相比1.6,1.7有些变化:
1、 增加了一个TIDYING状态,这个状态是介于STOP和TERMINATED之间的,如果执行完terminated钩子函数后状态就变成TERMINATED了;
2、 内部类Worker继承了AQS类作为一个独享锁,在运行每个任务前会获取自己的锁;
3、 runState和poolSize两个字段被合并成一个原子字段ctl了,不再使用mainLock保护了。
原文转载:http://blog.csdn.net/yuenkin/article/details/51040001
一、成员变量介绍
[java] view plain copy
- public class ThreadPoolExecutor extends AbstractExecutorService {
- /**
- * ctl字段其实表示两个含义:runState和workerCount(近似1.6中的poolSize)
- * int类型,高3位表示runState,低29位表示workerCount。目前这个版本也就限
- * 制了线程个数不会超过2^29-1。
- * RUNNING: 能接受新的任务且能处理队列里的请求
- * SHUTDOWN: 不能接受新的任务但是能处理队列里的请求
- * STOP: 不能接受新的任务、不能处理队列里的请求,workers会被interrupt
- * TIDYING: 所有的线程都已经terminated了,正准备调用terminated()方法
- * TERMININATED: terminated()方法已经调用结束了
- *
- * RUNNING->SHUTDOWN: 调用shutdown方法
- * (RUNNING/SHUTDOWN)>STOP: 调用shutdownNow方法
- * SHUTDOWN->TIDYING: 当workers和queue都空的时候
- * STOP->TIDYING: 当workers为空的时候
- * TIDYING->TERMINATED: 当terminated方法调用结束的时候。
- * awaitTermination()直到状态为TERMINATED时才会返回。
- *
- */
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- private static final int COUNT_BITS = Integer.SIZE - 3;
- private static final int CAPACITY = (1 << COUNT_BITS) - 1;
- // runState is stored in the high-order bits
- private static final int RUNNING = -1 << COUNT_BITS;
- private static final int SHUTDOWN = 0 << COUNT_BITS;
- private static final int STOP = 1 << COUNT_BITS;
- private static final int TIDYING = 2 << COUNT_BITS;
- private static final int TERMINATED = 3 << COUNT_BITS;
- // 取ctl的高三位,获取runState(运行状态)
- private static int runStateOf(int c) { return c & ~CAPACITY; }
- // 取ctl的低29位,获取workerCount(worker的数量)
- private static int workerCountOf(int c) { return c & CAPACITY; }
- // 把runState和workerCount合并成ctl,上面两个函数的反操作
- private static int ctlOf(int rs, int wc) { return rs | wc; }
二、execute函数
[java] view plain copy
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- /*
- * 三步走:
- * 1. 如果RUNNING的线程数目小于corePoolSize,直接调用addWorker方法
- * 启动一个新线程。addWorker函数会检查runState和workerCount,如果不
- * 需要新建一个thread就会返回false了
- *
- * 2. 如果任务被成功的放入了workQueue,我们仍然需要做个double-check
- * 因为调用完isRunning(c)后池中的线程可能都退出了或者线程池被shut
- * down了。重新检查状态看是要remove掉新来的任务还是创建一个新线程来执
- * 行(如果没有活动的线程了)
- *
- * 3. 如果放入workQueue失败了,我们尝试创建一个新worker。如果失败了,
- * 说明线程池被关闭了或者饱和了(超过最大值了),就直接拒了。
- *
- */
- int c = ctl.get();
- if (workerCountOf(c) < corePoolSize) {
- // addWorker有可能会失败,失败后重新获取状态并继续往下走
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- // 如果isRunning(c)&&workQueue.offer中间并发发生了shutdown,需要remove
- // 掉刚放入workQueue的command任务。注意:此时如果有一个worker刚执行完一个task
- // 然后从workQueue获取下一个task时,这里的remove就会失败了。
- if (! isRunning(recheck) && remove(command))
- reject(command);
- // 如果是RUNNING状态但是没有可工作的线程,需要直接new一个
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- else if (!addWorker(command, false))
- reject(command);
- }
execute函数大体思路和1.6一致,就三种情况:
① 当前线程池中线程数目小于corePoolSize,直接new一个thread;
② 当先线程池数据大于corePoolSize,则放入workQueue中;
③ 如果workQueue满了且线程池中线程数小于maximumPoolSize,则new一个thread。
[java] view plain copy
- private boolean addWorker(Runnable firstTask, boolean core) {
- retry:
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // 如果被shutdown了,一般就直接返回false。但是需要排除一个特例情况:当线程池状
- // 态是shutdown,但workQueue不空且workers空了,会调用addWorker(null,false)
- // 方法创建一个线程处理workQueue里的任务,这时不能直接返回false。
- if (rs >= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
- for (;;) {
- int wc = workerCountOf(c);
- // 如果当前workers数目大于CAPACITY或者大于用户设置了,直接返回false
- if (wc >= CAPACITY ||
- wc >= (core ? corePoolSize : maximumPoolSize))
- return false;
- if (compareAndIncrementWorkerCount(c))
- break retry;
- c = ctl.get(); // Re-read ctl
- // 如果仅仅是workerCount变化了,那么继续内层的循环;如果连runState也变化了,
- // 则要重新继续外层的循环。
- if (runStateOf(c) != rs)
- continue retry;
- }
- }
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- final ReentrantLock mainLock = this.mainLock;
- w = new Worker(firstTask);
- final Thread t = w.thread;
- if (t != null) {
- mainLock.lock();
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- int c = ctl.get();
- int rs = runStateOf(c);
- // 再次检查runState的状态,如果是RUNNING或者SHUTDOWN但是firstTask不空,则
- // 把new出来的worker放入workers中。
- if (rs < SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- workers.add(w);
- int s = workers.size();
- if (s > largestPoolSize)
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- if (workerAdded) {
- // 创建worker成功后直接启动线程了
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted)
- // 创建失败要做清理操作
- addWorkerFailed(w);
- }
- return workerStarted;
- }
addWorker函数尝试新建一个thread来运行传递给它的task。当线程池被STOP或SHUTDOWN或threadFactory返 回null时或者OOM时,会返回false并做相应的清理。整个过程分为两步:1、尝试设置workerCount,成功了就到步骤2;2、尝试创建一 个worker并加入到workers里。
[java] view plain copy
- private void addWorkerFailed(Worker w) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- if (w != null)
- workers.remove(w);
- decrementWorkerCount();
- tryTerminate();
- } finally {
- mainLock.unlock();
- }
- }
addWorkerFailed函数做些清理操作:1、把创建的worker从workers中删除;2、把workerCount减1;3、检查是否可以terminated线程池,防止这个worker的存在导致执行awaitTermination操作的客户端线程阻塞了。
[java] view plain copy
- final void tryTerminate() {
- for (;;) {
- int c = ctl.get();
- // 如果是以下三种情况直接返回:
- // 1.RUNNING状态; 2.runState>=TIDYING,说明有其他线程执行了tryTerminate操
- // 作; 3.SHUTDOWN状态且workQueue不空
- if (isRunning(c) ||
- runStateAtLeast(c, TIDYING) ||
- (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
- return;
- // 如果workerCount大于0,则中断一个空闲的worker,就返回了。为啥只中断一个呢?
- // 因为worker线程退出时也会调用tryTerminate方法(一个接一个的传播)
- if (workerCountOf(c) != 0) { // Eligible to terminate
- interruptIdleWorkers(ONLY_ONE);
- return;
- }
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 走到这里说明workers数量为0了,尝试把线程池状态改成TIDYING并调用terminated
- // 函数->状态再设置成TERMINATED。如果设置TIDYING失败,则继续循环。
- if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
- try {
- terminated();
- } finally {
- // terminated函数抛异常也需要执行下面的操作。
- ctl.set(ctlOf(TERMINATED, 0));
- termination.signalAll();
- }
- return;
- }
- } finally {
- mainLock.unlock();
- }
- // else retry on failed CAS
- }
- }
tryTerminate函数尝试TERMINATED线程 池(当a、SHUTDOWN且queue和pool都空;b、STOP且queue为空了)。如果workers不为0,则中断任意一个空闲的 worker后直接返回。否则:首先,将线程池状态改成TIDYING;其次,调用用户的钩子函数terminated;最后,将状态设置成 TERMINATED。
[java] view plain copy
- private void interruptIdleWorkers(boolean onlyOne) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (Worker w : workers) {
- Thread t = w.thread;
- // 如果tryLock成功,就说明这个worker是空闲的。
- if (!t.isInterrupted() && w.tryLock()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- } finally {
- w.unlock();
- }
- }
- if (onlyOne)
- // 如果只中断一个就break,只有tryTerminate函数中使用到这种情况。
- break;
- }
- } finally {
- mainLock.unlock();
- }
- }
interruptIdleWorkers函数根据onlyOne参数决定中断一个或所有空闲的workers(这些workers都阻塞在getTask方法中)。
三、shutdown函数
[java] view plain copy
- public void shutdown() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 检查调用者是否有权限执行shutdown
- checkShutdownAccess();
- // 将线程池的状态改成SHUTDOWN
- advanceRunState(SHUTDOWN);
- // 中断所有空闲的workers
- interruptIdleWorkers();
- onShutdown(); // hook for ScheduledThreadPoolExecutor
- } finally {
- mainLock.unlock();
- }
- // 尝试终止线程池
- tryTerminate();
- }
shutdown函数就执行几步:把状态改成SHUTDOWN,中断所有空闲的workers,调用onShutdown钩子函数,最后调用tryTerminate尝试终止线程池。
[java] view plain copy
- private void advanceRunState(int targetState) {
- for (;;) {
- int c = ctl.get();
- if (runStateAtLeast(c, targetState) ||
- ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
- break;
- }
- }
advanceRunState函数将线程池的状态改成指定状态值,如果现在状态值比target值大就直接返回。targeState的值是SHUTDOWN或者STOP,不能是TIDYING或者TERMINATED(这两种状态需要调用tryTerminate函数设置)。
四、shutdownNow函数
[java] view plain copy
- public List<Runnable> shutdownNow() {
- List<Runnable> tasks;
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 检查调用者是否有权限执行关闭
- checkShutdownAccess();
- // 将线程池的状态改成STOP
- advanceRunState(STOP);
- // 和shutdown不同,这里中断所有的worker线程
- interruptWorkers();
- // 删除workQueue里的任务并返回任务列表
- tasks = drainQueue();
- } finally {
- mainLock.unlock();
- }
- // 尝试终止线程池
- tryTerminate();
- return tasks;
- }
shutdownNow函数会中断所有的worker线程,删除workQueue里的任务,最后尝试终止线程池并返回workQueue里的任务。
[java] view plain copy
- private void interruptWorkers() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 中断所有的worker线程
- for (Worker w : workers)
- w.interruptIfStarted();
- } finally {
- mainLock.unlock();
- }
- }
[java] view plain copy
- private List<Runnable> drainQueue() {
- BlockingQueue<Runnable> q = workQueue;
- List<Runnable> taskList = new ArrayList<Runnable>();
- q.drainTo(taskList);
- if (!q.isEmpty()) {
- for (Runnable r : q.toArray(new Runnable[0])) {
- if (q.remove(r))
- taskList.add(r);
- }
- }
- return taskList;
- }
五、Worker内部类
[java] view plain copy
- private final class Worker
- extends AbstractQueuedSynchronizer
- implements Runnable
- {
- /** Thread this worker is running in. Null if factory fails. */
- final Thread thread;
- /** Initial task to run. Possibly null. */
- Runnable firstTask;
- /** Per-thread task counter */
- volatile long completedTasks;
- /**
- * Creates with given first task and thread from ThreadFactory.
- * @param firstTask the first task (null if none)
- */
- Worker(Runnable firstTask) {
- // 初始值为-1,防止worker还没启动就被interrupt了;在start开始时会将状态改成0
- setState(-1); // inhibit interrupts until runWorker
- this.firstTask = firstTask;
- this.thread = getThreadFactory().newThread(this);
- }
- protected boolean isHeldExclusively() {
- return getState() != 0;
- }
- protected boolean tryAcquire(int unused) {
- if (compareAndSetState(0, 1)) {
- setExclusiveOwnerThread(Thread.currentThread());
- return true;
- }
- return false;
- }
- protected boolean tryRelease(int unused) {
- setExclusiveOwnerThread(null);
- setState(0);
- return true;
- }
- // 参数1没有意义,是独占锁
- public void lock() { acquire(1); }
- public boolean tryLock() { return tryAcquire(1); }
- public void unlock() { release(1); }
- public boolean isLocked() { return isHeldExclusively(); }
Worker类主要维护着中断的管理和其他操作(runWorker函数),继承了AQS类实现了一个不可重入的Lock,在获取到一个任务后,准备执行前首先要获取这个锁。同时,在中断空闲的worker时也要先获取到这个锁。
[java] view plain copy
- public void run() {
- runWorker(this);
- }
[java] view plain copy
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- // 有时我们不想从workQueue取第一个任务,直接执行刚提交的任务
- Runnable task = w.firstTask;
- w.firstTask = null;
- // 把state设置成0,允许中断
- w.unlock(); // allow interrupts
- boolean completedAbruptly = true;
- try {
- // 进入循环了
- while (task != null || (task = getTask()) != null) {
- w.lock();
- // 如果是STOP状态,需要保证线程是被中断了的;
- // 如果不是需要清空中断状态,但是需要重新检查下状态防止在清除
- // 中断时发生了shutdownNow
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- // 执行前的钩子函数
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- task.run();
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- // 执行后的钩子函数
- afterExecute(task, thrown);
- }
- } finally {
- task = null;
- w.completedTasks++;
- w.unlock();
- }
- }
- completedAbruptly = false;
- } finally {
- processWorkerExit(w, completedAbruptly);
- }
- }
runWorker函数循环从workQueue里获取 task并执行,但是需要注意以下几个问题:1.如果不想从workQueue里获取第一个任务执行,那就给worker.firstTask赋值。2、 如果getTask获取的值为null,或者你的task里抛异常了,那循环就退出了,然后worker线程也就退出了。3、在执行任务前先要获取 worker的锁,这里防止中断正在执行的线程。4、如果你的钩子函数beforeExecute函数抛异常了,那么你的任务就不会被执行 了,worker线程也会退出。5、如果task.run方法抛出Runtime或Error异常,会原样抛出,如果是Throwable,则会包装成一 个Error抛出,抛出异常前会执行afterExecute钩子函数,最后线程会退出。6、如果afterExecute钩子函数抛出异常,那么 worker线程也会退出。
[java] view plain copy
- private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
- retry:
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // 如果SHUTDOWN且workQueue为空,或者STOP了,worker线程直接退出
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
- // 是否要回收这个worker线程?
- boolean timed; // Are workers subject to culling?
- for (;;) {
- int wc = workerCountOf(c);
- timed = allowCoreThreadTimeOut || wc > corePoolSize;
- // 如果还没有超时过(循环第一次执行到这里)直接break
- if (wc <= maximumPoolSize && ! (timedOut && timed))
- break;
- // 否则,如果线程数大于最大限制或者已经超时过了说明这个worker线程要准备退出了
- // 先设置workerCount-1,成功的话直接退出;否则,看下runState是否和rs一样,如
- // 果一样就在内部循环,不一样就要到外部循环
- if (compareAndDecrementWorkerCount(c))
- return null;
- c = ctl.get(); // Re-read ctl
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- try {
- // 无限阻塞或超时阻塞
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- // 没有获取到task肯定是超时了
- timedOut = true;
- } catch (InterruptedException retry) {
- // 如果被中断了,不能算作超时
- timedOut = false;
- }
- }
- }
getTask函数是从workQueue里获取一个task,有两种策略(无限阻塞或者超时,具体要看客户端的配置)。如果这个函数返回了null,那么worker线程就会退出了。退出的原因不外乎以下几种:
1. 当前线程池中worker数量大于maximumPoolSize了;
2. 线程池被STOP了(workQueue.poll/take时会捕获到InterruptedException异常);
3. 线程池被SHUTDOWN了且workQueue为空(workQueue.poll/take时会捕获到InterruptedException异常);
4. 获取task超时了(timedOut)&&(timed)。
[java] view plain copy
- private void processWorkerExit(Worker w, boolean completedAbruptly) {
- // 用户的函数抛异常了,需要调整workerCount的值,因为worker线程准备退出了
- if (completedAbruptly) // If abrupt, then workerCount wasn‘t adjusted
- decrementWorkerCount();
- // 做些统计操作(bookkeeping)
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- completedTaskCount += w.completedTasks;
- workers.remove(w);
- } finally {
- mainLock.unlock();
- }
- // 尝试终止线程池
- tryTerminate();
- int c = ctl.get();
- // 如果是RUNNING或SHUTDOWN状态,要看下workQueue是否为空,
- // 不能直接退出。如果workQueue不空,至少要保留1或corePoolSize个
- // 线程(看allowCoreThreadTimeOut配置)。少于这个数目,就需要通过
- // addWorker(null,false)方法补充新的线程进来。
- if (runStateLessThan(c, STOP)) {
- if (!completedAbruptly) {
- int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
- if (min == 0 && ! workQueue.isEmpty())
- min = 1;
- if (workerCountOf(c) >= min)
- return; // replacement not needed
- }
- addWorker(null, false);
- }
- }
processWorkerExit函数是在runWork 循环退出后做的清理和bookkeeping(应该就是指completedTaskCount等变量的操作吧)操作。 completedAbruptly参数的含义是指用户的函数是否抛异常了(before/after/run等)。注意下函数最后会根据线程池的状态和 配置决定是否新建一个worker线程。
原文地址:https://www.cnblogs.com/AndyAo/p/8135063.html