Quartz任务调度源码分析

从源码分析中可以看出,任务的整个调度过程为,初始化线程池,及调度器QuartzScheduler,然后由线程池去执行QuartzSchedulerThread,将触发器任务(job与触发器)添加到存储器(TreeSet,timeTrriger)中,然后启动调度器,QuartzSchedulerThread从timeTrriger去除待触发的任务,并包装成TriggerFiredBundle,然后由JobRunShellFactory 
创建TriggerFiredBundle的执行线程JobRunShell, 调度执行通过线程池SimpleThreadPool去执行JobRunShell,而JobRunShell执行的就是job.execute(JobExecutionContext context)。Quartz主要中的集合类有ArrayList,LinkedList,HashMap,TreeSet(TreeMap);之所以用到上面四个集合类,主要用到集合的如下特点:ArrayList访问速度快,LinkedList添加删除元素快;HashMap添加删除快,TreeSet访问速度快。

触发任务创建工厂类

Java代码  下载

  1. public class JTAJobRunShellFactory
  2. implements JobRunShellFactory
  3. {
  4. public void initialize(Scheduler sched)
  5. throws SchedulerConfigException
  6. {
  7. scheduler = sched;
  8. }
  9. public JobRunShell createJobRunShell(TriggerFiredBundle bundle)
  10. throws SchedulerException
  11. {
  12. return new JTAJobRunShell(scheduler, bundle);
  13. }
  14. private Scheduler scheduler;
  15. }

//触发任务运行类

