12.ThreadPoolExecutor线程池原理及其execute方法

jdk1.7.0_79 

  对于线程池大部分人可能会用,也知道为什么用。无非就是任务需要异步执行,再者就是线程需要统一管理起来。对于从线程池中获取线程,大部分人可能只知道,我现在需要一个线程来执行一个任务,那我就把任务丢到线程池里,线程池里有空闲的线程就执行,没有空闲的线程就等待。实际上对于线程池的执行原理远远不止这么简单。

  在Java并发包中提供了线程池类——ThreadPoolExecutor,实际上更多的我们可能用到的是Executors工厂类为我们提供的线程池:newFixedThreadPool、newSingleThreadPool、newCachedThreadPool,这三个线程池并不是ThreadPoolExecutor的子类,关于这几者之间的关系,我们先来查看ThreadPoolExecutor,查看源码发现其一共有4个构造方法。

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

  首先就从这几个参数开始来了解线程池ThreadPoolExecutor的执行原理。

  corePoolSize:核心线程池的线程数量

  maximumPoolSize:最大的线程池线程数量

  keepAliveTime:线程活动保持时间,线程池的工作线程空闲后,保持存活的时间。

  unit:线程活动保持时间的单位。

  workQueue:指定任务队列所使用的阻塞队列

  corePoolSize和maximumPoolSize都在指定线程池中的线程数量,好像平时用到线程池的时候最多就只需要传递一个线程池大小的参数就能创建一个线程池啊,Java为我们提供了一些常用的线程池类就是上面提到的newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool,当然如果我们想要自己发挥创建自定义的线程池就得自己来“配置”有关线程池的一些参数。

  当把一个任务交给线程池来处理的时候,线程池的执行原理如下图所示参考自《Java并发编程的艺术》

  ①首先会判断核心线程池里是否有线程可执行,有空闲线程则创建一个线程来执行任务。

  ②当核心线程池里已经没有线程可执行的时候,此时将任务丢到任务队列中去。

  ③如果任务队列(有界)也已经满了的话,但运行的线程数小于最大线程池的数量的时候,此时将会新建一个线程用于执行任务,但如果运行的线程数已经达到最大线程池的数量的时候,此时将无法创建线程执行任务。

  所以实际上对于线程池不仅是单纯地将任务丢到线程池,线程池中有线程就执行任务,没线程就等待。

  为巩固一下线程池的原理,现在再来了解上面提到的常用的3个线程池:

  Executors.newFixedThreadPool:创建一个固定数量线程的线程池。

// Executors#newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

  可以看到newFixedThreadPool中调用的是ThreadPoolExecutor类,传递的参数corePoolSize= maximumPoolSize=nThread。回顾线程池的执行原理,当一个任务提交到线程池中,首先判断核心线程池里有没有空闲线程,有则创建线程,没有则将任务放到任务队列(这里是有界阻塞队列LinkedBlockingQueue)中,如果任务队列已经满了的话,对于newFixedThreadPool来说,它的最大线程池数量=核心线程池数量,此时任务队列也满了,将不能扩展创建新的线程来执行任务。

  Executors.newSingleThreadExecutor:创建只包含一个线程的线程池。  

//Executors# newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegateExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}

  只有一个线程的线程池好像有点奇怪,并且并没有直接将返回ThreadPoolExecutor,甚至也没有直接将线程池数量1传递给newFixedThreadPool返回。那就说明这个只含有一个线程的线程池,或许并没有只包含一个线程那么简单。在其源码注释中这么写到:创建只有一个工作线程的线程池用于操作一个无界队列(如果由于前驱节点的执行被终止结束了,一个新的线程将会继续执行后继节点线程)任务得以继续执行,不同于newFixedThreadPool(1)不会有额外的线程来重新继续执行后继节点。也就是说newSingleThreadExecutor自始至终都只有一个线程在执行,这和newFixedThreadPool一样,但如果线程终止结束过后newSingleThreadExecutor则会重新创建一个新的线程来继续执行任务队列中的线程,而newFixedThreaPool则不会。

  Executors.newCachedThreadPool:根据需要创建新线程的线程池。

//Executors#newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
  return new ThreadPooExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

  可以看到newCachedThread返回的是ThreadPoolExecutor,其参数核心线程池corePoolSize = 0, maximumPoolSize = Integer.MAX_VALUE,这也就是说当任务被提交到newCachedThread线程池时,将会直接把任务放到SynchronousQueue任务队列中,maximumPool从任务队列中获取任务。注意SynchronousQueue是一个没有容量的队列,也就是说每个入队操作必须等待另一个线程的对应出队操作,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,newCachedThreadPool会不断创建线程,线程多并不是一件好事,严重会耗尽CPU和内存资源。

  题外话:newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool,这三者都直接或间接调用了ThreadPoolExecutor,为什么它们三者没有直接是其子类,而是通过Executors来实例化呢?这是所采用的静态工厂方法java.util.Connections接口中同样也是采用的静态工厂方法来创建相关的类这样有很多好处,静态工厂方法是用来产生对象的,产生什么对象没关系,只要返回原返回类型或原返回类型的子类型都可以降低API数目和使用难度,《Effective Java》中的第1条就是静态工厂方法。



  回到ThreadPoolExecutor,首先来看它的继承关系:

  ThreadPoolExecutor它的顶级父类是Executor接口,只包含了一个方法——execute,这个方法也就是线程池的“执行”。

//Executor#execute
public interface Executor {
    void execute(Runnable command);
}

  Executor#execute的实现则是在ThreadPoolExecutor中实现的:

//ThreadPoolExecutor#execute
public void execute(Runnable command) {
  if (command == null)
        throw new NullPointerException();
  int c = ctl.get();
    …
}

  一来就碰到个不知所云的ctl变量它的定义:

private final AtomicInteger ctl = new AtlmicInteger(ctlOf(RUNNING, 0));

  这个变量使用来干嘛的呢?它的作用有点类似我们在《7.ReadWriteLock接口及其实现ReentrantReadWriteLock中提到的读写锁有读、写两个同步状态,而AQS则只提供了state一个int型变量,此时将state高16位表示为读状态,低16位表示为写状态。这里的clt同样也是,它表示了两个概念:

  1. workerCount:当前有效的线程数
  2. runState:当前线程池的五种状态,Running、Shutdown、Stop、Tidying、Terminate。

  int型变量一共有32位,线程池的五种状态runState至少需要3位来表示,故workCount只能有29位,所以代码中规定线程池的有效线程数最多为229-1。

