Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理

相关文章目录:

Java线程池ThreadPoolExecutor使用和分析(一)

Java线程池ThreadPoolExecutor使用和分析(二) - execute()原理

Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理

以下是本文的目录大纲:

一、shutdown()  --  温柔的终止线程池

interruptIdleWorkers()  --  中断空闲worker

tryTerminate()  --  尝试终止线程池

二、shutdownNow()  --  强硬的终止线程池

interruptWorkers()  --  中断所有worker

三、awaitTermination()  --  等待线程池终止

终止线程池主要有两个方法:shutdown() 和 shutdownNow()。

shutdown()后线程池将变成shutdown状态,此时不接收新任务,但会处理完正在运行的 和 在阻塞队列中等待处理的任务。

shutdownNow()后线程池将变成stop状态,此时不接收新任务,不再处理在阻塞队列中等待的任务,还会尝试中断正在处理中的工作线程。

下面是对线程池的几种终止方式的分析,基于JDK 1.7

一、shutdown()  --  温柔的终止线程池

/**
 * Initiates an orderly shutdown in which previously submitted
 * tasks are executed, but no new tasks will be accepted.
 * Invocation has no additional effect if already shut down.
 * 开始一个有序的关闭,在关闭中,之前提交的任务会被执行(包含正在执行的,在阻塞队列中的),但新任务会被拒绝
 * 如果线程池已经shutdown,调用此方法不会有附加效应
 *
 * <p>This method does not wait for previously submitted tasks to
 * complete execution.  Use {@link #awaitTermination awaitTermination}
 * to do that.
 * 当前方法不会等待之前提交的任务执行结束,可以使用awaitTermination()
 *
 * @throws SecurityException {@inheritDoc}
 */
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock(); //上锁

    try {
    	//判断调用者是否有权限shutdown线程池
        checkShutdownAccess();

        //CAS+循环设置线程池状态为shutdown
        advanceRunState(SHUTDOWN);

        //中断所有空闲线程
        interruptIdleWorkers();

        onShutdown(); // hook for ScheduledThreadPoolExecutor
    }
    finally {
        mainLock.unlock(); //解锁
    }

    //尝试终止线程池
    tryTerminate();
}

shutdown()执行流程:

1、上锁,mainLock是线程池的主锁,是可重入锁,当要操作workers set这个保持线程的HashSet时,需要先获取mainLock,还有当要处理largestPoolSize、completedTaskCount这类统计数据时需要先获取mainLock

2、判断调用者是否有权限shutdown线程池

3、使用CAS操作将线程池状态设置为shutdown,shutdown之后将不再接收新任务

4、中断所有空闲线程  interruptIdleWorkers()

5、onShutdown(),ScheduledThreadPoolExecutor中实现了这个方法,可以在shutdown()时做一些处理

6、解锁

7、尝试终止线程池  tryTerminate()

可以看到shutdown()方法最重要的几个步骤是:更新线程池状态为shutdown中断所有空闲线程tryTerminated()尝试终止线程池

那么,什么是空闲线程?interruptIdleWorkers() 是怎么中断空闲线程的?

/**
 * Interrupts threads that might be waiting for tasks (as
 * indicated by not being locked) so they can check for
 * termination or configuration changes. Ignores
 * SecurityExceptions (in which case some threads may remain
 * uninterrupted).
 * 中断在等待任务的线程(没有上锁的),中断唤醒后,可以判断线程池状态是否变化来决定是否继续
 *
 * @param onlyOne If true, interrupt at most one worker. This is
 * called only from tryTerminate when termination is otherwise
 * enabled but there are still other workers.  In this case, at
 * most one waiting worker is interrupted to propagate shutdown
 * signals in case(以免) all threads are currently waiting.
 * Interrupting any arbitrary thread ensures that newly arriving
 * workers since shutdown began will also eventually exit.
 * To guarantee eventual termination, it suffices to always
 * interrupt only one idle worker, but shutdown() interrupts all
 * idle workers so that redundant workers exit promptly, not
 * waiting for a straggler task to finish.
 *
 * onlyOne如果为true,最多interrupt一个worker
 * 只有当终止流程已经开始,但线程池还有worker线程时,tryTerminate()方法会做调用onlyOne为true的调用
 * (终止流程已经开始指的是:shutdown状态 且 workQueue为空,或者 stop状态)
 * 在这种情况下,最多有一个worker被中断,为了传播shutdown信号,以免所有的线程都在等待
 * 为保证线程池最终能终止,这个操作总是中断一个空闲worker
 * 而shutdown()中断所有空闲worker,来保证空闲线程及时退出
 */
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock(); //上锁
    try {
        for (Worker w : workers) {
            Thread t = w.thread;

            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock(); //解锁
    }
}

