ThreadPoolExecutor源码分析

引言



为什么引入线程池技术?

对于服务端的程序,经常面对的是执行时间较短、工作内容较为单一的任务,需要服务端快速处理并返回接口。假若服务端每次接收到一个任务,就创建一个线程,然后执行,这种方式在原型阶段是不错的选择,但是面对成千上万的任务提交进服务器时,这个时候将会创建数以万记的线程,这很明显不是一个好的选择。为什么呢?

- 第一,频繁的线程切换会使操作系统频繁的进行上下文切换,增加了系统的负载;

- 第二,线程的创建和销毁是需要耗费系统资源的,这样子很明显浪费了系统资源。

线程池技术很好的解决了这个问题,它预先创建一定数量的线程,用户不能直接控制线程的创建和销毁,重复使用固定或者较为固定数目的线程来完成任务的执行。这样做的好处:

- 消除了频繁创建和销毁线程的系统资源开销;

- 面对过量任务的提交能够平缓劣化。


ThreadPoolExcutor源码解析

在看具体的源码之前,先给一个线程池使用案例

  1. 创建线程池对象;
  2. executor.submit(Runnable task)提交10个任务;
  3. executor.submit(Callable<T> task)提交5个任务;
  4. 所有线程的管理都由线程池来原理,程序员不需要关注线程的创建销毁。

构造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
  this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);}

核心参数:

- corePoolSize:核心线程数,线程池里一直不会被销毁的线程数量;

- maximumPoolSize:最大线程数量;

- keepAliveTime:非核心线程空闲时的存活时间,该参数只有在线程数量 > corePoolSize情况下才有用;

- unit:keepAlive时间单位;

- workQueue:工作队列,JDK提供这几种工作队列:

  • ArrayBlockingQueue:基于数组的有界阻塞队列,任务以FIFO顺序排序;
  • LinkedBlockingQueue:基于链表的阻塞队列,任务以FIFO顺序排列,吞吐量优于ArrayBlockingQueue,在使用时需要注意,此阻塞队列在不设置大小的时候,默认的长度是Integer.MAX_VALUE
  • PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含任务的排序不是FIFO,而是依据任务的自然排序顺序或者是构造函数的Comparator决定的顺序;
  • SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的,典型的生产者-消费者模型,它不存储元素,每一次的插入必须要等另一个线程的移除操作完成。
    • threadFactory:创建线程工厂,可以自定义线程工厂给线程池里的线程设置一个自定义线程名。

    • handler:饱和策略,假如线程池已满,并且没有空闲的线程,这个时候不再允许提交任务到线程池,线程池提供了4中策略,至于具体采用哪种策略还是自定义策略,具体情况具体分析。
  • AbortPolicy:拒绝提交,直接抛出异常,也是默认的饱和策略;
  • CallerRunsPolicy:线程池还未关闭时,用调用者的线程执行任务;
  • DiscardPolicy:丢掉提交任务;
  • DiscardOldestPolicy:线程池还未关闭时,丢掉阻塞队列最久为处理的任务,并且执行当前任务。

线程池内部状态

线程池用ctl的低29位表示线程池中的线程数,高3位表示当前线程状态,后续假如想要增大这个值,可以将AtomicInteger改成AtomicLong。

- RUNNING:运行状态,高3位为111;

- SHUTDOWN:关闭状态,高3位为000,在此状态下,线程池不再接受新任务,但是仍然处理阻塞队列中的任务;

- STOP:停止状态,高3位为001,在此状态下,线程池不再接受新任务,也不会处理阻塞队列中的任务,正在运行的任务也会停止;

- TIDYING:高3位为010;

- TERMINATED:终止状态,高3位为011。

接下来就以submit方法入手,分析一下相关源码。

submit任务提交

public Future<?> submit(Runnable task) {
  //提交的task为null,抛出空指针异常
  if (task == null)
       throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    //执行任务
    execute(ftask);
    return ftask;
}

整个任务的提交核心都在任务执行这部分,执行任务,拿到返回值。

任务执行execute
public void execute(Runnable command) {
  if (command == null)
     throw new NullPointerException();
  int c = ctl.get();
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();
  }
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
  } else if (!addWorker(command, false))
      reject(command);
}

