从源代码分析Universal-Image-Loader中的线程池

一般来讲一个网络访问就需要App创建一个线程来执行,但是这也导致了当网络访问比较多的情况下,线程的数目可能积聚增多,虽然Android系统理论上说可以创建无数个线程,但是某一时间段,线程数的急剧增加可能导致系统OOM。在UIL中引入了线程池这种技术来管理线程。合理利用线程池能够带来三个好处。第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

前面我们有讲到ImageLoader.displayImage(…)函数中的图片处理流程,但当时有意忽略了线程方面的额处理。UIL中将线程池相关的东西封装在ImageLoaderEngine类中了。让我们回到图片下载的源代码中,也就是ImageLoader.displayImage(…)函数。

 1     public void displayImage(String uri, ImageAware imageAware, DisplayImageOptions options,
 2             ImageLoadingListener listener, ImageLoadingProgressListener progressListener) {
 3         //检查UIL的配置是否被初始化
 4         checkConfiguration();
 5         if (imageAware == null) {
 6             throw new IllegalArgumentException(ERROR_WRONG_ARGUMENTS);
 7         }
 8         if (listener == null) {
 9             listener = emptyListener;
10         }
11         if (options == null) {
12             options = configuration.defaultDisplayImageOptions;
13         }
14
15         if (TextUtils.isEmpty(uri)) {
16             engine.cancelDisplayTaskFor(imageAware);
17             listener.onLoadingStarted(uri, imageAware.getWrappedView());
18             if (options.shouldShowImageForEmptyUri()) {
19                 imageAware.setImageDrawable(options.getImageForEmptyUri(configuration.resources));
20             } else {
21                 imageAware.setImageDrawable(null);
22             }
23             listener.onLoadingComplete(uri, imageAware.getWrappedView(), null);
24             return;
25         }
26         //计算Bitmap的大小,以便后面解析图片时用
27         ImageSize targetSize = ImageSizeUtils.defineTargetSizeForView(imageAware, configuration.getMaxImageSize());
28         String memoryCacheKey = MemoryCacheUtils.generateKey(uri, targetSize);
29         engine.prepareDisplayTaskFor(imageAware, memoryCacheKey);
30
31         listener.onLoadingStarted(uri, imageAware.getWrappedView());
32         //Bitmap是否缓存在内存?
33         Bitmap bmp = configuration.memoryCache.get(memoryCacheKey);
34         if (bmp != null && !bmp.isRecycled()) {
35             L.d(LOG_LOAD_IMAGE_FROM_MEMORY_CACHE, memoryCacheKey);
36
37             if (options.shouldPostProcess()) {
38                 ImageLoadingInfo imageLoadingInfo = new ImageLoadingInfo(uri, imageAware, targetSize, memoryCacheKey,
39                         options, listener, progressListener, engine.getLockForUri(uri));
40                 //处理并显示图片
41                 ProcessAndDisplayImageTask displayTask = new ProcessAndDisplayImageTask(engine, bmp, imageLoadingInfo,
42                         defineHandler(options));
43                 if (options.isSyncLoading()) {
44                     displayTask.run();
45                 } else {
46                     engine.submit(displayTask);
47                 }
48             } else {
49                 //显示图片
50                 options.getDisplayer().display(bmp, imageAware, LoadedFrom.MEMORY_CACHE);
51                 listener.onLoadingComplete(uri, imageAware.getWrappedView(), bmp);
52             }
53         } else {
54             if (options.shouldShowImageOnLoading()) {
55                 imageAware.setImageDrawable(options.getImageOnLoading(configuration.resources));
56             } else if (options.isResetViewBeforeLoading()) {
57                 imageAware.setImageDrawable(null);
58             }
59
60             ImageLoadingInfo imageLoadingInfo = new ImageLoadingInfo(uri, imageAware, targetSize, memoryCacheKey,
61                     options, listener, progressListener, engine.getLockForUri(uri));
62             //启动一个线程,加载并显示图片
63             LoadAndDisplayImageTask displayTask = new LoadAndDisplayImageTask(engine, imageLoadingInfo,
64                     defineHandler(options));
65             if (options.isSyncLoading()) {
66                 displayTask.run();
67             } else {
68                 engine.submit(displayTask);
69             }
70         }
71     }

