quatz集群化定时任务使用简介

概要

主要介绍quartz如何使用,原理请阅读下面的参考文献,整理的已经很详细。使用quartz集群的主要目的是避免单点job故障,重要job故障后自动重新执行。

使用指南
  • 加入quartz jar 包依赖
<dependency>
    <groupId>com.opensymphony.quartz</groupId>
    <artifactId>com.springsource.org.quartz</artifactId>
    <version>1.6.2</version>
</dependency>
  • 集成spring配置 ( spring 为 3.2.x)
<bean id="schedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
    <property name="configLocation" value="classpath:quartz.properties"/>
    <!-- 设置自动启动 -->
    <property name="autoStartup" value="true"/>
    <!-- This name is persisted as SCHED_NAME in db. for local testing could change to unique name to avoid collision with dev server -->
    <property name="schedulerName" value="quartzScheduler"/>
    <!--可选,QuartzScheduler 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了 -->
    <property name="overwriteExistingJobs" value="true"/>
    <!--必须的,QuartzScheduler 延时启动,应用启动完后 QuartzScheduler 再启动-->
    <property name="startupDelay" value="30"/>
    <property name="applicationContextSchedulerContextKey" value="applicationContext"/>
    <!-- 注册触发器 -->
    <property name="triggers">
        <list>
            <ref local="demoJobTrigger"/>
        </list>
    </property>
</bean>
<bean id="demoJobDetail" class="org.springframework.scheduling.quartz.JobDetailBean">
    <property name="name" value="demoJobDetailName"/>
    <property name="group" value="demoJobDetailGroup"/>
    <property name="jobClass" value="org.ACRusher.DemoJobClass"/>
    <!-- requestsRecovery属性为true,则当Quartz服务被中止后,再次启动任务时会尝试恢复执行之前未完成的所有任务-->
    <property name="requestsRecovery" value="false"/>
    <!-- 标识job是持久的,删除所有触发器的时候不被删除 -->
    <property name="durability" value="true"/>
    <!--是否是暂存的,若是 ,re-starting quartz 后不存在-->
    <property name="volatility" value="false"/>
</bean>
<bean id="demoJobTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
    <property name="name" value="demoJobTriggerName"/>
    <property name="group" value="demoJobTriggerGroup"/>
    <property name="jobDetail" ref="demoJobDetail"/>
    <property name="cronExpression" value="0 0 1 * * ? *"/>
</bean>
  • 配置 quartz.properties
# Using Spring datasource in quartzJobsConfig.xml
# Spring uses LocalDataSourceJobStore extension of JobStoreCMT
# org.quartz.jobStore.useProperties=true
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.isClustered=true
# 10 seconds
org.quartz.jobStore.clusterCheckinInterval=10000
org.quartz.scheduler.skipUpdateCheck=true

# Change this to match your DB vendor
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# Needed to manage cluster instances
org.quartz.scheduler.instanceId=AUTO
org.quartz.scheduler.instanceName=DEMO_JOB_SCHEDULER

org.quartz.scheduler.rmi.export=false
org.quartz.scheduler.rmi.proxy=false
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=10
org.quartz.threadPool.threadPriority=5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true

org.quartz.jobStore.dataSource=myDS
org.quartz.dataSource.myDS.driver=com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL=${org.quartz.dataSource.myDS.URL}
org.quartz.dataSource.myDS.user=${org.quartz.dataSource.myDS.user}
org.quartz.dataSource.myDS.password=${org.quartz.dataSource.myDS.password}
org.quartz.dataSource.myDS.maxConnections=5
org.quartz.jobStore.misfireThreshold=120000
  • 编写 DemoJobClass
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.quartz.QuartzJobBean;

import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
 * cluster job 抽象类
 * <p/>
 * Created by ACRusher on 2016/4/7.
 */
public abstract class AbstractClusterJob extends QuartzJobBean implements Serializable {
    protected static final Logger logger = LoggerFactory.getLogger("quartzJob");

    private static final long serialVersionUID = 3158445449885064827L;

    protected ApplicationContext applicationContext;