interruptIdleWorkers() 首先会获取mainLock锁,因为要迭代workers set,在中断每个worker前,需要做两个判断:

1、线程是否已经被中断,是就什么都不做

2、worker.tryLock() 是否成功

第二个判断比较重要,因为Worker类除了实现了可执行的Runnable,也继承了AQS,本身也是一把锁,具体可见 ThreadPoolExecutor内部类Worker解析

tryLock()调用了Worker自身实现的tryAcquire()方法,这也是AQS规定子类需要实现的尝试获取锁的方法

protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

tryAcquire()先尝试将AQS的state从0-->1,返回true代表上锁成功,并设置当前线程为锁的拥有者

可以看到compareAndSetState(0, 1)只尝试了一次获取锁,且不是每次state+1,而是0-->1,说明锁不是可重入的

但是为什么要worker.tryLock()获取worker的锁呢?

这就是Woker类存在的价值之一,控制线程中断

在runWorker()方法中每次获取到task,task.run()之前都需要worker.lock()上锁,运行结束后解锁,即正在运行任务的工作线程都是上了worker锁的

在interruptIdleWorkers()中断之前需要先tryLock()获取worker锁,意味着正在运行的worker不能中断,因为worker.tryLock()失败,且锁是不可重入的

故shutdown()只有对能获取到worker锁的空闲线程(正在从workQueue中getTask(),此时worker没有加锁)发送中断信号


由此可以将worker划分为:
1、空闲worker:正在从workQueue阻塞队列中获取任务的worker
2、运行中worker:正在task.run()执行任务的worker

正阻塞在getTask()获取任务的worker在被中断后,会抛出InterruptedException,不再阻塞获取任务

捕获中断异常后,将继续循环到getTask()最开始的判断线程池状态的逻辑,当线程池是shutdown状态,且workQueue.isEmpty时,return null,进行worker线程退出逻辑

某些情况下,interruptIdleWorkers()时多个worker正在运行,不会对其发出中断信号,假设此时workQueue也不为空

那么当多个worker运行结束后,会到workQueue阻塞获取任务,获取到的执行任务,没获取到的,如果还是核心线程,会一直workQueue.take()阻塞住,线程无法终止,因为workQueue已经空了,且shutdown后不会接收新任务了

这就需要在shutdown()后,还可以发出中断信号

Doug Lea大神巧妙的在所有可能导致线程池产终止的地方安插了tryTerminated()尝试线程池终止的逻辑,并在其中判断如果线程池已经进入终止流程,没有任务等待执行了,但线程池还有线程,中断唤醒一个空闲线程

shutdown()的最后也调用了tryTerminated()方法,下面看看这个方法的逻辑:

/**
 * Transitions to TERMINATED state if either (SHUTDOWN and pool
 * and queue empty) or (STOP and pool empty).  If otherwise
 * eligible to terminate but workerCount is nonzero, interrupts an
 * idle worker to ensure that shutdown signals propagate. This
 * method must be called following any action that might make
 * termination possible -- reducing worker count or removing tasks
 * from the queue during shutdown. The method is non-private to
 * allow access from ScheduledThreadPoolExecutor.
 *
 * 在以下情况将线程池变为TERMINATED终止状态
 * shutdown 且 正在运行的worker 和 workQueue队列 都empty
 * stop 且  没有正在运行的worker
 *
 * 这个方法必须在任何可能导致线程池终止的情况下被调用,如:
 * 减少worker数量
 * shutdown时从queue中移除任务
 *
 * 这个方法不是私有的,所以允许子类ScheduledThreadPoolExecutor调用
 */
final void tryTerminate() {
	//这个for循环主要是和进入关闭线程池操作的CAS判断结合使用的
    for (;;) {
        int c = ctl.get();

        /**
         * 线程池是否需要终止
         * 如果以下3中情况任一为true,return,不进行终止
         * 1、还在运行状态
         * 2、状态是TIDYING、或 TERMINATED,已经终止过了
         * 3、SHUTDOWN 且 workQueue不为空
         */
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;

        /**
         * 只有shutdown状态 且 workQueue为空,或者 stop状态能执行到这一步
         * 如果此时线程池还有线程(正在运行任务,正在等待任务)
         * 中断唤醒一个正在等任务的空闲worker
         * 唤醒后再次判断线程池状态,会return null,进入processWorkerExit()流程
         */
        if (workerCountOf(c) != 0) { // Eligible to terminate 资格终止
            interruptIdleWorkers(ONLY_ONE); //中断workers集合中的空闲任务,参数为true,只中断一个
            return;
        }

        /**
         * 如果状态是SHUTDOWN,workQueue也为空了,正在运行的worker也没有了,开始terminated
         */
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
        	//CAS:将线程池的ctl变成TIDYING(所有的任务被终止,workCount为0,为此状态时将会调用terminated()方法),期间ctl有变化就会失败,会再次for循环
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated(); //需子类实现
                }
                finally {
                    ctl.set(ctlOf(TERMINATED, 0)); //将线程池的ctl变成TERMINATED
                    termination.signalAll(); //唤醒调用了 等待线程池终止的线程 awaitTermination()
                }
                return;
            }
        }
        finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
        // 如果上面的CAS判断false,再次循环
    }
}

tryTerminate() 执行流程:

1、判断线程池是否需要进入终止流程(只有当shutdown状态+workQueue.isEmpty 或 stop状态,才需要)

2、判断线程池中是否还有线程,有则 interruptIdleWorkers(ONLY_ONE) 尝试中断一个空闲线程(正是这个逻辑可以再次发出中断信号,中断阻塞在获取任务的线程)

3、如果状态是SHUTDOWN,workQueue也为空了,正在运行的worker也没有了,开始terminated

会先上锁,将线程池置为tidying状态,之后调用需子类实现的 terminated(),最后线程池置为terminated状态,并唤醒所有等待线程池终止这个Condition的线程

二、shutdownNow()  --  强硬的终止线程池

/**
 * Attempts to stop all actively executing tasks, halts the
 * processing of waiting tasks, and returns a list of the tasks
 * that were awaiting execution. These tasks are drained (removed)
 * from the task queue upon return from this method.
 * 尝试停止所有活动的正在执行的任务,停止等待任务的处理,并返回正在等待被执行的任务列表
 * 这个任务列表是从任务队列中排出(删除)的
 *
 * <p>This method does not wait for actively executing tasks to
 * terminate.  Use {@link #awaitTermination awaitTermination} to
 * do that.
 * 这个方法不用等到正在执行的任务结束,要等待线程池终止可使用awaitTermination()
 *
 * <p>There are no guarantees beyond best-effort attempts to stop
 * processing actively executing tasks.  This implementation
 * cancels tasks via {@link Thread#interrupt}, so any task that
 * fails to respond to interrupts may never terminate.
 * 除了尽力尝试停止运行中的任务,没有任何保证
 * 取消任务是通过Thread.interrupt()实现的,所以任何响应中断失败的任务可能永远不会结束
 *
 * @throws SecurityException {@inheritDoc}
 */
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock(); //上锁

    try {
    	//判断调用者是否有权限shutdown线程池
        checkShutdownAccess();

        //CAS+循环设置线程池状态为stop
        advanceRunState(STOP);

        //中断所有线程,包括正在运行任务的
        interruptWorkers();

        tasks = drainQueue(); //将workQueue中的元素放入一个List并返回
    }
    finally {
        mainLock.unlock(); //解锁
    }

    //尝试终止线程池
    tryTerminate();

    return tasks; //返回workQueue中未执行的任务
}

