定时任务系统

定时任务是互联网行业里最常用的服务之一,本文给大家介绍定时任务在我司的发展历程。

linux系统中一般使用crontab命令来实现,在Java世界里,使用最广泛的就是quartz了。我司使用quartz就已经升级了三代,每一代在上一代系统之上有所优化,写这篇文章一方面介绍一下quartz的使用,另一方面可以根据此项目的变迁反应出我司平台架构升级的一个缩影。

定时任务的使用场景很多,以我们平台来讲:计息,派息、对账等等。

quartz 介绍

Quartz是个开源的作业调度框架,为在Java应用程序中进行作业调度提供了简单却强大的机制。Quartz允许开发人员根据时间间隔(或天)来调度作业。它实现了作业和触发器的多对多关系,还能把多个作业与不同的触发器关联。Quartz可以集成几乎任何的java应用程序—从小的单片机系统到大型的电子商务系统。Quartz可以执行上千上万的任务调度。

Quartz核心的概念:scheduler任务调度、Job任务、JobDetail任务细节、Trigger触发器

  • Scheduler:调度器,调度器接受一组JobDetail+Trigger即可安排一个任务,其中一个JobDetail可以关联多个Trigger
  • Job:Job是任务执行的流程,是一个类
  • JobDetail:JobDetail是Job是实例,是一个对象,包含了该实例的执行计划和所需要的数据
  • Trigger:Trigger是定时器,决定任务何时执行

使用Quartz调度系统的思路就是,首先写一个具体的任务(job),配置任务的触发时间(Trigger),Scheduler很根据JobDetail+Trigger安排去执行此任务。

Quartz 定时器的时间设置

时间的配置如下:0 30 16 * * ?

时间大小由小到大排列,从秒开始,顺序为 秒,分,时,天,月,年 *为任意 ?为无限制。由此上面所配置的内容就是,在每天的16点30分启动buildSendHtml() 方法

具体时间设定可参考 :

"0/10 * * * * ?" 每10秒触发
"0 0 12 * * ?" 每天中午12点触发
"0 * 14 * * ?" 在每天下午2点到下午2:59期间的每1分钟触发
"0 10,44 14 ? 3 WED" 每年三月的星期三的下午2:10和2:44触发
"0 15 10 ? * MON-FRI" 周一至周五的上午10:15触发
"0 0 06,18 * * ?" 在每天上午6点和下午6点触发

第一代定时任务系统

第一代定时任务系统使用的很简单,全部按照当时spring推荐的配置方式来进行,开发于2014年初。

首先在配置线程池

<bean id="executor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="50" />
    <property name="maxPoolSize" value="100" />
    <property name="queueCapacity" value="500" />
</bean>

配置定时任务工厂和任务基类

<bean id="timerFactory" class="com.zx.timer.TimerFactory" />

<bean id="baseTask" class="com.zx.timer.core.BaseTask">
    <property name="machineId" value="${machine.id}"/>
    <property name="recordErrorDetail" value="${is.record.errordetail}"/>
</bean>
  • machineId:机器编码
  • recordErrorDetail:是否记录详细日志

通过timerFactory 来获取具体的任务和触发器

public class TimerFactory implements BeanFactoryAware {

    private BeanFactory beanFactory;

    public Object getTask(String taskCode) {
        return beanFactory.getBean(taskCode+"Task");
    }

    public Object getTrigger(String taskCode) {
        return beanFactory.getBean(taskCode+"Trigger");
    }

    public void setBeanFactory(BeanFactory beanFactory) {
        this.beanFactory = beanFactory;
    }

    public BeanFactory getBeanFactory() {
        return beanFactory;
    }
}

baseTask集成了task,在里面做了一些基础的业务,比如定时任务开始执行的时候记录定时任务的开始执行时间,定时任务结束的时候记录执行的结果等。

public interface Task {
    public void executeTask();
}

配置具体的定时任务。以重发短信邮件的定时任务为例

<bean id="resendSmsAndEmailTask" class="com.zx.timer.core.tasks.ResendSmsAndEmailTask"
        parent="baseTask">
