理解ThreadPoolExecutor源码(一)线程池的corePoolSize、maximumPoolSize和poolSize

我们知道,受限于硬件、内存和性能,我们不可能无限制的创建任意数量的线程,因为每一台机器允许的最大线程是一个有界值。也就是说ThreadPoolExecutor管理的线程数量是有界的。线程池就是用这些有限个数的线程,去执行提交的任务。然而对于多用户、高并发的应用来说,提交的任务数量非常巨大,一定会比允许的最大线程数多很多。为了解决这个问题,必须要引入排队机制,或者是在内存中,或者是在硬盘等容量很大的存储介质中。J.U.C提供的ThreadPoolExecutor只支持任务在内存中排队,通过BlockingQueue暂存还没有来得及执行的任务。

任务的管理是一件比较容易的事,复杂的是线程的管理,这会涉及线程数量、等待/唤醒、同步/锁、线程创建和死亡等问题。ThreadPoolExecutor与线程相关的几个成员变量是:keepAliveTime、allowCoreThreadTimeOut、poolSize、corePoolSize、maximumPoolSize,它们共同负责线程的创建和销毁。

corePoolSize:

线程池的基本大小,即在没有任务需要执行的时候线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。这里需要注意的是:在刚刚创建ThreadPoolExecutor的时候,线程并不会立即启动,而是要等到有任务提交时才会启动,除非调用了prestartCoreThread/prestartAllCoreThreads事先启动核心线程。再考虑到keepAliveTime和allowCoreThreadTimeOut超时参数的影响,所以没有任务需要执行的时候,线程池的大小不一定是corePoolSize。

maximumPoolSize:

线程池中允许的最大线程数,线程池中的当前线程数目不会超过该值。如果队列中任务已满,并且当前线程个数小于maximumPoolSize,那么会创建新的线程来执行任务。这里值得一提的是largestPoolSize,该变量记录了线程池在整个生命周期中曾经出现的最大线程个数。为什么说是曾经呢?因为线程池创建之后,可以调用setMaximumPoolSize()改变运行的最大线程的数目。

poolSize:

线程池中当前线程的数量,当该值为0的时候,意味着没有任何线程,线程池会终止;同一时刻,poolSize不会超过maximumPoolSize。

现在我们通过ThreadPoolExecutor.execute()方法,看一下这3个属性的关系,以及线程池如何处理新提交的任务。以下源码基于JDK1.6.0_37版本。

public void execute(Runnable command) {
	if (command == null)
		throw new NullPointerException();
	if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
		if (runState == RUNNING && workQueue.offer(command)) {
			if (runState != RUNNING || poolSize == 0)
				ensureQueuedTaskHandled(command);
		}
		else if (!addIfUnderMaximumPoolSize(command))
			reject(command); // is shutdown or saturated
	}
}

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < corePoolSize && runState == RUNNING)
            t = addThread(firstTask);
    } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < maximumPoolSize && runState == RUNNING)
            t = addThread(firstTask);
    } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

新提交一个任务时的处理流程很明显:

1、如果线程池的当前大小还没有达到基本大小(poolSize < corePoolSize),那么就新增加一个线程处理新提交的任务;

2、如果当前大小已经达到了基本大小,就将新提交的任务提交到阻塞队列排队,等候处理workQueue.offer(command);

3、如果队列容量已达上限,并且当前大小poolSize没有达到maximumPoolSize,那么就新增线程来处理任务;

4、如果队列已满,并且当前线程数目也已经达到上限,那么意味着线程池的处理能力已经达到了极限,此时需要拒绝新增加的任务。至于如何拒绝处理新增

的任务,取决于线程池的饱和策略RejectedExecutionHandler。

接下来我们看下allowCoreThreadTimeOut和keepAliveTime属性的含义。在压力很大的情况下,线程池中的所有线程都在处理新提交的任务或者是在排队的任务,这个时候线程池处在忙碌状态。如果压力很小,那么可能很多线程池都处在空闲状态,这个时候为了节省系统资源,回收这些没有用的空闲线程,就必须提供一些超时机制,这也是线程池大小调节策略的一部分。通过corePoolSize和maximumPoolSize,控制如何新增线程;通过allowCoreThreadTimeOut和keepAliveTime,控制如何销毁线程。

allowCoreThreadTimeOut:

该属性用来控制是否允许核心线程超时退出。If false,core threads stay alive
even when idle.If true, core threads use keepAliveTime to time out waiting for work。如果线程池的大小已经达到了corePoolSize,不管有没有任务需要执行,线程池都会保证这些核心线程处于存活状态。可以知道:该属性只是用来控制核心线程的。

keepAliveTime:

