ThreadPoolExecutor任务提交过程源码浅析

线程池是一种重复利用既有线程的池化技术 ,它大量减少了线程的创建初始化过程,也可以防止海量线程创建占尽资源的风险。

任务提交过程

学习使用线程池的使用,我们都大概知道这样一个过程,如图:

这个是一个Runnable实例提交到线程池的过程,大体分为4个步骤:

1)判断当前线程数量是否小于核心线程数量,如果小于则创建一个新的线程去执行该任务;

2)如果线程数已经超过了核心线程数,那么就提交到等待工作队列(等待队列的任务将会被既有的线程获取并处理)。

3)如果等待队列已经满了,无法再提交任务,那么将会判断当前线程数是否超过最大线程数。如果没有超过,那么将创建一个新的线程去执行该任务。

4)如果线程数已经超过了最大线程数,那么将调用拒绝策略的处理器(默认处理器实现是抛出一个拒绝的异常)。

DEMO入口

下面,我们以一个DEMO作为入口,了解一下ThreadPoolExecutor的任务提交过程源码实现,如下:

DEMO中,第一行代码创建了一个ThreadPoolExecutor实例对象,构造过程传入了几个参数:

1)corePoolSize:核心线程数,一般线程池中存活的最大线程,即使闲置也不会被回收;

2)maximumPoolSize:最大线程数,线程池中允许最大存活的线程数量;

3)keepAliveTime:闲置线程的存活时间,如果没有工作任务,超过存活时间后闲置线程将会被回收;

4)TimeUnit:存活时间的单位;

5)LinkedBlockQueue:一个阻塞队列,存放待处理任务;

ThreadPoolExecutor构造方法源码

我们点开ThreadPoolExecutor实例的构造方法,代码如下:

我们看到,该构造方法调用了另外一个构造方法,同时参数增加了两个,一个是ThreadFactory的实例(用于创建Thread对象),另一个是RejectExecutionHandler的实例(拒绝任务提交时调用的处理器)。点击进入另一个构造方法,如:

这里一开始进行了参数值校验,后面的部分就是给成员变量赋值(keepAliveTime转成成了纳秒),构造方法即执行结束了。

submit提交方法的源码

接下来我们看看submit提交任务的源码,如:

submit方法内,显示将Runnable实例包装成了一个RunnableFuture(该接口继承了Runnable和Future)的实现类FutureTask的实例,执行FutureTask也就会调用Runnable。

下面,我们看看execute这个核心方法,点击进入该方法

注意:ThreadPoolExecutor通过一个AtomicInteger类型的ctl变量存储了runState(线程池状态)和workerCount(线程数)两个变量。具体实现可以看另外一个博文说明:https://www.cnblogs.com/lay2017/p/10946928.html

execute代码的核心步骤如下:

1、获得workerCount,判断workerCount是否小于corePoolSize,如果小于那么调用addWorker方法添加一个worker(这里的worker可以暂时理解为线程,后面我们会打开addWorker方法看看)执行该任务。

2、如果workerCount大于等于corePoolSize,或者addWorker方法添加worker失败(失败会重新获取ctl值),那么进入第二重判断。

3、第二重判断先是判断了线程池的状态,如果不是运行中则进行第三重判断,如果是运行中,那么尝试着吧任务提交到workQueue(工作队列中),如果提交任务失败也一样进入第三重判断。如果提交成功了,那么会再获取一次ctl并进行二次校验线程池状态(因为在offer到队列以后有可能线程池shutdown了),如果不是running状态,那么将会从队列中移除并调用拒绝策略。如果还是运行状态,但是当前线程数为0,那么为了把已经提交的任务处理掉会调用addWorker启动一个新的线程确保该任务有线程会处理。

4、在第二重判断中,如果线程池不是running状态或者提交到workQueue失败都会进入第三重判断。第三重判断中,如果不是running状态addWorker方法不会调用成功则直接调用reject方法,该方法将调用RejectExecutionHandler处理策略。如果是因为workQueue提交失败,如果线程数不大于MaximumPoolSize的话,addWorker方法将尝试创建一个新的线程去处理,否则调用reject拒绝任务提交。

我们看到,execute方法的执行逻辑基本和我们一开始所说的任务提交过程图是一致的。

addWorker方法添加一个线程

我们通过execute方法的逻辑大体了解了任务提交的源码过程,下面我们打开addWorker方法看看如何添加一个线程的。

addWorker方法主要分为两步:

1、在for循环中通过CAS乐观锁把workerCount数量+1

2、线程数增加成功以后,再实际地去创建一个worker实例对象,worker对象中包含任务对象和一个Thread线程对象。worker对象构建完成以后添加到WorkerSet集合容器里面,并调用Thread的start方法,当Thread执行以后会调用worker中的runWorker方法去处理任务(如果当前任务处理完了,会持续不断地从workQueue中获取任务并处理)。

