现在的位置: 首页 > 综合 > 正文

Volley库源码分析(上)

2018年05月04日 ⁄ 综合 ⁄ 共 7164字 ⁄ 字号 评论关闭

Volley库源码分析(上):http://blog.csdn.net/mba16c35/article/details/43944703

Volley库源码分析(下):http://blog.csdn.net/mba16c35/article/details/44589137

整体框架

Volley使用了线程池来作为基础结构,主要分为主线程,cache线程和network线程。
主线程和cache线程都只有一个,而NetworkDispatcher线程可以有多个,这样能解决比并行问题。如下图:


这里思考一下为什么Cache模块要单独作为一个线程?一般情况下可能会将这部分放在主线程实现,在主线程发出请求先查找Cache。这里因为Volley作为一个可扩展的框架,主线程中发出请求的可能是Image或者Json,甚至是客户派生出来的未知的数据格式,而且具体发出请求的代码也可能是客户代码,如果Cache查找放在主线程中,就会造成代码重复,因为这些情况各异的代码里都需要复制一份查找Cache的代码。于是Google就将这部分解藕出来,作为一个独立的线程。

核心:NetworkDispatcher

关键步骤

其中左下角是NetworkDispatcher线程,大致步骤是:
1.不断从请求队列中取出请求
request = mQueue.take();

2.发起网络请求

NetworkResponse networkResponse = mNetwork.performRequest(request);
3.把这个networkResponse转化为期望的数据类型,比如Response<String>,Response<Json>,Response<Bitmap>
Response<?> response = request.parseNetworkResponse(networkResponse);

4.将网络响应加入缓存

mCache.put(request.getCacheKey(), response.cacheEntry);

5.将网络响应发回主线程

mDelivery.postResponse(request, response);

下面是NetworkDispatcher线程的主要代码:

@Override
    public void run() {
        Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
        Request request;
        while (true) {
            try {
                // Take a request from the queue.
                request = mQueue.take();//1.从请求队列中取出一个网络请求,mQueue是BlockingQueue<Request>的实现类
            } catch (InterruptedException e) {
                // We may have been interrupted because it was time to quit.
                if (mQuit) {
                    return;
                }
                continue;
            }

            try {
                request.addMarker("network-queue-take");

                // If the request was cancelled already, do not perform the
                // network request.
                if (request.isCanceled()) {
                    request.finish("network-discard-cancelled");
                    continue;
                }

                // Tag the request (if API >= 14)
                if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.ICE_CREAM_SANDWICH) {
                    TrafficStats.setThreadStatsTag(request.getTrafficStatsTag());
                }

                // Perform the network request.
                NetworkResponse networkResponse = mNetwork.performRequest(request);//2.发起网络请求
                request.addMarker("network-http-complete");

                // If the server returned 304 AND we delivered a response already,
                // we're done -- don't deliver a second identical response.
                if (networkResponse.notModified && request.hasHadResponseDelivered()) {
                    request.finish("not-modified");
                    continue;
                }

                // Parse the response here on the worker thread.
                Response<?> response = request.parseNetworkResponse(networkResponse);//3.把这个networkResponse转化为期望的数据类型
                request.addMarker("network-parse-complete");

                // Write to cache if applicable.
                // TODO: Only update cache metadata instead of entire record for 304s.
                if (request.shouldCache() && response.cacheEntry != null) {
                    mCache.put(request.getCacheKey(), response.cacheEntry);//4.将网络响应加入缓存
                    request.addMarker("network-cache-written");
                }

                // Post the response back.
                request.markDelivered();
                mDelivery.postResponse(request, response);//5.将网络响应发回主线程
            } catch (VolleyError volleyError) {
                parseAndDeliverNetworkError(request, volleyError);
            } catch (Exception e) {
                VolleyLog.e(e, "Unhandled exception %s", e.toString());
                mDelivery.postError(request, new VolleyError(e));
            }
        }
    }

如何实现多线程下载

要理解Volley的并行性实现,必需理解PriorityBlockingQueue并发类。注意到我们在NetworkDispatcher循环中并没有使用显式的同步(使用Lock或者使用synchronize),因为PriorityBlockingQueue的实现是线程安全的。在使用显式的wait()和notifyAll()时存在的类和类之间的耦合可以因此消除,因为每个类只和BlockingQueue通信。这个知识点可以参考《Java编程思想》P713.

BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control. 
所以可以有多个NetworkDispatcher线程同时从请求队列PriorityBlockingQueue中取出请求而不会产生线程冲突,那就是Volley支持多线程下载图片的方式。