如果一个线程处在空闲状态的时间超过了该属性值,就会因为超时而退出。举个例子,如果线程池的核心大小corePoolSize=5,而当前大小poolSize =8,那么超出核心大小的线程,会按照keepAliveTime的值判断是否会超时退出。如果线程池的核心大小corePoolSize=5,而当前大小poolSize
=5,那么线程池中所有线程都是核心线程,这个时候线程是否会退出,取决于allowCoreThreadTimeOut。

Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }

(poolSize > corePoolSize || allowCoreThreadTimeOut)这个条件,就是用来判断是否允许当前线程退出。workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);就是借助阻塞队列,让空闲线程等待keepAliveTime时间之后,恢复执行。这样空闲线程会由于超时而退出。

时间: 2024-08-04 02:44:59

理解ThreadPoolExecutor源码(一)线程池的corePoolSize、maximumPoolSize和poolSize的相关文章

通过ThreadPoolExecutor源码分析线程池实现原理

为什么要用线程池 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性.使用线程池可以重复利用已创建的线程降低线程创建和销毁带来的消耗,随之即可提高响应速度(当一个任务到达时,不需要重新创建线程来为之服务,重用已有线程),还可以通过线程池控制线程资源统一分配和监控等. 线程池工厂Executors JDK 提供了创建线程池的工厂类 Executors,该类提供了创建线程池的静态方法: public static ExecutorService newFixedThreadP

nginx源码分析——线程池

源码: nginx 1.13.0-release 一.前言 nginx是采用多进程模型,master和worker之间主要通过pipe管道的方式进行通信,多进程的优势就在于各个进程互不影响.但是经常会有人问道,nginx为什么不采用多线程模型(这个除了之前一篇文章讲到的情况,别的只有去问作者了,HAHA).其实,nginx代码中提供了一个thread_pool(线程池)的核心模块来处理多任务的.下面就本人对该thread_pool这个模块的理解来跟大家做些分享(文中错误.不足还请大家指出,谢谢)

理解ThreadPoolExecutor源码(二)execute函数的巧妙设计和阅读心得

ThreadPoolExecutor.execute()源码提供了大量注释来解释该方法的设计考虑.下面的源码来自jdk1.6.0_37 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUN

分析线程池源码测试线程池

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 线程池测试类 */ public class TestThreadPool { public static void main(String[] args) { // 实例化线程池对象 corePoolSize--线程池

Java并发包源码学习之线程池(一)ThreadPoolExecutor源码分析

Java中使用线程池技术一般都是使用Executors这个工厂类,它提供了非常简单方法来创建各种类型的线程池: public static ExecutorService newFixedThreadPool(int nThreads) public static ExecutorService newSingleThreadExecutor() public static ExecutorService newCachedThreadPool() public static Scheduled

【Java并发编程】21、线程池ThreadPoolExecutor源码解析

一.前言 JUC这部分还有线程池这一块没有分析,需要抓紧时间分析,下面开始ThreadPoolExecutor,其是线程池的基础,分析完了这个类会简化之后的分析,线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法.下面开始分析. 二.ThreadPoolExecutor数据结构 在ThreadPoolExecutor的内部,主要由BlockingQueue和AbstractQu

JAVA 笔记(五) ThreadPoolExecutor 源码剖析

基本概念 Thread t = new Thread(); t.start(); 上面的代码我们再熟悉不过了,因为我们通常在需要开启一个线程的时候都会这样做. 但使用这样的方式,有时候也会照成困扰.例如如果程序中存在大量的并发线程,这样做会带来什么缺陷? 答案很明显,会造成编写工作繁杂,降低系统效率,线程难以管理等等问题. 在这种情况下,有没有一种方式能够让我们避免这些困扰呢?有,也就是我们这里研究的线程池(ThreadPoolExecutor). 线程池的思想理解起来其实也很简单,我们可以看做

深入理解OkHttp源码(一)——提交请求

本篇文章主要介绍OkHttp执行同步和异步请求的大体流程.主要流程如下图: 主要分析到getResponseWidthInterceptorChain方法,该方法为具体的根据请求获取响应部分,留着后面的博客再介绍. Dispatcher类 Dispatcher类负责异步任务的请求策略.首先看它的部分定义: public final class Dispatcher { private int maxRequests = 64; private int maxRequestsPerHost = 5

ThreadPoolExecutor源码详解

我之前一篇文章谈到了ThreadPoolExecutor的作用(http://my.oschina.net/xionghui/blog/494004),这篇文章介绍下它的原理,并根据原理分析下它的实现源码. 我们先来查看一下ThreadPoolExecutor API,看看它能实现什么功能,然后看看它是怎么实现这些功能的. ThreadPoolExecutor API ThreadPoolExecutor API比较长,这里列出几个关键点: 核心和最大池大小:如果运行的线程少于 corePool