ThreadPoolExecutor源码解析

java doc中的解释是:

An ExecutorService that executes each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods.

一个使用线程池来执行提交的任务的ExecutorService子类,正常通过Executors工具类中的工厂方法进行配置。

那我们就先看一下比较熟悉的Executors中的几个方法的实现代码:

1.Executors.newCachedThreadPool (threadFactory);

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

60L, TimeUnit.SECONDS,

new SynchronousQueue<Runnable>(),

threadFactory);

}

2.Executors.newFixedThreadPool(nThreads , threadFactory);

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>(),

threadFactory);

}

3.Executors.newScheduledThreadPool(corePoolSize, threadFactory);

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {

return new FinalizableDelegatedExecutorService

(new ThreadPoolExecutor(1, 1,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>(),

threadFactory));

}

可以看到其实这些方法都是通过构造方法创建了ThreadPoolExecutor对象,我们来看下具体的构造方法实现

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue,

ThreadFactory threadFactory) {

this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

threadFactory, defaultHandler);

}

这里我们可以看到ThreadPoolExecutor中比较重要的一些参数,这些参数都是可以通过外部传入,对ThreadPoolExecutor内部进行控制。而ThreadPoolExecutor内部的工作机制究竟是怎样进行的呢?下面我们就揭开它的外衣,深入其中仔细探究。

1.ThreadPoolExecutor继承了AbstractExecutorService类

public class ThreadPoolExecutor extends AbstractExecutorService

2. ThreadPoolExecutor的重要变量参数

ctl:    用来标识线程池状态的重要参数,很多操作执行前都需要对线程池状态进行前置判断,以确定线程池状态是否正常

workQueue:    任务队列,用来在全部当前线程正在处理任务时存储提交来的任务

works:    存储所有工作线程

corePoolSize:    核心线程数

maximumPoolSize:    最大线程数

keepAliveTime:    空闲线程等待任务时间

threadFactory:    线程创建工厂

handler:    因线程池饱和或关闭触发的拒绝异常处理器

//标识线程池控制状态

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//线程池状态类型

 

//接受新的任务并处理队列中的任务

private static final int RUNNING = -1 << COUNT_BITS;

//不接受新任务但处理队列中的任务

private static final int SHUTDOWN = 0 << COUNT_BITS;

//不接受新任务也不处理队列中的任务,且中断正在进行的任务

private static final int STOP = 1 << COUNT_BITS;

//所有任务已经完结,工作线程数为0,并调用terminated方法

private static final int TIDYING = 2 << COUNT_BITS;

//terminated方法执行完成

private static final int TERMINATED = 3 << COUNT_BITS;

 

//任务队列,储存任务以提供给工作线程

private final BlockingQueue<Runnable> workQueue;

 

//主要锁,设置workers和相关数据记录调用

private final ReentrantLock mainLock = new ReentrantLock();

 

//存储所有工作线程,设置时需要加mainLock锁

private final HashSet<Worker> workers = new HashSet<Worker>();

 

//线程池已达到的最大数,设置时需要加mainLock锁

private int largestPoolSize;

 

//已完成任务数,设置时需要加mainLock锁

private long completedTaskCount;

 

//线程创建工厂

private volatile ThreadFactory threadFactory;

 

//因饱和或线程池关闭触发的拒绝异常处理器

private volatile RejectedExecutionHandler handler;

 

//空闲线程等待任务时间(单位:纳秒),到时则会被销毁

private volatile long keepAliveTime;

 

//默认为false,核心线程在空闲时一直存活

//如果为true,核心线程使用keepAliveTime参数来等待任务

private volatile boolean allowCoreThreadTimeOut;

 

//核心线程数

private volatile int corePoolSize;

 

//最大线程数

private volatile int maximumPoolSize;

 

//默认拒绝异常处理器

private static final RejectedExecutionHandler defaultHandler =

new AbortPolicy();

3.execute方法,用户通过该方法提交任务给线程池。处理任务分四种种情况:

(1)如果当前工作线程数小于核心线程数,则创建新的线程来处理任务

(2)如果当前工作线程等于核心线程数,新提交的任务存储到工作队列中

重新检测线程池状态是否正常,如果不是运行状态,则移除任务,并处理拒绝异常

如果线程池正常,工作线程数等于0,则增加工作线程

(3)当工作队列达到最大容量,工作线程数没有达到最大线程数,增加新的工作线程,并处理任务

(4)当工作线程数达到最大线程数,则使用拒绝异常处理器对任务进行处理

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) {

if (addWorker(command, true))

return;

c = ctl.get();

}

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

else if (!addWorker(command, false))

reject(command);

}

4.线程池是怎么增加一个新的线程的呢?接下来我们来看addWorker方法

(1)双重for循环检查线程池是否适合增加新的线程

(2)创建Worker对象并获得mainLock锁

(3)再次检查状态,防止线程工厂失败或线程池关闭

(4)works增加worker对象,并更新largestPoolSize,释放锁

(5)启用worker对象中的线程

(6)由于并发原因,可能会出现线程尚未执行,但线程池正在关闭,因此可能会出现线程池关闭时,错过中断当前线程,

