线程池的使用及ThreadPoolExecutor的execute和addWorker源码分析

说明:本作者是文章的原创作者,转载请注明出处:本文地址:http://www.cnblogs.com/qm-article/p/7821602.html

一、线程池的介绍

在开发中,频繁的创建和销毁一个线程,是很耗资源的,为此找出了一个可以循环利用已经存在的线程来达到自己的目的,线程池顾名思义,也就是线程池的集合,通过线程池执行的线程任务,可以很有效的去规划线程的使用。
在java中大致有这几种线程池
      newScheduledThreadPool  创建一个定长线程池,支持定时及周期性任务执行。,可以作一个定时器使用。
      newCachedThreadPool       创建一个可缓存线程池,如果线程池长度超过需要的线程数量,可灵活回收空闲线程,若无可回收,则新建线程。
      newSingleThreadExecutor 创建一个单线程化的线程池, 它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行,可以控制线程的执行顺序
      newFixedThreadPool          创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待,当创建的线程池数量为1的时候。也类似于单线程化的线程池,当为1的时候,也可控制线程的执行顺序

二、线程池的使用

1、newScheduledThreadPool

 1    /**
 2      * 测试newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
 3      * 一般可做定时器使用
 4      */
 5     public static void test_1(){
 6         //参数是线程的数量
 7         ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
 8         /**
 9          * 第二个参数,是首次执行该线程的延迟时间,之后失效
10          * 第三个参数是,首次执行完之后,再过该段时间再次执行该线程,具有周期性
11          */
12         scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
13
14             @Override
15             public void run() {
16                 System.out.println(new Date().getSeconds());
17
18             }
19         }, 10, 3, TimeUnit.SECONDS);
20
21     }

2、newCachedThreadPool      

 1     /**
 2      * newCachedThreadPool创建一个可缓存线程池,
 3      * 如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
 4      */
 5     public static void test_2(){
 6         ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
 7         for (int i = 0; i < 10; i++) {
 8                final int index = i;
 9                try {
10                 Thread.sleep(index * 1000);
11                } catch (InterruptedException e) {
12                 e.printStackTrace();
13                }
14             cachedThreadPool.execute(new Runnable() {
15
16                 @Override
17                 public void run() {
18                     // TODO Auto-generated method stub
19                     System.out.println(index+":"+new Date().getSeconds());
20                 }
21             });
22         }
23     }

3、newSingleThreadExecutor

 1     /**
 2      * newSingleThreadExecutor 创建一个单线程化的线程池,
 3      * 它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
 4      */
 5     public static void test_4(){
 6         ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
 7         for(int i = 1; i < 11; i++){
 8             final int index = i;
 9             singleThreadExecutor.execute(new Runnable() {
10                 @Override
11                 public void run() {
12                     // TODO Auto-generated method stub
13                     //会按顺序打印
14                     System.out.println(index);
15                 }
16             });
17         }
18     }

4、newFixedThreadPool

 1     /**
 2      * newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
 3      */
 4     public static void test_3(){
 5         //当参数为1的时候,可以控制线程的执行顺序,类似join的作用
 6         ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
 7         for(int i = 0; i < 2; i++){
 8             final int index = i;
 9             fixedThreadPool.execute(new Runnable() {
10
11                 @Override
12                 public void run() {
13                     // TODO Auto-generated method stub
14                     try {
15                         System.out.println(index);
16                     } catch (Exception e) {
17                         // TODO Auto-generated catch block
18                         e.printStackTrace();
19                     }
20                 }
21             });
22         }
23     }

三、线程池源码分析

以上四种线程都是由一个线程工具类Executors来创造的

如上图,其中newFixedThreadPool 和newCachedThreadPool 都是由threadPoolExecutor来创建的,只是参数不一致而已,
关于threadPoolExector的构造器的参数

corePoolSize 代表该线程中允许的核心线程数,要和工作的线程数量区分开来,两者不
                      等价(工作的线程数量一定不大于corePoolSize,即当超过后,会将线程
                      放入队列中),可以理解为一个ArrayList集合中,默认空间是10,但存放的
                     元素的数量 不一定是10, 在这里这个10就寓指corePoolSize ,存放元
                     素的个数是工作线程数量
