线程池源码解析

ThreadPoolExecutor的几个重要属性

  • BlockingQueue workQueue

    阻塞队列。存放将要执行的任务

  • HashSet workers

    当前线程池的线程集合。下文会重点介绍Worker这个内部类

  • corePoolSize

    核心线程数

  • maximumPoolSize

    最大线程数

  • keepAliveTime

    非核心线程保持空闲的最长时间

  • allowCoreThreadTimeOut

    核心线程是否被回收。默认是不回收核心线程的

  • RejectedExecutionHandler defaultHandler = new AbortPolicy()

    默认拒绝策略。可以看到默认是抛异常

      public static class AbortPolicy implements RejectedExecutionHandler {
    
          public AbortPolicy() { }
    
          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
              throw new RejectedExecutionException("Task " + r.toString() +
                                                   " rejected from " +
                                                   e.toString());
          }
      }

源码分析

execute

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    //当前线程数 < 核心线程数
    if (workerCountOf(c) < corePoolSize) {
        //新建线程并执行
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //(上一步addWorker失败了 || 当前线程数 >= 核心线程数) && 阻塞队列未满 && 线程池运行中
    if (isRunning(c) && workQueue.offer(command)) {
        //为了再次校验线程状态
        int recheck = ctl.get();
        //线程池不是运行中 && 将任务移除阻塞队列成功
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //所有线程都被回收了 但是之前workQueue已经接收了任务
        else if (workerCountOf(recheck) == 0)
            //这里为什么传null?
            addWorker(null, false);
    }

    // 阻塞队列满了
    // 当前线程数 < 最大线程数 新建线程会成功
    else if (!addWorker(command, false))
        //当前线程数 >= 最大线程数 执行拒绝策略
        reject(command);
}

addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 很恶心的判断。就当线程池被搞了吧。正常情况下不会进来
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 根据core参数来判断能不能新建线程
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //改线程数+1。后续失败会对这个操作回滚
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    //这之前的操作其实就是一些校验,相当于预创建线程
    //现在才开始真正的创建线程并执行
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //将任务封装为Worker。new出来的时候内部就新建了一个线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            //获取全局锁 一个个来执行
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                //再进行一系列的校验
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    //已经被执行了抛异常
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    //记录下这个线程池整个生命周期里的最大的线程数(这东西没什么卵用)
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 这里其实调的是Worker#runWorker
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            //擦屁股。因为之前的预创建在还没正真执行的时候就将工作线程数+1了,所以这里回滚。再从workers中移除
            addWorkerFailed(w);
    }
    return workerStarted;
}

Worker.runWorker

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //之前的伏笔 task = null的时候 走 task = getTask()
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //如果线程池状态大于等于STOP,那么意味着该线程也要中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //钩子方法。由开发者重写扩展
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //钩子方法。由开发者重写扩展
                    afterExecute(task, thrown);
                }
            } finally {
                //置空task,准备getTask获取下一个任务
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //将这个worker移除
        //运行状态小于STOP的情况下
        //allowCoreThreadTimeOut为false && 当前线程数小于核心线程数 新建一个worker
        processWorkerExit(w, completedAbruptly);
    }
}

getTask