因此再进行一次判断,如果线程池状态为关闭且当前线程未被中断,则手动中断它

private boolean addWorker(Runnable firstTask, boolean core) {

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

 

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN &&

! (rs == SHUTDOWN &&

firstTask == null &&

! workQueue.isEmpty()))

return false;

 

for (;;) {

int wc = workerCountOf(c);

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

return false;

if (compareAndIncrementWorkerCount(c))

break retry;

c = ctl.get(); // Re-read ctl

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

}

 

Worker w = new Worker(firstTask);

Thread t = w.thread;

 

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

// Recheck while holding lock.

// Back out on ThreadFactory failure or if

// shut down before lock acquired.

int c = ctl.get();

int rs = runStateOf(c);

 

if (t == null ||

(rs >= SHUTDOWN &&

! (rs == SHUTDOWN &&

firstTask == null))) {

decrementWorkerCount();

tryTerminate();

return false;

}

 

workers.add(w);

 

int s = workers.size();

if (s > largestPoolSize)

largestPoolSize = s;

} finally {

mainLock.unlock();

}

 

t.start();

// It is possible (but unlikely) for a thread to have been

// added to workers, but not yet started, during transition to

// STOP, which could result in a rare missed interrupt,

// because Thread.interrupt is not guaranteed to have any effect

// on a non-yet-started Thread (see Thread#interrupt).

if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())

t.interrupt();

 

return true;

}

5.在addWorker方法中,我们并没有看到任务具体执行的操作,但是可以很明显地猜测到应该是在调用t.start()方法时进行调用。而线程t是来自于Worker对象,我们来看下内部类Worker(删除了部分代码)。

(1)Worker类继承自AbstractQueuedSynchronizer,实现了Runnable接口

(2)new Worker()时,通过ThreadFactory的newThread方法创建了一个新的线程

(3)当调用addWorker中的t.start()时,其实触发的是run方法中的runWorker(this)

private final class Worker

extends AbstractQueuedSynchronizer

implements Runnable

{

/** Thread this worker is running in. Null if factory fails. */

final Thread thread;

/** Initial task to run. Possibly null. */

Runnable firstTask;

/** Per-thread task counter */

volatile long completedTasks;

 

/**

* Creates with given first task and thread from ThreadFactory.

* @param firstTask the first task (null if none)

*/

Worker(Runnable firstTask) {

setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this);

}

 

/** Delegates main run loop to outer runWorker */

public void run() {

runWorker(this);

}

 

}

6.我们再来看runWorker究竟做了什么操作

(1)while循环保证了线程可以重复执行任务,如果firstTask执行完成后,通过getTask方法从任务队列中获取新的任务继续执行

(2)执行前和执行后分别调用beforExecute和afterExecute两个钩子方法,可以用来在子类中自己实现,比如用于线程池监控

(3)如果处理过程中出现意外情况,在finally中调用processWorkerExit进行处理,主要是对线程记录相关变量进行恢复,且处理当核心线程全部超时而任务队列中有新的任务时,重新增加新线程来处理任务

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock(); // allow interrupts

boolean completedAbruptly = true;

try {

while (task != null || (task = getTask()) != null) {

w.lock();

// If pool is stopping, ensure thread is interrupted;

// if not, ensure thread is not interrupted. This

// requires a recheck in second case to deal with

// shutdownNow race while clearing interrupt

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();

try {

beforeExecute(wt, task);

Throwable thrown = null;

try {

task.run();

} catch (RuntimeException x) {

thrown = x; throw x;

} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

thrown = x; throw new Error(x);

} finally {

afterExecute(task, thrown);

}

} finally {

task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

processWorkerExit(w, completedAbruptly);

}

}

7.最后来看下getTask方法中是怎么获取任务队列中的任务的

(1)判断线程池状态是否正常,根据timed = allowCoreThreadTimeout || wc > corePoolSize来决定队列获取任务的方式是指定keepAliveTime时间进行等待还是阻塞式等待

(2)如果keepAliveTime超时,允许核心线程超时销毁或者当前线程池总量大于核心线程数,则getTask()返回null,回溯到runWorker方法中,则while循环结束,即线程执行完成,此线程将被销毁。

private Runnable getTask() {

boolean timedOut = false; // Did the last poll() time out?

 

retry:

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

 

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

decrementWorkerCount();

return null;

}

 

boolean timed; // Are workers subject to culling?

 

for (;;) {

int wc = workerCountOf(c);

timed = allowCoreThreadTimeOut || wc > corePoolSize;

 

if (wc <= maximumPoolSize && ! (timedOut && timed))

break;

if (compareAndDecrementWorkerCount(c))

return null;

c = ctl.get(); // Re-read ctl

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

 

try {

Runnable r = timed ?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

workQueue.take();

if (r != null)

return r;

timedOut = true;

} catch (InterruptedException retry) {

timedOut = false;

}

}

}

以上对ThreadPoolExecutor中的主要部分都进行了解析。相信大家应该对该类的实现有了大概的了解,下面对一些细节方面补充说明:

1.RejectExceptionHandler四种方式