具体的执行流程如下:

- 通过workerCountOf计算出当前线程池的线程数,如果线程数小于corePoolSize,执行addWork方法创建新的线程执行任务;

- 如果当前线程池线程数大于coreSize,向队列里添加task,不继续增加线程;

- 当workQueue.offer失败时,也就是说现在队列已满,不能再向队列里放,此时工作线程大于等于corePoolSize,创建新的线程执行该task;

- 执行addWork失败,执行reject方法处理该任务。

总结一下,对于使用线程池的外部来说,线程池的机制是这样的:

1. 如果正在运行的线程数 < coreSize,马上创建线程执行该task,不排队等待;

2. 如果正在运行的线程数 >= coreSize,把该task放入队列;

3. 如果队列已满 && 正在运行的线程数 < maximumPoolSize,创建新的线程执行该task;

4. 如果队列已满 && 正在运行的线程数 >= maximumPoolSize,线程池调用handler的reject方法拒绝本次提交。

addWorker实现

从全局来看,ThreadPoolExcutor一定维护一个池:

addWorker的实质是向该HashSet里add一个worker,worker有一个线程,这个线程执行完成时,会从该HashSet里remove掉。

看一下addWorker的具体代码实现:

这只是addWorker的前半部分代码,首先,判断当前线程的状态是否符合条件,不符合条件不做处理直接返回;通过参数core判断当前线程是否为核心线程,如果是核心线程,跳出循环创建新的线程来执行该task,在break retry的时候会执行compareAndIncrementWorkerCount(c),利用CAS原则,将线程数量+1。

看看创建线程部分(addWorker的后半部分)代码实现:

创建线程部分最核心的操作就是:new一个新的worker,add进HashSet,然后启动woker里的Thread。

从源码可以看到,在执行add之前先活取了mainLock锁,该锁是一个公用的可重入锁:

private final ReentrantLock mainLock = new ReentrantLock();

addWorker的4种调用方式

  1. addWorker(command, true)

    线程数 < coreSize时,将task放入workers,如果线程数 >= coreSize,返回false;

  2. addWorker(command, false)

    当阻塞对列已满,尝试将新的task放入workers,如果线程数 >= maximumPoolSize,返回false;

  3. addWorker(null, false)

    放入一个空的task到workers,此时线程数的限制是maximumPoolSize,相当于创建一个新的线程,没立马分配任务;

  4. addWorker(null, true)

    放入一个空的task到workers,线程数 < coreSize。实际的使用是在prestartCoreThread()等方法,有兴趣的读者可以自行阅读,在此不做详细赘述。

Worker具体实现

在addWorker中,t.start()使线程就绪,我们来看看Worker类的具体设计。

- Worker继承AbstractQueuedSynchronizer,方便实现工作线程的中止等操作;

- Worker实现Runnable接口,将自身作为一个task在工作线程中执行;

addWoker中的t.start()实质上是执行Worker的run()方法:

public void run() {
    runWorker(this);
}

run方法主要干了一件事,调用runWorker(this),接下来我们来看看runWorker的具体实现。

runWorker具体实现

  1. 线程启动后,释放锁,设AQS状态为0;
  2. 获取firstTask任务并执行,执行任务前后可定制beforeExecuteafterExecute;
  3. 如果worker自己的task为null,调用getTask从阻塞队列获取等待任务执行,否则,阻塞该方法。

getTask具体实现

private Runnable getTask() {
    boolean timedOut = false;
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //必要情况下需要检查workQueue是否为空
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        //如果线程池允许线程超时或者当前线程数大于核心线程数,则会进行超时处理
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                  return null;
            continue;
        }
        try {
            Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
            if (r != null)
                return r;
           timedOut = true;
        } catch (InterruptedException retry) {
           timedOut = false;
        }
    }
}

整个getTask循环实现:

- workQueue.poll:如果在keepAliveTime时间内阻塞队列有任务,返回该任务并执行;

- workQueue.take:如果阻塞队列为空,当前线程阻塞,当队列有任务时,线程被唤醒,执行take返回的任务。

