Volley使用了线程池来作为基础结构,主要分为主线程,cache线程和network线程。
主线程和cache线程都只有一个,而NetworkDispatcher线程可以有多个,这样能解决比并行问题。如下图:
其中左下角是NetworkDispatcher线程,大致步骤是:
1.不断从请求队列中取出请求
request = mQueue.take();
2.发起网络请求
NetworkResponse networkResponse = mNetwork.performRequest(request);
3.把这个networkResponse转化为期望的数据类型,比如Response<String>,Response<Json>,Response<Bitmap>
Response<?> response = request.parseNetworkResponse(networkResponse);
4.将网络响应加入缓存
mCache.put(request.getCacheKey(), response.cacheEntry);
下面是NetworkDispatcher线程的主要代码:
@Override public void run() { Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); Request request; while (true) { try { // Take a request from the queue. request = mQueue.take();//1.从请求队列中取出一个网络请求,mQueue是BlockingQueue<Request>的实现类 } catch (InterruptedException e) { // We may have been interrupted because it was time to quit. if (mQuit) { return; } continue; } try { request.addMarker("network-queue-take"); // If the request was cancelled already, do not perform the // network request. if (request.isCanceled()) { request.finish("network-discard-cancelled"); continue; } // Tag the request (if API >= 14) if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.ICE_CREAM_SANDWICH) { TrafficStats.setThreadStatsTag(request.getTrafficStatsTag()); } // Perform the network request. NetworkResponse networkResponse = mNetwork.performRequest(request);//2.发起网络请求 request.addMarker("network-http-complete"); // If the server returned 304 AND we delivered a response already, // we're done -- don't deliver a second identical response. if (networkResponse.notModified && request.hasHadResponseDelivered()) { request.finish("not-modified"); continue; } // Parse the response here on the worker thread. Response<?> response = request.parseNetworkResponse(networkResponse);//3.把这个networkResponse转化为期望的数据类型 request.addMarker("network-parse-complete"); // Write to cache if applicable. // TODO: Only update cache metadata instead of entire record for 304s. if (request.shouldCache() && response.cacheEntry != null) { mCache.put(request.getCacheKey(), response.cacheEntry);//4.将网络响应加入缓存 request.addMarker("network-cache-written"); } // Post the response back. request.markDelivered(); mDelivery.postResponse(request, response); } catch (VolleyError volleyError) { parseAndDeliverNetworkError(request, volleyError); } catch (Exception e) { VolleyLog.e(e, "Unhandled exception %s", e.toString()); mDelivery.postError(request, new VolleyError(e)); } } }
要理解Volley的并行性实现,必需理解PriorityBlockingQueue并发类。注意到我们在NetworkDispatcher循环中并没有使用显式的同步(使用Lock或者使用synchronize),因为PriorityBlockingQueue的实现是线程安全的。在使用显式的wait()和notifyAll()时存在的类和类之间的耦合可以因此消除,因为每个类只和BlockingQueue通信。这个知识点可以参考《Java编程思想》P713.
BlockingQueue
implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control.
所以可以有多个NetworkDispatcher线程同时从请求队列PriorityBlockingQueue中取出请求而不会产生线程冲突,那就是Volley支持多线程下载图片的方式。
实际上,BlockingQueue接口是用于解决生产者-消费者问题的。
class Producer implements Runnable { private final BlockingQueue queue; Producer(BlockingQueue q) { queue = q; } public void run() { try { while (true) { queue.put(produce()); } } catch (InterruptedException ex) { ... handle ...} } Object produce() { ... } } class Consumer implements Runnable { private final BlockingQueue queue; Consumer(BlockingQueue q) { queue = q; } public void run() { try { while (true) { consume(queue.take()); } } catch (InterruptedException ex) { ... handle ...} } void consume(Object x) { ... } } class Setup { void main() { BlockingQueue q = new SomeQueueImplementation(); Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } }}
所以在Volley中,request就是产品,主线程是生产者,NetworkDispatcher线程是消费者。
另外注意到,NetworkDispatcher的实现其实是策略设计模式:
/** The queue of requests to service. */ private final BlockingQueue<Request> mQueue; /** The network interface for processing requests. */ private final Network mNetwork; /** The cache to write to. */ private final Cache mCache; /** For posting responses and errors. */ private final ResponseDelivery mDelivery; /** Used for telling us to die. */ private volatile boolean mQuit = false; /** * Creates a new network dispatcher thread. You must call {@link #start()} * in order to begin processing. * * @param queue Queue of incoming requests for triage * @param network Network interface to use for performing requests * @param cache Cache interface to use for writing responses to cache * @param delivery Delivery interface to use for posting responses */ public NetworkDispatcher(BlockingQueue<Request> queue, Network network, Cache cache, ResponseDelivery delivery) { mQueue = queue; mNetwork = network; mCache = cache; mDelivery = delivery; }
NetworkDispatcher构造函数的几个参数都是接口,而run方法则使用这些策略类方法实现了算法的主体流程,具体实现有些留给了开发者,有些则是框架实现。比如ImageCache作为一级缓存的Cache方法留给了开发者实现,由开发者控制具体的缓存策略,当然Volley建议我们使用LRUCache作为L1缓存的实现。
最后,NetworkDispatcher的数组则构成了RequestQueue类中线程池,由RequestQueue统一启动和停止:
/** * Creates the worker pool. Processing will not begin until {@link #start()} is called. * * @param cache A Cache to use for persisting responses to disk * @param network A Network interface for performing HTTP requests * @param threadPoolSize Number of network dispatcher threads to create * @param delivery A ResponseDelivery interface for posting responses and errors */ public RequestQueue(Cache cache, Network network, int threadPoolSize, ResponseDelivery delivery) { mCache = cache; mNetwork = network; mDispatchers = new NetworkDispatcher[threadPoolSize]; mDelivery = delivery; } /** * Starts the dispatchers in this queue. */ public void start() { stop(); // Make sure any currently running dispatchers are stopped. // Create the cache dispatcher and start it. mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery); mCacheDispatcher.start(); // Create network dispatchers (and corresponding threads) up to the pool size. for (int i = 0; i < mDispatchers.length; i++) { NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork, mCache, mDelivery); mDispatchers[i] = networkDispatcher; networkDispatcher.start(); } }
关于volley的网络请求部分可以看博客:http://www.cnblogs.com/bvin/p/3291611.html
网络请求中有几个转换解析请求获取响应结果的地方:
1.HttpStack接口的performRequest()方法
public HttpResponse performRequest(Request<?> request, Map<String, String> additionalHeaders)
2.Network接口的performRequest()方法
public NetworkResponse performRequest(Request<?> request)
3.Request类的parseNetworkResponse()抽象方法
abstract protected Response<T> parseNetworkResponse(NetworkResponse response);
可以很鲜明得看出第一个是对原生Http请求的解析,解析出来是一个Apach的HttpResponse 实例,这个结果就是通过上述两个HttpStack的实现类HttpClientStack和HurlStack执行获取的获取的。
而第二个解析出来是框架自定的NetworkResponse,这是通过Network的实现类BasicNetwork来获取的。
第三个就是第二个得出来NetworkResponse解析成用户期望Response<T> 了,这个Response和Request是对应的,有String型,json型还有Image型的。然后在通过ResponseDelivery把解析好的结果发送到主线程。
从Request到Response就是这样一步步走来的。
再详细看下上面第二个方法performRequest(),这个方法是NetworkDispatcher中主要步骤的第二步,其中真正网络请求的工作委托给mHttpStack实现,通过HttpStack实现类就是上图的两个HttpClientStack和HurlStack执行。这两个类的区别在于HttpClientStack是直接用HttpClient的execute()方法执行一个Http请求,而HurlStack就是直接用URL.openConnection()进行连接,这种方式在2.3以上是不能用的,所以分开了这两种方式。
/** * @title performRequest执行各种Request请求并以NetworkResponse的形式返回结果 * @param Request * @return NetworkResponse * @throws VolleyError * 定义:{@link Network#performRequest(Request)} * 被调:{@link NetworkDispatcher#run()} * */ @Override//NetworkDispatcher的run()方法中调用 public NetworkResponse performRequest(Request<?> request) throws VolleyError { long requestStart = SystemClock.elapsedRealtime();//开始请求时间 while (true) { HttpResponse httpResponse = null;//apache的请求结果 byte[] responseContents = null;//请求的内容 Map<String, String> responseHeaders = new HashMap<String, String>();//响应结果头部信息 try { // Gather headers. Map<String, String> headers = new HashMap<String, String>();//保存缓存数据 addCacheHeaders(headers, request.getCacheEntry());//先获取缓存数据 httpResponse = mHttpStack.performRequest(request, headers);//去调用mHttpStack的实现方法执行请求 StatusLine statusLine = httpResponse.getStatusLine();//获取http状态线 int statusCode = statusLine.getStatusCode();//获取状态码 responseHeaders = convertHeaders(httpResponse.getAllHeaders()); // Handle cache validation.//处理缓存验证 if (statusCode == HttpStatus.SC_NOT_MODIFIED) {//返回缓存数据 return new NetworkResponse(HttpStatus.SC_NOT_MODIFIED, request.getCacheEntry().data, responseHeaders, true); } //把HttpEntity转化为byte[]数据 responseContents = entityToBytes(httpResponse.getEntity()); // if the request is slow, log it.//如果请求很慢,就打印出来看一下 long requestLifetime = SystemClock.elapsedRealtime() - requestStart; logSlowRequests(requestLifetime, request, responseContents, statusLine);//打印 //连接正常但是返回无内容,抛出IO异常 if (statusCode != HttpStatus.SC_OK && statusCode != HttpStatus.SC_NO_CONTENT) { throw new IOException(); } return new NetworkResponse(statusCode, responseContents, responseHeaders, false); } catch (SocketTimeoutException e) {//读取超时,重试 attemptRetryOnException("socket", request, new TimeoutError()); } catch (ConnectTimeoutException e) {//连接超时,重试 attemptRetryOnException("connection", request, new TimeoutError()); } catch (MalformedURLException e) {//Bad URL throw new RuntimeException("Bad URL " + request.getUrl(), e); } catch (IOException e) {//IO异常 int statusCode = 0; NetworkResponse networkResponse = null; if (httpResponse != null) { statusCode = httpResponse.getStatusLine().getStatusCode(); } else {//如果没有返回httpResponse,就说明没连接 throw new NoConnectionError(e); } VolleyLog.e("Unexpected response code %d for %s", statusCode, request.getUrl()); if (responseContents != null) {//返回数据不为空 networkResponse = new NetworkResponse(statusCode, responseContents, responseHeaders, false);//创建响应体 if (statusCode == HttpStatus.SC_UNAUTHORIZED || statusCode == HttpStatus.SC_FORBIDDEN) {//认证失败异常,重试 attemptRetryOnException("auth", request, new AuthFailureError(networkResponse)); } else {//服务器异常 // TODO: Only throw ServerError for 5xx status codes. throw new ServerError(networkResponse);//只有状态码为5XX才抛出服务器异常 } } else {//网络异常 throw new NetworkError(networkResponse); } } } }
A:先是通过mHttpStack把请求执行并且获取它的响应结果,根据HttpStatus做出各种判断。
B:然后再把httpResponse的Entity转化为ByteArray,并处理各种发生的异常。
C:最后的过程是这样的:通过Volley创建一个RequestQueue请求队列,当这个队列开始运作的时候会启动NetworkDispatcher这个工作线程,而BasicNetwork的performRequest()的方法就在NetworkDispatcher线程run()方法中调用,然后通过mHttpStack的performRequest()方法获取一个networkResponse,在NetworkDispatcher线程把这个networkResponse转化为期望的数据类型,比如Response<String>,Response<Json>,Response<Bitmap>。