shutdownNow() 和 shutdown()的大体流程相似,差别是:

1、将线程池更新为stop状态

2、调用 interruptWorkers() 中断所有线程,包括正在运行的线程

3、将workQueue中待处理的任务移到一个List中,并在方法最后返回,说明shutdownNow()后不会再处理workQueue中的任务

interruptWorkers()

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

interruptWorkers() 很简单,循环对所有worker调用 interruptIfStarted(),其中会判断worker的AQS state是否大于0,即worker是否已经开始运作,再调用Thread.interrupt()

需要注意的是,对于运行中的线程调用Thread.interrupt()并不能保证线程被终止,task.run()内部可能捕获了InterruptException,没有上抛,导致线程一直无法结束

三、awaitTermination()  --  等待线程池终止

参数:
    timeout:超时时间
    unit:     timeout超时时间的单位
返回:
    true:线程池终止
    false:超过timeout指定时间
在发出一个shutdown请求后,在以下3种情况发生之前,awaitTermination()都会被阻塞
1、所有任务完成执行
2、到达超时时间
3、当前线程被中断

/**
 * Wait condition to support awaitTermination
 */
private final Condition termination = mainLock.newCondition();

awaitTermination() 循环的判断线程池是否terminated终止 或 是否已经超过超时时间,然后通过termination这个Condition阻塞等待一段时间

termination.awaitNanos() 是通过 LockSupport.parkNanos(this, nanosTimeout)实现的阻塞等待

阻塞等待过程中发生以下具体情况会解除阻塞(对上面3种情况的解释):

1、如果发生了 termination.signalAll()(内部实现是 LockSupport.unpark())会唤醒阻塞等待,且由于ThreadPoolExecutor只有在 tryTerminated()尝试终止线程池成功,将线程池更新为terminated状态后才会signalAll(),故awaitTermination()再次判断状态会return true退出

2、如果达到了超时时间 termination.awaitNanos() 也会返回,此时nano==0,再次循环判断return false,等待线程池终止失败

3、如果当前线程被 Thread.interrupt(),termination.awaitNanos()会上抛InterruptException,awaitTermination()继续上抛给调用线程,会以异常的形式解除阻塞

故终止线程池并需要知道其是否终止可以用如下方式:

executorService.shutdown();
try{
    while(!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
        LOGGER.debug("Waiting for terminate");
    }
}
catch (InterruptedException e) {
    //中断处理
}

参考资料:

深入理解java线程池—ThreadPoolExecutor

时间: 2024-12-26 11:26:21

Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理的相关文章

Java实现多线程经典问题:使用三个线程实现输出ABCABC循环

使用三个线程实现ABCABC--循环. 代码如下: //标记类,用来让三个线程共享,同时也是三个线程中同步代码快的标记对象. //之前这个标记我设置成Integer,但是发现Integer进行加法运算时会改变对 //象引用(原因是自动装箱),因此出现异常抛出.所以索性自己定义Flag类. class Flag{ int i=0; public synchronized void setI() { i++; if(i==3) i=0; } } //输出A的线程 class SafeTestA im

线程池ThreadPoolExecutor源码分析(一)

1.线程池简介 1.1 线程池的概念: 线程池就是首先创建一些线程,它们的集合称为线程池.使用线程池可以很好地提高性能,线程池在系统启动时即创建大量空闲的线程,程序将一个任务传给线程池,线程池就会启动一条线程来执行这个任务,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务. 1.2 线程池的工作机制 在线程池的编程模式下,任务是提交给整个线程池,而不是直接提交给某个线程,线程池在拿到任务后,就在内部寻找是否有空闲的线程,如果有,则将任务交给某个空闲的线程. 一

java进程占用cpu过高分析是哪些线程