注意上面代码块中的第68行,当需要加载显示图片的时候,displayTask对象通过engine.submit(...)函数提交,那么这背后发生了什么呢?engine是ImageLoaderEngine类的一个实例,他主要用来响应displayTask的执行。

接下来让我们看看ImageLoaderEngine中相关的字段和方法。

 1 class ImageLoaderEngine {
 2
 3     final ImageLoaderConfiguration configuration;
 4
 5     private Executor taskExecutor;
 6     private Executor taskExecutorForCachedImages;
 7     private Executor taskDistributor;
 8
 9     private final Map<Integer, String> cacheKeysForImageAwares = Collections
10             .synchronizedMap(new HashMap<Integer, String>());
11     private final Map<String, ReentrantLock> uriLocks = new WeakHashMap<String, ReentrantLock>();
12
13     private final AtomicBoolean paused = new AtomicBoolean(false);
14     private final AtomicBoolean networkDenied = new AtomicBoolean(false);
15     private final AtomicBoolean slowNetwork = new AtomicBoolean(false);
16
17     private final Object pauseLock = new Object();
18
19     ImageLoaderEngine(ImageLoaderConfiguration configuration) {
20         this.configuration = configuration;
21
22         taskExecutor = configuration.taskExecutor;
23         taskExecutorForCachedImages = configuration.taskExecutorForCachedImages;
24
25         taskDistributor = DefaultConfigurationFactory.createTaskDistributor();
26     }
27
28     /** Submits task to execution pool */
29     void submit(final LoadAndDisplayImageTask task) {
30         taskDistributor.execute(new Runnable() {
31             @Override
32             public void run() {
33                 File image = configuration.diskCache.get(task.getLoadingUri());
34                 boolean isImageCachedOnDisk = image != null && image.exists();
35                 initExecutorsIfNeed();
36                 if (isImageCachedOnDisk) {
37                     taskExecutorForCachedImages.execute(task);
38                 } else {
39                     taskExecutor.execute(task);
40                 }
41             }
42         });
43     }
44
45     /** Submits task to execution pool */
46     void submit(ProcessAndDisplayImageTask task) {
47         initExecutorsIfNeed();
48         taskExecutorForCachedImages.execute(task);
49     }
50
51     private void initExecutorsIfNeed() {
52         if (!configuration.customExecutor && ((ExecutorService) taskExecutor).isShutdown()) {
53             taskExecutor = createTaskExecutor();
54         }
55         if (!configuration.customExecutorForCachedImages && ((ExecutorService) taskExecutorForCachedImages)
56                 .isShutdown()) {
57             taskExecutorForCachedImages = createTaskExecutor();
58         }
59     }
60
61     private Executor createTaskExecutor() {
62         return DefaultConfigurationFactory
63                 .createExecutor(configuration.threadPoolSize, configuration.threadPriority,
64                 configuration.tasksProcessingType);
65     }
66     //省略部分代码....
67 }

注意到第29行submit(final LoadAndDisplayImageTask task)函数,我们发现这个函数根据isImageCachedOnDisk判断文件是否有缓存在磁盘中,通过不同的taskExecutor来执行。taskExecutorForCachedImages、taskExecutor、taskDistributor这三个对象其实是Executor接口的的实例。

Executor接口执行已提交的 Runnable 任务的对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。例如,可能会使用以下方法,而不是为一组任务中的每个任务调用 new Thread(new(RunnableTask())).start():

Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...

taskExecutorForCachedImages和taskExecutor都是在ImageLoaderEngine.createTaskExecutor()中创建,经过分析我们发现他在DefaultConfigurationFactory.createExecutor中被初始化成ThreadPoolExecutor类型的对象(这是默认情况)。让我们看看ThreadPoolExecutor的使用。

创建一个ThreadPoolExecutor需要的参数:

  • corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
  • runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。
    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
    • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
  • maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
  • ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
  • RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。
    • AbortPolicy:直接抛出异常。
    • CallerRunsPolicy:只用调用者所在线程来运行任务。
    • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
    • DiscardPolicy:不处理,丢弃掉。
    • 当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
  • keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
  • TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

