面试官:你分析过线程池源码吗?

线程池源码也是面试经常被提问到的点,我会将全局源码做一分析,然后告诉你面试考啥,怎么答。

为什么要用线程池?

简洁的答两点就行。

  1. 降低系统资源消耗。
  2. 提高线程可控性。

如何创建使用线程池?

JDK8提供了五种创建线程池的方法:

1.创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

1 public static ExecutorService newFixedThreadPool(int nThreads) {
2     return new ThreadPoolExecutor(nThreads, nThreads,
3                                   0L, TimeUnit.MILLISECONDS,
4                                   new LinkedBlockingQueue<Runnable>());
5 }

2.(JDK8新增)会根据所需的并发数来动态创建和关闭线程。能够合理的使用CPU进行对任务进行并发操作,所以适合使用在很耗时的任务。

注意返回的是ForkJoinPool对象。

1 public static ExecutorService newWorkStealingPool(int parallelism) {
2     return new ForkJoinPool
3         (parallelism,
4          ForkJoinPool.defaultForkJoinWorkerThreadFactory,
5          null, true);
6 }

什么是ForkJoinPool:

 1 public ForkJoinPool(int parallelism,
 2                         ForkJoinWorkerThreadFactory factory,
 3                         UncaughtExceptionHandler handler,
 4                         boolean asyncMode) {
 5         this(checkParallelism(parallelism),
 6              checkFactory(factory),
 7              handler,
 8              asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
 9              "ForkJoinPool-" + nextPoolId() + "-worker-");
10         checkPermission();
11     }

使用一个无限队列来保存需要执行的任务,可以传入线程的数量;不传入,则默认使用当前计算机中可用的cpu数量;使用分治法来解决问题,使用fork()和join()来进行调用。

3.创建一个可缓存的线程池,可灵活回收空闲线程,若无可回收,则新建线程。

1 public static ExecutorService newCachedThreadPool() {
2     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3                                   60L, TimeUnit.SECONDS,
4                                   new SynchronousQueue<Runnable>());
5 }

4.创建一个单线程的线程池。

1 public static ExecutorService newSingleThreadExecutor() {
2     return new FinalizableDelegatedExecutorService
3         (new ThreadPoolExecutor(1, 1,
4                                 0L, TimeUnit.MILLISECONDS,
5                                 new LinkedBlockingQueue<Runnable>()));
6 }

5.创建一个定长线程池,支持定时及周期性任务执行。

1 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
2     return new ScheduledThreadPoolExecutor(corePoolSize);
3 }

上层源码结构分析

Executor结构:

Executor

一个运行新任务的简单接口

1 public interface Executor { 2 3 void execute(Runnable command); 4 5 }

ExecutorService

扩展了Executor接口。添加了一些用来管理执行器生命周期和任务生命周期的方法

AbstractExecutorService

对ExecutorService接口的抽象类实现。不是我们分析的重点。

ThreadPoolExecutor

Java线程池的核心实现。

ThreadPoolExecutor源码分析

属性解释

 1 // AtomicInteger是原子类  ctlOf()返回值为RUNNING;
 2 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 3 // 高3位表示线程状态
 4 private static final int COUNT_BITS = Integer.SIZE - 3;
 5 // 低29位表示workerCount容量
 6 private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 7
 8 // runState is stored in the high-order bits
 9 // 能接收任务且能处理阻塞队列中的任务
10 private static final int RUNNING    = -1 << COUNT_BITS;
11 // 不能接收新任务,但可以处理队列中的任务。
12 private static final int SHUTDOWN   =  0 << COUNT_BITS;
13 // 不接收新任务,不处理队列任务。
14 private static final int STOP       =  1 << COUNT_BITS;
15 // 所有任务都终止
16 private static final int TIDYING    =  2 << COUNT_BITS;
17 // 什么都不做
18 private static final int TERMINATED =  3 << COUNT_BITS;
19
20 // 存放任务的阻塞队列
21 private final BlockingQueue<Runnable> workQueue;

