并发编程之定时任务

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor 实现了ScheduledExecutorService。主要用来处理延时任务和定时任务。

 public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

定时线程池的执行原理与一般的线程池执行过程有点差别,具体的定时线程的执行原理如下图所示:

定时线程池主要是接收ScheduledFutureTask任务,是线程池调度任务的最小单位,有3种提交方式:

1. schedule:schedule方法是指任务在指定延迟时间后触发,只执行一次。

2. scheduledAtFixedRate:

3. scheduledWithFixedDelay:

具体的案例如下:

package com.test.executor;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class SchelduledThreadPoolTest {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledService=Executors.newScheduledThreadPool(5);
        scheduledService.schedule(new Runnable() {

            @Override
            public void run() {
                System.out.println("延时任务执行");

            }
        }, 1, TimeUnit.SECONDS);

        scheduledService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                // TODO Auto-generated method stub
                System.out.println("不管任务是否执行完成,每过3秒产生一个新线程");
            }
        }, 1, 3, TimeUnit.SECONDS);

        scheduledService.scheduleWithFixedDelay(new Runnable() {

            @Override
            public void run() {
                System.out.println("前一个任务执行完成以后,隔3秒执行下一个任务");

            }
        }, 1, 3, TimeUnit.SECONDS);

    }
}

定时线程池采用的是DelayQueue,是个无界队列,内部封装了priorityQueue,根据time时间先后进行排序,若time相同则用sequenceNumber排序。

ScheduledFutureTask 有以下三种属性:

1、private final long sequenceNumber;任务序号
2、private final long period;任务执行时间间隔

3、private long time;任务开始时间

工作线程的执行过程:

1、工作线程从DelayQueue从获取到期的任务去执行;

2、执行结束后重新设置任务的到期时间,再次放回到DelayQueue中去;

public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

 private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())//如果线程池已经关闭
            reject(task);//拒绝任务
        else {
            super.getQueue().add(task);//否则直接加入队列
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);//若当前线程无法执行,则取消
            else
                ensurePrestart();//增加一个worker,避免提交的任务没有线程去执行,原因就是该类没有像ThreadPoolExecutor一样,woker满了才放入队列
        }
    }
void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

ScheduledThreadPoolExecutor会把执行的任务放到DelayQueue中去,DelayQueue中封装了一个PriorityQueue队列,该队列会对任务ScheduledFutureTask按照时间顺序进行排序,排序算法如下:

public int compareTo(Delayed other) {
            if (other == this) // compare zero ONLY if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long d = (getDelay(TimeUnit.NANOSECONDS) -
                      other.getDelay(TimeUnit.NANOSECONDS));
            return (d == 0)? 0 : ((d < 0)? -1 : 1);
        }

由以上代码可以看出,

1、先按照time进行排序,时间小的排在前面,时间大的排在后面;

2、time相同的话,就按照sequenceNumber来进行排序,sequenceNumber小的排在前面,大的排在后面。即时间相同,先提交的优先执行。

定时线程池任务运行的核心就是ScheduledFutureTask的run方法,下面来看一下:

 public void run() {
            boolean periodic = isPeriodic();
        //如果当前线程已经不支持执行任务,则取消
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
    //不需要周期性执行任务,则直接执行run然后结束任务
            else if (!periodic)
                ScheduledFutureTask.super.run();
    //如果需要周期性执行任务,则执行完任务以后,设置下一次执行时间,然后重复执行
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();//设置下一次执行时间
                reExecutePeriodic(outerTask);//重复执行任务
            }
        }

总结一下,具体的执行步骤如下:

1. 如果当前线程池运行状态不可以执行任务,取消该任务,然后直接返回,否则执行

步骤2;

2. 如果不是周期性任务,调用FutureTask中的run方法执行,会设置执行结果,然后

直接返回,否则执行步骤3;

3. 如果是周期性任务,调用FutureTask中的runAndReset方法执行,不会设置执行

结果,然后直接返回,否则执行步骤4和步骤5;

4. 计算下次执行该任务的具体时间;

5. 重复执行任务。

 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

reExecutePeriodic方法与delayedExecute方法类似,但是不同的是:

1、由于调用reExecutePeriodic方法的时候,任务已经执行过一次了,所以不会拒绝当前任务;

2、传入的任务一定是周期性执行的任务。

DelayedWorkQueue

DelayedWorkQueue是一个基于堆数据结构的无界队列。在执行任务的时候每个任务的执行时间都不一样,所以它的工作就是按照时间升序排列,执行时间距离当前时间越近则越排在队列前。但是这里的顺序并不是绝对的,因为堆中排序只是保证了子节点的任务执行时间要比父节点的下次执行时间要大,各个子节点之间并不是顺序排列的。

