Java并发编程原理与实战三十七:线程池的原理与使用

一、简介

线程池在我们的高并发环境下,实际应用是非常多的!!适用频率非常高!

有过使用过Executors框架的朋友,可能不太知道底层的实现,这里就是讲Executors是由ThreadPoolExecutor实现的。好的,让我们来看看ThreadPollExcutor是怎样实现的呢?

如果你想了解ThreadPoolExecutor的话。可以先从它的构造方法看起。

ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,workQueue,threadFactory, handler)

  • corePoolSize 核心线程池大小
  • maximumPoolSize 线程池最大容量大小
  • keepAliveTime 线程池空闲时,线程存活的时间
  • TimeUnit 时间单位
  • ThreadFactory 线程工厂
  • BlockingQueue任务队列
  • RejectedExecutionHandler 线程拒绝策略

我相信大家看了注解之后,前面几个要填的要素,都没什么问题。ThreadFactory 的话,用默认的就可以(或者构造函数不用填写这项目)。RejectedExecutionHandler  这里的线程拒绝策略的话,可能要看看

RejectedExecutionHandler  有哪些拒绝策略呢?

  • AbortPolicy:如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
  • DiscardPolicy:如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
  • DiscardOldestPolicy:如果队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列。
  • CallerRunsPolicy:如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行。就像是个急脾气的人,我等不到别人来做这件事就干脆自己干。
  • 自定义

注意:shutdown()方法和shutdownNow()的区别?

shutdown()会等待线程全部执行完才执行shutdown线程池

shutdownNow()不会等待线程全部执行完才执行shutdown线程池

二、源码解析

我们现在来看看ThreadPoolExecutor的源码是怎么样的,也许你刚开始看他的源码会很痛苦,因为你不知道作者为什么是这样设计的,所以本文就我看到的思想会给你做一个介绍,此时也许你通过知道了一些作者的思想,你也许就知道应该该如何去操作了。

这里来看下构造方法中对那些属性做了赋值:

源码段1:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

public ThreadPoolExecutor(int corePoolSize,

                           int maximumPoolSize,

                           long keepAliveTime,

                           TimeUnit unit,

                           BlockingQueue<Runnable> workQueue,

                           ThreadFactory threadFactory,

                           RejectedExecutionHandler handler) {

     if (corePoolSize < 0 ||

         maximumPoolSize <= 0 ||

         maximumPoolSize < corePoolSize ||

         keepAliveTime < 0)

         throw new IllegalArgumentException();

     if (workQueue == null || threadFactory == null || handler == null)

         throw new NullPointerException();

     this.corePoolSize = corePoolSize;

     this.maximumPoolSize = maximumPoolSize;

     this.workQueue = workQueue;

     this.keepAliveTime = unit.toNanos(keepAliveTime);

     this.threadFactory = threadFactory;

     this.handler = handler;

 }

这里你可以看到最终赋值的过程,可以先大概知道下参数的意思:

corePoolSize:核心运行的poolSize,也就是当超过这个范围的时候,就需要将新的Thread放入到等待队列中了;

maximumPoolSize:一般你用不到,当大于了这个值就会将Thread由一个丢弃处理机制来处理,但是当你发生:newFixedThreadPool的时候,corePoolSize和maximumPoolSize是一样的,而corePoolSize是先执行的,所以他会先被放入等待队列,而不会执行到下面的丢弃处理中,看了后面的代码你就知道了。

workQueue:等待队列,当达到corePoolSize的时候,就向该等待队列放入线程信息(默认为一个LinkedBlockingQueue),运行中的队列属性为:workers,为一个HashSet;内部被包装了一层,后面会看到这部分代码。

keepAliveTime:默认都是0,当线程没有任务处理后,保持多长时间,cachedPoolSize是默认60s,不推荐使用。

threadFactory:是构造Thread的方法,你可以自己去包装和传递,主要实现newThread方法即可;

handler:也就是参数maximumPoolSize达到后丢弃处理的方法,java提供了5种丢弃处理的方法,当然你也可以自己弄,主要是要实现接口:RejectedExecutionHandler中的方法:

public void rejectedExecution(Runnabler, ThreadPoolExecutor e)

java默认的是使用:AbortPolicy,他的作用是当出现这中情况的时候会抛出一个异常;其余的还包含:

1、CallerRunsPolicy:如果发现线程池还在运行,就直接运行这个线程

2、DiscardOldestPolicy:在线程池的等待队列中,将头取出一个抛弃,然后将当前线程放进去。

3、DiscardPolicy:什么也不做

4、AbortPolicy:java默认,抛出一个异常:RejectedExecutionException。

