ThreadPoolExecutor的应用和实现分析(中)—— 任务处理相关源码分析

转自:http://www.tuicool.com/articles/rmqYjq

前面一篇文章从Executors中的工厂方法入手,已经对ThreadPoolExecutor的构造和使用做了一些整理。而这篇文章,我们将接着前面的介绍, 从源码实现上对ThreadPoolExecutor在任务的提交、执行,线程重用和线程数维护等方面做下分析。

0.    ThreadPoolExecutor类的声明属性变量分析

public class ThreadPoolExecutor extends AbstractExecutorService

从这个类声明中我们可以看到java.util.ThreadPoolExecutor是继承于AbstractExecutorService的,而之前的文章我也提到过,AbstractExecutorService已经实现了一些任务提交处理的方法,如submit()方法都是在这个抽象类中实现的。但submit()方法,最后也是会调用ThreadPoolExecutor的execute()方法。

打开SunJDK中的ThreadPoolExecutor类源码,除了上篇文章提到的一些和构造方法中参数对应的属性之外,让我们看看还有什么:

  • mainLock 对整个ThreadPoolExecutor对象的锁
  • workers  存储工作线程对应Worker对象的HashSet
  • termination 线程池ThreadPoolExecutor对象的生命周期终止条件,和mainLock相关
  • largestPoolSize 线程池跑过的最大线程数
  • completedTaskCount 完成任务数
  • ctl 执行器ThreadPoolExecutor的生命周期状态和活动状态的worker数封装

稍微需要说一下最后一个, ctl是一个AtomicInteger对象,以位运算的方式打包封装了当前线程池ThreadPoolExecutor对象的状态和活动线程数两个数据

1.    执行器状态

ExecutorService中已经指定了这个接口对应的类要实现的方法,其中就包括shutdown()和shutdownNow()等方法。在ThreadPoolExecutor中指明了状态的含义,并包含其于ctl属性中。

ThreadPoolExecutor对象有五种状态,如下:

  • RUNNING 在ThreadPoolExecutor被实例化的时候就是这个状态
  • SHUTDOWN 通常是已经执行过shutdown()方法,不再接受新任务,等待线程池中和队列中任务完成
  • STOP 通常是已经执行过shutdownNow()方法,不接受新任务,队列中的任务也不再执行,并尝试终止线程池中的线程
  • TIDYING 线程池为空,就会到达这个状态,执行terminated()方法
  • TERMINATED terminated()执行完毕,就会到达这个状态,ThreadPoolExecutor终结

2.    Worker内部类

它既实现了Runnable,同时也是一个AQS ( AbstractQueuedSynchronizer )。

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable

封装了3样东西,Runnable类的首个任务对象,执行的线程thread和完成的任务数(volatile)completedTasks。

final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

这个类还提供了interruptIfStarted()这样一个方法,里面做了(getState()>= 0)的判断。与此呼应,Worker的构造方法里对state设置了-1,避免在线程执行前被停掉。

Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

3. 提交任务

上篇文章已经提到了,提交新任务的时候,如果没达到核心线程数corePoolSize,则开辟新线程执行。如果达到核心线程数corePoolSize, 而队列未满,则放入队列,否则开新线程处理任务,直到maximumPoolSize,超出则丢弃处理。

这段源码逻辑如下,不细说了。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

4. addWorker()的实现

在上面提交任务的时候,会出现开辟新的线程来执行,这会调用addWorker()方法。

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

代码较长,我们可以分两大部分看:

第一段从第3行到第26行,是双层无限循环,尝试增加线程数到ctl变量,并且做一些比较判断,如果超出线程数限定或者ThreadPoolExecutor的状态不符合要求,则直接返回false,增加worker失败。

第二段从第28行开始到结尾,把firstTask这个Runnable对象传给Worker构造方法,赋值给Worker对象的task属性。Worker对象把自身(也是一个Runnable)封装成一个Thread对象赋予Worker对象的thread属性。锁住整个线程池并实际增加worker到workers的HashSet对象当中。成功增加后开始执行t.start(),就是worker的thread属性开始运行,实际上就是运行Worker对象的run方法。Worker的run()方法实际上调用了ThreadPoolExecutor的runWorker()方法。

5. 任务的执行runWorker()

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

