一、 背景
线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,合理的使用线程池可以对线程进行统一的分配、调优和监控,并有以下好处:
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
第三:提高线程的可管理性。
二、线程池的架构
三、Executors
用于创建线程池
newFixedThreadPool(固定大小线程池)
初始化一个指定线程数的线程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作为阻塞队列,不过当线程池没有可执行任务时,也不会释放线程。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }
newCachedThreadPool(无界线程池,可以进行自动线程回收)
1、初始化一个可以缓存线程的线程池,默认缓存60s,使用SynchronousQueue作为阻塞队列;
2、和newFixedThreadPool创建的线程池不同,newCachedThreadPool在没有任务执行时,当线程的空闲时间超过keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销;
所以,使用该线程池时,一定要注意控制并发的任务数,否则创建大量的线程可能导致严重的性能问题。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
newSingleThreadExecutor(单个后台线程)
初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行,内部使用LinkedBlockingQueue作为阻塞队列。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
newScheduledThreadPool
创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务。
通过如上配置的线程池的创建方法源代码,我们可以发现:
1> 除了CachedThreadPool使用的是直接提交策略的缓冲队列以外,其余两个采用的都是无界缓冲队列
2> 三个线程池采用的ThreadPoolExecutor构造方法都是同一个,使用的都是默认的ThreadFactory和handler:
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
提交任务用submit(),关闭线程池用shutdown()。
四、ExecutorService任务周期管理接口
Executor的实现通常都会创建线程来执行任务,但是使用异步方式来执行任务时,由于之前提交任务的状态不是立即可见的,所以如果要关闭应用程序时,就需要将受影响的任务状态反馈给应用程序。
为了解决执行服务的生命周期问题,Executor扩展了EecutorService接口,添加了一些用于生命周期管理的方法。如下:
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // 省略部分方法 }
五、ThreadPoolExecutor
线程池的主要工作流程如下图:
类中定义的重要变量,如下:
1. private final BlockingQueue<Runnable> workQueue; // 阻塞队列 2. private final ReentrantLock mainLock = new ReentrantLock(); // 互斥锁 3. private final HashSet<Worker> workers = new HashSet<Worker>();// 线程集合.一个Worker对应一个线程 4. private final Condition termination = mainLock.newCondition();// 终止条件 5. private int largestPoolSize; // 线程池中线程数量曾经达到过的最大值。 6. private long completedTaskCount; // 已完成任务数量 7. private volatile ThreadFactory threadFactory; // ThreadFactory对象,用于创建线程。 8. private volatile RejectedExecutionHandler handler;// 拒绝策略的处理句柄 9. private volatile long keepAliveTime; // 线程池维护线程所允许的空闲时间 10. private volatile boolean allowCoreThreadTimeOut; 11. private volatile int corePoolSize; // 线程池维护线程的最小数量,哪怕是空闲的 12. private volatile int maximumPoolSize; // 线程池维护的最大线程数量
其中有几个重要的规则需要说明一下:
1> corePoolSize与maximumPoolSize
线程池将根据 corePoolSize和 maximumPoolSize设置的边界自动调整池大小,当新任务在方法 execute() 中提交时:
- 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
- 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;当队列满时才创建新线程去处理请求;
- 如果当前线程池中的线程数目达到maximumPoolSize,即队列已经满了,则通过handler所指定的任务拒绝策略来处理新请求;
- 如果线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;
也就是说,处理任务的优先级为:
- 1. 核心线程corePoolSize > 任务队列workQueue > 最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
- 2. 当池中的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁。
2> workQueue
线程池所使用的缓冲队列,该缓冲队列的长度决定了能够缓冲的最大数量,缓冲队列有三种通用策略:
1) 直接提交。SynchronousQueue,它将任务直接提交给线程执行而不保存它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。
2) 无界队列。使用无界队列将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;
3) 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量.
3>ThreadFactory
使用 ThreadFactory 创建新线程。通过提供不同的 ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。
- public interface ThreadFactory {
- Thread newThread(Runnable r);
- }
而构造方法中的threadFactory对象,是通过 Executors.defaultThreadFactory()返回的。
4>RejectedExecutionHandler
当Executor已经关闭(即执行了executorService.shutdown()方法后),并且Executor将有限边界用于最大线程和工作队列容量,且已经饱和时,在方法execute()中提交的新任务将被拒绝.
在以上述情况下,execute 方法将调用RejectedExecutionHandler.rejectedExecution() 方法。
下面提供了四种预定义的处理程序策略:
1) AbortPolicy 直接抛出异常 RejectedExecutionException;
2) CallerRunsPolicy 用调用者所在的线程来执行任务
3) DiscardPolicy 不能执行的任务将被删除;
4) DiscardOldestPolicy 如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
5>keepAliveTime
线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用;
六、线程池的关闭
通过调用线程池的shutdown或shutdownNow方法来关闭线程池,但是它们的实现原理不同,shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。shutdownNow是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。shutdownNow会首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。
只要调用了这两个关闭方法的其中一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于我们应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown来关闭线程池,如果任务不一定需要执行完,则可以调用shutdownNow。
七、线程池的配置
可以从以下几个角度来进行分析:
1. 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
2. 任务的优先级:高,中和低。
3. 任务的执行时间:长,中和短。
4. 任务的依赖性:是否依赖其他系统资源,如数据库连接。
CPU密集型任务配置尽可能少的线程数量,如配置Ncpu+1个线程的线程池。
IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2*Ncpu。
混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。
我们可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。
依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。
建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点