Java代码  下载

  1. public class JTAJobRunShell extends JobRunShell
  2. {
  3. public JTAJobRunShell(Scheduler scheduler, TriggerFiredBundle bndle)
  4. {
  5. super(scheduler, bndle);
  6. transactionTimeout = null;
  7. }
  8. }
  9. public class JobRunShell extends SchedulerListenerSupport
  10. implements Runnable
  11. {
  12. public JobRunShell(Scheduler scheduler, TriggerFiredBundle bndle)
  13. {
  14. jec = null;
  15. qs = null;
  16. firedTriggerBundle = null;
  17. this.scheduler = null;
  18. shutdownRequested = false;
  19. this.scheduler = scheduler;
  20. firedTriggerBundle = bndle;
  21. }
  22. public void run()
  23. {
  24. //添加到内部监听器
  25. qs.addInternalSchedulerListener(this);
  26. label0:
  27. {
  28. //protected JobExecutionContextImpl jec,job执行上下文
  29. OperableTrigger trigger = (OperableTrigger)jec.getTrigger();
  30. JobDetail jobDetail = jec.getJobDetail();
  31. org.quartz.Trigger.CompletedExecutionInstruction instCode;
  32. do
  33. {
  34. JobExecutionException jobExEx = null;
  35. Job job = jec.getJobInstance();
  36. try
  37. {
  38. begin();
  39. }
  40. catch(SchedulerException se)
  41. {
  42. qs.notifySchedulerListenersError((new StringBuilder()).append("Error executing Job (").append(jec.getJobDetail().getKey()).append(": couldn‘t begin execution.").toString(), se);
  43. break label0;
  44. }
  45. try
  46. {
  47. if(!notifyListenersBeginning(jec))
  48. break label0;
  49. }
  50. catch(VetoedException ve)
  51. {
  52. try
  53. {
  54. org.quartz.Trigger.CompletedExecutionInstruction instCode = trigger.executionComplete(jec, null);
  55. qs.notifyJobStoreJobVetoed(trigger, jobDetail, instCode);
  56. if(jec.getTrigger().getNextFireTime() == null)
  57. qs.notifySchedulerListenersFinalized(jec.getTrigger());
  58. complete(true);
  59. }
  60. catch(SchedulerException se)
  61. {
  62. qs.notifySchedulerListenersError((new StringBuilder()).append("Error during veto of Job (").append(jec.getJobDetail().getKey()).append(": couldn‘t finalize execution.").toString(), se);
  63. }
  64. break label0;
  65. }
  66. long startTime = System.currentTimeMillis();
  67. long endTime = startTime;
  68. try
  69. {
  70. log.debug((new StringBuilder()).append("Calling execute on job ").append(jobDetail.getKey()).toString());
  71. //执行Job,关键
  72. job.execute(jec);
  73. endTime = System.currentTimeMillis();
  74. }
  75. catch(JobExecutionException jee)
  76. {
  77. endTime = System.currentTimeMillis();
  78. jobExEx = jee;
  79. getLog().info((new StringBuilder()).append("Job ").append(jobDetail.getKey()).append(" threw a JobExecutionException: ").toString(), jobExEx);
  80. }
  81. catch(Throwable e)
  82. {
  83. endTime = System.currentTimeMillis();
  84. getLog().error((new StringBuilder()).append("Job ").append(jobDetail.getKey()).append(" threw an unhandled Exception: ").toString(), e);
  85. SchedulerException se = new SchedulerException("Job threw an unhandled exception.", e);
  86. qs.notifySchedulerListenersError((new StringBuilder()).append("Job (").append(jec.getJobDetail().getKey()).append(" threw an exception.").toString(), se);
  87. jobExEx = new JobExecutionException(se, false);
  88. }
  89. //设置jJobExecutionContext运行时间
  90. jec.setJobRunTime(endTime - startTime);
  91. if(!notifyJobListenersComplete(jec, jobExEx))
  92. break label0;
  93. instCode = org.quartz.Trigger.CompletedExecutionInstruction.NOOP;
  94. try
  95. {
  96. instCode = trigger.executionComplete(jec, jobExEx);
  97. }
  98. catch(Exception e)
  99. {
  100. SchedulerException se = new SchedulerException("Trigger threw an unhandled exception.", e);
  101. qs.notifySchedulerListenersError("Please report this error to the Quartz developers.", se);
  102. }
  103. if(!notifyTriggerListenersComplete(jec, instCode))
  104. break label0;
  105. if(instCode == org.quartz.Trigger.CompletedExecutionInstruction.RE_EXECUTE_JOB)
  106. {
  107. jec.incrementRefireCount();
  108. try
  109. {
  110. complete(false);
  111. }
  112. catch(SchedulerException se)
  113. {
  114. qs.notifySchedulerListenersError((new StringBuilder()).append("Error executing Job (").append(jec.getJobDetail().getKey()).append(": couldn‘t finalize execution.").toString(), se);
  115. }
  116. continue;
  117. }
  118. try
  119. {
  120. complete(true);
  121. break;
  122. }
  123. catch(SchedulerException se)
  124. {
  125. qs.notifySchedulerListenersError((new StringBuilder()).append("Error executing Job (").append(jec.getJobDetail().getKey()).append(": couldn‘t finalize execution.").toString(), se);
  126. }
  127. } while(true);
  128. //通知job执行完成
  129. qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
  130. }
  131. qs.removeInternalSchedulerListener(this);
  132. break MISSING_BLOCK_LABEL_710;
  133. Exception exception;
  134. exception;
  135. qs.removeInternalSchedulerListener(this);
  136. throw exception;
  137. }
  138. protected JobExecutionContextImpl jec;//job执行上下文
  139. protected QuartzScheduler qs;
  140. protected TriggerFiredBundle firedTriggerBundle;
  141. protected Scheduler scheduler;
  142. protected volatile boolean shutdownRequested;
  143. private final Logger log = LoggerFactory.getLogger(getClass());
  144. }

//TriggerKey,JobKey包装类

