Quartz集群实战及原理解析

选Quartz的团队基本上是冲着Quartz本身实现的集群去的, 不然JDK自带Timer就可以实现相同的功能, 而Timer存在的单点故障是生产环境上所不能容忍的。 在自己造个有负载均衡和支持集群(高可用、伸缩性)的调度框架又影响项目的进度, 所以大多数团队都直接使用了Quartz来作为调度框架。

一、 Quartz集群的架构图:

二、 Quartz集群配置:

	<!-- 调度任务 -->
	<bean id="jobDetail"
			class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
			<property name="jobClass" value="全类名" />
			<property name="durability" value="true"/>
			<property name="targetMethod" value="execute" />
			<property name="concurrent" value="true" /> -->
			<!-- <property name="shouldRecover" value="true" /> -->
	</bean>

	<!-- 调度工厂 -->
	<bean id="scheduler" lazy-init="false" autowire="no"
		class="org.springframework.scheduling.quartz.SchedulerFactoryBean">

		<!-- 注册JobDetails -->
		<property name="jobDetails">
			<list>
				<ref bean="jobDetail"/>
			</list>
		</property>

		<!--可选,QuartzScheduler 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了 -->
		<property name="overwriteExistingJobs" value="true"/>

		<!-- 属性 -->
		<property name="quartzProperties">
			<props>
				<!-- 集群要求必须使用持久化存储 -->
				<prop key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreCMT</prop>

				<prop key="org.quartz.scheduler.instanceName">EventScheduler</prop>
				<!-- 每个集群节点要有独立的instanceId -->
				<prop key="org.quartz.scheduler.instanceId">AUTO</prop>

				<!-- Configure ThreadPool -->
				<prop key="org.quartz.threadPool.class">org.quartz.simpl.SimpleThreadPool</prop>
				<prop key="org.quartz.threadPool.threadCount">50</prop>
				<prop key="org.quartz.threadPool.threadPriority">5</prop>
				<prop key="org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread">true</prop>
				<!-- Configure JobStore -->
				<prop key="org.quartz.jobStore.misfireThreshold">60000</prop>
				<prop key="org.quartz.jobStore.driverDelegateClass">org.quartz.impl.jdbcjobstore.StdJDBCDelegate</prop>
				<prop key="org.quartz.jobStore.tablePrefix">SCHEDULER_</prop>
				<prop key="org.quartz.jobStore.maxMisfiresToHandleAtATime">10</prop>
				<!-- 开启集群 -->
				<prop key="org.quartz.jobStore.isClustered">true</prop>
				<prop key="org.quartz.jobStore.clusterCheckinInterval">20000</prop>
				<prop key="org.quartz.jobStore.dontSetAutoCommitFalse">true</prop>
				<prop key="org.quartz.jobStore.txIsolationLevelSerializable">false</prop>
				<prop key="org.quartz.jobStore.dataSource">myDS</prop>
				<prop key="org.quartz.jobStore.nonManagedTXDataSource">myDS</prop>
				<prop key="org.quartz.jobStore.useProperties">false</prop>
				<!-- Configure Datasources  -->
				<prop key="org.quartz.dataSource.myDS.driver">com.mysql.jdbc.Driver</prop>
				<prop key="org.quartz.dataSource.myDS.URL">${db.url}</prop>
				<prop key="org.quartz.dataSource.myDS.user">${db.username}</prop>
				<prop key="org.quartz.dataSource.myDS.password">${db.password}</prop>
				<prop key="org.quartz.dataSource.myDS.maxConnections">10</prop>
				<prop key="org.quartz.dataSource.myDS.validationQuery">select 0 from dual</prop>
			</props>
		</property>
		<property name="applicationContextSchedulerContextKey" value="applicationContext" />
	</bean>

三、 集群源码分析

Quartz如何保证多个节点的应用只进行一次调度(即某一时刻的调度任务只由其中一台服务器执行)?

正如上面架构图所示, Quartz的集群是在同一个数据库下, 由数据库的数据来确定调度任务是否正在执行, 正在执行则其他服务器就不能去执行该行调度数据。 这个跟很多项目是用Zookeeper做集群不一样, 这些项目是靠Zookeeper选举出来的的服务器去执行, 可以理解为Quartz靠数据库选举一个服务器来执行。

如果之前看过这篇Quartz按时启动原理就应该了解到Quartz最主要的一个类QuartzSchedulerThread职责是触发任务, 是一个不断运行的Quartz主线程, 还是从这里入手了解集群原理。

集群配置里面有一个配置项:

<prop key="org.quartz.jobStore.class">org.quartz.impl.jdbcjobstore.JobStoreCMT</prop>

源码可以看到JobStoreCMT extends JobStoreSupport, 在QuartzSchedulerThread的run方法里面调用的acquireNextTriggers、 triggersFired、 releaseAcquiredTrigger方法都进行了加锁处理。

以acquireNextTriggers为例:

而LOCK_TRIGGER_ACCESS其实就是一个JAVA常量

protected static final String LOCK_TRIGGER_ACCESS = "TRIGGER_ACCESS";

这个常量传入加锁的核心方法executeInNonManagedTXLock: 处理逻辑前获取锁, 处理完成后在finally里面释放锁(一种典型的同步处理方法)

 protected <T> T executeInNonManagedTXLock(
            String lockName,
            TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
        boolean transOwner = false;
        Connection conn = null;
        try {
            if (lockName != null) {
                // If we aren't using db locks, then delay getting DB connection
                // until after acquiring the lock since it isn't needed.
                if (getLockHandler().requiresConnection()) {
                    conn = getNonManagedTXConnection();
                }
                // 获取锁
                transOwner = getLockHandler().obtainLock(conn, lockName);
            }

            if (conn == null) {
                conn = getNonManagedTXConnection();
            }

            final T result = txCallback.execute(conn);
            try {
                commitConnection(conn);
            } catch (JobPersistenceException e) {
                rollbackConnection(conn);
                if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() {
                    @Override
                    public Boolean execute(Connection conn) throws JobPersistenceException {
                        return txValidator.validate(conn, result);
                    }
                })) {
                    throw e;
                }
            }

            Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
            if(sigTime != null && sigTime >= 0) {
                signalSchedulingChangeImmediately(sigTime);
            }

            return result;
        } catch (JobPersistenceException e) {
            rollbackConnection(conn);
            throw e;
        } catch (RuntimeException e) {
            rollbackConnection(conn);
            throw new JobPersistenceException("Unexpected runtime exception: "
                    + e.getMessage(), e);
        } finally {
            try {
			    // 释放锁
                releaseLock(lockName, transOwner);
            } finally {
                cleanupConnection(conn);
            }
        }
    }

getLockHandler那么可以思考下这个LockHandler怎么来的?

最后发现在JobStoreSupport的initail方法赋值了:

public void initialize(ClassLoadHelper loadHelper,
            SchedulerSignaler signaler) throws SchedulerConfigException {

        ...

        // If the user hasn't specified an explicit lock handler, then
        // choose one based on CMT/Clustered/UseDBLocks.
        if (getLockHandler() == null) {

            // If the user hasn't specified an explicit lock handler,
            // then we *must* use DB locks with clustering
            if (isClustered()) {
                setUseDBLocks(true);
            }

            if (getUseDBLocks()) {
                ...
				// 在初始化方法里面赋值了
                setLockHandler(new StdRowLockSemaphore(getTablePrefix(), getInstanceName(), getSelectWithLockSQL()));
            } else {
                getLog().info(
                    "Using thread monitor-based data access locking (synchronization).");
                setLockHandler(new SimpleSemaphore());
            }
        }

    }

可以在StdRowLockSemaphore里面看到:

public static final String SELECT_FOR_LOCK = "SELECT * FROM "
            + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
            + " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";

    public static final String INSERT_LOCK = "INSERT INTO "
        + TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES ("
        + SCHED_NAME_SUBST + ", ?)";

可以看出采用了悲观锁的方式对triggers表进行行加锁, 以保证任务同步的正确性。

当线程使用上述的SQL对表中的数据执行操作时,数据库对该行进行行加锁; 于此同时, 另一个线程对该行数据执行操作前需要获取锁, 而此时已被占用, 那么这个线程就只能等待, 直到该行锁被释放。

Quartz的锁存放在:

CREATE TABLE `scheduler_locks` (
  `SCHED_NAME` varchar(120) NOT NULL COMMENT '调度名',
  `LOCK_NAME` varchar(40) NOT NULL COMMENT '锁名',
  PRIMARY KEY (`SCHED_NAME`,`LOCK_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

锁名和上述常量一一对应:

有可能你的任务不能支持并发执行(因为有可能任务还没执行完, 下一轮就trigger了, 如果没做同步处理可能造成严重的数据问题), 那么在任务类加上注解:

@DisallowConcurrentExecution

设置@DisallowConcurrentExecution以后程序会等任务执行完毕以后再去执行

四、 参考资料

Quartz官网: http://quartz-scheduler.org/documentation/quartz-2.x/tutorials/tutorial-lesson-11

时间: 2024-08-14 18:00:20

Quartz集群实战及原理解析的相关文章

HA 高可用集群概述及其原理解析

HA 高可用集群概述及其原理解析 1. 概述 1)所谓HA(High Available),即高可用(7*24小时不中断服务). 2)实现高可用最关键的策略是消除单点故障.HA严格来说应该分成各个组件的HA机制:HDFS 的HA和YARN的HA. 3)Hadoop2.0之前,在HDFS集群中NameNode存在单点故障(SPOF). 4)NameNode主要在以下两个方面影响HDFS集群: ? NameNode机器发生意外,如宕机,集群将无法使用,直到管理员重启 ? NameNode机器需要升级

Quartz集群原理及配置应用

1.Quartz任务调度的基本实现原理 Quartz是OpenSymphony开源组织在任务调度领域的一个开源项目,完全基于Java实现.作为一个优秀的开源调度框架,Quartz具有以下特点: (1)强大的调度功能,例如支持丰富多样的调度方法,可以满足各种常规及特殊需求: (2)灵活的应用方式,例如支持任务和调度的多种组合方式,支持调度数据的多种存储方式: (3)分布式和集群能力,Terracotta收购后在原来功能基础上作了进一步提升.本文将对该部分相加阐述. 1.1 Quartz 核心元素

spring boot + quartz 集群

spring boot bean配置: @Configuration public class QuartzConfig { @Value("${quartz.scheduler.instanceName}") private String quartzInstanceName; @Value("${org.quartz.dataSource.myDS.driver}") private String myDSDriver; @Value("${org.q

原!总结 quartz集群 定时任务 测试运行ok

由于项目优化重构,想将定时任务从quartz单机模式变成集群或分布式的方式.于是,百度了一圈....修修改改...用集群的方式部署定时任务,测试可以... 集群?分布式?什么区别? 集群:同一个业务,部署在多个服务器上 分布式:一个业务分拆多个子业务,部署在不同的服务器上 或者说 集群:是指在多台不同的服务器中部署相同应用或服务模块,构成一个集群,通过负载均衡设备对外提供服务. 分布式:是指在多台不同的服务器中部署不同的服务模块,通过远程调用协同工作,对外提供服务. 平时常用的quartz单机模

MySQL DBA及Linux企业集群实战工程师

MySQL DBA及Linux企业集群实战工程师 2015,来一场随时随地的学习之旅 开启我赢职场MySQL学习之旅 不能错过的我赢之旅 任性就是想问就问 谁是你的群聊小伙伴 学习点滴我主宰 名师在线答与问 职业入门--数据库基础知识及安装MySQL MySQL课程介绍 讲师访谈 深入了解什么是数据库 MySQL从万千数据库中脱颖而出 选择学习哪个版本的MySQL 搭建学习MySQL的实验环境 提前熟悉一下MySQL环境 Linux下基于官方YUM源安装MySQL Linux下基于官方源码包包安

quartz集群调度机制调研及源码分析---转载

quartz2.2.1集群调度机制调研及源码分析引言quartz集群架构调度器实例化调度过程触发器的获取触发trigger:Job执行过程:总结:附: 引言 quratz是目前最为成熟,使用最广泛的java任务调度框架,功能强大配置灵活.在企业应用中占重要地位.quratz在集群环境中的使用方式是每个企业级系统都要考虑的问题.早在2006年,在ITeye上就有一篇关于quratz集群方案的讨论:http://www.iteye.com/topic/40970 ITeye创始人@Robbin在8楼

(4) Spring中定时任务Quartz集群配置学习

原 来配置的Quartz是通过spring配置文件生效的,发现在非集群式的服务器上运行良好,但是将工程部署到水平集群服务器上去后改定时功能不能正常运 行,没有任何错误日志,于是从jar包.JDK版本.cronExpression到服务器类型,甚至查到了服务器操作系统的类型,都没能找到解决的办 法,后来才知道是集群惹的祸! 详细步骤如下: 1. 按照Quartz集群工作原理 图:表示了每个节点直接与数据库通信,若离开数据库将对其他节点一无所知 在数据库中建表.建表模版在Quartz包下docs/d

Oracle 集群】ORACLE DATABASE 11G RAC 知识图文详细教程之ORACLE集群概念和原理(二)

ORACLE集群概念和原理(二) 概述:写下本文档的初衷和动力,来源于上篇的<oracle基本操作手册>.oracle基本操作手册是作者研一假期对oracle基础知识学习的汇总.然后形成体系的总结,一则进行回顾复习,另则便于查询使用.本图文文档亦源于此.阅读Oracle RAC安装与使用教程前,笔者先对这篇文章整体构思和形成进行梳理.由于阅读者知识储备层次不同,我将从Oracle RAC安装前的准备与规划开始进行整体介绍安装部署Oracle RAC.始于唐博士指导,对数据库集群进行配置安装,前

(1)quartz集群调度机制调研及源码分析---转载

quartz2.2.1集群调度机制调研及源码分析 原文地址:http://demo.netfoucs.com/gklifg/article/details/27090179 引言quartz集群架构调度器实例化调度过程触发器的获取触发trigger:Job执行过程:总结:附: 引言 quratz是目前最为成熟,使用最广泛的java任务调度框架,功能强大配置灵活.在企业应用中占重要地位.quratz在集群环境中的使用方式是每个企业级系统都要考虑的问题.早在2006年,在ITeye上就有一篇关于qu