//ThreadPoolExecutor
private static final int COUNT_BITS = Integer.SIZE – 3;     //32-3=29,线程数量所占位数
private static final int CAPACITY = (1 << COUNT_BITS) – 1;    //低29位表示最大线程数,229-1
//五种线程池状态
private static final int RUNNING = -1 << COUNT_BITS;    /int型变量高3位(含符号位)101表RUNING
private static final int SHUTDOWN = 0 << COUNT_BITS;    //高3位000
private static final int STOP = 1 << COUNT_BITS;    //高3位001
private static final int TIDYING = 2 << COUNT_BITS;    //高3位010
private static final int TERMINATED = 3 << COUNT_BITS;    //高3位011

  再次回到ThreadPoolExecutor#execute方法:

 1 //ThreadPoolExecutor#execute
 2 public void execute(Runnable command) {
 3     if (command == null)
 4         throw new NullPointerException();
 5    int c = ctl.get();    //由它可以获取到当前有效的线程数和线程池的状态
 6 /*1.获取当前正在运行线程数是否小于核心线程池,是则新创建一个线程执行任务,否则将任务放到任务队列中*/
 7     if (workerCountOf(c) < corePoolSize){
 8         if (addWorker(command, tre))     //在addWorker中创建工作线程执行任务
 9             return ;
10         c = ctl.get();
11     }
12 /*2.当前核心线程池中全部线程都在运行workerCountOf(c) >= corePoolSize,所以此时将线程放到任务队列中*/
13     if (isRunning(c) && workQueue.offer(command))    {    //线程池是否处于运行状态,且是否任务插入任务队列成功
14         int recheck = ctl.get();
15      if (!isRunning(recheck) && remove(command))        //线程池是否处于运行状态,如果不是则使刚刚的任务出队
16        reject(command);    //抛出RejectedExceptionException异常
17      else if (workerCountOf(recheck) == 0)
18        addWorker(null, false);
19   }
20 /*3.插入队列不成功,且当前线程数数量小于最大线程池数量,此时则创建新线程执行任务,创建失败抛出异常*/
21   else if (!addWorker(command, false)){
22     reject(command);    //抛出RejectedExceptionException异常
23   }
24 }

  上面代码注释第7行的即判断当前核心线程池里是否有空闲线程,有则通过addWorker方法创建工作线程执行任务。addWorker方法较长,筛选出重要的代码来解析。

 1 //ThreadPoolExecutor#addWorker
 2 private boolean addWorker(Runnable firstTask, boolean core) {
 3 /*首先会再次检查线程池是否处于运行状态,核心线程池中是否还有空闲线程,都满足条件过后则会调用compareAndIncrementWorkerCount先将正在运行的线程数+1,数量自增成功则跳出循环,自增失败则继续从头继续循环*/
 4   ...
 5   if (compareAndIncrementWorkerCount(c))
 6     break retry;
 7   ...
 8 /*正在运行的线程数自增成功后则将线程封装成工作线程Worker*/
 9   boolean workerStarted = false;
10   boolean workerAdded = false;
11   Worker w = null;
12   try {
13     final ReentrantLock mainLock = this.mainLock;        //全局锁
14     w = new Woker(firstTask);        //将线程封装为Worker工作线程
15     final Thread t = w.thread;
16     if (t != null) {
17       mainLock.lock();    //获取全局锁
18 /*当持有了全局锁的时候,还需要再次检查线程池的运行状态等*/
19       try {
20         int c = clt.get();
21         int rs = runStateOf(c);        //线程池运行状态
22         if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)){        //线程池处于运行状态,或者线程池关闭且任务线程为空
23           if (t.isAlive())    //线程处于活跃状态,即线程已经开始执行或者还未死亡,正确的应线程在这里应该是还未开始执行的
24             throw new IllegalThreadStateException();
25           workers.add(w);    //private final HashSet<Worker> wokers = new HashSet<Worker>();包含线程池中所有的工作线程,只有在获取了全局的时候才能访问它。将新构造的工作线程加入到工作线程集合中
26           int s = worker.size();    //工作线程数量
27           if (s > largestPoolSize)
28             largestPoolSize = s;
29           workerAdded = true;    //新构造的工作线程加入成功
30         }
31       } finally {
32         mainLock.unlock();
33       }
34       if (workerAdded) {
35         t.start();    //在被构造为Worker工作线程,且被加入到工作线程集合中后,执行线程任务,注意这里的start实际上执行Worker中run方法,所以接下来分析Worker的run方法
36         workerStarted = true;
37       }
38     }
39   } finally {
40     if (!workerStarted)    //未能成功创建执行工作线程
41       addWorkerFailed(w);    //在启动工作线程失败后,将工作线程从集合中移除
42   }
43   return workerStarted;
44 }

  在上面第35代码中,工作线程被成功添加到工作线程集合中后,则开始start执行,这里start执行的是Worker工作线程中的run方法。

//ThreadPoolExecutor$Worker,它继承了AQS,同时实现了Runnable,所以它具备了这两者的所有特性
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
  final Thread thread;
  Runnable firstTask;
  public Worker(Runnable firstTask) {
    setState(-1);    //设置AQS的同步状态为-1,禁止中断,直到调用runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);    //通过线程工厂来创建一个线程,将自身作为Runnable传递传递
  }
  public void run() {
    runWorker(this);    //运行工作线程
  }
}

  ThreadPoolExecutor#runWorker,在此方法中,Worker在执行完任务后,还会循环获取任务队列里的任务执行(其中的getTask方法),也就是说Worker不仅仅是在执行完给它的任务就释放或者结束,它不会闲着,而是继续从任务队列中获取任务,直到任务队列中没有任务可执行时,它才退出循环完成任务。理解了以上的源码过后,往后线程池执行原理的第二步、第三步的理解实则水到渠成。

时间: 2024-10-26 17:07:34

12.ThreadPoolExecutor线程池原理及其execute方法的相关文章

ThreadPoolExecutor线程池原理