时间: 2024-10-10 16:14:25

ThreadPoolExecutor源码分析的相关文章

Java并发包源码学习之线程池(一)ThreadPoolExecutor源码分析

Java中使用线程池技术一般都是使用Executors这个工厂类,它提供了非常简单方法来创建各种类型的线程池: public static ExecutorService newFixedThreadPool(int nThreads) public static ExecutorService newSingleThreadExecutor() public static ExecutorService newCachedThreadPool() public static Scheduled

Java多线程 -- JUC包源码分析11 -- ThreadPoolExecutor源码分析

在JUC包中,线程池部分本身有很多组件,可以说是前面所分析的各种技术的一个综合应用.从本文开始,将综合前面的知识,逐个分析线程池的各个组件. -Executor/Executors -ThreadPoolExecutor使用介绍 -ThreadPoolExecutor实现原理 –ThreadPoolExecutor的中断与优雅关闭 shutdown + awaitTermination –shutdown的一个误区 Executor/Executors Executor是线程池框架最基本的几个接

通过ThreadPoolExecutor源码分析线程池实现原理

为什么要用线程池 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性.使用线程池可以重复利用已创建的线程降低线程创建和销毁带来的消耗,随之即可提高响应速度(当一个任务到达时,不需要重新创建线程来为之服务,重用已有线程),还可以通过线程池控制线程资源统一分配和监控等. 线程池工厂Executors JDK 提供了创建线程池的工厂类 Executors,该类提供了创建线程池的静态方法: public static ExecutorService newFixedThreadP

线程池ThreadPoolExecutor源码分析(一)

1.线程池简介 1.1 线程池的概念: 线程池就是首先创建一些线程,它们的集合称为线程池.使用线程池可以很好地提高性能,线程池在系统启动时即创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务. 1.2 线程池的工作机制 在线程池的编程模式下,任务是提交给整个线程池,而不是直接提交给某个线程,线程池在拿到任务后,就在内部寻找是否有空闲的线程,如果有,则将任务交给某个空闲的线程. 一

【Java并发编程】21、线程池ThreadPoolExecutor源码解析

一.前言 JUC这部分还有线程池这一块没有分析,需要抓紧时间分析,下面开始ThreadPoolExecutor,其是线程池的基础,分析完了这个类会简化之后的分析,线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法.下面开始分析. 二.ThreadPoolExecutor数据结构 在ThreadPoolExecutor的内部,主要由BlockingQueue和AbstractQu

《Java源码分析》:线程池 ThreadPoolExecutor

<Java源码分析>:线程池 ThreadPoolExecutor ThreadPoolExecutor是ExecutorService的一张实现,但是是间接实现. ThreadPoolExecutor是继承AbstractExecutorService.而AbstractExecutorService实现了ExecutorService接口. 在介绍细节的之前,先介绍下ThreadPoolExecutor的结构 1.线程池需要支持多个线程并发执行,因此有一个线程集合Collection来执行

线程池的使用及ThreadPoolExecutor的execute和addWorker源码分析

说明:本作者是文章的原创作者,转载请注明出处:本文地址:http://www.cnblogs.com/qm-article/p/7821602.html 一.线程池的介绍 在开发中,频繁的创建和销毁一个线程,是很耗资源的,为此找出了一个可以循环利用已经存在的线程来达到自己的目的,线程池顾名思义,也就是线程池的集合,通过线程池执行的线程任务,可以很有效的去规划线程的使用.在java中大致有这几种线程池      newScheduledThreadPool  创建一个定长线程池,支持定时及周期性任

源码分析—ThreadPoolExecutor线程池三大问题及改进方案

前言 在一次聚会中,我和一个腾讯大佬聊起了池化技术,提及到java的线程池实现问题,我说这个我懂啊,然后巴拉巴拉说了一大堆,然后腾讯大佬问我说,那你知道线程池有什么缺陷吗?我顿时哑口无言,甘拜下风,所以这次我再回来思考一下线程池的实现原理 源码分析 ThreadPoolExecutor构造器 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, Blo

ThreadPoolExecutor简介与源码分析

1.ThreadPoolExecutor的初始化参数介绍 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize <