ScheduledThreadPoolExecutor实现原理

ScheduledThreadPoolExecutor实现原理

博客分类:

自jdk1.5开始,Java开始提供ScheduledThreadPoolExecutor类来支持周期性任务的调度,在这之前,这些工作需要依靠Timer/TimerTask或者其它第三方工具来完成。但Timer有着不少缺陷,如Timer是单线程模式,调度多个周期性任务时,如果某个任务耗时较久就会影响其它任务的调度;如果某个任务出现异常而没有被catch则可能导致唯一的线程死掉而所有任务都不会再被调度。ScheduledThreadPoolExecutor解决了很多Timer存在的缺陷。

先来看看ScheduledThreadPoolExecutor的实现模型,它通过继承ThreadPoolExecutor来重用线程池的功能,里面做了几件事情:

  • 为线程池设置了一个DelayedWorkQueue,该queue同时具有PriorityQueue(优先级大的元素会放到队首)和DelayQueue(如果队列里第一个元素的getDelay返回值大于0,则take调用会阻塞)的功能
  • 将传入的任务封装成ScheduledFutureTask,这个类有两个特点,实现了java.lang.Comparable和java.util.concurrent.Delayed接口,也就是说里面有两个重要的方法:compareTo和getDelay。ScheduledFutureTask里面存储了该任务距离下次调度还需要的时间(使用的是基于System#nanoTime实现的相对时间,不会因为系统时间改变而改变,如距离下次执行还有10秒,不会因为将系统时间调前6秒而变成4秒后执行)。getDelay方法就是返回当前时间(运行getDelay的这个时刻)距离下次调用之间的时间差;compareTo用于比较两个任务的优先关系,距离下次调度间隔较短的优先级高。那么,当有任务丢进上面说到的DelayedWorkQueue时,因为它有DelayQueue(DelayQueue的内部使用PriorityQueue来实现的)的功能,所以新的任务会与队列中已经存在的任务进行排序,距离下次调度间隔短的任务排在前面,也就是说这个队列并不是先进先出的;另外,在调用DelayedWorkQueue的take方法的时候,如果没有元素,会阻塞,如果有元素而第一个元素的getDelay返回值大于0(前面说过已经排好序了,第一个元素的getDelay不会大于后面元素的getDelay返回值),也会一直阻塞。
  • ScheduledFutureTask提供了一个run的实现,线程池执行的就是这个run方法。看看run的源码(本文的代码取自hotspot1.5.0_22,jdk后续版本的代码可能已经不一样了,如jdk1.7中使用了自己实现的DelayedWorkQueue,而不再使用PriorityQueue作为存储,不过从外面看它们的行为还是一样的,所以并不影响对ScheduledThreadPoolExecutor调度机制的理解):

    public void run() {

        if (isPeriodic())

            runPeriodic();

        else

            ScheduledFutureTask.super.run();

    }

    如果不是周期性任务就直接执行任务(也就是else部分),这个主要是用于实现ScheduledThreadPoolExecutor#schedule(Callable callable, long delay, TimeUnit unit)和ScheduledThreadPoolExecutor#schedule(Runnable command, long delay, TimeUnit unit),后面会讲到它们的实现,这里先关注周期任务的执行方式。周期性任务执行的是runPeriodic(),看下它的实现:

    private void runPeriodic() {

        boolean ok = ScheduledFutureTask.super.runAndReset();

        boolean down = isShutdown();

        // Reschedule if not cancelled and not shutdown or policy allows

        if (ok && (!down ||

                   (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&

                    !isTerminating()))) {

            long p = period;

            if (p > 0)

                time += p;

            else

                time = triggerTime(-p);

            ScheduledThreadPoolExecutor.super.getQueue().add(this);

        }

        // This might have been the final executed delayed

        // task.  Wake up threads to check.

        else if (down)

            interruptIdleWorkers();

    }

    这里可以看到,先执行了任务本身(ScheduledFutureTask.super.runAndReset),这个调用有一个返回值,来看下它的实现:

    protected boolean runAndReset() {

        return sync.innerRunAndReset();

    }

    跟进去看下innerRunAndReset():

    boolean innerRunAndReset() {

        if (!compareAndSetState(0, RUNNING))

            return false;

        try {

            runner = Thread.currentThread();

            callable.call(); // don‘t set result

            runner = null;

            return compareAndSetState(RUNNING, 0);

        } catch(Throwable ex) {

            innerSetException(ex);

            return false;

        }

    }

    可以发现,这里需要关注的是第三个return,也就是如果任务执行出现了异常,会被catch且返回false.

    继续看runPeriodic()方法,if里面,如果刚才任务执行的返回值是true且线程池还在运行就在if块中的操作,如果线程池被关闭了就做else if里的操作。也就是说,如果之前的任务执行出现的异常返回了false,那么if里以及else if里的代码都不会执行了,那有什么影响?接下来看看if里做了什么。

    if里的代码很简单,分为两部分,一是计算这个任务下次调度的间隔,二是将任务重新放回队列中。回到出现异常的情况,如果刚才的任务执行出现了异常,就不会将任务再放回队列中,换而言之,也就是这个任务再也得不到调度了!但是,这并不影响其它周期任务的调度。

综上,可以看到,ScheduledThreadPoolExecutor执行周期性任务的模型就是:调度一次任务,计算并设置该任务下次间隔,将任务放回队列中供线程池执行。这里的队列起了很大的作用,且有一些特点:距离下次调度间隔短的任务总是在队首,队首的任务若距离下次调度的间隔时间大于0就无法从该队列的take()方法中拿到任务。

接下来看看ScheduledThreadPoolExecutor#schedule(Callable callable, long delay, TimeUnit unit)和ScheduledThreadPoolExecutor#schedule(Runnable command, long delay, TimeUnit unit)这两个非周期性任务的实现方式,先看看它们的源码:

public ScheduledFuture<?> schedule(Runnable command,

                                   long delay,

                                   TimeUnit unit) {

    if (command == null || unit == null)

        throw new NullPointerException();

    ScheduledFutureTask<?> t =

        new ScheduledFutureTask<Boolean>(command, null, triggerTime(delay, unit));

    delayedExecute(t);

    return t;

}

public <V> ScheduledFuture<V> schedule(Callable<V> callable,

                                       long delay,

                                       TimeUnit unit) {

    if (callable == null || unit == null)

        throw new NullPointerException();

    ScheduledFutureTask<V> t =

        new ScheduledFutureTask<V>(callable, triggerTime(delay, unit));

    delayedExecute(t);

    return t;

}

private void delayedExecute(Runnable command) {

    if (isShutdown()) {

        reject(command);

        return;

    }

    // Prestart a thread if necessary. We cannot prestart it

    // running the task because the task (probably) shouldn‘t be

    // run yet, so thread will just idle until delay elapses.

    if (getPoolSize() < getCorePoolSize())

        prestartCoreThread();

    super.getQueue().add(command);

}

实现方式也很简单,在创建ScheduledThreadPoolExecutor内部任务(即ScheduledFutureTask)的时候就将调度间隔计算并设置好,如果当前线程数小于设置的核心线程数,就启动一个线程(可能是线程池刚启动里面还没有线程,也可能是里面的线程执行任务时挂掉了。如果线程池中的线程挂掉了而又没有调用这些schedule方法谁去补充挂掉的线程?不用担心,线程池自己会处理的)去监听队列里的任务,然后将任务放到队列里,在任务执行间隔不大于0的时候,线程就可以拿到这个任务并执行。

周期性任务的入口(ScheduledThreadPoolExecutor#scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)和ScheduledThreadPoolExecutor#scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit))与非周期性任务是类似的,它们处理方式不同的地方在于前文说到的ScheduledFutureTask#run()中。

原文地址:https://www.cnblogs.com/w-wfy/p/8951778.html

时间: 2024-08-19 01:50:24

ScheduledThreadPoolExecutor实现原理的相关文章

并发编程(十四)—— ScheduledThreadPoolExecutor 实现原理与源码深度解析 之 DelayedWorkQueue

我们知道线程池运行时,会不断从任务队列中获取任务,然后执行任务.如果我们想实现延时或者定时执行任务,重要一点就是任务队列会根据任务延时时间的不同进行排序,延时时间越短地就排在队列的前面,先被获取执行. 队列是先进先出的数据结构,就是先进入队列的数据,先被获取.但是有一种特殊的队列叫做优先级队列,它会对插入的数据进行优先级排序,保证优先级越高的数据首先被获取,与数据的插入顺序无关. 实现优先级队列高效常用的一种方式就是使用堆. 什么是堆? 堆通常是一个可以被看做一棵树的数组对象. 堆(heap)又

JUC回顾之-ScheduledThreadPoolExecutor底层实现原理和应用

项目中经常使用定时器,比如每隔一段时间清理下线过期的F码,或者应用timer定期查询MQ在数据库的配置,根据不同version实现配置的实时更新等等.但是timer是存在一些缺陷的,因为Timer在执行定时任务时只创建一个线程,所以如果存在多个任务,比如两个,第一个任务执行的时间很长,超过两个任务执行的间隔时间,会发生一些问题:可以看出内部只要一个线程执行任务: /** * The timer task queue. This data structure is shared with the

深入浅出 Java Concurrency (35): 线程池 part 8 线程池的实现及原理 (3)[转]

线程池任务执行结果 这一节来探讨下线程池中任务执行的结果以及如何阻塞线程.取消任务等等. 1 package info.imxylz.study.concurrency.future;2 3 public class SleepForResultDemo implements Runnable {4 5     static boolean result = false;6 7     static void sleepWhile(long ms) {8         try {9      

java线程池原理及实现方式

线程池的定义 线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务.线程池线程都是后台线程 为什么要使用线程池 1.减少在创建和销毁线程上所花的时间以及系统资源的开销 2.在一个 JVM 里创建太多的线程可能会导致系统由于过度消耗内存而用完内存或"切换过度".为了防止资源不足,服务器应用程序需要一些办法来限制任何给定时刻处理的请求数目. 线程池组成部分 1.线程池管理器(ThreadPoolManager):用于创建并管理线程池,包括 创建线程池,销

java多线程系类:JUC线程池:03之线程池原理(二)(转)

概要 在前面一章"Java多线程系列--"JUC线程池"02之 线程池原理(一)"中介绍了线程池的数据结构,本章会通过分析线程池的源码,对线程池进行说明.内容包括:线程池示例参考代码(基于JDK1.7.0_40)线程池源码分析(一) 创建"线程池"(二) 添加任务到"线程池"(三) 关闭"线程池" 转载请注明出处:http://www.cnblogs.com/skywang12345/p/3509954.h

多线程原理分析

转(http://www.cnblogs.com/guguli/p/5198894.html) Java对象实例的锁一共有四种状态:无锁,偏向锁,轻量锁和重量锁.原始脱离框架的并发应用大部分都需要手动完成加锁释放,最直接的就是使用synchronized和volatile关键字对某个对象或者代码块加锁从而限制每次访问的次数,从对象之间的竞争也可以实现到对象之间的协作.但是这样手动实现出来的应用不仅耗费时间而且性能表现往往又有待提升.顺带一提,之前写过一篇文章介绍我基于Qt和Linux实现的一个多

java 多线程原理分析

一.为什么使用线程池 1.降低资源消耗,减少线程创建和销毁次数,每个工作线程可以重复利用,执行多个任务 2.可根据系统承受能力,调整工作线程的数目,防止消耗过多的内存 二.java 线程池使用 ExecutorService 真正的线程池接口. ScheduledExecutorService 能和Timer/TimerTask类似,解决那些需要任务重复执行的问题. ThreadPoolExecutor ExecutorService的默认实现. ScheduledThreadPoolExecu

12.深入线程池_流程和原理

参考博文:http://blog.csdn.net/mark_lq/article/details/50346999 一.线程池的基本类结构 合理利用线程池能够带来三个好处. 1.降低资源消耗.通过重复利用已创建的线程降低线程创建和销毁造成的消耗 2.提高响应速度.当任务到达时,任务可以不需要等到线程创建就能立即执行 3.提高线程的可管理性.线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控 Executor线程池框架最大优点是把

J.U.C并发框架源码阅读(十四)ScheduledThreadPoolExecutor

基于版本jdk1.7.0_80 java.util.concurrent.ScheduledThreadPoolExecutor 代码如下 /* * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. * * * * * * * * * * * * * * * * * * * * */ /* * * * * * * Written by Doug Lea with assistance from members of