</bean>

<bean id="resendSmsAndEmailJob" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
    <property name="targetObject" ref="resendSmsAndEmailTask" />
    <property name="targetMethod" value="executeTask" />
    <property name="concurrent" value="false" />
</bean>

<bean id="resendSmsAndEmailTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
    <property name="jobDetail" ref="resendSmsAndEmailJob" />
    <property name="cronExpression">
        <value>0 0 0 * * ?</value>
    </property>
</bean>
  • resendSmsAndEmailTask:具体的定时任务类
  • resendSmsAndEmailJob:包装成具体的Job
  • resendSmsAndEmailTrigger:设置具体执行的时间,包装成Trigger

具体的task类,删掉了部分业务代码:

public class ResendSmsAndEmailTask extends BaseTask{
    private static final String TASK_CODE = "resendSmsAndEmail";
    AtomicInteger ai = new AtomicInteger(0);

    public void execute(){
        try {
            ai = new AtomicInteger(0);
            // todo
        }catch (Exception e) {
            String exception = ExceptionUtils.getStackTrace(e);
            logger.error("stat error with exception[{}].", exception);
            this.recordTaskErrorDetail(this.taskRecordId, "ResendSmsAndEmailTask-" + e.getMessage(), exception);
        }finally{
            this.modifyTaskRecord(ai.get(), taskRecordId);
        }
    }

    public String getTaskNo() {
        return TASK_CODE;
    }
}

最后配置scheduler任务调度

<bean id="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
    <property name="triggers">
        <list>
            <ref bean="resendSmsAndEmailTrigger" />
        </list>
    </property>
    <property name="taskExecutor" ref="executor" />
</bean>
<bean class="com.zx.timer.core.scheduler.DynamicJobAssembler" init-method="init" scope="singleton"/>

DynamicJobAssembler类代码:

public class DynamicJobAssembler {

    private static Logger logger = LoggerFactory.getLogger(DynamicJobAssembler.class);

    @Resource
    Scheduler scheduler;

    @Resource
    TimerFactory timerFactory;

    @Resource
    TaskDao taskDao;

    public void init() {
        logger.info("start to assemble task from db.");
        List<TaskEntity> tasks = this.taskDao.getAllTask();
        if (tasks == null || tasks.size() <= 0) {
            return;
        }

        Map<String, String> jobNameMap = this.getAllJobNames();
        for (TaskEntity task : tasks) {
            logger.debug(task.toString());
            CronTriggerBean taskTrigger = (CronTriggerBean) timerFactory.getTrigger(task.getTaskNo());
            if (taskTrigger != null) {
                if (!task.getSchedulerRule().equals(taskTrigger.getCronExpression())) {
                    try {
                        taskTrigger.setCronExpression(task.getSchedulerRule());
                    } catch (ParseException e) {
                        logger.error("db task‘s cronExpression parse error:{}", e.getMessage());
                    }
                    try {
                        logger.info("rescheduleJob jobName:{}",task.getTaskNo());
                        scheduler.rescheduleJob(task.getTaskNo() + "Trigger", Scheduler.DEFAULT_GROUP, taskTrigger);
                    } catch (SchedulerException e) {
                        logger.error("revieved task[{},{}] reschedule error:{}", task.getTaskNo(), task.getSchedulerRule(), e.getMessage());
                    }
                }
                jobNameMap.remove(task.getTaskNo() + "Job");
            }
        }

        if (jobNameMap != null) {
            logger.info("=====================================");
            logger.info("Jobs need to be removed:" + Arrays.toString(jobNameMap.keySet().toArray()));
            logger.info("=====================================");
            for (String jobName : jobNameMap.keySet()) {
                try {
                    scheduler.deleteJob(jobName, jobNameMap.get(jobName));
                } catch (SchedulerException e) {
                    logger.error("Error occured when deleting Job[{}] with Exception:{}", jobName, e.getMessage());
                }
            }
        }
        logger.info("end to assemble task from db.");
    }