通常你得到线程池后,会调用其中的:submit方法或execute方法去操作;其实你会发现,submit方法最终会调用execute方法来进行操作,只是他提供了一个Future来托管返回值的处理而已,当你调用需要有返回值的信息时,你用它来处理是比较好的;这个Future会包装对Callable信息,并定义一个Sync对象(),当你发生读取返回值的操作的时候,会通过Sync对象进入锁,直到有返回值的数据通知,具体细节先不要看太多,继续向下:

来看看execute最为核心的方法吧:

源码段2:


1

2

3

4

5

6

7

8

9

10

11

12

public void execute(Runnable command) {

    if (command == null)

        throw new NullPointerException();

    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {

        if (runState == RUNNING && workQueue.offer(command)) {

            if (runState != RUNNING || poolSize == 0)

                ensureQueuedTaskHandled(command);

        }

        else if (!addIfUnderMaximumPoolSize(command))

            reject(command); // is shutdown or saturated

    }

}

这段代码看似简单,其实有点难懂,很多人也是这里没看懂,没事,我一个if一个if说:

首先第一个判定空操作就不用说了,下面判定的poolSize >= corePoolSize成立时候会进入if的区域,当然它不成立也有可能会进入,他会判定addIfUnderCorePoolSize是否返回false,如果返回false就会进去;

我们先来看下addIfUnderCorePoolSize方法的源码是什么:

源码段3:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

private boolean addIfUnderCorePoolSize(Runnable firstTask) {

    Thread t = null;

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        if (poolSize < corePoolSize && runState == RUNNING)

            t = addThread(firstTask);

    } finally {

        mainLock.unlock();

    }

    if (t == null)

        return false;

    t.start();

    return true;

}

可以发现,这段源码是如果发现小雨corePoolSize就会创建一个新的线程,并且调用线程的start()方法将线程运行起来:这个addThread()方法,我们先不考虑细节,因为我们还要先看到前面是怎么进去的,这里可以发信啊,只有没有创建成功Thread才会返回false,也就是当当前的poolSize > corePoolSize的时候,或线程池已经不是在running状态的时候才会出现;

注意:这里在外部判定一次poolSize和corePoolSize只是初步判定,内部是加锁后判定的,以得到更为准确的结果,而外部初步判定如果是大于了,就没有必要进入这段有锁的代码了。

此时我们知道了,当前线程数量大于corePoolSize的时候,就会进入【代码段2】的第一个if语句中,回到【源码段2】,继续看if语句中的内容:

这里标记为

源码段4


1

2

3

4

5

6

if (runState == RUNNING && workQueue.offer(command)) {

   if (runState != RUNNING || poolSize == 0)

       ensureQueuedTaskHandled(command);

   }

   else if (!addIfUnderMaximumPoolSize(command))

       reject(command); // is shutdown or saturated

第一个if,也就是当当前状态为running的时候,就会去执行workQueue.offer(command),这个workQueue其实就是一个BlockingQueue,offer()操作就是在队列的尾部写入一个对象,此时写入的对象为线程的对象而已;所以你可以认为只有线程池在RUNNING状态,才会在队列尾部插入数据,否则就执行else if,其实else if可以看出是要做一个是否大于MaximumPoolSize的判定,如果大于这个值,就会做reject的操作,关于reject的说明,我们在【源码段1】的解释中已经非常明确的说明,这里可以简单看下源码,以应征结果:

源码段5:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

    private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {

        Thread t = null;

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            if (poolSize < maximumPoolSize && runState == RUNNING)

                //在corePoolSize = maximumPoolSize下,该代码几乎不可能运行

                t = addThread(firstTask);

        } finally {

            mainLock.unlock();

        }

        if (t == null)

            return false;

        t.start();

        return true;

}

void reject(Runnable command) {

        handler.rejectedExecution(command, this);

    }

也就是如果线程池满了,而且线程池调用了shutdown后,还在调用execute方法时,就会抛出上面说明的异常:RejectedExecutionException

再回头来看下【代码段4】中进入到等待队列后的操作:

if (runState != RUNNING || poolSize == 0)

ensureQueuedTaskHandled(command);

这段代码是要在线程池运行状态不是RUNNING或poolSize == 0才会调用,他是干啥呢?

他为什么会不等于RUNNING呢?外面那一层不是判定了他== RUNNING了么,其实有时间差就是了,如果是poolSize == 0也会执行这段代码,但是里面的判定条件是如果不是RUNNING,就做reject操作,在第一个线程进去的时候,会将第一个线程直接启动起来;很多人也是看这段代码很绕,因为不断的循环判定类似的判定条件,你主要记住他们之间有时间差,要取最新的就好了。

此时貌似代码看完了?咦,此时有问题了:

1、  等待中的线程在后来是如何跑起来的呢?线程池是不是有类似Timer一样的守护进程不断扫描线程队列和等待队列?还是利用某种锁机制,实现类似wait和notify实现的?

2、  线程池的运行队列和等待队列是如何管理的呢?这里还没看出影子呢!