(1)默认为AbortPolicy,任务直接被抛弃,抛出RejectedExecutionException异常

(2)DiscardPolicy,同AbortPolicy一样,只是不抛出异常

(3)DiscardOldestPolicy,将队列中最早的任务抛弃,然后执行当前任务

(4)CallerRunsPolicy,使用主线程执行任务,减缓任务提交,以等待线程池中的线程执行

来源: <http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html>

时间: 2024-10-20 10:57:58

ThreadPoolExecutor源码解析的相关文章

【Java并发编程】21、线程池ThreadPoolExecutor源码解析

一.前言 JUC这部分还有线程池这一块没有分析,需要抓紧时间分析,下面开始ThreadPoolExecutor,其是线程池的基础,分析完了这个类会简化之后的分析,线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法.下面开始分析. 二.ThreadPoolExecutor数据结构 在ThreadPoolExecutor的内部,主要由BlockingQueue和AbstractQu

Java 1.7 ThreadPoolExecutor源码解析

相比1.6,1.7有些变化: 1.        增加了一个TIDYING状态,这个状态是介于STOP和TERMINATED之间的,如果执行完terminated钩子函数后状态就变成TERMINATED了: 2.        内部类Worker继承了AQS类作为一个独享锁,在运行每个任务前会获取自己的锁: 3.        runState和poolSize两个字段被合并成一个原子字段ctl了,不再使用mainLock保护了. 原文转载:http://blog.csdn.net/yuenki

线程池之ThreadPoolExecutor源码解析

1.变量 ThreadPoolExecutor先定义了这几个常量,初看时一脸懵逼,其实它就是用int的二进制高三位来表示线程池的状态, 先回顾一下位运算: <<’左移:右边空出的位置补0,其值相当于乘以2. ‘>>’右移:左边空出的位,如果是正数则补0,若为负数则补0或1,取决于所用的计算机系统OS X中补1.其值相当于除以2. 负数二进制由它的绝对值取反后加1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RU

线程池技术之:ThreadPoolExecutor 源码解析

java中的所说的线程池,一般都是围绕着 ThreadPoolExecutor 来展开的.其他的实现基本都是基于它,或者模仿它的.所以只要理解 ThreadPoolExecutor, 就相当于完全理解了线程池的精髓. 其实要理解一个东西,一般地,我们最好是要抱着自己的疑问或者理解去的.否则,往往收获甚微. 理解 ThreadPoolExecutor, 我们可以先理解一个线程池的意义: 本质上是提供预先定义好的n个线程,供调用方直接运行任务的一个工具. 线程池解决的问题: 1. 提高任务执行的响应

Java ThreadPoolExecutor源码解析之Execute流程

execute方法示意图如上,workers为HashSet类型,存储初始化任务RUNNING或SHUTDOWN但firsttask为null的worker. workQueue为BlockingQueue,存储提交的执行任务. 原文地址:https://www.cnblogs.com/codegod/p/9062789.html

第十四章 Executors源码解析

前边两章介绍了基础线程池ThreadPoolExecutor的使用方式.工作机理.参数详细介绍以及核心源码解析. 具体的介绍请参照: 第十二章 ThreadPoolExecutor使用与工作机理 第十三章 ThreadPoolExecutor源码解析 1.Executors与ThreadPoolExecutor ThreadPoolExecutor 可以灵活的自定义的创建线程池,可定制性很高 想创建好一个合适的线程池比较难 使用稍微麻烦一些 实际中很少使用 Executors 可以创建4种线程池

JDK 源码解析 —— Executors ExecutorService ThreadPoolExecutor 线程池

零. 简介 Executors 是 Executor.ExecutorService.ThreadFactory.Callable 类的工厂和工具方法. 一. 源码解析 创建一个固定大小的线程池:通过重用共享无界队列里的线程来减少线程创建的开销.当所有的线程都在执行任务,新增的任务将会在队列中等待,直到一个线程空闲.由于在执行前失败导致的线程中断,如果需要继续执行接下去的任务,新的线程会取代它执行.线程池中的线程会一直存在,除非明确地 shutdown 掉. public static Exec

Java源码解析 - ThreadPoolExecutor 线程池

1 线程池的好处 线程使应用能够更加充分合理地协调利用CPU.内存.网络.I/O等系统资源.线程的创建需要开辟虚拟机栈.本地方法栈.程序计数器等线程私有的内存空间;在线程销毁时需要回收这些系统资源.频繁地创建和销毁线程会浪费大量的系统资源,增加并发编程风险. 在服务器负载过大的时候,如何让新的线程等待或者友好地拒绝服务? 这些都是线程自身无法解决的;所以需要通过线程池协调多个线程,并实现类似主次线程隔离.定时执行.周期执行等任务. 线程池的作用包括:●利用线程池管理并复用线程.控制最大并发数等●

Android AsyncTask 源码解析

1. 官方介绍 public abstract class AsyncTask extends Object  java.lang.Object    ? android.os.AsyncTask<Params, Progress, Result> AsyncTask enables proper and easy use of the UI thread. This class allows to perform background operations and publish resul