maximumPoolSize 这个参数的意思就是该线程池所允许的最大线程数量
keepAliveTime 这个参数的意思就是空余线程的存活时间,注意这个值并不会对所有线程起作用,如果线程池中的线程数少于等于核心线程数 corePoolSize,那么这些线程不会因                           为空闲太长时间而被关闭,当然,也可以通过调用allowCoreThreadTimeOut方法使核心线程数内的线程也可以被回收。

unit 时间单位
workQueue 阻塞队列,在此作用就是用来存放线程。
threadFactory 线程工厂
defaultHandler 拒绝策略,即当加入线程失败,采用该handler来处理

3.1、线程池的拒绝策略

AbortPolicy
        为java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常
DiscardPolicy
        直接抛弃,任务不执行,空方法
DiscardOldestPolicy
        从队列里面抛弃head的一个任务,并再次execute 此task。
CallerRunsPolicy
        在调用execute的线程里面执行此command,会阻塞入口

在分析该类的execute方法前,先看这几个常量的值和一些方法的作用

 1    /*
 2     *  ctl的默认值为-536870912,
 3     *  作用是将该值传入workerCountOf(int c)的参数c中,
 4     *  则可以返回正在工作的线程数量
 5     *  每当有一个线程加入工作,该值会加1
 6     */
 7     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 8     private static final int COUNT_BITS = Integer.SIZE - 3;   //32-3=29
 9     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//536870911
10
11     // runState is stored in the high-order bits,其中running<shutdown<stop<tidying<terminated
12     private static final int RUNNING    = -1 << COUNT_BITS;// -536870912
13     private static final int SHUTDOWN   =  0 << COUNT_BITS;//0
14     private static final int STOP       =  1 << COUNT_BITS;//536870912
15     private static final int TIDYING    =  2 << COUNT_BITS;//1073741824
16     private static final int TERMINATED =  3 << COUNT_BITS;//1610612736
17
18     // Packing and unpacking ctl
19     private static int runStateOf(int c)     { return c & ~CAPACITY; }//当c<0时该方法返回的值为-536870912,否则为0
20     private static int workerCountOf(int c)  { return c & CAPACITY; }//获取工作线程数
21     private static int ctlOf(int rs, int wc) { return rs | wc; }//-536870912

3.2、execute

当线程为null时,直接抛出异常

第一步、看图,下图所指的将corePoolSize扩充至maxmumPoolSize是一个类比,
因为在addWorker代码中有这么一句wc >= (core ? corePoolSize : maximumPoolSize))成立则返回false,表明core为false时会以maximumPoolSize来当做corePoolSize比较

 1         int c = ctl.get();
 2         if (workerCountOf(c) < corePoolSize) {
 3             if (addWorker(command, true))
 4                 return;
 5             c = ctl.get();
 6         }
 7         if (isRunning(c) && workQueue.offer(command)) {
 8             int recheck = ctl.get();
 9             if (! isRunning(recheck) && remove(command))
10                 reject(command);
11             else if (workerCountOf(recheck) == 0)
12                 addWorker(null, false);
13         }
14         else if (!addWorker(command, false))
15             reject(command);