实际上,BlockingQueue接口是用于解决生产者-消费者问题的。
class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }

 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }

 class Setup {
   void main() {
     BlockingQueue q = new SomeQueueImplementation();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
 }}

所以在Volley中,request就是产品,主线程是生产者,NetworkDispatcher线程是消费者。

另外注意到,NetworkDispatcher的实现其实是策略设计模式:
/** The queue of requests to service. */
    private final BlockingQueue<Request> mQueue;
    /** The network interface for processing requests. */
    private final Network mNetwork;
    /** The cache to write to. */
    private final Cache mCache;
    /** For posting responses and errors. */
    private final ResponseDelivery mDelivery;
    /** Used for telling us to die. */
    private volatile boolean mQuit = false;

    /**
     * Creates a new network dispatcher thread.  You must call {@link #start()}
     * in order to begin processing.
     *
     * @param queue Queue of incoming requests for triage
     * @param network Network interface to use for performing requests
     * @param cache Cache interface to use for writing responses to cache
     * @param delivery Delivery interface to use for posting responses
     */
    public NetworkDispatcher(BlockingQueue<Request> queue,
            Network network, Cache cache,
            ResponseDelivery delivery) {
        mQueue = queue;
        mNetwork = network;
        mCache = cache;
        mDelivery = delivery;
    }

NetworkDispatcher构造函数的几个参数都是接口,而run方法则使用这些策略类方法实现了算法的主体流程,具体实现有些留给了开发者,有些则是框架实现。比如ImageCache作为一级缓存的Cache方法留给了开发者实现,由开发者控制具体的缓存策略,当然Volley建议我们使用LRUCache作为L1缓存的实现。


最后,NetworkDispatcher的数组则构成了RequestQueue类中线程池,由RequestQueue统一启动和停止:
/**
     * Creates the worker pool. Processing will not begin until {@link #start()} is called.
     *
     * @param cache A Cache to use for persisting responses to disk
     * @param network A Network interface for performing HTTP requests
     * @param threadPoolSize Number of network dispatcher threads to create
     * @param delivery A ResponseDelivery interface for posting responses and errors
     */
    public RequestQueue(Cache cache, Network network, int threadPoolSize,
            ResponseDelivery delivery) {
        mCache = cache;
        mNetwork = network;
        mDispatchers = new NetworkDispatcher[threadPoolSize];
        mDelivery = delivery;
    }
	
	/**
     * Starts the dispatchers in this queue.
     */
    public void start() {
        stop();  // Make sure any currently running dispatchers are stopped.
        // Create the cache dispatcher and start it.
        mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery);
        mCacheDispatcher.start();

        // Create network dispatchers (and corresponding threads) up to the pool size.
        for (int i = 0; i < mDispatchers.length; i++) {
            NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork,
                    mCache, mDelivery);
            mDispatchers[i] = networkDispatcher;
            networkDispatcher.start();
        }
    }

如何将网络响应发回主线程

通过PriorityBlockingQueue将各个线程解耦之后,最终的结果都必需直接发回主线程,这里用到的方法就是安卓中非常常见的Handler+Looper机制。
mDelivery.postResponse(request, response);

mDelivery是ResponseDelivery接口,由ExecutorDelivery实现,以下是ExecutorDelivery中postResponse的实现:

@Override
    public void postResponse(Request<?> request, Response<?> response) {
        postResponse(request, response, null);
    }

    @Override
    public void postResponse(Request<?> request, Response<?> response, Runnable runnable) {
        request.markDelivered();
        request.addMarker("post-response");
        mResponsePoster.execute(new ResponseDeliveryRunnable(request, response, runnable));
    }

我们看看mResponsePoster.execute()是怎么实现的:

 /** Used for posting responses, typically to the main thread. */
    private final Executor mResponsePoster;

    /**
     * Creates a new response delivery interface.
     * @param handler {@link Handler} to post responses on
     */
    public ExecutorDelivery(final Handler handler) {
        // Make an Executor that just wraps the handler.
        mResponsePoster = new Executor() {
            @Override
            public void execute(Runnable command) {
                handler.post(command);
            }
        };
    }

没错,就是通过handler去post请求,因为要将请求发回主线程,所以这里的Handler就是主线程的Hanlder

 public RequestQueue(Cache cache, Network network, int threadPoolSize) {
        this(cache, network, threadPoolSize,
                new ExecutorDelivery(new Handler(Looper.getMainLooper())));
    }

抱歉!评论已关闭.