参考: https://www.cnblogs.com/liuzhihu/p/8177371.html 原文地址:https://www.cnblogs.com/kancy/p/10432131.html

13.ThreadPoolExecutor线程池之submit方法

jdk1.7.0_79  在上一篇<ThreadPoolExecutor线程池原理及其execute方法>中提到了线程池ThreadPoolExecutor的原理以及它的execute方法.本文解析ThreadPoolExecutor#submit. 对于一个任务的执行有时我们不需要它返回结果,但是有我们需要它的返回执行结果.对于线程来讲,如果不需要它返回结果则实现Runnable,而如果需要执行结果的话则可以实现Callable.在线程池同样execute提供一个不需要返回结果的任务执行,而

Java多线程系列--“JUC线程池”02之 线程池原理(一)

ThreadPoolExecutor简介 ThreadPoolExecutor是线程池类.对于线程池,可以通俗的将它理解为"存放一定数量线程的一个线程集合.线程池允许同时运行的线程数量就是线程池的容量:当添加到线程池中的线程超过它的容量时,会有一部分线程阻塞等待.线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理." ThreadPoolExecutor数据结构 ThreadPoolExecutor的数据结构如下图所示: 各个数据在ThreadPoolExecutor

Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理

相关文章目录: Java线程池ThreadPoolExecutor使用和分析(一) Java线程池ThreadPoolExecutor使用和分析(二) - execute()原理 Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理 以下是本文的目录大纲: 一.shutdown()  --  温柔的终止线程池 interruptIdleWorkers()  --  中断空闲worker tryTerminate()  --  尝试终止线程池 二.shutdown

ThreadPoolExecutor线程池的分析和使用

1. 引言 合理利用线程池能够带来三个好处. 第一:降低资源消耗.通过重复利用已创建的线程降低线程创建和销毁造成的消耗. 第二:提高响应速度.当任务到达时,任务可以不需要等到线程创建就能立即执行. 第三:提高线程的可管理性.线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控. 但是要做到合理的利用线程池,必须对其原理了如指掌. 2. 线程池的使用 线程池的创建 我们可以通过java.util.concurrent.ThreadPo

Executor线程池原理详解

线程池 线程池的目的就是减少多线程创建的开销,减少资源的消耗,让系统更加的稳定.在web开发中,服务器会为了一个请求分配一个线程来处理,如果每次请求都创建一个线程,请求结束就销毁这个线程.那么在高并发的情况下,就会有大量线程创建和销毁,这就会降低系统的效率.线程池的诞生就是为了让线程得到重复使用,减少了线程创建和销毁的开销,减少了线程的创建和销毁自然的就提高了系统的响应速度,与此同时还提高了线程的管理性,使线程可以得到统一的分配,监控和调优. 线程创建和销毁为什么会有开销呢,因为我们java运行

java多线程系类:JUC线程池:03之线程池原理(二)(转)

概要 在前面一章"Java多线程系列--"JUC线程池"02之 线程池原理(一)"中介绍了线程池的数据结构,本章会通过分析线程池的源码,对线程池进行说明.内容包括:线程池示例参考代码(基于JDK1.7.0_40)线程池源码分析(一) 创建"线程池"(二) 添加任务到"线程池"(三) 关闭"线程池" 转载请注明出处:http://www.cnblogs.com/skywang12345/p/3509954.h

线程池原理

在面向对象编程中,对象创建和销毁是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是对一些很耗资源的对象创建和销毁.如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池就是遵循这一思想而产生的,下面将介绍的线程池技术同样符合这一思想. 多线程技术主要解决处

Java源码解析 - ThreadPoolExecutor 线程池

1 线程池的好处 线程使应用能够更加充分合理地协调利用CPU.内存.网络.I/O等系统资源.线程的创建需要开辟虚拟机栈.本地方法栈.程序计数器等线程私有的内存空间;在线程销毁时需要回收这些系统资源.频繁地创建和销毁线程会浪费大量的系统资源,增加并发编程风险. 在服务器负载过大的时候,如何让新的线程等待或者友好地拒绝服务? 这些都是线程自身无法解决的;所以需要通过线程池协调多个线程,并实现类似主次线程隔离.定时执行.周期执行等任务. 线程池的作用包括:●利用线程池管理并复用线程.控制最大并发数等●