总结

线程池的设计非常地面向对象,ThreadPoolExecutor就像是一个工厂的车间,这个车间里面有固定的几个工作人员(worker)和一个工作台(workQueue)。worker们会从workQueue上获取任务(RunnableFuture)并处理该任务昼夜不歇。如果workQueue上的任务满了,那么会招来更多地worker(创建新的Thread)来帮助处理任务。如果workQueue上的任务处理完了,大家闲在那里无所事事的话多余的worker就会离开(GC回收)留下几个worker驻守在那里(corePoolSize限定个数)等待新的任务来临。

原文地址:https://www.cnblogs.com/lay2017/p/10957943.html

时间: 2024-11-13 22:42:43

ThreadPoolExecutor任务提交过程源码浅析的相关文章

Android M Launcher3主流程源码浅析

背景 关于Launcher是啥的问题我想这里就没必要再强调了.由于一些原因迫使最近开始需要研究一下Launcher3源码,为了不再像以前那么傻逼(研究Settings等代码没作笔记),故这里赶紧将阶段性的感悟整理成文章,方便日后回看.其实本文来源于我在项目组内部的一次分享活动的ppt. 在开始学习Launcher3源码之前请务必保证你已经具备如下图所示基础知识技能(相关权重系数已经饼状图标注),如下: 看懂Launcher3源码必须要先准备充足上面的知识点,每个知识点在Launcher3源码中的

yarn作业提交过程源码

记录源码细节,内部有中文注释 Client 端: //最终通过ApplicationClientProtocol协议提交到RM端的ClientRMService内 package org.apache.hadoop.mapred; jobclient包内 YarnRunner public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedExc

php-fpm执行流程源码浅析

作者:zhanhailiang 日期:2015-03-09 sapi\fpm\fpm\fpm_main.c:@main; fcgi_init()->fcgi_setup_signals(); 设置信号处理方法; sigaction(SIGUSR1, &new_sa, NULL); sigaction(SIGTERM, &new_sa, NULL); sigaction(SIGPIPE, NULL, &old_sa); sapi_startup(&cgi_sapi_mo

h2database源码浅析:事务、两阶段提交

http://blog.csdn.net/bluejoe2000/article/details/42437633 h2database源码浅析:事务.两阶段提交 2015-01-05 22:54 734人阅读 评论(0) 收藏 举报  分类: 源码故事(18)  版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] Transaction Isolation Transaction isolation is provided for all data manipulation

【Spark Core】任务执行机制和Task源码浅析2

引言 上一小节<任务执行机制和Task源码浅析1>介绍了Executor的注册过程. 这一小节,我将从Executor端,就接收LaunchTask消息之后Executor的执行任务过程进行介绍. 1. Executor的launchTasks函数 DriverActor提交任务,发送LaunchTask指令给CoarseGrainedExecutorBackend,接收到指令之后,让它内部的executor来发起任务,即调用空闲的executor的launchTask函数. 下面是Coars

Volley框架源码浅析(二)

尊重原创 http://write.blog.csdn.net/postedit/25921795 在前面的一片文章Volley框架浅析(一)中我们知道在RequestQueue这个类中,有两个队列:本地队列和网络队列 /** The cache triage queue. */ private final PriorityBlockingQueue<Request<?>> mCacheQueue = new PriorityBlockingQueue<Request<

JobTracker启动流程源码级分析

org.apache.hadoop.mapred.JobTracker类是个独立的进程,有自己的main函数.JobTracker是在网络环境中提交及运行MR任务的核心位置. main方法主要代码有两句: 1 //创建jobTracker对象 2 JobTracker tracker = startTracker(new JobConf()); 3 //启动各个服务,包括JT内部一些重要的服务或者线程 4 tracker.offerService(); 一.startTracker(new Jo

【Spark】Stage生成和Stage源码浅析

引入 上一篇文章<DAGScheduler源码浅析>中,介绍了handleJobSubmitted函数,它作为生成finalStage的重要函数存在,这一篇文章中,我将就DAGScheduler生成Stage过程继续学习,同时介绍Stage的相关源码. Stage生成 Stage的调度是由DAGScheduler完成的.由RDD的有向无环图DAG切分出了Stage的有向无环图DAG.Stage的DAG通过最后执行的Stage为根进行广度优先遍历,遍历到最开始执行的Stage执行,如果提交的St

TaskTracker启动过程源码级分析

TaskTracker也是作为一个单独的JVM来运行的,其main函数就是TaskTracker的入口函数,当运行start-all.sh时,脚本就是通过SSH运行该函数来启动TaskTracker的. TaskTracker是JobTracker和Task之间的桥梁:一方面,从JobTracker接收并执行各种命令:运行任务.提交任务.杀死任务等:另一方面,将本地节点上各个任务的状态通过心跳周期性汇报给JobTracker.TaskTracker与JobTracker和Task之间采用了RPC