	//由父类自动注入。 注意,QuartzJobBean 只会注入spring的 context。
    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        long start = System.currentTimeMillis();
        logger.warn(">>> Cluster Job Frame | jobClass:{} started.",getClass().getSimpleName());
        try {
            beforeJob();
            run(context);
        } catch (Exception e) {
            logger.error("jobClass:{} | jobContext:{} ", new Object[]{getClass().getSimpleName(), context, e});
        } finally {
            logger.warn(">>> Cluster Job Frame | jobClass:{} finished. costTime:{}ms ", getClass().getSimpleName(), System.currentTimeMillis() - start);
        }
    }

    //job业务逻辑
    protected abstract void run(JobExecutionContext context);

    //job执行前注入必要的bean
    protected void beforeJob() throws InvocationTargetException, IllegalAccessException {
        //init injection , etc
        Method[] methods=getClass().getDeclaredMethods();
        for(Method m : methods){
            m.setAccessible(true);
            if(m.getName().startsWith("set")){
                String name=m.getName().substring(3);
                if(name.length()>1){
                    name=Character.toLowerCase(name.charAt(0))+name.substring(1);
                }else {
                    name=String.valueOf(Character.toLowerCase(name.charAt(0)));
                }
                try {
                    Object bean = applicationContext.getBean(name);
                    if (bean != null && !bean.equals("applicationContext")) {
                        m.invoke(this, bean);
                    }
                }catch (Exception e){
                    logger.error("",e);
                }
            }
        }
    }
}
  • 编写具体业务Job
import org.quartz.JobExecutionContext;

/**
 * Created by ACRusher on 2016/4/7.
 */
public class DemoJobClass extends AbstractClusterJob {
    private static final long serialVersionUID = 6945414743611829056L;

    private UserManager userManager;

    @Override
    protected void run(JobExecutionContext context) {
        // some biz code...
    }

    //由父类自动注入
    public void setUserManager(UserManager userManager) {
        this.userManager = userManager;
    }
}
  • 在mysql中生成quartz需要使用的表
CREATE TABLE `QRTZ_JOB_DETAILS` (`JOB_NAME` VARCHAR(200) NOT NULL,`JOB_GROUP` VARCHAR(200) NOT NULL,`DESCRIPTION` VARCHAR(250) DEFAULT NULL,`JOB_CLASS_NAME` VARCHAR(250) NOT NULL,`IS_DURABLE` VARCHAR(1) NOT NULL,`IS_VOLATILE` VARCHAR(1) NOT NULL,`IS_STATEFUL` VARCHAR(1) NOT NULL,`REQUESTS_RECOVERY` VARCHAR(1) NOT NULL,`JOB_DATA` BLOB, PRIMARY KEY (`JOB_NAME`,`JOB_GROUP`) ) ENGINE=INNODB DEFAULT CHARSET=utf8;
 
CREATE TABLE `QRTZ_JOB_LISTENERS` ( `JOB_NAME` varchar(200) NOT NULL,`JOB_GROUP` varchar(200) NOT NULL,`JOB_LISTENER` varchar(200) NOT NULL,PRIMARY KEY (`JOB_NAME`,`JOB_GROUP`,`JOB_LISTENER`),KEY `JOB_NAME` (`JOB_NAME`,`JOB_GROUP`),CONSTRAINT `QRTZ_JOB_LISTENERS_ibfk_1` FOREIGN KEY (`JOB_NAME`, `JOB_GROUP`) REFERENCES `QRTZ_JOB_DETAILS` (`JOB_NAME`, `JOB_GROUP`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8

 
CREATE TABLE `QRTZ_LOCKS` (`LOCK_NAME` varchar(40) NOT NULL,PRIMARY KEY (`LOCK_NAME`)) ENGINE=InnoDB DEFAULT CHARSET=utf8

 
CREATE TABLE `QRTZ_PAUSED_TRIGGER_GRPS` (`TRIGGER_GROUP` varchar(200) NOT NULL, PRIMARY KEY (`TRIGGER_GROUP`)) ENGINE=InnoDB DEFAULT CHARSET=utf8

 
CREATE TABLE `QRTZ_SCHEDULER_STATE` (`INSTANCE_NAME` varchar(200) NOT NULL,`LAST_CHECKIN_TIME` bigint(13) NOT NULL,`CHECKIN_INTERVAL` bigint(13) NOT NULL,PRIMARY KEY (`INSTANCE_NAME`)) ENGINE=InnoDB DEFAULT CHARSET=utf8

 
CREATE TABLE `QRTZ_TRIGGERS` (`TRIGGER_NAME` varchar(200) NOT NULL,`TRIGGER_GROUP` varchar(200) NOT NULL,`JOB_NAME` varchar(200) NOT NULL,`JOB_GROUP` varchar(200) NOT NULL,`IS_VOLATILE` varchar(1) NOT NULL,`DESCRIPTION` varchar(250) DEFAULT NULL,`NEXT_FIRE_TIME` bigint(13) DEFAULT NULL,`PREV_FIRE_TIME` bigint(13) DEFAULT NULL,`PRIORITY` int(11) DEFAULT NULL,`TRIGGER_STATE` varchar(16) NOT NULL,`TRIGGER_TYPE` varchar(8) NOT NULL,`START_TIME` bigint(13) NOT NULL,`END_TIME` bigint(13) DEFAULT NULL,`CALENDAR_NAME` varchar(200) DEFAULT NULL,`MISFIRE_INSTR` smallint(2) DEFAULT NULL,`JOB_DATA` blob,PRIMARY KEY (`TRIGGER_NAME`,`TRIGGER_GROUP`),KEY `JOB_NAME` (`JOB_NAME`,`JOB_GROUP`),CONSTRAINT `QRTZ_TRIGGERS_ibfk_1` FOREIGN KEY (`JOB_NAME`, `JOB_GROUP`) REFERENCES `QRTZ_JOB_DETAILS` (`JOB_NAME`, `JOB_GROUP`)) ENGINE=InnoDB DEFAULT CHARSET=utf8

 
CREATE TABLE `QRTZ_TRIGGER_LISTENERS` (`TRIGGER_NAME` varchar(200) NOT NULL,`TRIGGER_GROUP` varchar(200) NOT NULL,`TRIGGER_LISTENER` varchar(200) NOT NULL,PRIMARY KEY (`TRIGGER_NAME`,`TRIGGER_GROUP`,`TRIGGER_LISTENER`),KEY `TRIGGER_NAME` (`TRIGGER_NAME`,`TRIGGER_GROUP`), CONSTRAINT `QRTZ_TRIGGER_LISTENERS_ibfk_1` FOREIGN KEY (`TRIGGER_NAME`, `TRIGGER_GROUP`) REFERENCES `QRTZ_TRIGGERS` (`TRIGGER_NAME`, `TRIGGER_GROUP`)) ENGINE=InnoDB DEFAULT CHARSET=utf8

 
CREATE TABLE `QRTZ_SIMPLE_TRIGGERS` (`TRIGGER_NAME` varchar(200) NOT NULL,`TRIGGER_GROUP` varchar(200) NOT NULL,`REPEAT_COUNT` bigint(7) NOT NULL,`REPEAT_INTERVAL` bigint(12) NOT NULL,`TIMES_TRIGGERED` bigint(10) NOT NULL,PRIMARY KEY (`TRIGGER_NAME`,`TRIGGER_GROUP`),KEY `TRIGGER_NAME` (`TRIGGER_NAME`,`TRIGGER_GROUP`),CONSTRAINT `QRTZ_SIMPLE_TRIGGERS_ibfk_1` FOREIGN KEY (`TRIGGER_NAME`, `TRIGGER_GROUP`) REFERENCES `QRTZ_TRIGGERS` (`TRIGGER_NAME`, `TRIGGER_GROUP`)) ENGINE=InnoDB DEFAULT CHARSET=utf8

 
CREATE TABLE `QRTZ_BLOB_TRIGGERS` (`TRIGGER_NAME` varchar(200) NOT NULL,`TRIGGER_GROUP` varchar(200) NOT NULL,`BLOB_DATA` blob,PRIMARY KEY (`TRIGGER_NAME`,`TRIGGER_GROUP`),KEY `TRIGGER_NAME` (`TRIGGER_NAME`,`TRIGGER_GROUP`), CONSTRAINT `QRTZ_BLOB_TRIGGERS_ibfk_1` FOREIGN KEY (`TRIGGER_NAME`, `TRIGGER_GROUP`) REFERENCES `QRTZ_TRIGGERS` (`TRIGGER_NAME`, `TRIGGER_GROUP`)) ENGINE=InnoDB DEFAULT CHARSET=utf8

 
CREATE TABLE `QRTZ_CALENDARS` (`CALENDAR_NAME` varchar(200) NOT NULL,`CALENDAR` blob NOT NULL, PRIMARY KEY (`CALENDAR_NAME`)) ENGINE=InnoDB DEFAULT CHARSET=utf8

CREATE TABLE `QRTZ_CRON_TRIGGERS` (`TRIGGER_NAME` varchar(200) NOT NULL,`TRIGGER_GROUP` varchar(200) NOT NULL,`CRON_EXPRESSION` varchar(120) NOT NULL,`TIME_ZONE_ID` varchar(80) DEFAULT NULL,PRIMARY KEY (`TRIGGER_NAME`,`TRIGGER_GROUP`),KEY `TRIGGER_NAME` (`TRIGGER_NAME`,`TRIGGER_GROUP`),CONSTRAINT `QRTZ_CRON_TRIGGERS_ibfk_1` FOREIGN KEY (`TRIGGER_NAME`, `TRIGGER_GROUP`) REFERENCES `QRTZ_TRIGGERS` (`TRIGGER_NAME`, `TRIGGER_GROUP`)) ENGINE=InnoDB DEFAULT CHARSET=utf8

 
CREATE TABLE `QRTZ_FIRED_TRIGGERS` (`ENTRY_ID` varchar(95) NOT NULL,`TRIGGER_NAME` varchar(200) NOT NULL,`TRIGGER_GROUP` varchar(200) NOT NULL,`IS_VOLATILE` varchar(1) NOT NULL,`INSTANCE_NAME` varchar(200) NOT NULL,`FIRED_TIME` bigint(13) NOT NULL,`PRIORITY` int(11) NOT NULL,`STATE` varchar(16) NOT NULL,`JOB_NAME` varchar(200) DEFAULT NULL,`JOB_GROUP` varchar(200) DEFAULT NULL,`IS_STATEFUL` varchar(1) DEFAULT NULL,`REQUESTS_RECOVERY` varchar(1) DEFAULT NULL, PRIMARY KEY (`ENTRY_ID`)) ENGINE=InnoDB DEFAULT CHARSET=utf8

 
INSERT INTO QRTZ_LOCKS VALUES ( ‘CALENDAR_ACCESS‘ )
INSERT INTO QRTZ_LOCKS VALUES ( ‘JOB_ACCESS‘ )
INSERT INTO QRTZ_LOCKS VALUES ( ‘MISFIRE_ACCESS‘ )
INSERT INTO QRTZ_LOCKS VALUES ( ‘STATE_ACCESS‘ )
INSERT INTO QRTZ_LOCKS VALUES ( ‘TRIGGER_ACCESS‘ )
 
总结

到此一切配置结束,剩下的就是具体的业务JobBean的开发了。当部署在多台机器上时,每个Job会动态分配在某一台机器执行。如果Job比较关键,在失败后需要重启,也可以配置

requestsRecovery 为 true来实现 任务重跑。 轻轻松松避免了单点故障和任务异常不可恢复问题。

参考文献

【1】 quartz集群原理  http://www.cnblogs.com/zhenyuyaodidiao/p/4755649.html

【2】 美团quartz集群实践 http://tech.meituan.com/mt-crm-quartz.html

【3】 quartz官网 http://www.quartz-scheduler.org/documentation/

时间: 2024-11-10 15:37:45

quatz集群化定时任务使用简介的相关文章

Rancher集群化docker管理平台部署、特性及破坏性测试。

rancher是一个docker集群化管理平台,相对于mesos和k8s架构,rancher的部署管理非常简单方便.并且功能丰富.如下为本人绘制的逻辑架构图. 1:部署Rancher管理平台 规划: server:10.64.5.184 agent1:10.64.5.185 agent2:10.64.5.186 agent3:10.64.5.187 agent4:10.64.5.188 部署方式: docker容器启动 server端部署   依赖镜像:rancher/server:latest

RabbitMQ集群化部署

压测环境上RabbitMQ主库采用三台集群化部署,部署在172.16.103.127, 172.16.103.138, 172.16.103.129三台机器上. 安装目录:/opt/rabbitmq/rabbitmq_3.6.2 集群化部署 1.设置hosts解析,所有节点配置相同 vi /etc/hosts 172.16.103.129 mq-n129172.16.103.128 mq-n128172.16.103.127 mq-n127 2.设置节点间认证的cookiescp /root/.

kairosdb+cassandra集群化安装

kairosdb (1)到/conf目录下,找到kairosdb.properties,修改datastore为cassandra (2)设置cassandra的连接方式 (3) 设置用户名密码 4. 启动:到/bin目录下,直接跑./kairosdb.sh start,最后会看到 KairosDB service started   这样一句话,就OK了 172.16.101.25:8080 kairosdb客户端 cassandra 修改cassandra配置文件 conf/cassandr

SpringCloud注册中心集群化及如何抗住大型系统的高并发访问

一.场景引入 本人所在的项目由于直接面向消费者,迭代周期迅速,所以服务端框架一直采用Springboot+dubbo的组合模式,每个服务由service模块+web模块构成,service模块通过公司API网关向安卓端暴 露restful接口,web模块通过dubbo服务向service模块获取数据渲染页面.测试环境dubbo的注册中心采用的单实例的zookeeper,随着时间的发现注册在zookeeper上的生产者和消费者越来越多,测试 人员经常在大规模的压测后发现zookeeper挂掉的现象

2020-04-05-SpringBoot+WebSocket基于Redis订阅发布实现集群化

SpringBoot+WebSocket基于Redis订阅发布实现集群化 前面讲了单机版的websocket如何使用发送群聊(2020-03-24-springboot快速集成websocket实现群聊),那么要是部署多个服务实现集群话怎么实现呢? 由于websocket是长连接,session保持在一个server中,所以在不同server在使用websocket推送消息时就需要获取对应的session进行推送,在分布式系统中就无法获取到所有session,这里就需要使用一个中间件将消息推送到

DB层面上的设计 分库分表 读写分离 集群化 负载均衡

第1章  引言 随着互联网应用的广泛普及,海量数据的存储和访问成为了系统设计的瓶颈问题.对于一个大型的 互联网应用,每天几十亿的PV无疑对数据库造成了相当高的负载.对于系统的稳定性和扩展性造成了极大的问题.通过数据切分来提高网站性能,横向扩展数据层 已经成为架构研发人员首选的方式.水平切分数据库,可以降低单台机器的负载,同时最大限度的降低了了宕机造成的损失.通过负载均衡策略,有效的降低了单台 机器的访问负载,降低了宕机的可能性:通过集群方案,解决了数据库宕机带来的单点数据库不能访问的问题:通过读

联想企业网盘:SaaS服务集群化持续交付实践

1      前言 当代信息技术飞速发展,软件和系统的代码规模都变得越来越大,而且组件众多,依赖繁复,每次新版本的发布都仿佛是乘坐一次无座的绿皮车长途夜行,疲惫不堪.软件交付是一个复杂的工程,涉及到软件开发的各个细节,其中任何一环出现问题,都会导致软件不能及时交付,或者交付的质量堪忧. 从企业的角度来讲,如何利用更科学的工具.更科学的流程来提高产品质量,提升客户满意度,是刚需.从员工角度来讲,生命里值得追求的事情很多,不能把宝贵的时间浪费在一些机械的.重复的事情上面. 联想企业网盘从2007开始

redis的集群化方案

关于 目前有三种 (1)Twitter开发的twemproxy (2)豌豆荚开发的codis (3)redis官方的redis-cluster Twemproxy 架构简单 就是用proxy对后端redis server进行代理 但是由于代理层的消耗性能很低 而且通常涉及多个key的操作都是不支持的 而且本身不支持动态扩容和透明的数据迁移 而且也失去维护 Twitter内部已经不使用了 redis-cluster是三个里性能最强大的 因为他使用去中心化的思想 使用hash slot方式 将163

ipvsadm集群化管理_学习笔记

ipvsadm:主要功能 管理集群服务:     添加 -A -t|u|f  service-address [-s(调度算法:默认wlc) scheduler]         -t tcp协议集群         -u udp协议集群             service-address:    VIP:Port         -f FirewallMark(防火墙标记:FWM)             service-address:    Mark-Number     修改  -E