NO,NO,NO!

Java在实现这部分的时候,使用了怪异的手段,神马手段呢,还要再看一部分代码才晓得。

在前面【源码段3】中,我们看到了一个方法叫:addThread(),也许很少有人会想到关键在这里,其实关键就是在这里:

我们看看addThread()方法到底做了什么。

源码段6:


1

2

3

4

5

6

7

8

9

10

11

12

private Thread addThread(Runnable firstTask) {

    Worker w = new Worker(firstTask);

    Thread t = threadFactory.newThread(w);

    if (t != null) {

        w.thread = t;

        workers.add(w);

        int nt = ++poolSize;

        if (nt > largestPoolSize)

            largestPoolSize = nt;

    }

    return t;

}

这里创建了一个Work,其余的操作,就是讲poolSize叠加,然后将将其放入workers的运行队列等操作;

我们主要关心Worker是干什么的,因为这个threadFactory对我们用途不大,只是做了Thread的命名处理;而Worker你会发现它的定义也是一个Runnable,外部开始在代码段中发现了调用哪个这个Worker的start()方法,也就是线程的启动方法,其实也就是调用了Worker的run()方法,那么我们重点要关心run方法是如何处理的

源码段7:


1

2

3

4

5

6

7

8

9

10

11

12

public void run() {

     try {

         Runnable task = firstTask;

         firstTask = null;

         while (task != null || (task = getTask()) != null) {

             runTask(task);

             task = null;

         }

     } finally {

         workerDone(this);

     }

 }

FirstTask其实就是开始在创建work的时候,由外部传入的Runnable对象,也就是你自己的Thread,你会发现它如果发现task为空,就会调用getTask()方法再判定,直到两者为空,并且是一个while循环体。

那么看看getTask()方法的实现为:

源码段8:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

Runnable getTask() {

    for (;;) {

        try {

            int state = runState;

            if (state > SHUTDOWN)

                return null;

            Runnable r;

            if (state == SHUTDOWN)  // Help drain queue

                r = workQueue.poll();

            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)

                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);

            else

                r = workQueue.take();

            if (r != null)

                return r;

            if (workerCanExit()) {

                if (runState >= SHUTDOWN) // Wake up others

                    interruptIdleWorkers();

                return null;

            }

            // Else retry

        } catch (InterruptedException ie) {

            // On interruption, re-check runState

        }

    }

}

你会发现它是从workQueue队列中,也就是等待队列中获取一个元素出来并返回!

回过头来根据代码段6理解下:

当前线程运行完后,在到workQueue中去获取一个task出来,继续运行,这样就保证了线程池中有一定的线程一直在运行;此时若跳出了while循环,只有workQueue队列为空才会出现或出现了类似于shutdown的操作,自然运行队列会减少1,当再有新的线程进来的时候,就又开始向worker里面放数据了,这样以此类推,实现了线程池的功能。

这里可以看下run方法的finally中调用的workerDone方法为:

源码段9:


1

2

3

4

5

6

7

8

9

10

11

12

void workerDone(Worker w) {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        completedTaskCount += w.completedTasks;

        workers.remove(w);

        if (--poolSize == 0)

            tryTerminate();

    } finally {

        mainLock.unlock();

    }

}

注意这里将workers.remove(w)掉,并且调用了—poolSize来做操作。

至于tryTerminate是做了更多关于回收方面的操作。

最后我们还要看一段代码就是在【源码段6】中出现的代码调用为:runTask(task);这个方法也是运行的关键。

源码段10:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

private void runTask(Runnable task) {

       final ReentrantLock runLock = this.runLock;

       runLock.lock();

       try {

           if (runState < STOP &&

               Thread.interrupted() &&

               runState >= STOP)

               thread.interrupt();

           boolean ran = false;

           beforeExecute(thread, task);

           try {

               task.run();

               ran = true;

               afterExecute(task, null);

               ++completedTasks;

           } catch (RuntimeException ex) {

               if (!ran)

                   afterExecute(task, ex);

               throw ex;

           }

       } finally {

           runLock.unlock();

       }

   }

你可以看到,这里面的task为传入的task信息,调用的不是start方法,而是run方法,因为run方法直接调用不会启动新的线程,也是因为这样,导致了你无法获取到你自己的线程的状态,因为线程池是直接调用的run方法,而不是start方法来运行。

这里有个beforeExecuteafterExecute方法,分别代表在执行前和执行后,你可以做一段操作,在这个类中,这两个方法都是【空body】的,因为普通线程池无需做更多的操作。

如果你要实现类似暂停等待通知的或其他的操作,可以自己extends后进行重写构造;