值的注意的是状态值越大线程越不活跃。

线程池状态的转换模型:

构造器

 1 public ThreadPoolExecutor(int corePoolSize,//线程池初始启动时线程的数量
 2                           int maximumPoolSize,//最大线程数量
 3                           long keepAliveTime,//空闲线程多久关闭?
 4                           TimeUnit unit,// 计时单位
 5                           BlockingQueue<Runnable> workQueue,//放任务的阻塞队列
 6                           ThreadFactory threadFactory,//线程工厂
 7                           RejectedExecutionHandler handler// 拒绝策略) {
 8     if (corePoolSize < 0 ||
 9         maximumPoolSize <= 0 ||
10         maximumPoolSize < corePoolSize ||
11         keepAliveTime < 0)
12         throw new IllegalArgumentException();
13     if (workQueue == null || threadFactory == null || handler == null)
14         throw new NullPointerException();
15     this.acc = System.getSecurityManager() == null ?
16             null :
17             AccessController.getContext();
18     this.corePoolSize = corePoolSize;
19     this.maximumPoolSize = maximumPoolSize;
20     this.workQueue = workQueue;
21     this.keepAliveTime = unit.toNanos(keepAliveTime);
22     this.threadFactory = threadFactory;
23     this.handler = handler;
24 }

在向线程池提交任务时,会通过两个方法:execute和submit。

本文着重讲解execute方法。submit方法放在下次和Future、Callable一起分析。

execute方法:

 1 public void execute(Runnable command) {
 2     if (command == null)
 3         throw new NullPointerException();
 4     // clt记录着runState和workerCount
 5     int c = ctl.get();
 6     //workerCountOf方法取出低29位的值,表示当前活动的线程数
 7     //然后拿线程数和 核心线程数做比较
 8     if (workerCountOf(c) < corePoolSize) {
 9         // 如果活动线程数<核心线程数
10         // 添加到
11         //addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断
12         if (addWorker(command, true))
13             // 如果成功则返回
14             return;
15         // 如果失败则重新获取 runState和 workerCount
16         c = ctl.get();
17     }
18     // 如果当前线程池是运行状态并且任务添加到队列成功
19     if (isRunning(c) && workQueue.offer(command)) {
20         // 重新获取 runState和 workerCount
21         int recheck = ctl.get();
22         // 如果不是运行状态并且
23         if (! isRunning(recheck) && remove(command))
24             reject(command);
25         else if (workerCountOf(recheck) == 0)
26             //第一个参数为null,表示在线程池中创建一个线程,但不去启动
27             // 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize
28             addWorker(null, false);
29     }
30     //再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize
31     else if (!addWorker(command, false))
32         //如果失败则拒绝该任务
33         reject(command);
34 }

总结一下它的工作流程:

  1. workerCount &lt; corePoolSize,创建线程执行任务。
  2. workerCount &gt;= corePoolSize&&阻塞队列workQueue未满,把新的任务放入阻塞队列。
  3. workQueue已满,并且workerCount &gt;= corePoolSize,并且workerCount &lt; maximumPoolSize,创建线程执行任务。
  4. 当workQueue已满,workerCount &gt;= maximumPoolSize,采取拒绝策略,默认拒绝策略是直接抛异常。

通过上面的execute方法可以看到,最主要的逻辑还是在addWorker方法中实现的,那我们就看下这个方法:

addWorker方法

主要工作是在线程池中创建一个新的线程并执行