这段代码实际上就是执行提交给线程池执行的Runnable任务的实际内容。其中,值得注意的有以下几点:

  • 线程开始执行前,需要对worker加锁,完成一个任务后执行unlock()
  • 在任务执行前后,执行beforeExecute()和afterExecute()方法
  • 记录任务执行中的异常后,继续抛出
  • 每个任务完成后,会记录当前线程完成的任务数
  • 当worker执行完一个任务的时候,包括初始任务firstTask,会调用getTask()继续获取任务,这个方法调用是可以阻塞的
  • 线程退出,执行processWorkerExit(w, completedAbruptly)处理

5. Worker线程的复用和任务的获取getTask()

在上一段代码中,也就是runWorker()方法,任务的执行过程是嵌套在while循环语句块中的。每当一个任务执行完毕,会从头开始做下一次循环执行,实现了空闲线程的复用。而要执行的任务则是来自于getTask()方法:

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            boolean timed;      // Are workers subject to culling?

            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;

                if (wc <= maximumPoolSize && ! (timedOut && timed))
                     break;
                if (compareAndDecrementWorkerCount(c))
                     return null;
                c = ctl.get();
                // Re-read ctl
                if (runStateOf(c) != rs)
                     continue retry;
                // else CAS failed due to workerCount change; retry inner loop
             }
             try {
                 Runnable r = timed ?
                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                     workQueue.take();
                 if (r != null)
                     return r;
                 timedOut = true;
             } catch (InterruptedException retry) {
                 timedOut = false;
             }
         }
     }

getTask()实际上是从工作队列(workQueue)中取提交进来的任务。这个workQueue是一个BlockingQueue,通常当队列中没有新任务的时候,则getTask()会阻塞。另外,还有定时阻塞这样一段逻辑:如果从队列中取任务是计时的,则用poll()方法,并设置等待时间为keepAlive,否则调用阻塞方法take()。当poll()超时,则获取到的任务为null,timeOut设置为 true。这段代码也是放在一个for(;;)循环中,前面有判断超时的语句,如果超时,则return null。这意味着runWorker()方法的while循环结束,线程将退出,执行processWorkerExit()方法。

回头看看是否计时是如何确定的。

int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc &gt; corePoolSize;

即判断当前线程池的线程数是否超出corePoolSize,如果超出这个值并且空闲时间多于keepAlive则当前线程退出。

另外一种情况就是allowCoreThreadTimeOut为true,就是允许核心在空闲超时的情况下停掉。

6. 线程池线程数的维护和线程的退出处理