本文没有介绍关于ScheduledThreadPoolExecutor调用的细节,下一篇文章会详细说明,因为大部分代码和本文一致,区别在于一些细节,在介绍:ScheduledThreadPoolExecutor的时候,会明确的介绍它与TimerTimerTask的巨大区别,区别不在于使用,而是在于本身内在的处理细节。

原文地址:https://www.cnblogs.com/pony1223/p/9527710.html

时间: 2024-10-07 19:13:31

Java并发编程原理与实战三十七:线程池的原理与使用的相关文章

Java并发编程(十二):线程池的使用(转载)

本文转载自:http://www.cnblogs.com/dolphin0520/p/3932921.html 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.

Java并发编程学习笔记(一)线程安全性 1

什么是线程安全性: 要编写线程安全的代码,其核心在于要对状态访问操作进行管理,特别是对共享的和可变的状态的访问."共享"意味着变量可以由多个线程同时访问,而"可变"则意味着变量的值在其生命周期内可以发生变化. 一个对象是否需要线程安全的,取决于他是否被多个线程访问.这指的是在程序中访问对象的方式,而不是对象要实现的功能.要使得对象时线程安全的,需要采用同步机制来协同对对象可变状态的访问.如果无法实现协同,那么可能导致数据破坏以及其他不该出现的结果. 如果当多个线程访

【Java并发专题之十】juc-locks之线程池框架概述

环境 jdk version:jdk1.8.0_171 一.Executor接口执行器接口,也是最顶层的抽象核心接口, 分离了任务和任务的执行. 二.ExecutorService接口在Executor的基础上提供了执行器生命周期管理,任务异步执行等功能.在Executor的基础上增强了对任务的控制,同时包括对自身生命周期的管理,主要有四类:(1)关闭执行器,禁止任务的提交:(2)监视执行器的状态:(3)提供对异步任务的支持:(4)提供对批处理任务的支持. AbstractExecutorSer

java并发编程的艺术(三)---lock源码

jdk1.5以后,并发包中新增了lock接口, 它相对于synchronized,多了以下三个主要特性:尝试非阻塞地获取锁(尝试获取锁成功则持有).能被中断地获取锁(锁的进程能响应中断).超时获取锁(指定时间截止之前获取锁). 我们看看它接口中定义的api: 获取锁 可中断地获取锁 尝试非阻塞地获取锁,能够获取则返回true,否则false 超时获取锁,三种返回情况:1.当前线程在超时时间内获得了锁.2.当前线程在超时时间内被中断.3.超时时间内没获得锁 释放锁 获取等待通知组件,该组件和当前的

Java 并发编程之测试(三)

产生更多的交替操作 由于并发代码中发生的错误一般都是低概率事件,所以在测试并发错误时需要反复地执行许多次,但有些方法可以提高发现这些错误的概率 ,在前面提到过,在多处理器系统上,如果 处理器的数量少于活动线程的数量,那么 与单处理器的系统 或者 包含多个处理器的系统相比,将能产生更多的交替行为. 有一种有用的方法能提高交替操作的数量.以便能更有效的搜索程序的状态空间:就是在访问状态的操作中加上Thread.yield作为一个空操作.当代码在访问状态的时候没有使用足够的同步,将存在一些对执行时序敏

Java 并发编程(一)浅谈线程安全

首先我们要弄清楚什么叫线程安全. "线程安全"是指:当多个线程访问某个类时,不管运行环境采用何种调度方式或者这些线程如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的. 这里有三个关键点, 第一."线程安全"问题是来源于多个线程访问的情况下,当个线程没有竞争便涉及到出现线程安全的问题. 第二.类的"线程安全"性不依赖于多线程的执行顺序. 第三.主调代码不需要同步或协同.某个类"

Java并发编程学习笔记(一)——线程安全性

1.当多个线程访问某个状态变量并且其中有一个献策灰姑娘执行写入操作时,必须采用同步机制来协同这些线程对变量的访问.Java中的主要同步机制是关键字synchronized,他提供了一种独占的加锁方式. 2.在任何情况下,只有当类中仅包含自己的状态时,线程安全类才是有意义的. 3.当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些献策灰姑娘讲如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的. 4.无状态对象一定是线程安全的

Java 并发编程中的 CyclicBarrier 用于一组线程互相等待

Java 5 引入的 Concurrent 并发库软件包中的 CyclicBarrier 是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point).在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用.因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier.CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在

【Java并发编程】之十二:线程间通信中notifyAll造成的早期通知问题(含代码)

如果线程在等待时接到通知,但线程等待的条件还不满足,此时,线程接到的就是早期通知,如果条件满足的时间很短,但很快又改变了,而变得不再满足,这时也将发生早期通知.这种现象听起来很奇怪,下面通过一个示例程序来说明问题. 很简单,两个线程等待删除List中的元素,同时另外一个线程正要向其中添加项目.代码如下: [java] view plaincopy import java.util.*; public class EarlyNotify extends Object { private List