如何实现分布式定时任务(xxl的实现)

1、前言

定时任务在任何系统中都非常重要,如:订单48小时自动完成,每日重新给会员送优惠券,游戏中每隔半小时给玩家添加体力等等。

对于小型系统我们可以用quartz和spring task实现定时任务,这样都任务存在如下几个任务:

1)单点问题,如果任务服务器挂了,定时任务就挂了;

2)如果任务服务和业务代码耦合在一起,业务服务部署多台主机,任务服务在每天机器上都会触发,引起任务重复执行;

3)任务不可预知执行情况,需要开发人员每天去检查日志,查看是否执行成功;

4)当任务失败了之后,没办法手动执行任务

这时候分布式任务就该出场了。那么分布式任务是如何解决上面当问题当昵?

2、名词说明

调度中心:负责任务调度当服务;

执行器:   执行任务当服务器;

管理中心:负责任务的创建更新删除,查看任务状态,执行过程的服务器。

3、架构图

说明

1)服务注册中心可以是zookeeper,eureka,也可以是自己实现的。

2)leader选择器可以替换为分布式锁(redission),在调度任务的时候控制只有一个调度中心在分配任务,当然也可以使用select * from for update。

目前xxl-job就是采用select * from for update 加时间轮的方式实现的。

package com.xxl.job.admin.core.thread;

import com.xxl.job.admin.core.conf.XxlJobAdminConfig;

import com.xxl.job.admin.core.cron.CronExpression;

import com.xxl.job.admin.core.model.XxlJobInfo;

import com.xxl.job.admin.core.trigger.TriggerTypeEnum;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.sql.Connection;

import java.sql.PreparedStatement;

import java.sql.SQLException;

import java.util.*;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.TimeUnit;

/**

 * @author xuxueli 2019-05-21

 */

public class JobScheduleHelper {