private Runnable getTask() {
        boolean timedOut = false;

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 线程池被搞了
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 是否要回收线程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //keepAliveTime的作用
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

# 总结

  • 当前线程数 < 核心线程数:直接新建一个线程并执行任务
  • 当前线程数 >= 核心线程数 && 阻塞队列未满:将任务放入到阻塞队列
  • 核心线程数 <= 当前线程数 < 最大线程数 && 阻塞队列已满:新建线程并执行任务
  • 当前线程数 >= 最大线程数 && 阻塞队列已满:执行拒绝策略

--

当阻塞队列已经接收了任务,但此时所有线程被回收了,此时的任务将如何处理?

else if (workerCountOf(recheck) == 0)
    //这里为什么传null?
    addWorker(null, false);

新建一个线程去阻塞队列里获取任务并执行。

--

扩展

fixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

核心线程数=最大线程数。

阻塞队列默认的用LinkedBlockingQueue且容量是Integer.MAX_VALUE。

那么问题来了。只要阻塞队列不满,这个线程池就一直会接收任务。到达一定数量,还未到Integer.MAX_VALUE的时候机器肯定爆了。显然我们实际项目中不应该直接用fixedThreadPool

cachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

最大线程数是Integer.MAX_VALUE。只要有任务过来,不断的新建线程。

--

这两种默认的线程池说白了就是无限接受任务。所以我们实际项目中应该自己构造线程池来解决实际需求。

原文地址:https://www.cnblogs.com/chenshengyue/p/11558648.html

时间: 2024-10-12 11:11:30

线程池源码解析的相关文章

nginx线程池源码解析

周末看了nginx线程池部分的代码,顺手照抄了一遍,写成了自己的版本.实现上某些地方还是有差异的,不过基本结构全部摘抄. 在这里分享一下.如果你看懂了我的版本,也就证明你看懂了nginx的线程池. 本文只列出了关键数据结构和API,重在理解nginx线程池设计思路.完整代码在最后的链接里. 1.任务节点 typedef void (*CB_FUN)(void *); //任务结构体 typedef struct task { void *argv; //任务函数的参数(任务执行结束前,要保证参数

面试官:你分析过线程池源码吗?

线程池源码也是面试经常被提问到的点,我会将全局源码做一分析,然后告诉你面试考啥,怎么答. 为什么要用线程池? 简洁的答两点就行. 降低系统资源消耗. 提高线程可控性. 如何创建使用线程池? JDK8提供了五种创建线程池的方法: 1.创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待. 1 public static ExecutorService newFixedThreadPool(int nThreads) { 2 return new ThreadPoolExecutor(

手撕ThreadPoolExecutor线程池源码

这篇文章对ThreadPoolExecutor创建的线程池如何操作线程的生命周期通过源码的方式进行详细解析.通过对execute方法.addWorker方法.Worker类.runWorker方法.getTask方法.processWorkerExit从源码角度详细阐述,文末有彩蛋. exexcte方法 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); in

Java并发编程中线程池源码分析及使用

当Java处理高并发的时候,线程数量特别的多的时候,而且每个线程都是执行很短的时间就结束了,频繁创建线程和销毁线程需要占用很多系统的资源和时间,会降低系统的工作效率. 参考http://www.cnblogs.com/dolphin0520/p/3932921.html 由于原文作者使用的API 是1.6 版本的,参考他的文章,做了一些修改成 jdk 1.8版本的方法,涉及到的内容比较多,可能有少许错误. API : jdk1.8.0_144 ThreadPoolExecutor类 Java中线

java线程池源码解读

java线程池的顶级类是Executors 内置了几种线程池 1.newFixedThreadPool  并且重载了两个此方法  有固定线程数的线程池 当达到设置的线程数时 多余的任务会排队,当处理完一个马上就会去接着处理排队中的任务 源码如下 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit

java线程池源码的理解

线程池 所谓线程池,就是有一个池子,里面存放着已经创建好的线程,当有任务提交到线程池执行时,池子中的某个线程会主动执行该任务. 新建线程和切换线程的开销太大了,使用线程池可以节省系统资源. 线程池的关键类:ThreadPoolExecutor 主要流程 execute() –> addWorker() –>runWorker() -> getTask() 重要参数及变量 控制状态的变量 ctl: ctl是一个AtomicInteger原子操作类,能够保证线程安全. ctl变量定义如下:

java线程池源码的简单分析

工作中用过线程池来进行多线程的操作,但是也仅仅是停留在使用方面,没有深入研究,现在通过源码来仔细研究下java的线程池.关于线程池的优缺点就不研究了,直接通过一个源码来看看java中线程池的原理. 使用ThreadPoolExecutor来创建一个线程池 public class MultipleThread { public static void main(String[] args) { /** * 通过ThreadPoolExecutor来创建一个线程池 * * 我们创建的线程池 核心线

Java线程池源码阅读

简单介绍 线程池是池化技术的一种,对线程复用.资源回收.多任务执行有不错的实践.阅读源码,可以学习jdk的大师对于线程并发是怎么池化的,还有一些设计模式.同时,它也能给我们在使用它的时候多一种感知,出了什么问题可以马上意识到哪里的问题. 使用范例 我们使用一个线程池,直接通过jdk提供的工具类直接创建.使用如下api创建一个固定线程数的线程池. ExecutorService pool = Executors.newFixedThreadPool(5); 使用如下api创建一个会不断增长线程的线

分析线程池源码测试线程池

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 线程池测试类 */ public class TestThreadPool { public static void main(String[] args) { // 实例化线程池对象 corePoolSize--线程池