Java代码  下载

  1. class TriggerWrapper
  2. {
  3. TriggerWrapper(OperableTrigger trigger)
  4. {
  5. state = 0;
  6. if(trigger == null)
  7. {
  8. throw new IllegalArgumentException("Trigger cannot be null!");
  9. } else
  10. {
  11. this.trigger = trigger;
  12. key = trigger.getKey();
  13. jobKey = trigger.getJobKey();
  14. return;
  15. }
  16. }
  17. public boolean equals(Object obj)
  18. {
  19. if(obj instanceof TriggerWrapper)
  20. {
  21. TriggerWrapper tw = (TriggerWrapper)obj;
  22. if(tw.key.equals(key))
  23. return true;
  24. }
  25. return false;
  26. }
  27. public int hashCode()
  28. {
  29. return key.hashCode();
  30. }
  31. public OperableTrigger getTrigger()
  32. {
  33. return trigger;
  34. }
  35. public final TriggerKey key;
  36. public final JobKey jobKey;
  37. public final OperableTrigger trigger;
  38. public int state;
  39. public static final int STATE_WAITING = 0;//等待
  40. public static final int STATE_ACQUIRED = 1;//就绪
  41. public static final int STATE_EXECUTING = 2;//执行
  42. public static final int STATE_COMPLETE = 3;//完成
  43. public static final int STATE_PAUSED = 4;//暂停
  44. public static final int STATE_BLOCKED = 5;//阻塞
  45. public static final int STATE_PAUSED_BLOCKED = 6;//暂停阻塞
  46. public static final int STATE_ERROR = 7;//错误
  47. }

//简单触发器

Java代码  下载

  1. public class SimpleTriggerImpl extends AbstractTrigger
  2. implements SimpleTrigger, CoreTrigger
  3. {
  4. //获取下一次触发时间
  5. public Date getNextFireTime()
  6. {
  7. return nextFireTime;
  8. }
  9. private Date startTime;
  10. private Date endTime;
  11. private Date nextFireTime;
  12. private Date previousFireTime;
  13. private int repeatCount;
  14. private long repeatInterval;
  15. private int timesTriggered;
  16. private boolean complete;
  17. }

//触发任务包装类

Java代码  下载

  1. public class TriggerFiredBundle
  2. implements Serializable
  3. {
  4. public TriggerFiredBundle(JobDetail job, OperableTrigger trigger, Calendar cal, boolean jobIsRecovering, Date fireTime, Date scheduledFireTime, Date prevFireTime,
  5. Date nextFireTime)
  6. {
  7. this.job = job;
  8. this.trigger = trigger;
  9. this.cal = cal;
  10. this.jobIsRecovering = jobIsRecovering;
  11. this.fireTime = fireTime;
  12. this.scheduledFireTime = scheduledFireTime;
  13. this.prevFireTime = prevFireTime;
  14. this.nextFireTime = nextFireTime;
  15. }
  16. private JobDetail job;
  17. private OperableTrigger trigger;
  18. private Calendar cal;
  19. private boolean jobIsRecovering;
  20. private Date fireTime;
  21. private Date scheduledFireTime;
  22. private Date prevFireTime;
  23. private Date nextFireTime;
  24. }

//触发任务包装结果类

Java代码  下载

  1. public class TriggerFiredResult
  2. {
  3. public TriggerFiredResult(TriggerFiredBundle triggerFiredBundle)
  4. {
  5. this.triggerFiredBundle = triggerFiredBundle;
  6. }
  7. private TriggerFiredBundle triggerFiredBundle;
  8. private Exception exception;
  9. }
时间: 2024-10-03 21:47:01

Quartz任务调度源码分析的相关文章

3.算子+PV&UV+submit提交参数+资源调度和任务调度源码分析+二次排序+分组topN+SparkShell

1.补充算子 transformations ?  mapPartitionWithIndex 类似于mapPartitions,除此之外还会携带分区的索引值. ?  repartition 增加或减少分区.会产生shuffle.(多个分区分到一个分区不会产生shuffle) 多用于增多分区. 底层调用的是coalesce ?  coalesce(合并) coalesce常用来减少分区,第二个参数是减少分区的过程中是否产生shuffle. true为产生shuffle,false不产生shuff

quartz集群调度机制调研及源码分析---转载

