《java.util.concurrent 包源码阅读》14 线程池系列之ScheduledThreadPoolExecutor 第一部分

ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,同时实现了ScheduledExecutorService接口。

public class ScheduledThreadPoolExecutor
        extends ThreadPoolExecutor
        implements ScheduledExecutorService

ScheduledThreadPoolExecutor的功能主要有两点:在固定的时间点执行(也可以认为是延迟执行),重复执行。

和分析ThreadPoolExecutor时一样,首先来看核心方法execute:

    public void execute(Runnable command) {
        schedule(command, 0, TimeUnit.NANOSECONDS);
    }

execute方法调用了另外一个方法schedule,同时我们发现三个submit方法也是同样调用了schedule方法,因为有两种类型的任务:Callable和Runnable,因此schedule也有两个重载方法。

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

两个方法逻辑基本一致,都是把任务包装成RunnableScheduledFuture对象,然后调用delayedExecute来实现延迟执行。任务包装类继承自ThreadPoolExecutor的包装类RunnableFuture,同时实现ScheduledFuture接口使包装类具有了延迟执行和重复执行这些功能以匹配ScheduledThreadPoolExecutor。

因此首先来看ScheduledFutureTask,以下是ScheduledFutureTask专有的几个变量:

private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {

        /** 针对线程池所有任务的序列号 */
        private final long sequenceNumber;

        /** 距离任务开始执行的时间,纳秒为单位 */
        private long time;

        /**
         * 重复执行任务的间隔,即每隔多少时间执行一次任务
         */
        private final long period;

        /** 重复执行任务和排队时用这个类型的对象, */
        RunnableScheduledFuture<V> outerTask = this;

        /**
         * 在延迟队列的索引,这样取消任务时使用索引会加快查找速度
         */
        int heapIndex;

来看核心方法run:

        public void run() {
            boolean periodic = isPeriodic();
            // 检测是否可以运行任务,这里涉及到另外两个变量:continueExistingPeriodicTasksAfterShutdown
            // 和executeExistingDelayedTasksAfterShutdown
            // 前者允许在shutdown之后继续执行重复执行的任务
            // 后者允许在shutdown之后继续执行延时执行的任务,
            // 因此这里根据任务是否为periodic来决定采用哪个选项,然后
            // 如果线程池正在运行,那么肯定可以执行
            // 如果正在shutdown,那么要看选项的值是否为true来决定是否允许执行任务
            // 如果不被允许的话,就会取消任务
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            // 如果可以执行任务,对于不用重复执行的任务,直接执行即可
            else if (!periodic)
                ScheduledFutureTask.super.run();
            // 对于需要重复执行的任务,则执行一次,然后reset
            // 更新一下下次执行的时间,调用reExecutePeriodic更新任务在执行队列的
            // 位置(其实就是添加到队列的末尾)
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }

因此这里可以得出关于重复执行的实现:任务执行一次,Reset状态,重新加入到任务队列。

回到delayedExecute,它可以保证任务在准确时间点执行,来看delayedExecute是如果实现延迟执行的:

    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

乍看之下,发现也就是把任务加入到任务队列中,那么这个延时执行的功能是如何实现的,秘密就在任务队列的实现。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

ScheduledThreadPoolExecutor的任务队列不是普通的BlockingQueue,而是一个特殊的实现DelayedWorkQueue。下一篇文章就来说说这个DelayedWorkQueue。

《java.util.concurrent 包源码阅读》14 线程池系列之ScheduledThreadPoolExecutor 第一部分

时间: 2024-07-28 14:25:11

《java.util.concurrent 包源码阅读》14 线程池系列之ScheduledThreadPoolExecutor 第一部分的相关文章

《java.util.concurrent 包源码阅读》 结束语

<java.util.concurrent 包源码阅读>系列文章已经全部写完了.开始的几篇文章是根据自己的读书笔记整理出来的(当时只阅读了部分的源代码),后面的大部分都是一边读源代码代码,一边写文章. 由于水平有限,在阅读源代码的时候,分析得也比较浅显,也有很多地方自己也没有研究明白,文章有的地方显得语焉不详,只能请各位多多见谅了. 后面会继续写一些关于Java并发编程的文章,希望各位多多指教. 这里整理了一个简单的目录,包含了本系列所有文章的链接: <java.util.concurr

《java.util.concurrent 包源码阅读》13 线程池系列之ThreadPoolExecutor 第三部分

这一部分来说说线程池如何进行状态控制,即线程池的开启和关闭. 先来说说线程池的开启,这部分来看ThreadPoolExecutor构造方法: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecut

《java.util.concurrent 包源码阅读》06 ArrayBlockingQueue

对于BlockingQueue的具体实现,主要关注的有两点:线程安全的实现和阻塞操作的实现.所以分析ArrayBlockingQueue也是基于这两点. 对于线程安全来说,所有的添加元素的方法和拿走元素的方法都会涉及到,我们通过分析offer方法和poll()方法就能看出线程安全是如何实现的. 首先来看offer方法 public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lo

《java.util.concurrent 包源码阅读》05 BlockingQueue

想必大家都很熟悉生产者-消费者队列,生产者负责添加元素到队列,如果队列已满则会进入阻塞状态直到有消费者拿走元素.相反,消费者负责从队列中拿走元素,如果队列为空则会进入阻塞状态直到有生产者添加元素到队列.BlockingQueue就是这么一个生产者-消费者队列. BlockingQueue是Queue的子接口 public interface BlockingQueue<E> extends Queue<E> BlockingQueue拿走元素时,如果队列为空,阻塞等待会有两种情况:

《java.util.concurrent 包源码阅读》10 线程池系列之AbstractExecutorService

AbstractExecutorService对ExecutorService的执行任务类型的方法提供了一个默认实现.这些方法包括submit,invokeAny和InvokeAll. 注意的是来自Executor接口的execute方法是未被实现,execute方法是整个体系的核心,所有的任务都是在这个方法里被真正执行的,因此该方法的不同实现会带来不同的执行策略.这个在后面分析ThreadPoolExecutor和ScheduledThreadPoolExecutor就能看出来. 首先来看su

《java.util.concurrent 包源码阅读》03 锁

Condition接口 应用场景:一个线程因为某个condition不满足被挂起,直到该Condition被满足了. 类似与Object的wait/notify,因此Condition对象应该是被多线程共享的,需要使用锁保护其状态的一致性 示例代码: class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition

《java.util.concurrent 包源码阅读》02 关于java.util.concurrent.atomic包

Aomic数据类型有四种类型:AomicBoolean, AomicInteger, AomicLong, 和AomicReferrence(针对Object的)以及它们的数组类型, 还有一个特殊的AomicStampedReferrence,它不是AomicReferrence的子类,而是利用AomicReferrence实现的一个储存引用和Integer组的扩展类 首先,所有原子操作都是依赖于sun.misc.Unsafe这个类,这个类底层是由C++实现的,利用指针来实现数据操作 关于CAS

《java.util.concurrent 包源码阅读》11 线程池系列之ThreadPoolExecutor 第一部分

先来看ThreadPoolExecutor的execute方法,这个方法能体现出一个Task被加入到线程池之后都发生了什么: public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* 如果运行中的worker线程数少于设定的常驻线程数,增加worker线程,把task分配给新建的worker线程 */ int c = ctl.get(); if (worker

《java.util.concurrent 包源码阅读》09 线程池系列之介绍篇

concurrent包中Executor接口的主要类的关系图如下: Executor接口非常单一,就是执行一个Runnable的命令. public interface Executor { void execute(Runnable command); } ExecutorService接口扩展了Executor接口,增加状态控制,执行多个任务返回Future. 关于状态控制的方法: // 发出关闭信号,不会等到现有任务执行完成再返回,但是现有任务还是会继续执行, // 可以调用awaitTe