周期性线程池与主要源码解析

之前学习ThreadPool的使用以及源码剖析,并且从面试的角度去介绍知识点的解答。今天小强带来周期性线程池的使用和重点源码剖析。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor:用来处理延时任务或定时任务
定时线程池类的类结构图

ScheduledThreadPoolExecutor接收ScheduleFutureTask类型的任务,是线程池调度任务的最小单位。
它采用DelayQueue存储等待的任务:
1、DelayQueue内部封装成一个PriorityQueue,它会根据time的先后时间顺序,如果time相同则根绝sequenceNumber排序;
2、DelayQueue是无界队列;

ScheduleFutureTask

接收的参数:

private final long sequenceNumber;//任务的序号
private long time;//任务开始的时间
private final long period;//任务执行的时间间隔

工作线程的的执行过程:
工作线程会从DelayQueue取出已经到期的任务去执行;
执行结束后重新设置任务的到期时间,再次放回DelayQueue;

ScheduledThreadPoolExecutor会把待执行的任务放到工作队列DelayQueue中,DelayQueue封装了一个PriorityQueue,PriorityQueue会对队列中的ScheduledFutureTask进行排序,具体的排序算法实现如下:

public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        //首先按照time排序,time小的排到前面,time大的排到后面
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        //time相同,按照sequenceNumber排序;
        //sequenceNumber小的排在前面,sequenceNumber大的排在后面
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

接下来看看ScheduledFutureTask的run方法实现, run方法是调度task的核心,task的执行实际上是run方法的执行。

public void run() {
    //是否是周期性的
    boolean periodic = isPeriodic();
    //线程池是shundown状态不支持处理新任务,直接取消任务
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    //如果不需要执行执行周期性任务,直接执行run方法结束
    else if (!periodic)
        ScheduledFutureTask.super.run();
    //如果需要周期性执行,则在执行任务完成后,设置下一次执行时间
    else if (ScheduledFutureTask.super.runAndReset()) {
        //设置下一次执行该任务的时间
        setNextRunTime();
        //重复执行该任务
        reExecutePeriodic(outerTask);
    }
}

run方法的执行步骤:

  • 1、如果线程池是shundown状态不支持处理新任务,直接取消任务,否则步骤2;
  • 2、如果不是周期性任务,直接调用ScheduledFutureTask的run方法执行,会设置执行结果,然后直接返回,否则步骤3;
  • 3、如果是周期性任务,调用ScheduledFutureTask的runAndset方法执行,不会设置执行结果,然后直接返回,否则执行步骤4和步骤5;
  • 4、计算下一次执行该任务的时间;
  • 5、重复执行该任务;

接下来看下reExecutePeriodic方法的执行步骤:

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

由于已经执行过一次周期性任务,所以不会reject当前任务,同时传入的任务一定是周期性任务。

周期性线程池任务的提交方式

周期性有三种提交的方式:schedule、sceduleAtFixedRate、schedlueWithFixedDelay。下面从使用和源码两个方面进行说明,首先是如果提交任务:

pool.schedule(new Runnable() {
    @Override
    public void run() {
        System.out.println("延迟执行");
    }
},1, TimeUnit.SECONDS);

/**
 * 这个执行周期是固定,不管任务执行多长时间,每过3秒中就会产生一个新的任务
 */
pool.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        //这个业务逻辑需要很长的时间,超过了3秒
        System.out.println("重复执行");
    }
},1,3,TimeUnit.SECONDS);

pool.shutdown();

/**
 * 假如run方法30min后执行完成,然后间隔3秒,再周期性执行下一个任务
 */
pool.scheduleWithFixedDelay(new Runnable() {
    @Override
    public void run() {
        //30min
        System.out.println("重复执行");
    }
},1,3,TimeUnit.SECONDS);

知道了如何提交周期性任务,接下来源码是如何执行的,首先是schedule方法,该方法是指任务在指定延迟时间到达后触发,只会执行一次。

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    //把任务封装成ScheduledFutureTask,之后调用decorateTask进行包装;
    //decorateTask方法是空方法,留给用户去实现的;
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    //包装好任务之后,进行任务的提交
    delayedExecute(t);
    return t;
}

任务提交方法:

private void delayedExecute(RunnableScheduledFuture<?> task) {
    //如果线程池不是RUNNING状态,则使用拒绝策略把提交任务拒绝掉
    if (isShutdown())
        reject(task);
    else {
        //与ThreadPoolExecutor不同,这里直接把任务加入延迟队列
        super.getQueue().add(task);
        //如果当前状态无法执行任务,则取消
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
        //和ThreadPoolExecutor不一样,corePoolSize没有达到会增加Worker;
        //增加Worker,确保提交的任务能够被执行
            ensurePrestart();
    }
}

还没关注我的公众号?

  • 扫文末二维码关注公众号【小强的进阶之路】可领取如下:
  • 学习资料: 1T视频教程:涵盖Javaweb前后端教学视频、机器学习/人工智能教学视频、Linux系统教程视频、雅思考试视频教程;
  • 100多本书:包含C/C++、Java、Python三门编程语言的经典必看图书、LeetCode题解大全;
  • 软件工具:几乎包括你在编程道路上的可能会用到的大部分软件;
  • 项目源码:20个JavaWeb项目源码。

原文地址:https://www.cnblogs.com/xiaoqiang-code/p/11432936.html

时间: 2024-10-05 06:16:55

周期性线程池与主要源码解析的相关文章

线程池之ThreadPoolExecutor源码解析

1.变量 ThreadPoolExecutor先定义了这几个常量,初看时一脸懵逼,其实它就是用int的二进制高三位来表示线程池的状态, 先回顾一下位运算: <<’左移:右边空出的位置补0,其值相当于乘以2. ‘>>’右移:左边空出的位,如果是正数则补0,若为负数则补0或1,取决于所用的计算机系统OS X中补1.其值相当于除以2. 负数二进制由它的绝对值取反后加1 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RU

JAVA线程池原理与源码分析

1.线程池常用接口介绍 1.1.Executor public interface Executor { void execute(Runnable command); } 执行提交的Runnable任务.其中的execute方法在将来的某个时候执行给定的任务,该任务可以在新线程.池化线程或调用线程中执行,具体由Executor的实现者决定. 1.2.ExecutorService ExecutorService继承自Executor,下面挑几个方法介绍: 1.2.1.shutdown() vo

java并发之线程池Executor 核心源码解析

1.什么是线程池 定义:线程池是指管理一组同构工作线程的资源池 组成部分: 线程管理器(ThreadPool):用于创建并管理线程池.包括创建线程池,销毁线程池,添加新任务 工作线程(PoolWorker):线程池中的线程 任务接口(Task):每个任务必须实现的接口,一共工作线程调度任务的执行 任务队列:用于存放没有处理的任务,提供一种缓冲机制 2.为什么要使用线程池 通过重用现有的线程而不是创建新线程,从而减少了线程创建 和 销毁过程中的巨大开销 当请求到达时,工作线程已经存在,不用再等待线

Java线程池ThreadPoolExector的源码分析

前言:线程是我们在学习java过程中非常重要的也是绕不开的一个知识点,它的重要程度可以说是java的核心之一,线程具有不可轻视的作用,对于我们提高程序的运行效率.压榨CPU处理能力.多条线路同时运行等都是强有力的杀手锏工具.线程是如此的重要,那么我们来思考这样一个问题.假设我们有一个高并发,多线程的项目,多条线程在运行的时候,来一个任务我们new一个线程,任务结束了,再把它销毁结束,这样看似没有问题,适合于低并发的场景,可是当我们的项目投入到生产环境,一下涌入千条任务的时候,线程不断的new执行

Java线程池及其底层源码实现分析

1.相关类 Executors  ExecutorService   Callable   ThreadPool     Future 2.相关接口 Executor Executor接口的使用: public class TestExecutor implements Executor{ @Override public void execute(Runnable command){ //调用execute方法常常传入runnable接口对象,开启线程 } } ExecutorService接

Linux简单线程池实现(带源码)

这里给个线程池的实现代码,里面带有个应用小例子,方便学习使用,代码 GCC 编译可用.参照代码看下面介绍的线程池原理跟容易接受,百度云下载链接: http://pan.baidu.com/s/1i3zMHDV 一.线程池简介 为什么使用线程池? 目前的大多数网络服务器,包括Web服务器.Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短. 传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任

JDK线程池框架Executor源码阅读

Executor框架 Executor ExecutorService AbstractExecutorService ThreadPoolExecutor ThreadPoolExecutor继承AbstractExecutorService,是一个线程池的具体的实现 主要成员 1. ctl private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT

C#线程池操作演示源码

把开发过程中经常用到的一些代码段做个备份,下面代码内容是关于C#线程池操作演示的代码. static void Main(string[] args){ThreadPool.SetMaxThreads(1000, 1000);for (int i = 0; i < 10;i ){ThreadPool.QueueUserWorkItem(new WaitCallback(ShowMessage), string.Format("当前编号{0}",i));}Console.ReadL

Java源码解析 - ThreadPoolExecutor 线程池

1 线程池的好处 线程使应用能够更加充分合理地协调利用CPU.内存.网络.I/O等系统资源.线程的创建需要开辟虚拟机栈.本地方法栈.程序计数器等线程私有的内存空间;在线程销毁时需要回收这些系统资源.频繁地创建和销毁线程会浪费大量的系统资源,增加并发编程风险. 在服务器负载过大的时候,如何让新的线程等待或者友好地拒绝服务? 这些都是线程自身无法解决的;所以需要通过线程池协调多个线程,并实现类似主次线程隔离.定时执行.周期执行等任务. 线程池的作用包括:●利用线程池管理并复用线程.控制最大并发数等●