简单原理
一个比较简单的线程池至少应包含线程池管理器、工作线程、任务队列、任务接口等部分。
- 线程池管理器(ThreadPool Manager)的作用是创建、销毁并管理线程池,将工作线程放入线程池中;
- 工作线程是一个可以循环执行任务的线程,在没有任务时进行等待;
- 任务队列的作用是提供一种缓冲机制,将没有处理的任务放在任务队列中;
- 任务接口是每个任务必须实现的接口,主要用来规定任务的入口、任务执行完后的收尾工作、任务的执行状态等,工作线程通过该接口调度任务的执行。
线程池的优点
- 降低资源消耗,通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
- 提高响应速度,当任务到达时,任务可以不需要的等到线程创建就能立即执行;
- 提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
例子
之前项目中用到的代码
public Integer repayUnprocessBorrowRepayTaskIds(boolean isMultiThread) {
int updateOverdueBadloanThreadPoolSize = BORROW_REPAY_THREAD_POOL_SIZE;
// 吐槽一下,对Redis的滥用啊,写到配置文件中不是更好么?
String updateOverdueBadloanThreadPoolSize_s = redisService.get(Constants.BORROW_REPAY_THREAD_POOL_SIZE);
if (StringUtil.isNotEmpty(updateOverdueBadloanThreadPoolSize_s)) {
updateOverdueBadloanThreadPoolSize = Integer.parseInt(updateOverdueBadloanThreadPoolSize_s);
}
log.info("BORROW_REPAY_THREAD_POOL_SIZE:" + updateOverdueBadloanThreadPoolSize);
// ------1 线程池初始化代码
ExecutorService executorService = Executors.newFixedThreadPool(updateOverdueBadloanThreadPoolSize);
int _counter = 0;
List<Integer> borrowRepayTaskIds = borrowRepayTaskService.getUnprocessBorrowRepayTaskIds(updateOverdueBadloanThreadPoolSize);
if (StringUtil.isNotEmpty(borrowRepayTaskIds)) {
log.info("repayUnprocessBorrowRepayTaskIds ready to process, count:" + borrowRepayTaskIds.size());
for (Integer borrowRepayTaskId : borrowRepayTaskIds) {
_counter++;
if (isMultiThread && updateOverdueBadloanThreadPoolSize > 1) { // 多线程
// --------2 线程池执行代码
executorService.execute(new BorrowRepayTaskThread(this, borrowRepayTaskId));
} else { // 单线程
repayByTask(borrowRepayTaskId);
}
}
} else {
log.info("repayUnprocessBorrowRepayTaskIds ready to process, count:0");
}
executorService.shutdown();
try {
while (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
log.info("Waiting repayUnprocessBorrowRepayTaskIds thread pool close..."); // Waiting thread pool close...
}
} catch (InterruptedException ex) { // should not reach here
ex.printStackTrace();
executorService.shutdownNow();
Thread.currentThread().interrupt();
} catch (Exception ex) { // should not reach here
ex.printStackTrace();
executorService.shutdownNow();
}
log.info("repayUnprocessBorrowRepayTaskIds done, count:" + _counter);
return _counter;
}
- Executors类中提供的几个静态方法来创建线程池;
- 这里使用了newFixedThreadPool线程池。newFixedThreadPool的作用是创建固定大小的线程池,每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程;
- ExecutorService是线程池接口
几种常用线程池
newSingleThreadExecutor
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
newFixedThreadPool(常用)
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
newCachedThreadPool
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
newScheduledThreadPool
创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
newFixedThreadPool底层实现
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
this是调用的
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
参数解释:
- corePoolSize - 池中所保存的线程数,包括空闲线程。 (通过应用参数传过来的)
- maximumPoolSize - 池中允许的最大线程数。 (与corePoolSize数量一致)
- keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。 (这里是0L)
- unit - keepAliveTime 参数的时间单位。
- workQueue - 执行前用于保持任务的队列。此队列仅由保持 execute 方法提交的 Runnable 任务。
(newFixedThreadPool使用的是LinkedBlockingQueue线程安全阻塞队列)
- handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
(newFixedThreadPool使用的是RejectedExecutionHandler)
详细说一下corePoolSize:
核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。
LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
线程池的主要工作流程
可以看到使用ThreadPoolExecutor方法实现的,深入研究一下了解具体的实现原理。
- 首先线程池判断基本线程池是否已满?没满,创建一个工作线程来执行任务。满了,则进入下个流程。
- 其次线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。
- 最后线程池判断整个线程池是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。
线程池执行任务方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 如果线程数小于基本线程数,则创建线程并执行当前任务
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
// 如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,则创建一个线程执行任务。
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
工作线程
线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会无限循环获取工作队列里的任务来执行。我们可以从Worker的run方法里看到这点。
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this); //当任务队列中没有任务时,进行清理工作
}
}
}
线程池配置与监控
下面这些方法可以了解一下,我觉得最好的方法就是通过压力测试,观察核心功能的处理能力,服务器CPU、IO和JVM的具体情况,进行调试是最好的。
合理配置线程池
要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:
1. 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
2. 任务的优先级:高,中和低。
3. 任务的执行时间:长,中和短。
4. 任务的依赖性:是否依赖其他系统资源,如数据库连接。
5. 任务性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务配置尽可能少的线程数量,如配置Ncpu+1个线程的线程池。IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2*Ncpu。
6. 建议使用有界队列。
线程池的监控
通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用
* taskCount:线程池需要执行的任务数量。
* completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。
* largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
* getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不减。
* getActiveCount:获取活动的线程数。
通过扩展线程池进行监控。通过继承线程池并重写线程池的beforeExecute,afterExecute和terminated方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。
参考:
http://www.cnblogs.com/dolphin0520/p/3932921.html
http://ifeve.com/java-threadpool/
http://my.oschina.net/jielucky/blog/157250?fromerr=imwl7YeR 这个解释的比较详细。