Executor-ThreadPoolExecutor实现

1、ThreadPoolExecutor的主要作用

在Oracle中对ThreadPoolExecutor的作用进行了说明:1、在大量的异步任务到达的情况下,使用线程池能够提升性能;2、提供一种资源管理和调度的方法。

一般通过Executors的工厂方法来生成一个线程池对象,Executors提供了多种方法来构造不同的线程池:1、带有缓存性质的线程池 Executors.newCachedThreadPool(),线程池的大小不固定,并且会随着使用情况自动调整线程池的大小;2、固定大小的线程池Executors.newFixedThreadPool(int);3、单一线程池,只有一个后台线程,Executors.newSingleThreadExecutor()

2、线程池控制属性

在使用ThreadPoolExecutor可以指定一些参数来对线程池进行控制。

当前线程数 poolSize

线程池中当前线程的个数

核心线程数  corePoolSize

线程池中核心线程的个数,也就是能够长期存活的线程个数

最大线程数 maximumPoolSize

corePoolSize和maximumPoolSize二者之间共同合作来管理线程池中线程的数量,同时能够控制线程池的行为。当一个任务被提交到线程池中,根据当前线程池中线程数目的不同,线程池的处理也不一样。当前线程数用poolSize表示

最大线程数 largestPoolSize

largestPoolSize并不参与线程池的控制,它用来记录,线程池中曾经同时运行的最大线程数目。

等待时间 keepAliveTime

等待时间是用来控制超过corePoolSize部分的线程的空闲时间。当线程池中的线程数超过了corePoolSize时,超出部分的线程在空闲时间达到了keepAliveTime指定的时候之后,就会被终止,这样当对线程池使用不是很多的时候,可以节省机器资源。

3、线程池的状态

RUNNING 可以接受新任务

SHUTDOWN 不接受新任务,正在排队的已提交任务会去执行

STOP 不接受新任务,不执行已提交的正在排队的任务,同时中断所有正在执行的任务

TERMINATED  除了所有任务已经终止外,其它表象和STOP相同。

4、线程池的调度

当一个任务进入线程时, 根据线程池中当前线程的数据和配置的corePoolSize以及maximumPoolSize的值,线程池执行的动作也不一样。

a、poolSize < corePoolSize

当线程池中的数目小于指定的核心线程数时,线程池会为新提交的任务创建一个线程来执行任务。

b、poolSize>=corePoolSize

如果可以放入等待队列中,则放在等待队列中,等待其它线程执行完任务之后来执行此任务

如果无法放入等待队列中,则检查poolSize是否小于maximum,小于的话则新建一个线程来执行任务。如果已经达到线程数上限了,就会提示用户拒绝任务提交。

5、实现

首先来看ThreadPoolSize中定义的几个属性

private final BlockingQueue<Runnable> workQueue;

workQueue是已经提交到线程池,但还未来得及执行的任务。

private final HashSet<Worker> workers = new HashSet<Worker>();

workers中存放线程池中当前正在工作的线程信息。ThreadPoolExecutor使用Worker来对任务进行了包装。

private volatile long  keepAliveTime ;

private volatile int   corePoolSize ;

private volatile int   maximumPoolSize ;

private volatile int   poolSize ;

private int largestPoolSize;

还有一些属性,对于理解整个线程池的运作原理不那么重要,这里就没有列出。

任务进入线程池

在《Executor实现----AbstractExecutorService实现分析》中分析过,在submit提交任务中,最终是通过线程池的execute方法来实现Executors框架对任务的执行。

下面来看ThreadPoolExecutor的具体实现

public void execute (Runnable
command) {

if (command
== null)

throw new NullPointerException();

if (poolSize >= corePoolSize ||
!addIfUnderCorePoolSize(command)) {

if (runState == RUNNING && workQueue.offer(command))
{

if (runState != RUNNING || poolSize ==
0)

ensureQueuedTaskHandled(command);

}

else if (!addIfUnderMaximumPoolSize(command))

reject(command); // is shutdown or
saturated

}

}

通过if (poolSize >= corePoolSize ||
!addIfUnderCorePoolSize(command)) 来判断当前是否创建一个线程来执行任务。

如果线程池中线程数还没有达到核心线程数,就直接调用addIfUnderCorePoolSize来创建线程执行任务。

如果线程池中线程的个数已经达到了核心线程数,就使用workQueue.offer(command)把任务放入等待队列中。

如果workQueue.offer操作返回false,表示等待队列已满,无法放入更多任务,调用 addIfUnderMaximumPoolSize来检查当前是否已经达到最大线程数,是否还可以创建新的线程来执行任务。

addIfUnderCorePoolSize和addIfUnderMaximumPoolSize的代码如下

private boolean addIfUnderCorePoolSize (Runnable
firstTask) {

Thread t = null;

final ReentrantLock
mainLock = this.mainLock;

mainLock.lock();

try {

if (poolSize < corePoolSize && runState == RUNNING)

t = addThread(firstTask);

finally {

mainLock.unlock();

}

return t
!= null ;

}

private boolean addIfUnderMaximumPoolSize(Runnable
firstTask) {

Thread t = null;

final ReentrantLock
mainLock = this.mainLock;

mainLock.lock();

try {

if (poolSize < maximumPoolSize && runState == RUNNING)

t = addThread(firstTask);

finally {

mainLock.unlock();

}

return t
!= null ;

}

检查poolSize是否满足创建线程的条件,然后调用addThread来创建线程去执行任务。下面来看下addThread方法的具体实现

private Thread addThread(Runnable
firstTask) {

Worker w = new Worker(firstTask);

Thread t = threadFactory.newThread(w);

boolean workerStarted
false;

if (t
!= null ) {

if (t.isAlive()) //
precheck that t is startable

throw new IllegalThreadStateException();

w. thread =
t;

workers.add(w);

int nt
= ++poolSize ;

if (nt
> largestPoolSize )

largestPoolSize =
nt;

try {

t.start();

workerStarted = true;

}

finally {

if (!workerStarted)

workers.remove(w);

}

}

return t;

}

首先threadFactory.newThread创建一个新线程,然后把任务添加workers中。把任务封装成一个Worker对象,然后把这个Worker对象封装成一个Thread对象。

然后启动这个线程。注意这里对largestPoolSize的设置,从这里可以看出,只要当前线程池中线程数大于largestPoolSize,就更新largestPoolSize字段,也就是用largestPoolSize来记录线程池中历史线程最大数。

从addThread中可以看出,任务的执行最终是通过Wokrer来实现的。那一个问题是,当前这个任务执行完成后,线程是销毁,还是继续执行?如果执行的话,又如何取任务?在线程池中线程数已经超过corePoolSize的情况下,线程池又是如何控制线程池中线程的数目的?

这些都需要通过Worker来了解,下面来看Worker类的具体实现。

private final class Worker implements Runnable

Worker实现了Runnable接口,所以才能被封装到线程中( Thread t = threadFactory .newThread(w);就是做的这样的封装),在线程启动的时候,执行run方法。

private Runnable firstTask;

Thread thread;

volatile boolean hasRun = false;

这是Worker的三个属性(还有其它几个属性没有列出来)。firstTask初始化Worker对象时执行的任务,也就是worker启动时要执行的任务。thread是worker任务执行所在的线程,每个worker都属于一个线程,在这个线程中执行。hasRun是当前任务的执行状态,表示任务是不是已经执行完成

开门见山,直接看Worker的run方法

public void run()
{

try {

hasRun = true;

Runnable task = firstTask;

firstTask = null ;

while (task
!= null || (task = getTask()) != null)
{

runTask(task);

task = null;

}

finally {

workerDone( this);

}

}

}

注意这里的while循环,取任务--执行任务--取任务---执行任务 正常情况下,只要能正常获取到任务,这个线程就会一直执行下去。

所以,当线程池中提交的任务源源不断时,线程池中的线程数就不会减少。那么当线程池中提交的任务减少,while循环的条件不满足getTask返回null,或者抛出异常,就会到workerDone,来实现线程池中多余线程的销毁,节省资源。

OK,这里就有个问题,如果这样的话,要如何实现线程池中corePoolSize线程数的长期存活呢?如果实现超出corePoolSize的部分才会在空闲一段时间后销毁?答案在getTask中

Runnable getTask() {

for (;;)
{

try {

int state
= runState ;

if (state
SHUTDOWN)

return null ;

Runnable r;

if (state
== SHUTDOWN)  //
Help drain queue

r = workQueue.poll();

else if (poolSize > corePoolSize || allowCoreThreadTimeOut )

r = workQueue.poll(keepAliveTime ,
TimeUnit.NANOSECONDS);

else

r = workQueue.take();

if (r
!= null )

return r;

if (workerCanExit())
{

if (runState >= SHUTDOWN) //
Wake up others

interruptIdleWorkers();

return null ;

}

// Else retry

catch (InterruptedException
ie) {

// On interruption, re-check runState

}

}

}

注意当poolSize>corePoolSize时,从任务队列中获取任务时,是通过poll,并指定了超时时间(keepAliveTime在这里起效),这样当线程池中任务较少时,这里会返回null,workerCanExit返回true,所以整个getTask就返回null,这样就使得while循环条件不为true,从而执行workerDone,结束这个超出核心线程数的线程。

而在poolSize<=corePoolSize时,take使得线程阻塞。直到有新任务到来为止。

通过这两种不同的情况,来实现根据当前线程池中任务的多少去进行超出corePoolSize的线程的动态调整。

下面是一个流程示意图

时间: 2024-07-29 23:13:12

Executor-ThreadPoolExecutor实现的相关文章

Java多线程之~~~使用ThreadPoolExecutor来创建线程

以前我们创建线程的时候都是主动的new一个Thread,然后调用他们的start方法,但是如果线程非常多,任务也非 常多的时候,这样写就会显得非常麻烦,当然可能效率也不是很高,Java给我们提供了叫线程创建器这个样概念的类, 他可以帮助我们管理这些线程,你做的就是编写好代码,然后交给他,她就会自动帮你运行. 当然,带cache的threadpool 对于死掉的线程重新调用,在性能上也会有非常好的表现,但是不能将太多的线程交 给他管理,否则就会把系统拖垮,下面我们来做一个例子. package c

java并发之线程执行器(Executor)

线程执行器和不使用线程执行器的对比(优缺点) 1.线程执行器分离了任务的创建和执行,通过使用执行器,只需要实现Runnable接口的对象,然后把这些对象发送给执行器即可. 2.使用线程池来提高程序的性能.当发送一个任务给执行器时,执行器会尝试使用线程池中的线程来执行这个任务.避免了不断创建和销毁线程导致的性能开销. 3.执行器可以处理实现了Callable接口的任务.Callable接口类似于Runnable接口,却提供了两方面的增强: a.Callable主方法名称为call(),可以返回结果

Java并发编程-Executor框架之Callable和Future接口

在上一篇文章中我们已经了解了Executor框架进行线程管理,这篇文章将学习Executor框架的另一个特性,我们知道执行Runnable任务是没有返回值得,但Executor可以运行并发任务并获得返回值,Concurrent包提供下面两个接口实现这个功能: Callable接口:这个接口声明call(),类似于Runnable的run(),可以在这个方法里实现任务的具体逻辑操作.Callable是一个泛型接口,必须声明call()的返回类型. Future接口:这个接口声明了一下方法来获取Ca

ThreadPoolExecutor介绍

ThreadPoolExecutor的说明 ThreadPoolExecutor常见的操作主要有以下几个方法: getPoolSize():返回线程池实际的线程数. getActiveCount():返回在执行者中正在执行任务的线程数. getCompletedTaskCount():返回执行者完成的任务数. submit(): 提交一个线程给线程执行者,如果执行者有空余线程,则直接执行:否则等待直到有空闲线程.这里调用sumbit后,并不会阻塞调用线程.调用者所在的线程和执行的线程并发运行.

Java并发编程-Executor框架(转)

本文转自http://blog.csdn.net/chenchaofuck1/article/details/51606224 感谢作者 我们在传统多线程编程创建线程时,常常是创建一些Runnable对象,然后创建对应的Thread对象执行它们,但是如果程序需要并发执行大量的任务时,需要为每个任务都创建一个Thread,进行管理,这将会影响程序的执行效率,并且创建线程过多将会使系统负载过重. 在JDK 1.5之后通过了一套Executor框架能够解决这些问题,能够分解任务的创建和执行过程.该框架

[python] ThreadPoolExecutor线程池 python 线程池

初识 Python中已经有了threading模块,为什么还需要线程池呢,线程池又是什么东西呢?在介绍线程同步的信号量机制的时候,举得例子是爬虫的例子,需要控制同时爬取的线程数,例子中创建了20个线程,而同时只允许3个线程在运行,但是20个线程都需要创建和销毁,线程的创建是需要消耗系统资源的,有没有更好的方案呢?其实只需要三个线程就行了,每个线程各分配一个任务,剩下的任务排队等待,当某个线程完成了任务的时候,排队任务就可以安排给这个线程继续执行. 这就是线程池的思想(当然没这么简单),但是自己编

ThreadPoolExecutor线程池和ProcessPoolExecutor进程池

ProcessPoolExecutor线程池 1.为什么需要线程池呢,如果创建了20个线程,而同时只允许3个线程在运行,但是20个线程都需要创建和销毁,线程的创建是需要消耗系统资源的,所以线程池的思想就是:每个线程各分配一个任务,剩下的任务皮队等待,当某个线程完成了任务的时候,排队任务就可以安排给这个线程继续执行 2.标准库concurrent.futures模块,它提供了 ProcessPoolExecutor和ThreadPoolExecutor两个类,实现了对threading和multi

爬虫获取mobike共享单车信息

背景:端午节假期的时候参加了学校的数学建模比赛,题目是关于共享单车的供需匹配问题,需要获得共享单车的数量和时空分布情况. 在苦苦找寻数据无果的情况下决定自己用爬虫对天津地区的mobike进行统计. 在网上找到了这篇爬虫的代码,本着少造轮子的基本原则,我选择了这个代码进行统计,这里记录一下历程,方便日后查阅. 先上原作者github地址:git clone https://github.com/derekhe/mobike-crawler.python3环境,爬取的是微信小程序,之前是可以爬手机客

Android异步消息处理机制(3)asyncTask基本使用

本文翻译自android官方文档,结合自己测试,整理如下. 概述 AsyncTask抽象类,翻译过来就是异步任务,能够合理并方便的使用UI线程.该类可以实现将后台操作结果显示在UI线程中,而不需要我们自己实现子线程或者handler(当然它内部也是借助这两者实现的). 虽然AsyncTask可以提供后台运行并将结果显示在UI上,但是理想情况应该是后台操作最多只能是几秒钟,若要执行长时间的操作强烈建议使用java中的Executor,ThreadPoolExecutor,FutureTask等.

源码分析--AsyncTask

查看文档 AsyncTask enables proper and easy use of the UI thread. This class allows to perform background operations and publish results on the UI thread without having to manipulate threads and/or handlers. AsyncTask使适当的和易用的用户界面线程.这个类允许执行后台操作,在UI线程上发布的结果