接下来,再让我们分析taskDistributor的创建过程。分析发现,taskDistributor在DefaultConfigurationFactory.createTaskDistributor()中被创建,它通过Executors这个工厂创建,其实也是一个ThreadPoolExecutor类型的对象。

用Executors静态工厂方法创建的线程池类型

a) newFixedThreadPool:创建一个定长的线程池。达到最大线程数后,线程数不再增长。如果一个线程由于非预期Exception而结束,线程池会补充一个新的线程。

b) newCachedThreadPool:创建一个可缓存的线程池。当池长度超过处理需求时,可以回收空闲的线程。

c) newSingleThreadPool:创建一个单线程executor。

d) newScheduledThreadPool:创建一个定长的线程池,而且支持定时的以及周期性的任务执行。类似于Timer。但是,Timer是基于绝对时间,对系统时钟的改变是敏感的,而ScheduledThreadPoolExecutor只支持相对时间。

1) Timer是创建唯一的线程来执行所有的timer任务。如果一个任务超时了,会导致其他的TimerTask时间准确性出问题。

2)如果TimerTask抛出uncheck 异常,Timer将会产生无法预料的行为。因此,ScheduledThreadPoolExecutor可以完全代替Timer。

从上面ImageLoaderEngine.submit(...)分析,可以得知:

taskDistributor用来尝试读取磁盘中是否有图片缓存,因为涉及磁盘操作,需要用线程来执行。根据是否有对应的图片缓存,将图片加载的任务分发到对应的执行器。如果图片已经缓存在磁盘,则通过taskExecutorForCachedImages执行,如果图片没有缓存在磁盘,则通过taskExecutor执行。我们注意到这三个都实现了Executor接口,那么为什么要将任务细分在三个线程池中进行呢?这其实这跟线程池的调优有关,如果我们将所有的任务都放在同一个线程池中运行当然是可以的,但是这样的话所有的任务就都只能采取同一种任务优先级和运行策略。显然果要有更好的性能,在线程数比较多并且线程承担的任务不同的情况下,App中最好还是按任务的类别来划分线程池。

上面的分析又引出一个问题,我们究竟应该如何配置自己的线程池。

合理的配置线程池

要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:

  1. 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
  2. 任务的优先级:高,中和低。
  3. 任务的执行时间:长,中和短。
  4. 任务的依赖性:是否依赖其他系统资源,如数据库连接。

任务性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务配置尽可能小的线程,如配置Ncpu+1个线程的线程池。IO密集型任务则由于线程并不是一直在执行任务,则配置尽可能多的线程,如2*Ncpu。混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。

依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。

建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。有一次我们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞住,任务积压在线程池里。如果当时我们设置成无界队列,线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然我们的系统所有的任务是用的单独的服务器部署的,而我们使用不同规模的线程池跑不同类型的任务,但是出现这样问题时也会影响到其他任务。

接下来,让我们看看UIL中线程池的配置。

让我们来分析一下,taskDistributor由于在每创建一个新的线程的时候都需要读取一下磁盘,属于IO操作。需要图片缓存的应用一般在需要加载图片的时候,同时创建很多(>5)线程,这些线程一般来得猛去的也快,存活时间不必太长。taskDistributor和taskExecutorForCachedImages涉及网络和磁盘的读取和写入操作,比较耗时。主线程数默认为3,感觉定的低了,实际上IO密集的操作应该定得高一点,以便合理利用CPU的。线程优先级(10为最高,1为最低)为4是比较合理的,因为这些操作只需要后台完成即可,优先级太高可能让界面失去响应。

参考链接

聊聊并发(三)——JAVA线程池的分析和使用

线程池的介绍及简单实现

java Executor

时间: 2024-11-03 21:22:00

从源代码分析Universal-Image-Loader中的线程池的相关文章

Java中的线程池

综述 在我们的开发中经常会使用到多线程.例如在Android中,由于主线程的诸多限制,像网络请求等一些耗时的操作我们必须在子线程中运行.我们往往会通过new Thread来开启一个子线程,待子线程操作完成以后通过Handler切换到主线程中运行.这么以来我们无法管理我们所创建的子线程,并且无限制的创建子线程,它们相互之间竞争,很有可能由于占用过多资源而导致死机或者OOM.所以在Java中为我们提供了线程池来管理我们所创建的线程. 线程池的使用 采用线程池的好处 在这里我们首先来说一下采用线程池的

《Java并发编程的艺术》 第9章 Java中的线程池

第9章 Java中的线程池 在开发过程中,合理地使用线程池能带来3个好处: 降低资源消耗.通过重复利用已创建的线程 降低线程创建和销毁造成的消耗. 提高响应速度.当任务到达时,任务可以不需要等到线程创建就能立即执行. 提高线程的可管理性.线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配.调优和监控. 9.1 线程池的实现原理 当提交一个新任务到线程池时,线程池的处理流程如下: 1)线程池判断核心线程池里的线程是否都在执行任务.如果不是,则创建

分析AsyncTask中的线程池

问题由来: 之前看到一篇博文,说AsyncTask不适合运行多任务, 多个任务不会异步执行, 当时只是印象里记住了一下也不确定, 今天把代码看了看, 把原因写出来. 问题的代码演示: 1 public class AsyncTaskDemo extends AsyncTask<String, Integer, String>{ 2 private final static String TAG = "AsyncTaskTest"; 3 4 @Override 5 prote

Java中的线程池ExecutorService

示例 import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; public class Ch09_Executor { private static void run(ExecutorService threadPool) { for(int i = 1; i < 5; i++)

JDK1.8中的线程池

上面这段代码一直在用,面试的时候也经常被问到,却从未深究过,不知道线程池到底是怎么回事,今天看看源代码,一探其究竟 线程池主要控制的状态是ctl,它是一个原子的整数,其包含两个概念字段: workerCount:有效的线程数量 runState:线程池的状态 为了在一个整型值里面包含这两个字段,我们限制workerCount最多2的29次方减1 runState的值有这样几种: RUNNING: 接受新的任务,并处理队列中的任务 SHUTDOWN:不接受新的任务,继续处理队列中的任务 STOP:

Java中的线程池——ThreadPoolExecutor的原理

1 线程池的处理流程向线程池提交一个任务后,它的主要处理流程如下图所示一个线程从被提交(submit)到执行共经历以下流程: 线程池判断核心线程池里是的线程是否都在执行任务,如果不是,则创建一个新的工作线程来执行任务.如果核心线程池里的线程都在执行任务,则进入下一个流程线程池判断工作队列是否已满.如果工作队列没有满,则将新提交的任务储存在这个工作队列里.如果工作队列满了,则进入下一个流程.线程池判断其内部线程是否都处于工作状态.如果没有,则创建一个新的工作线程来执行任务.如果已满了,则交给饱和策

Linux中epoll+线程池实现高并发

服务器并发模型通常可分为单线程和多线程模型,这里的线程通常是指"I/O线程",即负责I/O操作,协调分配任务的"管理线程",而实际的请求和任务通常交由所谓"工作者线程"处理.通常多线程模型下,每个线程既是I/O线程又是工作者线程.所以这里讨论的是,单I/O线程+多工作者线程的模型,这也是最常用的一种服务器并发模型.我所在的项目中的server代码中,这种模型随处可见.它还有个名字,叫"半同步/半异步"模型,同时,这种模型也是生

Spring中的线程池ThreadPoolTaskExecutor

1.直接调用Spring框架中的ThreadPoolTaskExecutor ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor(); //线程池所使用的缓冲队列 poolTaskExecutor.setQueueCapacity(200); //线程池维护线程的最少数量 poolTaskExecutor.setCorePoolSize(5); //线程池维护线程的最大数量 poolTaskExecutor.s

Tomcat中的线程池StandardThreadExecutor

之所以今天讨论它,因为在motan的的NettyServer中利用它这个线程池可以作为业务线程池,它定制了一个自己的线程池.当然还是基于jdk中的ThreadExecutor中的构造方法和execute方法,然后在外边包装一层. public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,