3.3、addWorker

 1         private boolean addWorker(Runnable firstTask, boolean core) {
 2         //外部循环
 3         retry:
 4         for (;;) {
 5             int c = ctl.get();//获取当前工作线程数量,数量为{c-(-536870912)}
 6
 7             int rs = runStateOf(c);//若c>=0时,该值才为0,否则该值一直为-536870912
 8
 9
10             /*
11              *由上面的一些线程池状态常量值可知,running<shutdown<stop<tidying<terminated
12              *若rs>=shutdown,则表明线程池处于stop、tidying、terminated三种状态的一种
13              *若rs>=shutdown成立,则进行后面判断,
14              *1、线程池处于shutdown状态
15              *  1.1、firstTask不为null,则返回false,也即是线程池已经处于shutdown状态,还要添加新的线程,被直接驳回(拒绝)
16              *  1.2、firstTask为null
17              *     1.2.1、此时意味着线程池状态为shutdown状态,且first为null,若阻塞队列为空,则返回false
18              *2、线程处于大于shutdown的状态,则直接返回false
19             */
20             if (rs >= SHUTDOWN &&
21                 ! (rs == SHUTDOWN &&
22                    firstTask == null &&
23                    ! workQueue.isEmpty()))
24                 return false;
25             /*
26              *进入内循环以下两种情况会跳出该内循环,否则一直会循环
27              *1、当工作线程数量超过一定阈值,会直接返回false
28              *2、添加工作线程成功,即ctl的值进行了加一
29             */
30             for (;;) {
31                 int wc = workerCountOf(c);//获取工作线程的数量
32                 //当线程数量>=536870911或者>=corePoolSize或maximumPoolSize的时候,则返回false
33                 if (wc >= CAPACITY ||
34                     wc >= (core ? corePoolSize : maximumPoolSize))
35                     return false;
36                 if (compareAndIncrementWorkerCount(c))//使用unsafe的cas操作对ctl.get()的值进行加一
37                     break retry;//跳出这个外循环
38                 c = ctl.get();  // Re-read ctl
39                 if (runStateOf(c) != rs)//当此时的线程池状态和之前的状态不等时
40                     continue retry;//继续内循环
41             }
42         }
43         //若进行到了此步操作,则表明工作线程数量加了1
44         boolean workerStarted = false;
45         boolean workerAdded = false;
46         Worker w = null;
47         try {
48             w = new Worker(firstTask);
49             final Thread t = w.thread;//该w.thread为worker内部新创建的thread
50             if (t != null) {
51                 final ReentrantLock mainLock = this.mainLock;
52                 mainLock.lock();//开启锁
53                 try {
54                     //获取锁后,再次获取线程池的状态
55                     int rs = runStateOf(ctl.get());
56                     /*
57                      *1、当线程池的状态处于shutdown以上状态,则直接释放锁,不启动线程,且执行addWorkerFailed方法
58                          执行该方法的作用是使工作线程数量-1
59                     */
60                     if (rs < SHUTDOWN ||
61                         (rs == SHUTDOWN && firstTask == null)) {
62                         if (t.isAlive()) // 创建的线程处于活跃状态,即被启动了,抛出异常
63                             throw new IllegalThreadStateException();
64                         workers.add(w);//workers是一个set集合
65                         int s = workers.size();
66                         if (s > largestPoolSize)//largestPoolSize默认为0,作用是记录set集合中的线程数量
67                             largestPoolSize = s;
68                         workerAdded = true;//改变该值,为了启动线程,且返回一个addWorker执行成功的状态
69                     }
70                 } finally {
71                     mainLock.unlock();//释放锁
72                 }
73                 if (workerAdded) {
74                     t.start();
75                     workerStarted = true;
76                 }
77             }
78         } finally {
79             if (! workerStarted)
80                 addWorkerFailed(w);
81         }
82         return workerStarted;
83     }

总结:2017-11-12

时间: 2024-11-08 06:45:09

线程池的使用及ThreadPoolExecutor的execute和addWorker源码分析的相关文章

Python:线程、进程与协程(3)——Queue模块及源码分析

Queue模块是提供队列操作的模块,队列是线程间最常用的交换数据的形式.该模块提供了三种队列: Queue.Queue(maxsize):先进先出,maxsize是队列的大小,其值为非正数时为无线循环队列 Queue.LifoQueue(maxsize):后进先出,相当于栈 Queue.PriorityQueue(maxsize):优先级队列. 其中LifoQueue,PriorityQueue是Queue的子类.三者拥有以下共同的方法: qsize():返回近似的队列大小.为什么要加"近似&q

Java线程池ThreadPoolExector的源码分析

前言:线程是我们在学习java过程中非常重要的也是绕不开的一个知识点,它的重要程度可以说是java的核心之一,线程具有不可轻视的作用,对于我们提高程序的运行效率.压榨CPU处理能力.多条线路同时运行等都是强有力的杀手锏工具.线程是如此的重要,那么我们来思考这样一个问题.假设我们有一个高并发,多线程的项目,多条线程在运行的时候,来一个任务我们new一个线程,任务结束了,再把它销毁结束,这样看似没有问题,适合于低并发的场景,可是当我们的项目投入到生产环境,一下涌入千条任务的时候,线程不断的new执行

