线程池
线程池是可以控制线程创建、释放、并通过某种策略尝试复用线程去执行任务的一种管理框架,从而实现线程资源与任务之间的一种平衡。
类图
Executor
Executor是最基本的执行接口:“执行者”接口,只提供了一个方法:可以用来执行已经提交的Runnable任务对象,这个接口提供了一种将“任务提交”与“任务执行”解耦的方法。
public interface Executor { void execute(Runnable command); }
ExecutorService
接口继承了Executor接口,“执行者服务”接口,在Executor上做了一些扩展,可以说是真正的线程池接口,工厂类Executors创建的各种规格的线程池返回的对象都是ExecutorService的子类。
public interface ExecutorService extends Executor { /** * 启动一次有序的关闭,之前提交的任务执行,但不接受新任务 */ void shutdown(); /** * 试图停止所有正在执行的任务,暂停处理正在等待的任务,返回一个等待执行的任务列表 * 这个方法不会等待正在执行的任务终止 */ List<Runnable> shutdownNow(); // 如果已经被shutdown,返回true boolean isShutdown(); // 如果所有任务都已经被终止,返回true boolean isTerminated(); // 在一个shutdown请求后,阻塞的等待所有任务执行完毕 // 或者到达超时时间,或者当前线程被中断 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // 提交一个有返回值的任务,并返回一个Future代表等待的任务执行的结果, 等到任务成功执行,Future#get()方法会返回任务执行的结果 <T> Future<T> submit(Callable<T> task); // 提交一个可以执行的任务,返回一个Future代表这个任务, 等到任务执行结束,Future#get()方法会返回这个给定的result <T> Future<T> submit(Runnable task, T result); // 提交一个可执行的任务,返回一个Future代表这个任务, 等到任务成功执行,Future#get()方法会返回null Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
AbstractExecutorService
抽象类实现了ExecutorSerivce接口中的大部分方法,实现的方法有如下:
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
沿着Executor,ExecutorService,AbstractExecutorService和ThreadPoolExecutor可以看出这是一个模板方法设计模式。
模板方法模式:定义一个操作中算法的框架(比如上面的submit),而将一些步骤延迟到子类中(比如execute)。模板方法模式使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。
模板方法模式是一种基于继承的代码复用技术,它是一种类行为型模式。
模板方法模式是结构最简单的行为型设计模式,在其结构中只存在父类与子类之间的继承关系。通过使用模板方法模式,可以将一些复杂流程的实现步骤封装在一系列基本方法中,在抽象父类中提供一个称之为模板方法的方法来定义这些基本方法的执行次序,而通过其子类来覆盖某些步骤,从而使得相同的算法框架可以有不同的执行结果。模板方法模式提供了一个模板方法来定义算法框架,而某些具体步骤的实现可以在其子类中完成。
TheadPoolExecutor
继承了AbstractExecutorService,是线程池的具体实现:
public class ThreadPoolExecutor extends AbstractExecutorService { }
ScheduledExecutorService
接口继承了ExecutorService接口,提供了带"周期执行"功能ExecutorService;
public interface ScheduledExecutorService extends ExecutorService { /** * 在给定延时后,创建并执行一个一次性的Runnable任务,任务执行完毕后,ScheduledFuture#get()方法会返回null */ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); /** * 在给定延时后,创建并执行一个ScheduledFutureTask,ScheduledFuture 可以获取结果或取消任务 */ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); /** * 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期,也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执 行,接着在 initialDelay + 2 * period 后执行,依此类推 * 如果执行任务发生异常,随后的任务将被禁止,否则任务只会在被取消或者Executor被终止后停止,如果任何执行的任务超过了周期,随后的执行会延时,不会并发执行 */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); /** * 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟 * 如果执行任务发生异常,随后的任务将被禁止,否则任务只会在被取消或者Executor被终止后停止 */ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }
ScheduledThreadPoolExecutor
既继承了TheadPoolExecutor线程池,也实现了ScheduledExecutorService接口,是带"周期执行"功能的线程池:
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { }
Executors
是线程池的静态工厂,其提供了快捷创建线程池的静态方法:
public static ExecutorService newCachedThreadPool(); public static ExecutorService newFixedThreadPool(int nThreads); public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize); public static ExecutorService newSingleThreadExecutor();
(1)newCachedThreadPool: 创建一个可缓存工作线程的线程池,默认存活时间是60s,线程数量可达到Integer.MAX_VALUE,内部使用SynchronousQueue作为阻塞队列;
在没有任务执行时,当线程的空闲时间超过keepAliveTime,则工作线程将会终止,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销。
(2)newFixedThreadPool: 创建一个指定工作线程的线程池,其中参数corePoolSize和maximumPoolSize相等,阻塞队列基于LinkedBlockingQueue。线程池空闲时也不会释放工作线程,且阻塞队列大小是Integer.MAX_VALUE。
(3)newScheduledThreadPool: 初始化的线程池可以在指定的时间内周期性的执行所提交的惹怒,在实际的业务场景中可以使用该线程池定期的同步数据。
(4)newSingleThreadExecutor: 初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行,阻塞队列是LinkedBlockingQueue,且大小为Integer.MAX_VALUE。
ThreadPoolExecutor
ThreadPoolExecutor及其交互类
RejectedExecutionHandler(拒绝策略或饱和策略)
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4中策略:
(1)AbortPoliy: 直接抛出异常,默认策略;
(2)CallerrunsPoliy: 用调用者所在的线程来执行任务;
(3)DiscardOldestPoliy: 丢弃阻塞队列中靠最前的任务,并执行当前任务;
(4)DiscardPoliy: 直接丢弃任务;
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义拒绝策略,如:记录日志或持久化存储不能处理的任务等。
ThreadFactory
线程工厂负责创建线程实例,ThreadPoolExecutor使用Executors.DefaultThreadFactory工厂类创建线程:
public interface ThreadFactory { Thread newThread(Runnable r); }
很显然这个工厂模式(或工厂方法模式)的应用。工厂方法模式:定义一个用于创建对象的接口(如ThreadFactory),然子类决定将哪一个类实例化(比如Executors.DefaultThreadFactory),让一个类的实例化延迟到其子类。
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
从默认工厂中产生的Thread,命名规则是pool-数值-thread-数值,且是非daemon线程,级别为NORMAl。
构造函数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
(1)corePoolSize: 线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;
如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;
如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程;
(2)maximumPoolSize: 线程池中国允许的最大线程数。如果氮气阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize
(3)keepAliveTime: 线程空闲的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,改参数只在线程数大于corePoolSize时才有用
(4)workQueue: 必须是BlockingQueue阻塞队列,当线程池中的线程数超过它的corePoolSize的时候,线程会进入阻塞队列进行阻塞等待,通过workQueue,线程池上线额阻塞功能;
(5)unit: keepAliveTime的超时单位;
(6)threadFactory: 线程工厂,负责创建线程;
(7)handler: 拒绝策略
几种排队的策略
不排队,直接提交
将任务直接给线程处理而不保持它们,可使用SynchronousQueue如果不存在可用于立刻运行任务的线程(即线程池中的线程都在工作)则试图把任务加入缓冲队列将会失败,因此会构造一个新的线程来处理新添加的任务,并将其加入到线程池中。newCachedThreadPool()采用的就是这种策略。
无界队列
可以使用LinkedBlockingQueue(基于链表的无界队列,FIFO),理论上该队列可以对无限多的任务排队。
将导致所有corePoolSize线程都工作的情况下将新任务加入到队列中,这样创建的线程就不会超过corePoolSize,也因此,maximumPoolSize的值也就无效了。
备注:如果LinkedBlockingQueue构造函数指定容量大小,则是有界队列,否则队列大小是Integer.MAX_VALUE
有界队列
可以使用ArrayBlockingQueue(基于数组的有界队列,FIFO)并指定队列的最大长度,使用有界队列可以防止资源耗尽,但也会造成超过队列大小和maximumPoolSize后,提交的任务被拒绝的问题。
ThreadPoolExecutor线程池执行流程
概述
当通过execute方法将一个Runnable任务添加到线程池中,按照如下顺序来处理:
(1)如果线程池中线程数量小于corePoolSize,就创建新的线程来执行新添加的任务;
(2)如果线程池中的线程数量大于等于corePoolSize,但队列workQueue未满,则将新添加的任务放到workQueue中,按照FIFO的顺序依次等待执行(线程池中有线程空闲出来后依次将队列中的任务交付给空闲的线程执行)
(3)如果线程池中的线程数量大于等于corePoolSize,且队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务;
(4)如果线程池中的线程数量等于maximumPoolSize,就会拒绝处理。
总结:当有新的任务要处理时,先看线程池中的线程数量是否大于corePoolSize,再看缓存队列workQueue是否满,最后看线程中的线程数量是否大于maximumPoolSize。另外,当线程池中的线程数量大于corePoolSize时,如果里面有线程的空闲时间超过了keepAliveTime,就将其移除线程池。
如下为流程图:其中wc:代表线程池中线程数量,wq:标示阻塞队列,addWorker标示创建新线程,addQueue标示加入阻塞队列,reject标示拒绝服务。
线程池状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
其中ctl这个AtomicInteger对象,其告3位标示线程池的运行状态,低29位维护线程池中线程数量。
COUNT_BITS = Integer.SIZE-1 = 32 - 1 = 29
CAPACITY = (1 - COUNT_BITS) -1 = 00011111111111111111111111111111
~CAPACITY=11100000000000000000000000000000
(1)RUNNING: -1 << COUNT_BITS,即11100000000000000000000000000000,高3位全1,低29位全0,该状态的线程池会接受新任务,也会处理阻塞队列中等待处理的任务;
(2)SHUTDOWN: 0<<COUNT_BITS,即00000000000000000000000000000000,高3位全0,低29位全0,该状态的线程池不会接受新任务,但是还会处理已经提交到阻塞队列中等待处理的任务;
(3)STOP:1 << COUNT_BITS,即00100000000000000000000000000000,高3位是001,低29位全0,该状态的线程池不会接受新任务,不会处理阻塞队列中的任务,而且还会中断正在运行的任务。
(4)TIDYING:2 << COUNT_BITS,即01000000000000000000000000000000,高3位是010,低29位全0,该状态下所有任务都被终止了,workerCount为0,为此装填时还将调用terminated()方法
(5)TERMINATED:3 << COUNT_BITS,即01100000000000000000000000000000,高3位是011,低29位全0,terminated()方法调用完成后变成此状态。
runStateOf(int c):由于~CAPACITY只有高3位是全1,所以&操作比较高3位获取线程池状态
workerCountOf(int c):由于CAPACITY只有低29位全是1,素有&操作比较低29位获取线程数量
ctlOf(int rs, int wc) 通过rs高3位运行状态|wc低29位线程数量,合并打包并成ctl
这些状态均由int型标示,大小关系为RUNNING<SHUTDOWN<STOP<TIDYING<TERMINATED
execute方法
public void execute(Runnable command) { if (command == null) // 不允许提交空任务 throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 如果wc < corePoolSize则直接创建新任务执行 if (addWorker(command, true)) // 如果提交成功则直接退出,失败的原因有:(1)线程池已经shutdown(2)wc < corePoolSize之后,由于并发导致wc>=corePoolSize return; c = ctl.get();// 凡是需要再次使用ctl做判断时,都会再次调用ctl.get()获取最新值 } if (isRunning(c) && workQueue.offer(command)) { // 如果corePoolSize < wc < maximumPoolSize && 运行中,则入队列;如果队列满,则可能入队失败 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // 如果线程池不是running状态,应该拒绝添加新任务,出队列;双重检测,回滚入队列;删除队列元素失败的场景可能是:刚好有一个线程执行了该任务。 reject(command); else if (workerCountOf(recheck) == 0) // 如果线程池中无活动的线程,则增加一个线程,可能不断从队列中获取任务并执行。 addWorker(null, false); // 添加一个线程 } else if (!addWorker(command, false)) // 如果队列满了,则创建新线程运行任务, reject(command); // 如果失败,则说明线程池shutdown或者饱和了 }
具体流程图如下:
addWorker
从execute的实现来看,addWorker是关键的一个方法。
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { // 外层循环,负责判断线程池状态 int c = ctl.get(); int rs = runStateOf(c); <span style="white-space:pre"> </span> // 线程池状态值的大小为:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED // 有如下几种场景是可以直接拒绝增加新线程的:rs至少是SHUTDOWN状态,以下3个条件任意一个是false(1)rs==SHUTDOWN,false的情况是:线程池已经超过了SHUTDOWn状态,可能是STOP,TIDYING,TERMINATED其中之一,即线程已经终止了(2)firstTask==null,隐含rs==SHUTDOWN,false:firstTask!=null,场景是线程池已经shutdown,还要添加新的任务,拒绝(3)!workQueue.isEmpty(),隐含rs==SHUTDOWN && firstTask==null,false:wq为空,当firstTask为空是为了创建一个没有任务的线程,从wq中获取任务,如果wq为空,就没有添加新线程的必要了。 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 内层循环,负责workerCount + 1 int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // 如果wc > 线程池最大上限CAPACITY,或者wc > corePoolSize(maximumPoolSize) return false; if (compareAndIncrementWorkerCount(c)) // CAS操作,使得wc + 1,成功则跳出retry循环,标示累加成功 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) // 如果之前的CAS操作失败,需要从新获取线程池状态,如果与之前不一致,跳出内循环,继续去外层循环判断 continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; // 线程是否启动成功 boolean workerAdded = false; // 线程是否增加成功 Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 如果线程池在运行,或者线程池已经shutdown且firstTask==null(可能是wq中有未执行完成的任务,攒国家没有初始化的worker去执行队列中的任务) if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 如果添加成功,则启动线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) // 如果启动失败 addWorkerFailed(w); } return workerStarted; }
firstTask: worker线程的初始化任务,可以为空
core: true:workerCount与corePoolSize比较大小,false:workerCount与maximumPoolSize比较大小
addWorker()方法有4中传参数的方式:
(1)addWorker(cmd, true)
(2)addWorker(cmd,false)
(3)addWorker(null, false)
(4)addWorker(null, true)
其实在execute里面已经使用了前3种:
(1)线程数小于corePoolSize时,放一个需要处理的task进入线程池,如果线程池里面线程的长度超过corePoolSize,就返回false;
(2)当队列被放满时,就尝试将这个新来的task加入线程池,此时池中线程数量与maximumPoolSize比较大小,如果大于的话,返回false;
(3)放一个空的task进入线程池,长度限制是maximumPoolSize,这样的一个task为null的worker线程去任务队列中获取任务。
(4)这个场景在prestartAllCoreThreads方法里面,相当于预先初始化corePoolSize个空闲线程。
addWorker流程图,如下:
内部类Worker
Worker包装了task,管理着运行线程的中断状态和一些指标。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } }
Worker类大体上管理着运行线程的中断状态和一些指标,Worker继承了AbstractQueuedSynchronizer来简化在任务执行时的获取、释放锁,这样防止了中断在运行中的任务,
Worker实现了一个简单的不可重入的互斥锁,而不是用ReentrantLock可重入锁,此外,为了让线程真正开始后才可以中断,初始化lock状态为-1,在开始runWoker时将state设置为零,而state>=0才可以中断。
Worker类本身即实现了Runnable,又继承了AbstractQueuedSynchronizer(AQS),所以既是一个可执行的任务,又可以达到锁的效果。
到这里几个Runnable和Thread要理清楚:Worker,Worker内部的thread,Worker内部的firstTask
首先new Worker(Runnable),利用传入的Runnable,初始化firstTask,然后通过DefaultThreadFactory来初始化thread,其中newThread(this),里面的参数是Worker,
而在addWorker()方法中调用的是Worker.thread.start(),所以启动的是Worker内部的thread线程对象,但是执行的是Worker的run(),而run执行runWorker(this),runWorker内部执行firstTask.run()真正的任务。
所以Worker是线程池中的线程,而firstTask虽然是Runnable,但是并没有真正执行,只是被Worker调用了run方法。
Worker控制中断主要有以下几方面:
1、初始AQS状态为-1,此时不允许中断interrupt(),只有在worker线程启动了,执行了runWoker(),将state置为0,才能中断
不允许中断体现在:
A、shutdown()线程池时,会对每个worker tryLock()上锁,而Worker类这个AQS的tryAcquire()方法是固定将state从0->1,故初始状态state==-1时tryLock()失败,没发interrupt()
B、shutdownNow()线程池时,不用tryLock()上锁,但调用worker.interruptIfStarted()终止worker,interruptIfStarted()也有state>0才能interrupt的逻辑
2、为了防止某种情况下,在运行中的worker被中断,runWorker()每次运行任务时都会lock()上锁,而shutdown()这类可能会终止worker的操作需要先获取worker的锁,这样就防止了中断正在运行的线程
Worker实现的AQS为不可重入锁,为了是在获得worker锁的情况下再进入其它一些需要加锁的方法
runWorker(Worker)
每个线程在启动时都会调用runWorker方法,并传入一个Worker对象
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // 此处将state状态设置为0,允许中断 boolean completedAbruptly = true; // true:异常退出,false:正常退出 try { while (task != null || (task = getTask()) != null) { // 如果firstTask == null,则从队列中获取。 w.lock();// 不是为了防止并发,而是为了在shutdown状态下,不能终止正在运行的worker(worker是不可重入锁) if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run();// 任务执行 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; // 完成任务 + 1 w.unlock(); // 解锁 } } completedAbruptly = false;// 正常执行 } finally { processWorkerExit(w, completedAbruptly); // 处理worker退出 } }
(1)执行任务之前,首先worker.unlock()将AQS的state设置为0,允许中断当前worker线程;
(2)在执行任务之前worker.lock(),在任务执行完之后解锁,为了防止在任务运行时被线程池一些中断操作中断;
(3)在任务执行前后,可以自定义beforeExecute()和afterExecute()方法;
(4)无论在beforeExecute,task.run,afterExecute发生异常上抛,都会导致worker线程终止,进入processWorkerExit()处理worker退出的流程;
(5)如正常执行完当前task后,会通过getTask从阻塞队列中获取新任务,当队列中没有任务,且获取任务超时,那么当前worker也会进入退出流程。
getTask()
getTask从堵塞队列中获取等待执行的任务。
private Runnable getTask() { boolean timedOut = false; // 调用最新的poll()是否超时? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // (1)如果线程池状态为>=stop(2)线程池状态为SHUTDOWN并且队列为空 decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 默认allowCoreThreadTimeOut=false,所以当workerCount > corePoolSize时 // 这个分支包括4种情况:(1)wc > maximumPoolSize && wc > 1 (2) wc > maximumPoolSize && workQueue.isEmpty() // (3)timed && timedOut && wc > 1 (4) timed && timedOut && workQueue.isEmpty() if ((wc > maximumPoolSize || (timed && timedOut)) // wc > && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 如果workerCount > corePoolSize,则超时获取任务,否则阻塞获取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) // 如果r == null肯定是超时获取的任务 return r; timedOut = true; // 只要超时获取那一直都是这个状态 } catch (InterruptedException retry) { timedOut = false; // 如果在阻塞过程中,被interrrupt,重置timedOut为false } } }
有如下几种场景会返回null:
(1)通过调用setMaximumPoolSize导致worker数量大于maximumPoolSize
(2)线程池已经stop
(3)线程池是SHUTDOWN,并且队列为空
(4)worker等待task超时,超时的worker将被终止,
1、首先判断是否可以满足从workQueue中获取任务的条件,不满足return null
A、线程池状态是否满足:
(a)shutdown状态 + workQueue为空 或 stop状态,都不满足,因为被shutdown后还是要执行workQueue剩余的任务,但workQueue也为空,就可以退出了
(b)stop状态,shutdownNow()操作会使线程池进入stop,此时不接受新任务,中断正在执行的任务,workQueue中的任务也不执行了,故return null返回
B、线程数量是否超过maximumPoolSize 或 获取任务是否超时
(a)线程数量超过maximumPoolSize可能是线程池在运行时被调用了setMaximumPoolSize()被改变了大小,否则已经addWorker()成功不会超过maximumPoolSize
(b)如果 当前线程数量>corePoolSize,才会检查是否获取任务超时,这也体现了当线程数量达到maximumPoolSize后,如果一直没有新任务,会逐渐终止worker线程直到corePoolSize
2、如果满足获取任务条件,根据是否需要定时获取调用不同方法:
A、workQueue.poll():如果在keepAliveTime时间内,阻塞队列还是没有任务,返回null
B、workQueue.take():如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务
3、在阻塞从workQueue中获取任务时,可以被interrupt()中断,代码中捕获了InterruptedException,重置timedOut为初始值false,再次执行第1步中的判断,满足就继续获取任务,不满足return null,会进入worker退出的流程
processWorkerExit(Worker w, boolean completedAbruptly)线程退出
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // 如果true说明worker是异常退出,如果是false标示worker是正常退出并且worker没有可执行的task,不用-1,因为在getTask里面已经-1 decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 统计 completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate();// 在对线程池有负效益的操作时,都需要尝试终止线程池。 int c = ctl.get(); if (runStateLessThan(c, STOP)) { // 如果状态是running和shutdown if (!completedAbruptly) { // 如果线程是异常退出,则直接addWorker, int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // allowCoreThreadTimeOut默认是false,即min默认为corePoolSize if (min == 0 && ! workQueue.isEmpty()) // allowCoreThreadTimeOut=false标示核心线程即使空闲也要保持存活,如果=true,则标示使用keepAliveTime标示存活时间 min = 1; // 如果队列不空,那么至少也要保持一个线程存在 if (workerCountOf(c) >= min) // 如果线程池中线程数量>=min,则直接返回,否则需要addWorker return; // replacement not needed } addWorker(null, false); } }
processWorkerExit(Worker w, boolean completedAbruptly)
参数:
worker: 要结束的worker
completedAbruptly: 是否突然完成(是否因为异常退出)
执行流程:
1、worker数量-1
A、如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的worker线程数量需要-1
B、如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了
2、从Workers Set中移除worker,删除时需要上锁mainlock
3、tryTerminate():在对线程池有负效益的操作时,都需要“尝试终止”线程池,大概逻辑:
判断线程池是否满足终止的状态
A、如果状态满足,但还有线程池还有线程,尝试对其发出中断响应,使其能进入退出流程
B、没有线程了,更新状态为tidying->terminated
4、是否需要增加worker线程,如果线程池还没有完全终止,仍需要保持一定数量的线程
线程池状态是running 或 shutdown
A、如果当前线程是突然终止的,addWorker()
B、如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程
终止线程池
shutdown()平滑终止线程池
// 终止线程池,之前提交的任务会执行完,但是新提交的任务会拒绝,重复调用无影响,该方法不会等待线程池真正的终止。 public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN);// 设置线程状态,该方法只允许设置stop和shutdown,tidying和terminated状态在tryTerminate()方法中设置,内部是利用CAS+循环设置线程状态 interruptIdleWorkers(); // 中断所有的空闲worker onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); // 尝试终止线程池 }
interruptIdleWorkers()中断空闲worker
/ onlyOne如果为true,最多interrupt一个worker;空闲线程:等待任务的线程,即中断在等待任务的线程(没有上锁),中断唤醒继续循环,会判断线程池状态退出获取task private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { // 运行中的worker,是占用锁的,而worker是非重入锁,即运行中的worker不能中断。 try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
tryTerminate()尝试终止线程池
// 以下场景线程池变为terminated,(1)shutdown && wc == 0 && wq.isEmpty()(2)stop状态 // 这个方法必须在任何可能导致线程池终止的情况下被调用,如减少worker数量等。 final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) // 3种情况不需要中断线程池(1)running(2)状态是tidying或terminated(3)SHUTDOWN并且队列不空 return; if (workerCountOf(c) != 0) { // 只有SHUTDOWN并且队列空或者stop状态能到这里 interruptIdleWorkers(ONLY_ONE); // 中断一个正在等待任务的空闲worker return; } // 如果SHUTDOWN队列空,运行的worker也没有了,则可以terminated了 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // tidying状态 try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0));// terminated状态 termination.signalAll();//真正的终止了线程池,唤醒因为调用了awaitTermination()方法而阻塞的线程。 } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
shutdownNow()不友好的终止线程池
// 粗鲁的终止线程池,并返回等待执行的任务列表 public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); // 中断所有线程,包括运行的。 tasks = drainQueue(); // 返回等待处理的任务 } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
interrruptWorkers()中断所有worker
// 中断所有worker private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // state是否大于零,即worker是否已经开始运行并且还未interrupt try { t.interrupt(); } catch (SecurityException ignore) { } } }
注意:运行中的线程调用Thread.interrupt()并不能保证线程被终止,firstTask.run内部可能捕获了InterruptException异常,没有上抛,导致线程一直无法结束。
awaitTermination()等待线程池终止
// 等待线程池终止 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; // 终止返回true if (nanos <= 0) // 超时返回false return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }
3种情况发生之前,awaitTermination()都会被阻塞:
(1)所有任务完成执行,在tryTerminated()中更新状态为terminated之后调用termination.signalAll(),
(2)达到超时时间
(3)当前线程被中断
终止线程池并需要知道其是否终止可以使用如下形式:
exec.shutdown(); try{ while(!exec.awaitTermination(500, TimeUnit.MILLISECONDS)) { LOGGER.debug("Waiting for terminate"); } } catch (InterruptedException e) { //中断处理 }