刚刚也提到了,我们再看下processWorkerExit()方法。这个方法最主要就是从workers的Set中remove掉一个多余的线程。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
         if (completedAbruptly) // If abrupt, then workerCount wasn‘t adjusted
             decrementWorkerCount();
         final ReentrantLock mainLock = this.mainLock;
         mainLock.lock();
         try {
             completedTaskCount += w.completedTasks;
             workers.remove(w);
         } finally {
             mainLock.unlock();
         }
         tryTerminate();
         int c = ctl.get();
         if (runStateLessThan(c, STOP)) {
             if (!completedAbruptly) {
                 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                 if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                 if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

这个方法的第二个参数是判断是否在runWorker()中正常退出了循环向下执行,如果不是,说明在执行任务的过程中出现了异常,completedAbruptly为true,线程直接退出,需要直接对活动线程数减1 。

之后,加锁统计完成的任务数,并从workers这个集合中移除当前worker。

执行tryTerminate(),这个方法后面会详细说,主要就是尝试将线程池推向TERMINATED状态。

最后比较当前线程数是不是已经低于应有的线程数,如果这个情况发生,则添加无任务的空Worker到线程池中待命。

以上,增加新的线程和剔除多余的线程的过程大概就是如此,这样线程池能保持额定的线程数,并弹性伸缩,保证系统的资源不至于过度消耗。

时间: 2024-10-13 23:51:32

ThreadPoolExecutor的应用和实现分析(中)—— 任务处理相关源码分析的相关文章

Django中CBV和Restful API中的APIView源码分析

Django中CBV和Restful API中的APIView源码分析 python的Django框架的视图处理可以用FBV, 也可以采用CBV.首先定义一个CBV视图: from django.views import Viewfrom django.http import JsonResponseclass Book(View):    def get(self, request):        ll = [{'key':value}]        return JsonResponse

Spring中Bean命名源码分析

Spring中Bean命名源码分析 一.案例代码 首先是demo的整体结构 其次是各个部分的代码,代码本身比较简单,不是我们关注的重点 配置类 /** * @Author Helius * @Create 2019-10-25-20:16 */ @Configuration @ComponentScan(basePackages = {"service"}) public class SpringConfiguration { } 接口的实现类 public interface Use

Cocos2d-X3.0 刨根问底(八)----- 场景(Scene)、层(Layer)相关源码分析

本章节我们重点分析Cocos2d-x3.0与 场景.层相关的源码.这部分源码集中在 libcocos2d –> layers_scenes_transitions_nodes目录下面 我先发个截图大家了解一下都有哪些文件.红色框里面的就是我们今天要分析的文件. 从命名上可以了解,这个文件夹里的文件主要包含了  场景,层,变换这三种类型的文件. 下面我们先分析Scene类 打开CCScene.h文件 /** @brief Scene is a subclass of Node that is us

Netty中的ChannelPipeline源码分析

ChannelPipeline在Netty中是用来处理请求的责任链,默认实现是DefaultChannelPipeline,其构造方法如下: 1 private final Channel channel; 2 private final ChannelFuture succeededFuture; 3 private final VoidChannelPromise voidPromise; 4 final AbstractChannelHandlerContext head; 5 final

【朝花夕拾】Android自定义View篇之(六)Android事件分发机制(中)从源码分析事件分发逻辑及经常遇到的一些“诡异”现象

前言 转载请注明,转自[https://www.cnblogs.com/andy-songwei/p/11039252.html]谢谢! 在上一篇文章[[朝花夕拾]Android自定义View篇之(五)Android事件分发机制(上)Touch三个重要方法的处理逻辑][下文简称(五),请先阅读完(五)再阅读本文],我们通过示例和log来分析了Android的事件分发机制.这些,我们只是看到了现象,如果要进一步了解事件分发机制,这是不够的,我们还需要透过现象看本质,去研究研究源码.本文将从源码(基

[转] jQuery源码分析-如何做jQuery源码分析

jQuery源码分析系列(持续更新) jQuery的源码有些晦涩难懂,本文分享一些我看源码的方法,每一个模块我基本按照这样的顺序去学习. 当我读到难度的书或者源码时,会和<如何阅读一本书>结合起来进行学习.推荐读读这本书,你可以从这里和这里下载. 第一部分:检视阅读 1. 收集参考资料:官方文档.书籍.百度/谷歌,专题/博客等,快速的浏览,对涉及的知识点.范围.深度.是否有参考意义等有大致的了解和判断,知道这些文章的作者想要解释或解决什么问题. 第二部分:分析阅读 2. 细读官方文档,官方有非

Java并发编程中线程池源码分析及使用

当Java处理高并发的时候,线程数量特别的多的时候,而且每个线程都是执行很短的时间就结束了,频繁创建线程和销毁线程需要占用很多系统的资源和时间,会降低系统的工作效率. 参考http://www.cnblogs.com/dolphin0520/p/3932921.html 由于原文作者使用的API 是1.6 版本的,参考他的文章,做了一些修改成 jdk 1.8版本的方法,涉及到的内容比较多,可能有少许错误. API : jdk1.8.0_144 ThreadPoolExecutor类 Java中线

JDK1.7 中的HashMap源码分析

一.源码地址: 源码地址:http://docs.oracle.com/javase/7/docs/api/ 二.数据结构 JDK1.7中采用数组+链表的形式,HashMap是一个Entry<K,V>[] table数组,JDK1.8采用数组+链表/红黑树实现,当链表长度超过阈值,将链表转为红黑树. Entry代码如下: /** Entry是单向链表. * 它是 “HashMap链式存储法”对应的链表. *它实现了Map.Entry 接口,即实现getKey(), getValue(), se

avalon 中require.config源码分析

/********************************************************************* * 配置系统 在系统运行的开始就需要读取系统中require.config()这个方法中所配置的项目 * **********************************************************************/ //这里写在前面是为了更加方便阅读代码,在实际运行中,这几段代码必须放在下面 kernel.debug =