堆的数据结构如下:

堆结构可以转换成数组,如下:

{1,3,4,8,10,15,20}

假设,索引值从0开始,子节点的索引值为k,父节点的索引值为p,则:

1. 一个节点的左子节点的索引为:k = p * 2 + 1;

2. 一个节点的右子节点的索引为:k = (p + 1) * 2;

3. 一个节点的父节点的索引为:p = (k - 1) / 2。

为什么使用DelayedWorkQueue?

因为定时任务需要取出最近需要执行的任务,所以任务队列当中每次出队的一定是队列当中最靠前的任务,所以要用优先级队列。而DelayedWorkQueue就是一个优先级队列,可以保证每次出队的任务都是当前队列当中最靠前的任务,而且该队列还是堆结构的,在执行插入和删除的时候复杂度为O(logN).

DelayedWorkQueue属性

private static final int INITIAL_CAPACITY = 16;队列的初始容量
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];根据初始容量创建的RunnableScheduledFuture数组
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;

private Thread leader = null;leader线程

private final Condition available = lock.newCondition();当较新的任务在队列的头部可用时,或者说有新的线程可能需要成为leader的时候,通过这个条件发出信号。

对于多线程网络模型来说,线程有三种身份,leader,follower,proccesser。基准就是永远最多只有一个leader,所有的follower都在等待成为leader。线程池启动会自动产生一个leader,负责等待网络IO时间,当有一个事件产生的时候leader首先通知一个follower成为leader,然后自己就去处理这个网络事件,完毕以后自己加入follower线程等待队列当中去,等待下一次成为leader。这样的话可以增强CPU的高速缓存性,消除动态内存分配以及线程间的数据交换。

Offer方法

public boolean offer(Runnable x) {
//参数校验
    if (x == null)
        throw new NullPointerException()?
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x?
    final ReentrantLock lock = this.lock?
    lock.lock()?
    try {
//查看当前元素数量,如果大于队列长度则进行扩容
        int i = size?
        if (i >= queue.length)
            grow()?
//元素数量加1
        size = i + 1?
//如果当前队列还没有元素,则直接加入头部
        if (i == 0) {
            queue[0] = e?
//记录索引
            setIndex(e, 0)?
        } else {
  //把任务加入堆中,并调整堆结构,这里就会根据任务的触发时间排列
             //把需要最早执行的任务放在前面
            siftUp(i, e)?
        }
//如果新加入的元素就是队列头,这里有两种情况
        //1.这是用户提交的第一个任务
        //2.新任务进行堆调整以后,排在队列头
        if (queue[0] == e) {
// leader设置为null为了使在take方法中的线程在通过available.signal()?后会执行
available.awaitNanos(delay)?
            leader = null?
//加入元素以后,唤醒worker线程
            available.signal()?
        }
    } finally {
        lock.unlock()?
    }
    return true?
}

来看一下siftUp()方法:

private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
   // 获取父节点
        int parent = (k ­ 1) >>> 1?
        RunnableScheduledFuture<?> e = queue[parent]?
   // 如果key节点的执行时间大于父节点的执行时间,不需要再排序了
        if (key.compareTo(e) >= 0)
            break?
  // 如果key.compareTo(e) < 0,说明key节点的执行时间小于父节点的执行时间,需要把父节点移到后面
        queue[k] = e?
        setIndex(e, k)?
// 设置索引为k
        k = parent?
    }
// key设置为排序后的位置中
    queue[k] = key?
    setIndex(key, k)?
}

简言之就是循环的根据key节点与他的父节点进行比较,key节点的时间小于父节点,则交换位置,将时间点靠前的排在前面。

public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don‘t retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

在getTask的时候使用take方法,该方法是保证当前对列取出的就是最新需要执行的任务。保证了任务只有在指定的执行时间的时候才可以被取走。

原文地址:https://www.cnblogs.com/yatou-blog/p/11775004.html

时间: 2024-10-12 20:49:57

并发编程之定时任务的相关文章

漫谈并发编程(一) - 并发简介

并发编程是每个程序员进阶的必修之课,想写一个安全稳定,性能强劲的并发程序可没那么容易.我将在未来的日子里,与大家分享一个并发小白成长路上的所思所想.并发编程的思想是通的,但是例子得要是具现的,在该系列中将使用java语言用以演示. 此文作为为漫谈并发编程系列的第一篇,由于本人喜欢先论理再论事,而非先论事再论理,所以就以一篇对并发的文字描述开头了. 并发编程由来 早年的计算机中没有操作系统,在某个时间段内只支持运行一个程序,并且这个程序能访问计算机的所有资源.在这个程序完全执行完后,再执行下一个程

漫谈并发编程(一) - 并发简单介绍

并发编程是每一个程序猿进阶的必修之课,想写一个安全稳定,性能强劲的并发程序可没那么easy.我将在未来的日子里,与大家分享一个并发小白成长路上的所思所想.并发编程的思想是通的,可是样例得要是具现的,在该系列中将使用java语言用以演示. 此文作为为漫谈并发编程系列的第一篇,探本溯源,以一篇对并发的文字描写叙述开头. 并发编程由来 早年的计算机中没有操作系统,在某个时间段内仅仅支持运行一个程序,而且这个程序能訪问计算机的全部资源.在这个程序全然运行完后,再运行下一个程序. 在此时,引入并发编程的优

Java并发编程75个问答

1.在java中守护线程和本地线程区别? java中的线程分为两种:守护线程(Daemon)和用户线程(User). 任何线程都可以设置为守护线程和用户线程,通过方法Thread.setDaemon(bool on):true则把该线程设置为守护线程,反之则为用户线程.Thread.setDaemon()必须在Thread.start()之前调用,否则运行时会抛出异常. 两者的区别: 唯一的区别是判断虚拟机(JVM)何时离开,Daemon是为其他线程提供服务,如果全部的User Thread已经

Java——并发编程

1.在java中守护线程和本地线程区别? java中的线程分为两种:守护线程(Daemon)和用户线程(User). 任何线程都可以设置为守护线程和用户线程,通过方法Thread.setDaemon(bool on):true则把该线程设置为守护线程,反之则为用户线程.Thread.setDaemon()必须在Thread.start()之前调用,否则运行时会抛出异常. 两者的区别:  唯一的区别是判断虚拟机(JVM)何时离开,Daemon是为其他线程提供服务,如果全部的User Thread已

大数据技术之_16_Scala学习_11_客户信息管理系统+并发编程模型 Akka+Akka 网络编程-小黄鸡客服案例+Akka 网络编程-Spark Master Worker 进程通讯项目

第十五章 客户信息管理系统15.1 项目的开发流程15.2 项目的需求分析15.3 项目的界面15.4 项目的设计-程序框架图15.5 项目的功能实现15.5.1 完成 Customer 类15.5.2 完成显示主菜单和退出软件功能15.5.3 完成显示客户列表的功能15.5.4 完成添加客户的功能15.5.5 完成删除客户的功能15.5.6 完善退出确认功能15.5.7 完善删除确认功能15.5.8 完成修改客户的功能第十六章 并发编程模型 Akka16.1 Akka 的介绍16.2 Acto

4.并发编程多线程

并发编程之多线程(理论) 一 threading模块介绍 multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性,因而不再详细介绍 官网链接:https://docs.python.org/3/library/threading.html?highlight=threading# 二 开启线程的两种方式 #方式一 from threading import Thread import time def sayhi(name): time.sleep(

Java并发编程核心概念一览

并行相关概念 同步和异步 同步和异步通常来形容一次方法的调用.同步方法一旦开始,调用者必须等到方法结束才能执行后续动作:异步方法则是在调用该方法后不必等到该方法执行完就能执行后面的代码,该方法会在另一个线程异步执行,异步方法总是伴随着回调,通过回调来获得异步方法的执行结果. 并发和并行 很多人都将并发与并行混淆在一起,它们虽然都可以表示两个或者多个任务一起执行,但执行过程上是有区别的.并发是多个任务交替执行,多任务之间还是串行的:而并行是多个任务同时执行,和并发有本质区别. 对计算机而言,如果系

Java 面试宝典!并发编程 71 道题及答案全送上!

金九银十跳槽季已经开始,作为 Java 开发者你开始刷面试题了吗?别急,我整理了71道并发相关的面试题,看这一文就够了! 1.在java中守护线程和本地线程区别? java中的线程分为两种:守护线程(Daemon)和用户线程(User). 任何线程都可以设置为守护线程和用户线程,通过方法Thread.setDaemon(bool on):true则把该线程设置为守护线程,反之则为用户线程.Thread.setDaemon()必须在Thread.start()之前调用,否则运行时会抛出异常. 两者

并发编程71道

转 https://www.cnblogs.com/lfs2640666960/p/11488629.html 金九银十跳槽季已经开始,作为 Java 开发者你开始刷面试题了吗?别急,我整理了71道并发相关的面试题,看这一文就够了! 1.在java中守护线程和本地线程区别? java中的线程分为两种:守护线程(Daemon)和用户线程(User). 任何线程都可以设置为守护线程和用户线程,通过方法Thread.setDaemon(bool on):true则把该线程设置为守护线程,反之则为用户线