    private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class);

    private static JobScheduleHelper instance = new JobScheduleHelper();

    public static JobScheduleHelper getInstance(){

        return instance;

    }

    private Thread scheduleThread;

    private Thread ringThread;

    private volatile boolean toStop = false;

    private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();

    public void start(){

        // schedule thread

        scheduleThread = new Thread(new Runnable() {

            @Override

            public void run() {

                try {

                    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );

                catch (InterruptedException e) {

                    if (!toStop) {

                        logger.error(e.getMessage(), e);

                    }

                }

                logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

                while (!toStop) {

                    // 扫描任务

                    long start = System.currentTimeMillis();

                    Connection conn = null;

                    PreparedStatement preparedStatement = null;

                    try {

                        if (conn==null || conn.isClosed()) {

                            conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();

                        }

                        conn.setAutoCommit(false);

                        preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = ‘schedule_lock‘ for update" );

                        preparedStatement.execute();

                        // tx start

                        // 1、预读10s内调度任务

                        long maxNextTime = System.currentTimeMillis() + 10000;

                        long nowTime = System.currentTimeMillis();

                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(maxNextTime);

                        if (scheduleList!=null && scheduleList.size()>0) {

                            // 2、推送时间轮

                            for (XxlJobInfo jobInfo: scheduleList) {

                                // 时间轮刻度计算

                                int ringSecond = -1;

                                if (jobInfo.getTriggerNextTime() < nowTime - 10000) {   // 过期超10s:本地忽略,当前时间开始计算下次触发时间

                                    ringSecond = -1;

                                    jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());

                                    jobInfo.setTriggerNextTime(

                                            new CronExpression(jobInfo.getJobCron())

                                                    .getNextValidTimeAfter(new Date())

                                                    .getTime()

                                    );

                                else if (jobInfo.getTriggerNextTime() < nowTime) {    // 过期10s内:立即触发一次,当前时间开始计算下次触发时间

                                    ringSecond = (int)((nowTime/1000)%60);

                                    jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());

                                    jobInfo.setTriggerNextTime(

                                            new CronExpression(jobInfo.getJobCron())

                                                    .getNextValidTimeAfter(new Date())

                                                    .getTime()

                                    );

                                else {    // 未过期:正常触发,递增计算下次触发时间

                                    ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                    jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());

                                    jobInfo.setTriggerNextTime(

                                            new CronExpression(jobInfo.getJobCron())

                                                    .getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime()))

                                                    .getTime()

                                    );

                                }

                                if (ringSecond == -1) {

                                    continue;

                                }

                                // push async ring

                                List<Integer> ringItemData = ringData.get(ringSecond);

                                if (ringItemData == null) {

                                    ringItemData = new ArrayList<Integer>();

                                    ringData.put(ringSecond, ringItemData);

                                }

                                ringItemData.add(jobInfo.getId());

                                logger.debug(">>>>>>>>>>> xxl-job, push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );

                            }

                            // 3、更新trigger信息

                            for (XxlJobInfo jobInfo: scheduleList) {

                                XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);

                            }

                        }

                        // tx stop

                        conn.commit();

                    catch (Exception e) {

                        if (!toStop) {

                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);

                        }

                    finally {

                        if (conn != null) {

                            try {

                                conn.close();

                            catch (SQLException e) {

                            }

                        }

                        if (null != preparedStatement) {

                            try {

                                preparedStatement.close();

                            catch (SQLException ignore) {

                            }

                        }

                    }

                    long cost = System.currentTimeMillis()-start;

                    // next second, align second

                    try {

                        if (cost < 1000) {

                            TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);

                        }

                    catch (InterruptedException e) {

                        if (!toStop) {

                            logger.error(e.getMessage(), e);

                        }

                    }

                }

                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");

            }

        });

        scheduleThread.setDaemon(true);

        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");

        scheduleThread.start();

        // ring thread

        ringThread = new Thread(new Runnable() {

            @Override

            public void run() {

                // align second

                try {

                    TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );

                catch (InterruptedException e) {

                    if (!toStop) {

                        logger.error(e.getMessage(), e);

                    }

                }

                int lastSecond = -1;

                while (!toStop) {

                    try {

                        // second data

                        List<Integer> ringItemData = new ArrayList<>();

                        int nowSecond = (int)((System.currentTimeMillis()/1000)%60);   // 避免处理耗时太长,跨过刻度;

                        if (lastSecond == -1) {

                            lastSecond = (nowSecond+59)%60;

                        }

                        for (int i = 1; i <=60; i++) {

                            int secondItem = (lastSecond+i)%60;

                            List<Integer> tmpData = ringData.remove(secondItem);

                            if (tmpData != null) {

                                ringItemData.addAll(tmpData);

                            }

                            if (secondItem == nowSecond) {

                                break;

                            }

                        }

                        lastSecond = nowSecond;

                        // ring trigger

                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );

                        if (ringItemData!=null && ringItemData.size()>0) {

                            // do trigger

                            for (int jobId: ringItemData) {

                                // do trigger

                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1nullnull);

                            }

                            // clear

                            ringItemData.clear();

                        }

                    catch (Exception e) {

                        if (!toStop) {

                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);

                        }

                    }

                    // next second, align second

                    try {

                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);

                    catch (InterruptedException e) {

                        if (!toStop) {

                            logger.error(e.getMessage(), e);

                        }

                    }

                }

                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");

            }

        });

        ringThread.setDaemon(true);

        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");

        ringThread.start();

    }

    public void toStop(){

        toStop = true;

        // interrupt and wait

        scheduleThread.interrupt();

        try {

            scheduleThread.join();

        catch (InterruptedException e) {

            logger.error(e.getMessage(), e);

        }

        // interrupt and wait

        ringThread.interrupt();

        try {

            ringThread.join();

        catch (InterruptedException e) {

            logger.error(e.getMessage(), e);

        }

    }

}

通过代码,我们可以发现调度中心由两个线程完成,第一个线程不停的取最近10s钟待开始的任务,把任务放入时间轮中,第二个线程从时间轮中获取需要开始的任务,开始执行任务。

当然任务调度还可以使用DelayQueue(https://soulmachine.gitbooks.io/system-design/content/cn/task-scheduler.html

定时任务一直有一个头疼的问题,就是高频调度的执行时间比较长的任务,一般建议指定到单独一台主机上并保证在单机上任务不会并发执行来解决。

4、分布式定时任务中依赖任务的解决方案

1)任务依赖不支持环,只支持DAG;

如:A->B->(C,D)->E    其中CD并行,其余串行

2)下游任务只支持上游所有任务都成功并调度时间到了,才执行任务;

如:

JobA只有在Job1,Job2,Job3都执行完,并且3点时间到了才能执行。

3)不支持有不同调度周期的任务存在依赖关系

如:A->B      B的前置任务为A, A的调度周期为每15分钟调度一次, B为每天早上1点调度,该任务不建议分布式调度中心执行。

不支持原因:

1)改种情况在具体业务中比较少;

2)支持改种流程会提升分布式定时任务对负责度同时很难判断前置任务是成功还是失败;

3)建议把A任务拆分为两个任务,一个为B对前置任务A1,一个为每15分钟执行一次(调度时间过滤掉A1)的任务

实现:

在任务回调成功之后,查询任务到依赖任务,开始执行。

这里面有几个问题需要解决:

1、任务重复执行:

如上面任务,JobA依赖Job1,Job2,Job3执行,同时JobA3点也会调度执行,在3点左右时,Job3执行完后会执行JobA,同时cron调度也会执行JobA,在这种情况怎么保证JobA只被执行一次。

解决办法:在JobA执行前需要把JobA的状态修改为正在执行中,此时,通过update  where jobId = #{jobId} and status=#{未开始执行} 方法执行更新,如果更新记录为1的,任务可以进行执行,如果更新记录为0,抛弃该任务的执行。

2、怎么判断任务该不该执行

条件一:1点钟Job1执行完了,开始找后置任务JobA,JobA是否该执行?怎么判断?

JobA不该执行,前置任务Job2,Job3 都没开始执行,Job1不能执行;

条件二:3点钟Job3执行完了,开始找后置任务JobA,JobA是否该执行?怎么判断?

JobA不该执行,前置任务Job1,Job2,Job3 都执行完了,但是Cron时间还没到,Job1不能执行;

条件三:3点15分调度器开始调度,JobA是否该执行,怎么判断?

JobA该执行,前置任务Job1,Job2,Job3 都执行完了,Cron时间也到了;

判断任务是否执行的逻辑: 如果JobA执行时,需要判断Job1,Job2,Job3是否执行,下面拿Job1为例

假设Job1的历史任务都是正常执行成功的。

情况1:  2019-06-26 00:30:00(today)时,Job1的上一次执行成功时间为2019-06-25:01:00:00 (lastDay),下一次执行时间为:2019-06-26 01:00:00(nextDay).

情况2:  2019-06-26 01:30:00时,Job1的上一次执行成功时间为2019-06-26:01:00:00,下一次执行时间为:2019-06-27 01:00:00.

3、任务失败了,怎么办?

任务失败应该同时执行带依赖执行和不带依赖执行,由页面配置控制。

4、任务失败了,页面配置执行任务时,是否可传参数,参数怎么在任务间传递?

页面配置传参数时,参数需要传递给依赖任务。

5、查看任务执行状态时,是否可以查看依赖到表执行情况?

原文地址:https://www.cnblogs.com/smileIce/p/11156412.html

时间: 2024-10-04 21:58:14

如何实现分布式定时任务(xxl的实现)的相关文章

分布式定时任务

由于项目原因,需要使用分布式定时任务.目前可以使用的定时任务框架包括: A)Quartz:Java事实上的定时任务标准.但Quartz关注点在于定时任务而非数据,并无一套根据数据处理而定制化的流程.虽然Quartz可以基于数据库实现作业的高可用,但缺少分布式并行调度的功能. B)TBSchedule:阿里早期开源的分布式任务调度系统.代码略陈旧,使用timer而非线程池执行任务调度.众所周知,timer在处理异常状况时是有缺陷的.而且TBSchedule作业类型较为单一,只能是获取/处理数据一种

基于spring+quartz的分布式定时任务框架

http://www.cnblogs.com/aaronfeng/p/5537177.html 问题背景 我公司是一个快速发展的创业公司,目前有200人,主要业务是旅游和酒店相关的,应用迭代更新周期比较快,因此,开发人员花费了更多的时间去更=跟上迭代的步伐,而缺乏了对整个系统的把控 没有集群之前,公司定时任务的实现方式 在初期应用的访问量并不是那么大,一台服务器完全满足使用,应用中有很多定时任务需要执行 有了集群之后,公司定时任务实现的方式 随着用户的增加,访问量也就随之增加,一台服务器满足不了

lesson9:分布式定时任务

在实际的开发过程中,我们一定会遇到服务自有的定时任务,又因为现在的服务都是分布式的,但是对于定时任务,很多的业务场景下,还是只希望单台服务来执行,网上有很多分布式定时任务的框架,各位如感兴趣,可以自行去研究.本文采用非常简单的方式实现了分布式的定时任务,利用了zookeeper的节点的EPHEMERAL_SEQUENTIAL特性,适用范围: 1.定时任务跟随服务本身一起管理,不想引入过于复杂的分布式定时任务服务框架 2.已有分布式定时任务服务框架,但对于一些定时任务,服务本身对它进行管理更加方便

分布式定时任务框架比较,spring batch, tbschedule jobserver

分布式定时任务框架比较,spring batch, tbschedule jobserver | 移动开发参考书 分布式定时任务框架比较,spring batch, tbschedule jobserver

分布式锁实现,与分布式定时任务

写在前面 redis辣么多数据结构,这么多命令,具体一点,都可以应用在什么场景呢?用来解决什么具体的问题? 分布式锁 redis是网络单线程的,它只有一个线程负责接受请求,这个特性即降低了redis本身的开发成本,也提高了redis的可用性. 分布式环境下,数据一致性问题一直是一个比较重要的话题,分布式与单机情况下最大的不同在于其不是多线程而是多进程. 多线程由于可以共享堆内存,因此可以简单的采取内存作为标记存储位置,例如cas,java的synchronize.而进程之间可能不在同一台物理机上

Java分布式定时任务

分布式定时任务 elastic-job 可以实现任务分片 quartz 可以把任务存入数据库,实时生成任务(添加数据库添加定时任务) 文档 中文翻译 翻译2 原文地址:https://www.cnblogs.com/shengulong/p/11774431.html

Elastic-Job - 分布式定时任务框架

Elastic-Job是ddframe中dd-job的作业模块中分离出来的分布式弹性作业框架.去掉了和dd-job中的监控和ddframe接入规范部分. ddframe其他模块也有可独立开源的部分,之前当当曾开源过dd-soa的基石模块DubboX. 项目开源地址:https://github.com/dangdangdotcom/elastic-job Elastic-Job主要功能 定时任务: 基于成熟的定时任务作业框架Quartz cron表达式执行定时任务. 作业注册中心: 基于Zook

分布式定时任务的redis锁实现

一个web项目如果部署为分布式时,平时常见的定时服务在一定的间隔时间内,可能出现多次重复调用的问题.而此时由于是不同容器之间的竞争,因此需要容器级别的锁 Redis为单进程单线程模式,采用队列模式将并发访问变为串行访问.Redis本身没有锁的概念,Redis对于多个客户端连接并不存在竞争.但是可以通过setnx来实现锁 SETNX命令(SET if Not exists) 语法: SETNX key value 功能:将 key 的值设为 value ,当且仅当 key 不存在:若给定的 key

spring与quartz整合实现分布式动态创建,删除,改变执行时间定时任务(mysql数据库)

背景:因为在项目中用到了定时任务,当时想到了spring的quartz,写完发现费了很大功夫,光是整合就花了一上午,其中最大的问题就是版本问题,项目中用的是spring3.2.8的版本,查阅发现,3.0以上的版本需要使用quartz2.X以上版本,我就去官网下载了2.1.7的quartz,结果发现jar包与spring冲突,最后使用了quartz1.6.0版本. spring与quartz整合第一步需要导jar包,这个在百度搜下quartz的jar,下载一个 第二步:分布式定时任务,是基于数据库