《Java源码分析》:线程池 ThreadPoolExecutor

<Java源码分析>:线程池 ThreadPoolExecutor ThreadPoolExecutor是ExecutorService的一张实现,但是是间接实现. ThreadPoolExecutor是继承AbstractExecutorService.而AbstractExecutorService实现了ExecutorService接口. 在介绍细节的之前,先介绍下ThreadPoolExecutor的结构 1.线程池需要支持多个线程并发执行,因此有一个线程集合Collection来执行

JDK源码分析之concurrent包(二) -- 线程池ThreadPoolExecutor

上一篇我们简单描述了Executor框架的结构,本篇正式开始并发包中部分源码的解读. 我们知道,目前主流的商用虚拟机在线程的实现上可能会有所差别.但不管如何实现,在开启和关闭线程时一定会耗费很多CPU资源,甚至在线程的挂起和恢复JDK1.6都做了自旋锁的优化.所以,使用线程池来管理和执行多线程任务会大大提高程序执行效率.关于使用线程池的优点这里不做过多说明,我们直接进入Java5并发包中ThreadPoolExecutor的实现的源码. 在解读源码前,我们先来看看创建线程池的一般做法和线程池的几

线程池的使用(ThreadPoolExecutor详解)

为什么要使用线程池? 线程是一个操作系统概念.操作系统负责这个线程的创建.挂起.运行.阻塞和终结操作.而操作系统创建线程.切换线程状态.终结线程都要进行CPU调度--这是一个耗费时间和系统资源的事情. 另一方面,大多数实际场景中是这样的:处理某一次请求的时间是非常短暂的,但是请求数量是巨大的.这种技术背景下,如果我们为每一个请求都单独创建一个线程,那么物理机的所有资源基本上都被操作系统创建线程.切换线程状态.销毁线程这些操作所占用,用于业务请求处理的资源反而减少了.所以最理想的处理方式是,将处理

源码分析—ThreadPoolExecutor线程池三大问题及改进方案

前言 在一次聚会中,我和一个腾讯大佬聊起了池化技术,提及到java的线程池实现问题,我说这个我懂啊,然后巴拉巴拉说了一大堆,然后腾讯大佬问我说,那你知道线程池有什么缺陷吗?我顿时哑口无言,甘拜下风,所以这次我再回来思考一下线程池的实现原理 源码分析 ThreadPoolExecutor构造器 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, Blo

通过ThreadPoolExecutor源码分析线程池实现原理

为什么要用线程池 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性.使用线程池可以重复利用已创建的线程降低线程创建和销毁带来的消耗,随之即可提高响应速度(当一个任务到达时,不需要重新创建线程来为之服务,重用已有线程),还可以通过线程池控制线程资源统一分配和监控等. 线程池工厂Executors JDK 提供了创建线程池的工厂类 Executors,该类提供了创建线程池的静态方法: public static ExecutorService newFixedThreadP

java线程池ThreadPoolExector源码分析

java线程池ThreadPoolExector源码分析 今天研究了下ThreadPoolExector源码,大致上总结了以下几点跟大家分享下: 一.ThreadPoolExector几个主要变量 先了解下ThreadPoolExector中比较重要的几个变量.  corePoolSize:核心线程数量     maximumPoolSize:最大线程数量 allowCoreThreadTimeOut:是否允许线程超时(设置为true时与keepAliveTime,TimeUnit一起起作用)

Java多线程 -- JUC包源码分析11 -- ThreadPoolExecutor源码分析

在JUC包中,线程池部分本身有很多组件,可以说是前面所分析的各种技术的一个综合应用.从本文开始,将综合前面的知识,逐个分析线程池的各个组件. -Executor/Executors -ThreadPoolExecutor使用介绍 -ThreadPoolExecutor实现原理 –ThreadPoolExecutor的中断与优雅关闭 shutdown + awaitTermination –shutdown的一个误区 Executor/Executors Executor是线程池框架最基本的几个接