拿hbase基准测试列子来分析哪些线程使用比较高的cpu,环境是linux,基准测试命令: hbase org.apache.hadoop.hbase.PerformanceEvaluation  --rows=500000 --nomapred --presplit=5 --writeToWAL=true randomWrite 5 首先查看占用cpu最高的进程和线程id,执行命令: [[email protected] logs]$ ps Hh -eo pid,tid,pcpu | sort

java面试总躲不过的并发(一): 线程池ThreadPoolExecutor基础梳理

本文核心:线程池ThreadPoolExecutor基础梳理 一.实现多线程的方式 1.继承Thread类,重写其run方法 2.实现Runnable接口,实现run方法 3.实现Callable接口,实现call方法 由于Java的设计,只支持单继承,但是支持多实现形式,所以一般面向接口开发,Runnable接口与Callable接口的区别在于Callable接口中的call方法是带返回值的,其返回一个Future的异步类,我们可以通过Future的get方法获取结果,如果线程还没有执行完,g

线程池ThreadPoolExecutor源码解读研究(JDK1.8)

一.什么是线程池 为什么要使用线程池?在多线程并发开发中,线程的数量较多,且每个线程执行一定的时间后就结束了,下一个线程任务到来还需要重新创建线程,这样线程数量特别庞大的时候,频繁的创建线程和销毁线程需要一定时间而且增加系统的额外开销.基于这样的场景,线程池就出现了,线程池可以做到一个线程的任务处理完可以接受下一个任务,并不需要频繁的创建销毁,这样大大节省了时间和系统的开销. 线程池,顾名思义,就是一个池子,任务提交的到线程池后,线程池会在池子里边找有没有空闲的线程,如果没有,就会进入等待状态,

8天玩转并行开发——第七天 简要分析任务与线程池

原文:8天玩转并行开发--第七天 简要分析任务与线程池 其实说到上一篇,我们要说的task的知识也说的差不多了,这一篇我们开始站在理论上了解下“线程池”和“任务”之间的关系,不管是 说线程还是任务,我们都不可避免的要讨论下线程池,然而在.net 4.0以后,线程池引擎考虑了未来的扩展性,已经充分利用多核微处理器 架构,只要在可能的情况下,我们应该尽量使用task,而不是线程池. 首先看一下task的结构 从图中我们可以看出Task.Factory.StartNew()貌似等同于用ThreadPo

【Thread】java线程之对象锁、类锁、线程安全

说明: 1.个人技术也不咋滴.也没在项目中写过线程,以下全是根据自己的理解写的.所以,仅供参考及希望指出不同的观点. 2.其实想把代码的github贴出来,但还是推荐在初学的您多亲自写一下,就没贴出来了. 一.基本说明 类.对象:...(不知道怎么说,只可意会不可言传>.<!):要明白哪些方法.变量是对象的,哪些是类的. 类锁.对象锁:对应类和对象.每个类有且仅有一个类锁,每个对象有且仅有一个对象锁. ex: Person p1 = new Person(); Person p2 = new

Java知多少(65)线程的挂起、恢复和终止

有时,线程的挂起是很有用的.例如,一个独立的线程可以用来显示当日的时间.如果用户不希望用时钟,线程被挂起.在任何情形下,挂起线程是很简单的,一旦挂起,重新启动线程也是一件简单的事. 挂起,终止和恢复线程机制在Java 2和早期版本中有所不同.尽管你运用Java 2的途径编写代码,你仍需了解这些操作在早期Java环境下是如何完成的.例如,你也许需要更新或维护老的代码.你也需要了解为什么Java 2会有这样的变化.因为这些原因,下面内容描述了执行线程控制的原始方法,接着是Java 2的方法. Jav

Replication基础(六) 复制中的三个线程(IO/SQL/Dump)

Reference:  https://blog.csdn.net/sun_ashe/article/details/82181811?utm_source=blogxgwz1 简介在MySQL复制技术中,涉及到三个线程,分别为binlog Dump线程,IO线程,SQL回放线程.本文针对于这三个线程,简要说明. I/O线程上图所示,IO线程位于从实例,其作用就是作为一个客户端,建立到master实例上的TCP链接,并且发送认证信息,bingbinlog同步协议信息,然后实时的去同步阻塞的读取m