    private Map<String, String> getAllJobNames() {
        Map<String, String> jobNameMap = new HashMap<String, String>();
        try {
            String[] groups = scheduler.getJobGroupNames();
            for (String group : groups) {
                String[] jobs = scheduler.getJobNames(group);
                if (jobs != null) {
                    for (String job : jobs) {
                        jobNameMap.put(job, group);
                    }
                }
            }
        } catch (SchedulerException e1) {
            logger.error("Failed in geting all job names with exception:{}", e1.getMessage());
        }
        return jobNameMap;
    }

}

定时任务表,执行的时候以表里面的数据为准,方便编辑。

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------
-- Table structure for `zx_task_informations`
-- ----------------------------
DROP TABLE IF EXISTS `zx_task_informations`;
CREATE TABLE `zx_task_informations` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `version` int(11) NOT NULL COMMENT ‘版本号:需要乐观锁控制‘,
  `taskNo` varchar(64) NOT NULL COMMENT ‘任务编号‘,
  `taskName` varchar(64) NOT NULL COMMENT ‘任务名称‘,
  `schedulerRule` varchar(64) NOT NULL COMMENT ‘定时规则表达式‘,
  `frozenStatus` varchar(16) NOT NULL COMMENT ‘冻结状态‘,
  `executorNo` varchar(128) NOT NULL COMMENT ‘执行方‘,
  `timeKey` varchar(32) NOT NULL COMMENT ‘执行时间格式‘,
  `frozenTime` bigint(13) DEFAULT NULL COMMENT ‘冻结时间‘,
  `unfrozenTime` bigint(13) DEFAULT NULL COMMENT ‘解冻时间‘,
  `createTime` bigint(13) NOT NULL COMMENT ‘创建时间‘,
  `lastModifyTime` bigint(13) DEFAULT NULL COMMENT ‘最近修改时间‘,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8 COMMENT=‘定时任务信息表‘;

-- ----------------------------
-- Records of zx_task_informations
-- ----------------------------
INSERT INTO `zx_task_informations` VALUES (‘1‘, ‘0‘, ‘resendSmsAndEmail‘, ‘重发短信和邮件‘, ‘10 */10 * * * ?‘, ‘FROZEN‘, ‘0‘, ‘yyyy-MM-dd HH:mm‘, ‘0‘, ‘0‘, ‘0‘, ‘1486807296009‘);

这就是我们第一代定时任务系统,达到了定期执行定时任务的效果,但是同样有两个缺点:

  • 1、定时调度和业务代码耦合在一起
  • 2、每次调整定时任务的时间需要重启服务

第一代定时任务系统上线用了大概半年之后,就被我们厌倦了。于是就规划了第二代定时任务系统。

第二代定时任务系统

第二代调度系统主要解决的是,避免每次修改定时任务的执行时间都需要重新启动整个项目。另外也可支持单独重新调度单个定时任务。

我们做了一个请求入口,当更新了库表里面的数据之后,重新请求一下特定的url就会自动重新加载定时任务。

使用scheduler删除定时任务

public void reScheduler() throws Exception {
    // 取消现有的任务
    String[] jobNames = quartzUtil.getJobNames();
    if (null != jobNames && jobNames.length > 0) {
        for (String jobName : jobNames) {
            logger.info("----开始移除任务:" + jobName);
            quartzUtil.cancelJob(jobName);
            logger.info("----成功移除任务:" + jobName);
        }
    }
    logger.info("现有任务已全部取消");
    this.initScheduler();
}
public void cancelJob(String jobName) throws Exception {
    scheduler.pauseTrigger(jobName, Scheduler.DEFAULT_GROUP);
    scheduler.unscheduleJob(jobName, Scheduler.DEFAULT_GROUP);
    scheduler.deleteJob(jobName, Scheduler.DEFAULT_GROUP);
}

使用scheduler重新加载所有的定时任务。

job.setCronExpression(taskInfo.getSchedulerRule());
String jobName = taskInfo.getTaskNo() + "Job";
job.getJobDataMap().put(QuartzJob.OBJECT_ID,objectMethod);
job.setJobName(jobName);
logger.info("----开始部署任务:" + jobName);
quartzUtil.scheduleCronJob(job);
logger.info("----成功部署任务:" + jobName);
public void scheduleCronJob(QuartzJobEntity jobEntity) throws Exception {
    JobDetailBean jobDetail = createJobDetail(jobEntity);
    scheduler.addJob(jobDetail, true);
    CronTriggerBean trigger = new CronTriggerBean();
    trigger.setCronExpression(jobEntity.getCronExpression());
    trigger.setJobDetail(jobDetail);
    trigger.setName(jobEntity.getJobName());
    trigger.setJobName(jobDetail.getName());
    scheduler.scheduleJob(trigger);
}

如果只是重新调度某一个定时任务可以触发单独的调用

// 初始化某个加载定时任务
public void initScheduler(TaskEntity taskInfo) throws Exception {
    // 设置任务信息到quartz,并调度任务
    QuartzJobEntity job = new QuartzJobEntity();
    String objectName = taskInfo.getTaskNo()+"Task";
    String objectMethod = "executeTask";
    job.getJobDataMap().put(QuartzJob.OBJECT_NAME,objectName);
    job.getJobDataMap().put(QuartzJob.OBJECT_METHOD,objectMethod);
    // 单线程方式执行任务
    job.setJobClass(QuartzJob.class);
    job.setCronExpression(taskInfo.getSchedulerRule());
    String jobName = taskInfo.getTaskNo() + "Job";
    job.getJobDataMap().put(QuartzJob.OBJECT_ID,objectMethod);
    job.setJobName(jobName);
    logger.info("----开始部署任务:" + jobName);
    quartzUtil.scheduleCronJob(job);
    logger.info("----成功部署任务:" + jobName);
}

这样我们的第二代定时任务系统就完成了,第二代定时任务是在第一代定时任务的基础上改造的,增加了重新调度所有定时任务和单个定时任务。

第二代定时任务系统的缺点是:定时调度和业务代码耦合

第三代定时任务系统

第二代定时任务上线没有多久,我们就意识到有很多的子系统也需要定时任务,比如订单系统需要45分钟不支付的订单失效,监控系统需要定时扫描是否有业务报警,统计系统需要定时去统计一些数据,但是如果我们给每一个子系统都做一个定时任务的话,就不太合理,很分散。

于是计划开发一个统一的定时任务调度中心,负责整个平台中所有的定时任务的调度,另外规划了监控系统,来监控和分析每次定时任务的执行结果和执行时间等信息。为了更好的管理定时任务开发了简单的管理界面。如下:

根据上图可以看出,通过这个管理界面我们可以非常方便的去修改、启动、暂停定时任务。别的系统如果需要定时任务,可以随时在页面去添加,全部界面化操作,不需要重新启动项目等。

点击详情可以清晰的查看定时任务的上次执行情况

定时任务的支持的调度方式分有两种:http和mq,我们一般建议使用mq。

  • http :使用http一般适用于用时特别少的定时任务。或者接收请求之后立刻返回结果,重新启动另外一个线程去执行具体的业务,业务执行完成之后在通过http回调返回执行结果。
  • mq :使用mq的话,调度系统和业务系统的交互就完全异步来执行,调度系统定时触发后,发送MQ消息给业务系统,业务系统接收到消息开始执行业务,执行完毕之后,再发送MQ系统通知调度系统的执行结果。

主要核心代码

初始化加载

public void initScheduler(){
    List<TaskInformationsEntity> taskList = taskInformationsDao.getTaskList();
    Scheduler scheduler = schedulerBean.getScheduler();
    for(TaskInformationsEntity task : taskList){
        try {
            this.scheduler(task, scheduler);
        } catch (Exception e) {
            logger.error("定时:" + task.getTaskNo() + "启动失败");
        }
    }
}

遍历调度

public void scheduler(TaskInformationsEntity task,Scheduler scheduler){
    TriggerKey triggerKey = TriggerKey.triggerKey(task.getTaskNo(), Scheduler.DEFAULT_GROUP);
    JobDetail jobDetail = JobBuilder.newJob(QuartzJobFactory.class).withDescription(task.getTaskName()).withIdentity(task.getTaskNo(), Scheduler.DEFAULT_GROUP).build();
    jobDetail.getJobDataMap().put("targetObjectId", task.getTaskNo());
    jobDetail.getJobDataMap().put("executorNo", task.getExecutorNo());
    jobDetail.getJobDataMap().put("sendType", task.getSendType());
    jobDetail.getJobDataMap().put("url", task.getUrl());
    jobDetail.getJobDataMap().put("executeParamter", task.getExecuteParamter());
    CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(task.getSchedulerRule());
    CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
    try {
        scheduler.scheduleJob(jobDetail, trigger);
        logger.info("task "+task.getTaskNo()+" schedulerRule :"+task.getSchedulerRule()+" reload succeed");
    } catch (Exception e) {
        logger.error("scheduler--异常:",e);
        throw new RuntimeException();
    }
}

添加定时任务

public String addScheduler(String key){
    TaskInformationsEntity entity = taskInformationsDao.getTaskByTaskNo(key);
    if(null != entity){
        Scheduler scheduler = schedulerBean.getScheduler();
        try {
            scheduler.deleteJob(new JobKey(key));
            this.scheduler(entity, scheduler);
            entity.setFrozenStatus(TaskStatusEnum.UNFROZEN);
            entity.setUnfrozenTime(DateUtil.getLastModifyTime());
            entity.setLastModifyTime(DateUtil.getLastModifyTime());
            taskInformationsDao.updateById(entity);
            return "任务启动成功";
        } catch (Exception e) {
            logger.info("异常:",e);
            return "任务启动失败";
        }
    }else{
        return "该任务编号不存在";
    }
}

删除定时任务

public String delScheduler(String key){
    TaskInformationsEntity entity = taskInformationsDao.getTaskByTaskNo(key);
    if(null != entity && TaskStatusEnum.UNFROZEN == entity.getFrozenStatus()){
        Scheduler scheduler = schedulerBean.getScheduler();
        try {
            scheduler.deleteJob(new JobKey(key));
            entity.setFrozenStatus(TaskStatusEnum.FROZEN);
            entity.setFrozenTime(DateUtil.getLastModifyTime());
            entity.setLastModifyTime(DateUtil.getLastModifyTime());
            taskInformationsDao.updateById(entity);
            return "暂停任务成功";
        } catch (Exception e) {
            logger.error("异常:",e);
            return "暂停任务异常";
        }
    }else{
        return "该任务编号不存在";
    }
}

重新加载定时任务

public String resumeScheduler(String key){
    TaskInformationsEntity entity = taskInformationsDao.getTaskByTaskNo(key);
    if(null != entity){
        Scheduler scheduler = schedulerBean.getScheduler();
        try {
            scheduler.deleteJob(new JobKey(key));
            this.scheduler(entity, scheduler);
            return "重启成功";
        } catch (SchedulerException e) {
            logger.info("异常:",e);
            return "重启异常";
        }
    }else{
        return "该任务编号不存在";
    }
}

项目已经开源,详细的代码请在github上面查看。

zx-quartz

其实最后这版定时调度系统,还是有很多的缺陷,http模式没有进行完善,开源的代码中有部分内部依赖的jar还没有去掉。开放出来仅仅做为交流使用,后期有时间的话再去慢慢完善。也欢迎各网友多提提建议,一起加入完善。

原文:http://www.cnblogs.com/ityouknow/p/7131520.html

时间: 2024-10-12 11:18:38

定时任务系统的相关文章

Linux操作系统定时任务系统

Linux操作系统定时任务系统 Cron 入门 cron是一个linux下的定时执行工具,可以在无需人工干预的情况下运行作业.由于Cron 是Linux的内置服务,但它不自动起来,可以用以下的方法启动.关闭这个服务: /sbin/service crond start //启动服务 /sbin/service crond stop //关闭服务 /sbin/service crond restart //重启服务 /sbin/service crond reload //重新载入配置 你也可以将

Linux操作系统定时任务系统 Cron 入门

cron是一个linux下的定时执行工具,可以在无需人工干预的情况下运行作业.由于Cron 是Linux的内置服务,但它不自动起来,可以用以下的方法启动.关闭这个服务: /sbin/service crond start //启动服务 /sbin/service crond stop //关闭服务 /sbin/service crond restart //重启服务 /sbin/service crond reload //重新载入配置 你也可以将这个服务在系统启动的时候自动启动: 在/etc/

Linux操作系统定时任务系统Cron入门、PHP计划任务以及rpc示例

一.简单介绍 1.cron是一个linux下的定时执行工具,可以在无需人工干预的情况下运行作业.由于Cron 是Linux的内置服务,但它不自动起来,可以用以下的方法启动.关闭这个服务: service cron start //启动服务 service cron stop //关闭服务 service cron restart //重启服务 service cron reload //重新载入配置 2.直接用crontab命令编辑  cron服务提供crontab命令来设定cron服务的,以下

Quartz.net 定时任务系统

平台基于quartz.net进行任务调度功能开发 采用C#代码编写 支持corn表达式和第三方自定义的corn表达式扩展! 任务调用方式:采取 webpai 调用,便于第三方人员添加任务api 信息

结合Django+celery二次开发定时周期任务

需求: 前端时间由于开发新上线一大批系统,上完之后没有配套的报表系统.监控,于是乎开发.测试.产品.运营.业务部.财务等等各个部门就跟那饥渴的饿狼一样需要 各种各样的系统数据满足他们.刚开始一天一个还能满足他们,优化脚本之后只要开发提供查询数据的SQL.收件人.执行时间等等参数就可以几分钟写完一个定时任务脚本 ,到后面不知道是不是吃药了一天三四个定时任务,不到半个月手里一下就20多个定时任务了,渐渐感到力不从心了,而且天天还要给他们修改定时任务的SQL.收件人.执 行时间等等,天天写定时任务脚本

Linux 的计划任务(运维基础|可用于提权)

Linux操作系统定时任务系统 Cron 入门 先写笔记: crontab -u //设定某个用户的cron服务,一般root用户在执行这个命令的时候需要此参数 crontab -l //列出某个用户cron服务的详细内容 crontab -r //删除没个用户的cron服务 crontab -e //编辑某个用户的cron服务 看存在的的计划启动, 可以直接编辑相应的启动项, 添加自己想要的命令 ,因为cron默认是ROOT权限 可以用这个提权 eg: rontab -l59 1 * * *

APScheduler 3.0.1浅析

简介 APScheduler是一个小巧而强大的Python类库,通过它你可以实现类似Unix系统cronjob类似的定时任务系统.使用之余,阅读一下源码,一方面有助于更好的使用它,另一方面,个人认为aps的架构设计质量很高,阅读它对于提升软件开发的sense很有帮助. 组成 APScheduler整个系统可以说由这五个概念组成: scheduler:控制器,可以看做整个系统的driver,外部世界通过它来实现任务(Job)的增删改查管理.根据IO模式的不同,aps提供了多种scheduler实现

[转]Python定时任务框架APScheduler

APScheduler是基于Quartz的 一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便.提供了基于日期.固定时间间隔以及crontab类型的任务,并且可以 持久化任务.基于这些功能,我们可以很方便的实现一个python定时任务系统,写python还是要比java舒服多了. 1. 安装 安装过程很简单,可以基于easy_install和源码. easy_install apscheduler 或者下载源码,运行命令: python setup.py install

10.app后端选择什么开发语言

在qq上,经常看到有创业团队的创始人一直都招不到技术人员,除了项目的因素外,很大的原因就是所需要掌握的开发语言偏门.通过阅读本文,详细了解选择开发语言的核心原则,使各位心里对开发语言的选择更加有数. 选择开发语言就一个核心原则:在合适的业务场景的情况下选择最热门的语言. (1)每种语言都有自己擅长的业务场景,根据业务场景来选择 例如,如果需要开发一个聊天服务器,选择了php来开发,那真的醉了.php这种脚本语言怎么适合聊天服务? 例如,如果是开发web网站,php就很合适,比起java,效率提升