参数定义:

  • firstTask the task the new thread should run first (or null if none). (指定新增线程执行的第一个任务或者不执行任务)
  • core if true use corePoolSize as bound, else maximumPoolSize.(core如果为true则使用corePoolSize绑定,否则为maximumPoolSize。 (此处使用布尔指示符而不是值,以确保在检查其他状态后读取新值)。)
 1 private boolean addWorker(Runnable firstTask, boolean core) {
 2     retry:
 3     for (;;) {
 4
 5         int c = ctl.get();
 6         //  获取运行状态
 7         int rs = runStateOf(c);
 8
 9         // Check if queue empty only if necessary.
10         // 如果状态值 >= SHUTDOWN (不接新任务&不处理队列任务)
11         // 并且 如果 !(rs为SHUTDOWN 且 firsTask为空 且 阻塞队列不为空)
12         if (rs >= SHUTDOWN &&
13             ! (rs == SHUTDOWN &&
14                firstTask == null &&
15                ! workQueue.isEmpty()))
16             // 返回false
17             return false;
18
19         for (;;) {
20             //获取线程数wc
21             int wc = workerCountOf(c);
22             // 如果wc大与容量 || core如果为true表示根据corePoolSize来比较,否则为maximumPoolSize
23             if (wc >= CAPACITY ||
24                 wc >= (core ? corePoolSize : maximumPoolSize))
25                 return false;
26             // 增加workerCount(原子操作)
27             if (compareAndIncrementWorkerCount(c))
28                 // 如果增加成功,则跳出
29                 break retry;
30             // wc增加失败,则再次获取runState
31             c = ctl.get();  // Re-read ctl
32             // 如果当前的运行状态不等于rs,说明状态已被改变,返回重新执行
33             if (runStateOf(c) != rs)
34                 continue retry;
35             // else CAS failed due to workerCount change; retry inner loop
36         }
37     }
38
39     boolean workerStarted = false;
40     boolean workerAdded = false;
41     Worker w = null;
42     try {
43         // 根据firstTask来创建Worker对象
44         w = new Worker(firstTask);
45         // 根据worker创建一个线程
46         final Thread t = w.thread;
47         if (t != null) {
48             // new一个锁
49             final ReentrantLock mainLock = this.mainLock;
50             // 加锁
51             mainLock.lock();
52             try {
53                 // Recheck while holding lock.
54                 // Back out on ThreadFactory failure or if
55                 // shut down before lock acquired.
56                 // 获取runState
57                 int rs = runStateOf(ctl.get());
58                 // 如果rs小于SHUTDOWN(处于运行)或者(rs=SHUTDOWN && firstTask == null)
59                 // firstTask == null证明只新建线程而不执行任务
60                 if (rs < SHUTDOWN ||
61                     (rs == SHUTDOWN && firstTask == null)) {
62                     // 如果t活着就抛异常
63                     if (t.isAlive()) // precheck that t is startable
64                         throw new IllegalThreadStateException();
65                     // 否则加入worker(HashSet)
66                     //workers包含池中的所有工作线程。仅在持有mainLock时访问。
67                     workers.add(w);
68                     // 获取工作线程数量
69                     int s = workers.size();
70                     //largestPoolSize记录着线程池中出现过的最大线程数量
71                     if (s > largestPoolSize)
72                         // 如果 s比它还要大,则将s赋值给它
73                         largestPoolSize = s;
74                     // worker的添加工作状态改为true
75                     workerAdded = true;
76                 }
77             } finally {
78                 mainLock.unlock();
79             }
80             // 如果worker的添加工作完成
81             if (workerAdded) {
82                 // 启动线程
83                 t.start();
84                 // 修改线程启动状态
85                 workerStarted = true;
86             }
87         }
88     } finally {
89         if (! workerStarted)
90             addWorkerFailed(w);
91     }
92     // 返回线启动状态
93     return workerStarted;
为什么需要持有mainLock?

因为workers是HashSet类型的,不能保证线程安全。

w = new Worker(firstTask);如何理解呢

Worker.java

1 private final class Worker
2     extends AbstractQueuedSynchronizer
3     implements Runnable

可以看到它继承了AQS并发框架还实现了Runnable。证明它还是一个线程任务类。那我们调用t.start()事实上就是调用了该类重写的run方法。

Worker为什么不使用ReentrantLock来实现呢?

tryAcquire方法它是不允许重入的,而ReentrantLock是允许重入的。对于线程来说,如果线程正在执行是不允许其它锁重入进来的。

线程只需要两个状态,一个是独占锁,表明正在执行任务;一个是不加锁,表明是空闲状态。

1 public void run() { 2 runWorker(this); 3 }

run方法又调用了runWorker方法:

 1 final void runWorker(Worker w) {
 2     // 拿到当前线程
 3     Thread wt = Thread.currentThread();
 4     // 拿到当前任务
 5     Runnable task = w.firstTask;
 6     // 将Worker.firstTask置空 并且释放锁
 7     w.firstTask = null;
 8     w.unlock(); // allow interrupts
 9     boolean completedAbruptly = true;
10     try {
11         // 如果task或者getTask不为空,则一直循环
12         while (task != null || (task = getTask()) != null) {
13             // 加锁
14             w.lock();
15             // If pool is stopping, ensure thread is interrupted;
16             // if not, ensure thread is not interrupted.  This
17             // requires a recheck in second case to deal with
18             // shutdownNow race while clearing interrupt
19             //  return ctl.get() >= stop
20             // 如果线程池状态>=STOP 或者 (线程中断且线程池状态>=STOP)且当前线程没有中断
21             // 其实就是保证两点:
22             // 1. 线程池没有停止
23             // 2. 保证线程没有中断
24             if ((runStateAtLeast(ctl.get(), STOP) ||
25                  (Thread.interrupted() &&
26                   runStateAtLeast(ctl.get(), STOP))) &&
27                 !wt.isInterrupted())
28                 // 中断当前线程
29                 wt.interrupt();
30             try {
31                 // 空方法
32                 beforeExecute(wt, task);
33                 Throwable thrown = null;
34                 try {
35                     // 执行run方法(Runable对象)
36                     task.run();
37                 } catch (RuntimeException x) {
38                     thrown = x; throw x;
39                 } catch (Error x) {
40                     thrown = x; throw x;
41                 } catch (Throwable x) {
42                     thrown = x; throw new Error(x);
43                 } finally {
44                     afterExecute(task, thrown);
45                 }
46             } finally {
47                 // 执行完后, 将task置空, 完成任务++, 释放锁
48                 task = null;
49                 w.completedTasks++;
50                 w.unlock();
51             }
52         }
53         completedAbruptly = false;
54     } finally {
55         // 退出工作
56         processWorkerExit(w, completedAbruptly);
57     }

总结一下runWorker方法的执行过程:

  1. while循环中,不断地通过getTask()方法从workerQueue中获取任务
  2. 如果线程池正在停止,则中断线程。否则调用3.
  3. 调用task.run()执行任务;
  4. 如果task为null则跳出循环,执行processWorkerExit()方法,销毁线程workers.remove(w);

这个流程图非常经典:

除此之外,ThreadPoolExector还提供了tryAcquiretryReleaseshutdownshutdownNowtryTerminate、等涉及的一系列线程状态更改的方法有兴趣可以自己研究。大体思路是一样的,这里不做介绍。

Worker为什么不使用ReentrantLock来实现呢?

tryAcquire方法它是不允许重入的,而ReentrantLock是允许重入的。对于线程来说,如果线程正在执行是不允许其它锁重入进来的。

线程只需要两个状态,一个是独占锁,表明正在执行任务;一个是不加锁,表明是空闲状态。

在runWorker方法中,为什么要在执行任务的时候对每个工作线程都加锁呢?

shutdown方法与getTask方法存在竞态条件.(这里不做深入,建议自己深入研究,对它比较熟悉的面试官一般会问)

高频考点

  1. 创建线程池的五个方法。
  2. 线程池的五个状态
  3. execute执行过程。
  4. runWorker执行过程。(把两个流程图记下,理解后说个大该就行。)
  5. 比较深入的问题就是我在文中插入的问题。
  6. …期望大家能在评论区补充。

声明:图片来源于网络,侵删。

原文地址:https://www.cnblogs.com/javazhiyin/p/10605511.html

时间: 2024-10-09 04:47:09

面试官:你分析过线程池源码吗?的相关文章

java线程池源码的简单分析

工作中用过线程池来进行多线程的操作,但是也仅仅是停留在使用方面,没有深入研究,现在通过源码来仔细研究下java的线程池.关于线程池的优缺点就不研究了,直接通过一个源码来看看java中线程池的原理. 使用ThreadPoolExecutor来创建一个线程池 public class MultipleThread { public static void main(String[] args) { /** * 通过ThreadPoolExecutor来创建一个线程池 * * 我们创建的线程池 核心线

Java并发编程中线程池源码分析及使用

当Java处理高并发的时候,线程数量特别的多的时候,而且每个线程都是执行很短的时间就结束了,频繁创建线程和销毁线程需要占用很多系统的资源和时间,会降低系统的工作效率. 参考http://www.cnblogs.com/dolphin0520/p/3932921.html 由于原文作者使用的API 是1.6 版本的,参考他的文章,做了一些修改成 jdk 1.8版本的方法,涉及到的内容比较多,可能有少许错误. API : jdk1.8.0_144 ThreadPoolExecutor类 Java中线

分析线程池源码测试线程池

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 线程池测试类 */ public class TestThreadPool { public static void main(String[] args) { // 实例化线程池对象 corePoolSize--线程池

java线程池源码解读

java线程池的顶级类是Executors 内置了几种线程池 1.newFixedThreadPool  并且重载了两个此方法  有固定线程数的线程池 当达到设置的线程数时 多余的任务会排队,当处理完一个马上就会去接着处理排队中的任务 源码如下 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit

手撕ThreadPoolExecutor线程池源码

这篇文章对ThreadPoolExecutor创建的线程池如何操作线程的生命周期通过源码的方式进行详细解析.通过对execute方法.addWorker方法.Worker类.runWorker方法.getTask方法.processWorkerExit从源码角度详细阐述,文末有彩蛋. exexcte方法 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); in

线程池源码解析

ThreadPoolExecutor的几个重要属性 BlockingQueue workQueue 阻塞队列.存放将要执行的任务 HashSet workers 当前线程池的线程集合.下文会重点介绍Worker这个内部类 corePoolSize 核心线程数 maximumPoolSize 最大线程数 keepAliveTime 非核心线程保持空闲的最长时间 allowCoreThreadTimeOut 核心线程是否被回收.默认是不回收核心线程的 RejectedExecutionHandler

nginx线程池源码解析

周末看了nginx线程池部分的代码,顺手照抄了一遍,写成了自己的版本.实现上某些地方还是有差异的,不过基本结构全部摘抄. 在这里分享一下.如果你看懂了我的版本,也就证明你看懂了nginx的线程池. 本文只列出了关键数据结构和API,重在理解nginx线程池设计思路.完整代码在最后的链接里. 1.任务节点 typedef void (*CB_FUN)(void *); //任务结构体 typedef struct task { void *argv; //任务函数的参数(任务执行结束前,要保证参数

Java线程池源码阅读

简单介绍 线程池是池化技术的一种,对线程复用.资源回收.多任务执行有不错的实践.阅读源码,可以学习jdk的大师对于线程并发是怎么池化的,还有一些设计模式.同时,它也能给我们在使用它的时候多一种感知,出了什么问题可以马上意识到哪里的问题. 使用范例 我们使用一个线程池,直接通过jdk提供的工具类直接创建.使用如下api创建一个固定线程数的线程池. ExecutorService pool = Executors.newFixedThreadPool(5); 使用如下api创建一个会不断增长线程的线

java线程池源码的理解

线程池 所谓线程池,就是有一个池子,里面存放着已经创建好的线程,当有任务提交到线程池执行时,池子中的某个线程会主动执行该任务. 新建线程和切换线程的开销太大了,使用线程池可以节省系统资源. 线程池的关键类:ThreadPoolExecutor 主要流程 execute() –> addWorker() –>runWorker() -> getTask() 重要参数及变量 控制状态的变量 ctl: ctl是一个AtomicInteger原子操作类,能够保证线程安全. ctl变量定义如下: