22.线程池之ScheduledThreadPoolExecutor

1. ScheduledThreadPoolExecutor简介

ScheduledThreadPoolExecutor可以用来在给定延时后执行异步任务或者周期性执行任务,相对于任务调度的Timer来说,其功能更加强大,Timer只能使用一个后台线程执行任务,而ScheduledThreadPoolExecutor则可以通过构造函数来指定后台线程的个数。ScheduledThreadPoolExecutor类的UML图如下:

  1. 从UML图可以看出,ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,也就是说ScheduledThreadPoolExecutor拥有execute()和submit()提交异步任务的基础功能,关于ThreadPoolExecutor可以看这篇文章。但是,ScheduledThreadPoolExecutor类实现了ScheduledExecutorService,该接口定义了ScheduledThreadPoolExecutor能够延时执行任务和周期执行任务的功能;
  2. ScheduledThreadPoolExecutor也两个重要的内部类:DelayedWorkQueue(有界阻塞队列)和ScheduledFutureTask。可以看出DelayedWorkQueue实现了BlockingQueue接口,也就是一个阻塞队列,ScheduledFutureTask则是继承了FutureTask类,也表示该类用于返回异步任务的结果。这两个关键类,下面会具体详细来看。

1.1 构造方法

ThreadPoolExecutor有如下几个构造方法:

public ScheduledThreadPoolExecutor(int corePoolSize) {    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,          new DelayedWorkQueue());};?public ScheduledThreadPoolExecutor(int corePoolSize,                                   ThreadFactory threadFactory) {    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,          new DelayedWorkQueue(), threadFactory);};public ScheduledThreadPoolExecutor(int corePoolSize,                                   RejectedExecutionHandler handler) {    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,          new DelayedWorkQueue(), handler);};?public ScheduledThreadPoolExecutor(int corePoolSize,                                   ThreadFactory threadFactory,                                   RejectedExecutionHandler handler) {    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,          new DelayedWorkQueue(), threadFactory, handler);}

可以看出由于ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,它的构造方法实际上是调用了ThreadPoolExecutor,对ThreadPoolExecutor的介绍可以可以看这篇文章,理解ThreadPoolExecutor构造方法的几个参数的意义后,理解这就很容易了。可以看出,ScheduledThreadPoolExecutor的核心线程池的线程个数为指定的corePoolSize,当核心线程池的线程个数达到corePoolSize后,就会将任务提交给有界阻塞队列DelayedWorkQueue,对DelayedWorkQueue在下面进行详细介绍,线程池允许最大的线程个数为Integer.MAX_VALUE,也就是说理论上这是一个大小无界的线程池。

1.2 特有方法

ScheduledThreadPoolExecutor实现了ScheduledExecutorService接口,该接口定义了可延时执行异步任务和可周期执行异步任务的特有功能,相应的方法分别为:

//达到给定的延时时间后,执行任务。这里传入的是实现Runnable接口的任务,//因此通过ScheduledFuture.get()获取结果为nullpublic ScheduledFuture<?> schedule(Runnable command,                                       long delay, TimeUnit unit);//达到给定的延时时间后,执行任务。这里传入的是实现Callable接口的任务,//因此,返回的是任务的最终计算结果 public  ScheduledFuture<V> schedule(Callable<V> callable,                                           long delay, TimeUnit unit);?//是以上一个任务开始的时间计时,period时间过去后,//检测上一个任务是否执行完毕,如果上一个任务执行完毕,则当前任务立即执行,//如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,                                                  long initialDelay,                                                  long period,                                                  TimeUnit unit);//当达到延时时间initialDelay后,任务开始执行。上一个任务执行结束后到下一次//任务执行,中间延时时间间隔为delay。以这种方式,周期性执行任务。public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,                                                     long initialDelay,                                                     long delay,                                                     TimeUnit unit);

2. 可周期性执行的任务---ScheduledFutureTask

ScheduledThreadPoolExecutor最大的特色是能够周期性执行异步任务,当调用schedule,scheduleAtFixedRate和scheduleWithFixedDelay方法时,实际上是将提交的任务转换成的ScheduledFutureTask类,从源码就可以看出。以schedule方法为例:

public ScheduledFuture<?> schedule(Runnable command,                                   long delay,                                   TimeUnit unit) {    if (command == null || unit == null)        throw new NullPointerException();    RunnableScheduledFuture<?> t = decorateTask(command,        new ScheduledFutureTask<Void>(command, null,                                      triggerTime(delay, unit)));    delayedExecute(t);    return t;}

可以看出,通过decorateTask会将传入的Runnable转换成ScheduledFutureTask类。线程池最大作用是将任务和线程进行解耦,线程主要是任务的执行者,而任务也就是现在所说的ScheduledFutureTask。紧接着,会想到任何线程执行任务,总会调用run()方法。为了保证ScheduledThreadPoolExecutor能够延时执行任务以及能够周期性执行任务,ScheduledFutureTask重写了run方法:

public void run() {    boolean periodic = isPeriodic();    if (!canRunInCurrentRunState(periodic))        cancel(false);    else if (!periodic)        //如果不是周期性执行任务,则直接调用run方法        ScheduledFutureTask.super.run();        //如果是周期性执行任务的话,需要重设下一次执行任务的时间    else if (ScheduledFutureTask.super.runAndReset()) {        setNextRunTime();        reExecutePeriodic(outerTask);    }}

从源码可以很明显的看出,在重写的run方法中会先if (!periodic)判断当前任务是否是周期性任务,如果不是的话就直接调用run()方法;否则的话执行setNextRunTime()方法重设下一次任务执行的时间,并通过reExecutePeriodic(outerTask)方法将下一次待执行的任务放置到DelayedWorkQueue中。

因此,可以得出结论:ScheduledFutureTask最主要的功能是根据当前任务是否具有周期性,对异步任务进行进一步封装。如果不是周期性任务(调用schedule方法)则直接通过run()执行,若是周期性任务,则需要在每一次执行完后,重设下一次执行的时间,然后将下一次任务继续放入到阻塞队列中。

3. DelayedWorkQueue

在ScheduledThreadPoolExecutor中还有另外的一个重要的类就是DelayedWorkQueue。为了实现其ScheduledThreadPoolExecutor能够延时执行异步任务以及能够周期执行任务,DelayedWorkQueue进行相应的封装。DelayedWorkQueue是一个基于堆的数据结构,类似于DelayQueue和PriorityQueue。在执行定时任务的时候,每个任务的执行时间都不同,所以DelayedWorkQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面。

为什么要使用DelayedWorkQueue呢?

定时任务执行时需要取出最近要执行的任务,所以任务在队列中每次出队时一定要是当前队列中执行时间最靠前的,所以自然要使用优先级队列

DelayedWorkQueue是一个优先级队列,它可以保证每次出队的任务都是当前队列中执行时间最靠前的,由于它是基于堆结构的队列,堆结构在执行插入和删除操作时的最坏时间复杂度是 O(logN)。

DelayedWorkQueue的数据结构

//初始大小private static final int INITIAL_CAPACITY = 16;//DelayedWorkQueue是由一个大小为16的数组组成,数组元素为实现RunnableScheduleFuture接口的类//实际上为ScheduledFutureTaskprivate RunnableScheduledFuture<?>[] queue =    new RunnableScheduledFuture<?>[INITIAL_CAPACITY];private final ReentrantLock lock = new ReentrantLock();private int size = 0;

可以看出DelayedWorkQueue底层是采用数组构成的,关于DelayedWorkQueue可以看这篇博主的文章,很详细。

关于DelayedWorkQueue我们可以得出这样的结论:DelayedWorkQueue是基于堆的数据结构,按照时间顺序将每个任务进行排序,将待执行时间越近的任务放在在队列的队头位置,以便于最先进行的任务执行

4.ScheduledThreadPoolExecutor执行过程

现在我们对ScheduledThreadPoolExecutor的两个内部类ScheduledFutueTask和DelayedWorkQueue进行了了解,实际上这也是线程池工作流程中最重要的两个关键因素:任务以及阻塞队列。现在我们来看下ScheduledThreadPoolExecutor提交一个任务后,整体的执行过程。以ScheduledThreadPoolExecutor的schedule方法为例,具体源码为:

public ScheduledFuture<?> schedule(Runnable command,                                   long delay,                                   TimeUnit unit) {    if (command == null || unit == null)        throw new NullPointerException();    //将提交的任务转换成ScheduledFutureTask    RunnableScheduledFuture<?> t = decorateTask(command,        new ScheduledFutureTask<Void>(command, null,                                      triggerTime(delay, unit)));    //延时执行任务ScheduledFutureTask    delayedExecute(t);    return t;}

方法很容易理解,为了满足ScheduledThreadPoolExecutor能够延时执行任务和能够周期执行任务的特性,会先将实现Runnable接口的类转换成ScheduledFutureTask。然后会调用delayedExecute方法进行执行任务,这个方法也是关键方法,来看下源码:

private void delayedExecute(RunnableScheduledFuture<?> task) {    if (isShutdown())        //如果当前线程池已经关闭,则拒绝任务        reject(task);    else {        //将任务放入阻塞队列中        super.getQueue().add(task);        if (isShutdown() &&            !canRunInCurrentRunState(task.isPeriodic()) &&            remove(task))            task.cancel(false);        else            //保证至少有一个线程启动,即使corePoolSize=0            ensurePrestart();    }}

delayedExecute方法的主要逻辑请看注释,可以看出该方法的重要逻辑会是在ensurePrestart()方法中,它的源码为:

void ensurePrestart() {    int wc = workerCountOf(ctl.get());    if (wc < corePoolSize)        addWorker(null, true);    else if (wc == 0)        addWorker(null, false);}

可以看出该方法逻辑很简单,关键在于它所调用的addWorker方法,该方法主要功能:新建Worker类,当执行任务时,就会调用被Worker所重写的run方法,进而会继续执行runWorker方法。在runWorker方法中会调用getTask方法从阻塞队列中不断的去获取任务进行执行,直到从阻塞队列中获取的任务为null的话,线程结束终止。addWorker方法是ThreadPoolExecutor类中的方法,对ThreadPoolExecutor的源码分析可以看这篇文章,很详细。

总结一下runWorker方法的执行过程:

  1. while循环不断地通过getTask()方法获取任务;
  2. getTask()方法从阻塞队列中取任务;
  3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
  4. 调用task.run()执行任务;
  5. 如果task为null则跳出循环,执行processWorkerExit()方法;
  6. runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。

5.总结

  1. ScheduledThreadPoolExecutor继承了ThreadPoolExecutor类,因此,整体上功能一致,线程池主要负责创建线程(Worker类),线程从阻塞队列中不断获取新的异步任务,直到阻塞队列中已经没有了异步任务为止。但是相较于ThreadPoolExecutor来说,ScheduledThreadPoolExecutor具有延时执行任务和可周期性执行任务的特性,ScheduledThreadPoolExecutor重新设计了任务类ScheduleFutureTask,ScheduleFutureTask重写了run方法使其具有可延时执行和可周期性执行任务的特性。另外,阻塞队列DelayedWorkQueue是可根据优先级排序的队列,采用了堆的底层数据结构,使得与当前时间相比,待执行时间越靠近的任务放置队头,以便线程能够获取到任务进行执行;
  2. 线程池无论是ThreadPoolExecutor还是ScheduledThreadPoolExecutor,在设计时的三个关键要素是:任务,执行者以及任务结果。**它们的设计思想也是完全将这三个关键要素进行了解耦。

执行者

任务的执行机制,完全交由Worker类,也就是进一步了封装了Thread。向线程池提交任务,无论为ThreadPoolExecutor的execute方法和submit方法,还是ScheduledThreadPoolExecutor的schedule方法,都是先将任务移入到阻塞队列中,然后通过addWork方法新建了Work类,并通过runWorker方法启动线程,并不断的调用getTask()从阻塞队列中获取异步任务执行交给Worker执行,直至阻塞队列中无法取到任务为止,返回null。

任务

在ThreadPoolExecutor和ScheduledThreadPoolExecutor中任务是指实现了Runnable接口和Callable接口的实现类。ThreadPoolExecutor中会将任务转换成FutureTask,而在ScheduledThreadPoolExecutor中为了实现可延时执行任务和周期性执行任务的特性,任务会被转换成ScheduledFutureTask类,该类继承了FutureTask,并重写了run方法。

任务结果

在ThreadPoolExecutor中提交任务后,获取任务结果可以通过Future接口的类,在ThreadPoolExecutor中实际上为FutureTask类,而在ScheduledThreadPoolExecutor中则是ScheduledFutureTask

原文地址:https://www.cnblogs.com/itxiaok/p/10356596.html

时间: 2024-08-04 08:20:31

22.线程池之ScheduledThreadPoolExecutor的相关文章

线程池 一 ScheduledThreadPoolExecutor

java.util.concurrent public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService 构造函数 ScheduledFutureTask(Runnable r, V result, long triggerTime, long sequenceNumber) ScheduledFutureTask(Runnable r, V resul

线程池原理

在面向对象编程中,对象创建和销毁是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是对一些很耗资源的对象创建和销毁.如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池就是遵循这一思想而产生的,下面将介绍的线程池技术同样符合这一思想. 多线程技术主要解决处

线程池你真不来了解一下吗?

前言 只有光头才能变强 回顾前面: ThreadLocal就是这么简单 多线程三分钟就可以入个门了! 多线程基础必要知识点!看了学习多线程事半功倍 Java锁机制了解一下 AQS简简单单过一遍 Lock锁子类了解一下 本篇主要是讲解线程池,这是我在多线程的倒数第二篇了,后面还会有一篇死锁.主要将多线程的基础过一遍,以后有机会再继续深入! 那么接下来就开始吧,如果文章有错误的地方请大家多多包涵,不吝在评论区指正哦~ 声明:本文使用JDK1.8 一.线程池简介 线程池可以看做是线程的集合.在没有任务

43_2013年11月22日 线程池 Socket(Thread Lock Process 摇奖 线程池ThreadPool)

1>模拟线程池,生产者消费者问题 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Product { class Program { static void Main(string[] args) { //创建一个池子 MyConncetion[]

《java.util.concurrent 包源码阅读》14 线程池系列之ScheduledThreadPoolExecutor 第一部分

ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,同时实现了ScheduledExecutorService接口. public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService ScheduledThreadPoolExecutor的功能主要有两点:在固定的时间点执行(也可以认为是延迟执行),重复执行.

ScheduledThreadPoolExecutor 线程池调度 使用

package other; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util

ScheduledThreadPoolExecutor周期任务或延时任务线程池

ScheduledThreadPoolExecutor可以代替timer,timer的缺点是一个timer启动一个线程,如果任务在执行时候发生异常,该timer对应的线程会结束 ScheduledThreadPoolExecutor的效率更高,他比timer支持更丰富的功能 public class ScheduledThreadPoolExecutorTest { public static void main(String[] args) throws InterruptedExceptio

深入浅出 Java Concurrency (35): 线程池 part 8 线程池的实现及原理 (3)[转]

线程池任务执行结果 这一节来探讨下线程池中任务执行的结果以及如何阻塞线程.取消任务等等. 1 package info.imxylz.study.concurrency.future;2 3 public class SleepForResultDemo implements Runnable {4 5     static boolean result = false;6 7     static void sleepWhile(long ms) {8         try {9      

Java四种线程池

Java四种线程池newCachedThreadPool,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor 时间:2015-10-20 22:37:40      阅读:8762      评论:0      收藏:0      [点我收藏+] 介绍new Thread的弊端及Java四种线程池的使用,对Android同样适用.本文是基础篇,后面会分享下线程池一些高级功能. 1.new Thread的弊端执行一个异