quartz2.2.1集群调度机制调研及源码分析引言quartz集群架构调度器实例化调度过程触发器的获取触发trigger:Job执行过程:总结:附: 引言 quratz是目前最为成熟,使用最广泛的java任务调度框架,功能强大配置灵活.在企业应用中占重要地位.quratz在集群环境中的使用方式是每个企业级系统都要考虑的问题.早在2006年,在ITeye上就有一篇关于quratz集群方案的讨论:http://www.iteye.com/topic/40970 ITeye创始人@Robbin在8楼

定时组件quartz系列<三>quartz调度机制调研及源码分析

quartz2.2.1集群调度机制调研及源码分析引言quartz集群架构调度器实例化调度过程触发器的获取触发trigger:Job执行过程:总结:附: 引言 quratz是目前最为成熟,使用最广泛的java任务调度框架,功能强大配置灵活.在企业应用中占重要地位.quratz在集群环境中的使用方式是每个企业级系统都要考虑的问题.早在2006年,在ITeye上就有一篇关于quratz集群方案的讨论:http://www.iteye.com/topic/40970 ITeye创始人@Robbin在8楼

(1)quartz集群调度机制调研及源码分析---转载

quartz2.2.1集群调度机制调研及源码分析 原文地址:http://demo.netfoucs.com/gklifg/article/details/27090179 引言quartz集群架构调度器实例化调度过程触发器的获取触发trigger:Job执行过程:总结:附: 引言 quratz是目前最为成熟,使用最广泛的java任务调度框架,功能强大配置灵活.在企业应用中占重要地位.quratz在集群环境中的使用方式是每个企业级系统都要考虑的问题.早在2006年,在ITeye上就有一篇关于qu

spark core源码分析6 Spark job的提交

本节主要讲解SparkContext的逻辑 首先看一个spark自带的最简单的例子: object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = ma

spark源码分析之Executor启动与任务提交篇

任务提交流程 概述 在阐明了Spark的Master的启动流程与Worker启动流程.接下继续执行的就是Worker上的Executor进程了,本文继续分析整个Executor的启动与任务提交流程 Spark-submit 提交一个任务到集群通过的是Spark-submit 通过启动脚本的方式启动它的主类,这里以WordCount为例子 spark-submit --class cn.itcast.spark.WordCount bin/spark-clas -> org.apache.spar

JStorm与Storm源码分析(四)--均衡调度器,EvenScheduler

EvenScheduler同DefaultScheduler一样,同样实现了IScheduler接口, 由下面代码可以看出: (ns backtype.storm.scheduler.EvenScheduler (:use [backtype.storm util log config]) (:require [clojure.set :as set]) (:import [backtype.storm.scheduler IScheduler Topologies Cluster Topolo

源码分析_Shinken-2.4.0001.启动脚本/etc/init.d/shinken源码分析?

简单介绍:说明: Shinken是一个网络监控平台,可以通过一系列直观的方式监控网络内的各种健康状况.Shinken脱胎于Nagios,其实Shinken这个项目本身就是一帮Nagios项目的人无法忍受Nagios,自己跳出来重新用纯Python重构了一下,甚至完全兼容Nagios的配置文件. 相关地址: 官网地址: http://www.shinken-monitoring.org/ 官网文档: http://shinken.readthedocs.io/en/latest/ 论坛地址: ht

JDK源码分析之concurrent包(一) -- Executor架构

Java5新出的concurrent包中的API,是一些并发编程中实用的的工具类.在高并发场景下的使用非常广泛.笔者在这做了一个针对concurrent包中部分常用类的源码分析系列.本系列针对的读者是已经对并发包中的Executor框架和工具类有所了解并懂得如何使用的人群,如果对并发包还不了解的朋友,请先做些了解.网上对这方面的讲述有丰富的资源. 本篇博文是第一期,首先对Executor架构做一个概述.这里只简单介绍接口和类的继承.使用关系. 盗用一张类图来描述结构: 解析: Executor是