前言
前面说过当paused设置为false,QuartzSchedulerThread才正式启动,我们接着《Quartz与Spring集成——创建调度器》与《Quartz与Spring集成——启动调度器》中QuartzSchedulerThread启动的部分接着展开分析,QuartzSchedulerThread的run方法紧接着会从线程池获取可用的线程数,代码如下:
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
只有availThreadCount大于0时才会进行真正的调度,负责将轮询等待线程的释放。所以我们来看看可用线程数充足的情况下的执行过程。
获取触发器
获取触发器的代码见代码清单1。其中调用了JobStore的acquireNextTriggers方法来获取触发器。
代码清单1
List<OperableTrigger> triggers = null; long now = System.currentTimeMillis(); clearSignaledSchedulingChange(); try { triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow()); lastAcquireFailed = false; if (log.isDebugEnabled()) log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers"); } catch (JobPersistenceException jpe) { //省略异常信息 } catch (RuntimeException e) { //省略异常信息 }
以JobStore的实现类LocalDataSourceJobStore来具体看看acquireNextTriggers方法的执行内容。LocalDataSourceJobStore继承了父类JobStoreSupport的acquireNextTriggers方法(见代码清单2),此方法用于从数据源获取触发器。
代码清单2
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow) throws JobPersistenceException { String lockName; if(isAcquireTriggersWithinLock() || maxCount > 1) { lockName = LOCK_TRIGGER_ACCESS; } else { lockName = null; } return executeInNonManagedTXLock(lockName, new TransactionCallback<List<OperableTrigger>>() { public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException { return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow); } }, new TransactionValidator<List<OperableTrigger>>() { public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException { try { List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId()); Set<String> fireInstanceIds = new HashSet<String>(); for (FiredTriggerRecord ft : acquired) { fireInstanceIds.add(ft.getFireInstanceId()); } for (OperableTrigger tr : result) { if (fireInstanceIds.contains(tr.getFireInstanceId())) { return true; } } return false; } catch (SQLException e) { throw new JobPersistenceException("error validating trigger acquisition", e); } } }); }
JobStoreSupport的acquireNextTriggers方法,主要调用了executeInNonManagedTXLock方法(见代码清单3),其执行逻辑如下:
- 获取数据库连接;
- 回调txCallback(即代码清单2中的TransactionCallback的匿名类)的execute方法,因此调用了acquireNextTrigger方法获取触发器;
- 调用commitConnection方法提交第2步中的所有sql;
- 返回获取的触发器集合;
代码清单3
protected <T> T executeInNonManagedTXLock( String lockName, TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException { boolean transOwner = false; Connection conn = null; try { if (lockName != null) { // If we aren‘t using db locks, then delay getting DB connection // until after acquiring the lock since it isn‘t needed. if (getLockHandler().requiresConnection()) { conn = getNonManagedTXConnection(); } transOwner = getLockHandler().obtainLock(conn, lockName); } if (conn == null) { conn = getNonManagedTXConnection(); } final T result = txCallback.execute(conn); try { commitConnection(conn); } catch (JobPersistenceException e) { rollbackConnection(conn); if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() { @Override public Boolean execute(Connection conn) throws JobPersistenceException { return txValidator.validate(conn, result); } })) { throw e; } } Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion(); if(sigTime != null && sigTime >= 0) { signalSchedulingChangeImmediately(sigTime); } return result; } catch (JobPersistenceException e) { rollbackConnection(conn); throw e; } catch (RuntimeException e) { rollbackConnection(conn); throw new JobPersistenceException("Unexpected runtime exception: " + e.getMessage(), e); } finally { try { releaseLock(lockName, transOwner); } finally { cleanupConnection(conn); } } }
acquireNextTrigger方法用于获取触发器,它的执行步骤如下:
首先,查询状态为WAITING的触发器(见代码清单4),以StdJDBCDelegate为例,其selectTriggerToAcquire方法(使用JDBC的API,留给读者自己去看)实际就是执行sql查询触发器,执行的sql为:
SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC。
TRIGGER_STATE的条件是WAITING,NEXT_FIRE_TIME小于最迟的触发时间,并且要大于最早的触发时间。
注意:本文所有sql中的{0}为QRTZ_,{1}为‘schedulerFactoryBean‘。
代码清单4
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
其次,遍历集合keys中的TriggerKey,在循环中执行以下步骤:
- 从表QRTZ_TRIGGERS中查询触发器,代码为:
OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);
retrieveTrigger方法内部实际执行了Delegate的selectTrigger方法。以StdJDBCDelegate为例,其selectTrigger方法根据TriggerKey查询触发器,执行的sql为:SELECT * FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?
- 根据OperableTrigger持有的JobKey查询表QRTZ_JOB_DETAILS中对应的作业信息,并将作业添加到集合acquiredJobKeysForNoConcurrentExec中,代码如下:
JobKey jobKey = nextTrigger.getJobKey(); JobDetail job = getDelegate().selectJobDetail(conn, jobKey, getClassLoadHelper()); if (job.isConcurrentExectionDisallowed()) { if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) { continue; // next trigger } else { acquiredJobKeysForNoConcurrentExec.add(jobKey); } }
以StdJDBCDelegate为例,其selectJobDetail方法执行的sql为:SELECT * FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?
- 根据已获得的TriggerKey,将此触发器在表QRTZ_TRIGGERS中的状态从WAITING更新为ACQUIRED,代码如下:
int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
以StdJDBCDelegate为例,其updateTriggerStateFromOtherState方法执行的sql为:UPDATE {0}TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ? AND TRIGGER_STATE = ?
- 给OperableTrigger设置实例ID,然后将已触发的触发器插入表QRTZ_FIRED_TRIGGERS,代码如下:
nextTrigger.setFireInstanceId(getFiredTriggerRecordId()); getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);
以StdJDBCDelegate为例,其insertFiredTrigger方法执行的sql为:INSERT INTO {0}FIRED_TRIGGERS (SCHED_NAME, ENTRY_ID, TRIGGER_NAME, TRIGGER_GROUP, INSTANCE_NAME, FIRED_TIME, SCHED_TIME, STATE, JOB_NAME, JOB_GROUP, IS_NONCONCURRENT, REQUESTS_RECOVERY, PRIORITY) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- 将OperableTrigger添加到acquiredTriggers,代码如下:
acquiredTriggers.add(nextTrigger);
最后,返回获得的所有触发器集合acquiredTriggers;
触发触发器
在获取触发器后,下下来就是要触发这些触发器(见代码清单5),可以看到调用了JobStore的triggersFired方法。
代码清单5
// set triggers to ‘executing‘ List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>(); boolean goAhead = true; synchronized(sigLock) { goAhead = !halted.get(); } if(goAhead) { try { List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers); if(res != null) bndles = res; } catch (SchedulerException se) { //省略异常信息 } }
以JobStore的实现类LocalDataSourceJobStore来具体看看triggersFired方法的执行内容。LocalDataSourceJobStore继承了父类JobStoreSupport的triggersFired方法(见代码清单6),此方法用于触发触发器。
代码清单6
public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException { return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS, new TransactionCallback<List<TriggerFiredResult>>() { public List<TriggerFiredResult> execute(Connection conn) throws JobPersistenceException { List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>(); TriggerFiredResult result; for (OperableTrigger trigger : triggers) { try { TriggerFiredBundle bundle = triggerFired(conn, trigger); result = new TriggerFiredResult(bundle); } catch (JobPersistenceException jpe) { result = new TriggerFiredResult(jpe); } catch(RuntimeException re) { result = new TriggerFiredResult(re); } results.add(result); } return results; } }, new TransactionValidator<List<TriggerFiredResult>>() { @Override public Boolean validate(Connection conn, List<TriggerFiredResult> result) throws JobPersistenceException { try { List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId()); Set<String> executingTriggers = new HashSet<String>(); for (FiredTriggerRecord ft : acquired) { if (STATE_EXECUTING.equals(ft.getFireInstanceState())) { executingTriggers.add(ft.getFireInstanceId()); } } for (TriggerFiredResult tr : result) { if (tr.getTriggerFiredBundle() != null && executingTriggers.contains(tr.getTriggerFiredBundle().getTrigger().getFireInstanceId())) { return true; } } return false; } catch (SQLException e) { throw new JobPersistenceException("error validating trigger acquisition", e); } } }); }
可以看到triggersFired方法也调用了executeInNonManagedTXLock方法,我们根据前面的分析,知道最终实际会回调新的TransactionCallback匿名类的execute方法,可以看到其主要执行逻辑无非循环triggers列表,并且调用triggerFired方法获取TriggerFiredBundle。triggerFired的执行步骤如下:
- 获取表QRTZ_TRIGGERS中的触发器状态,代码如下:
try { // if trigger was deleted, state will be STATE_DELETED String state = getDelegate().selectTriggerState(conn, trigger.getKey()); if (!state.equals(STATE_ACQUIRED)) { return null; } } catch (SQLException e) { throw new JobPersistenceException("Couldn‘t select trigger state: " + e.getMessage(), e); }
以StdJDBCDelegate为例,其selectTriggerState方法的执行sql为:
SELECT TRIGGER_STATE FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?
- 查询触发器对应的作业,代码如下:
try { job = retrieveJob(conn, trigger.getJobKey()); if (job == null) { return null; } } catch (JobPersistenceException jpe) { try { getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe); getDelegate().updateTriggerState(conn, trigger.getKey(), STATE_ERROR); } catch (SQLException sqle) { getLog().error("Unable to set trigger state to ERROR.", sqle); } throw jpe; }
这里的retrieveJob方法实际也调用了Delegate的selectJobDetail方法,不再赘述。
- 更新表QRTZ_FIRED_TRIGGERS中此触发器被触发的状态为STATE_EXECUTING,代码如下:
try { getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job); } catch (SQLException e) { throw new JobPersistenceException("Couldn‘t insert fired trigger: " + e.getMessage(), e); }
这里实际执行的sql为
UPDATE {0}FIRED_TRIGGERS SET INSTANCE_NAME = ?, FIRED_TIME = ?, SCHED_TIME = ?, STATE = ?, JOB_NAME = ?, JOB_GROUP = ?, IS_NONCONCURRENT = ?, REQUESTS_RECOVERY = ? WHERE SCHED_NAME = {1} AND ENTRY_ID = ?
- 更新触发器被触发的状态,代码如下:
trigger.triggered(cal);
- 如果捕获到DisallowConcurrentExecution,则将处于STATE_WAITING、STATE_ACQUIRED、STATE_PAUSED状态的触发器的状态修改为STATE_BLOCKED,代码如下:
if (job.isConcurrentExectionDisallowed()) { state = STATE_BLOCKED; force = false; try { getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_BLOCKED, STATE_WAITING); getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_BLOCKED, STATE_ACQUIRED); getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(), STATE_PAUSED_BLOCKED, STATE_PAUSED); } catch (SQLException e) { throw new JobPersistenceException( "Couldn‘t update states of blocked triggers: " + e.getMessage(), e); } }
- 插入新的触发器,代码如下:
storeTrigger(conn, trigger, job, true, state, force, false);
storeTrigger方法首先调用triggerExists用于判断当前触发器是否存在:
boolean existingTrigger = triggerExists(conn, newTrigger.getKey());
以StdJDBCDelegate为例,其triggerExists方法中执行的sql为:SELECT TRIGGER_NAME FROM {0}TRIGGERS WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?。然后根据existingTrigger的值插入或者更新表QRTZ_TRIGGERS中触发器的下次触发时间,代码如下:
if (existingTrigger) { getDelegate().updateTrigger(conn, newTrigger, state, job); } else { getDelegate().insertTrigger(conn, newTrigger, state, job); }
以StdJDBCDelegate为例,其执行的sql为:INSERT INTO {0}TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, JOB_NAME, JOB_GROUP, DESCRIPTION, NEXT_FIRE_TIME, PREV_FIRE_TIME, TRIGGER_STATE, TRIGGER_TYPE, START_TIME, END_TIME, CALENDAR_NAME, MISFIRE_INSTR, JOB_DATA, PRIORITY) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- 返回TriggerFiredBundle对象,代码如下:
return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup() .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());
创建作业运行的shell脚本
之后QuartzSchedulerThread会遍历每个TriggerFiredBundle,然后创建作业运行的shell脚本,见代码清单7.
代码清单7
for (int i = 0; i < bndles.size(); i++) { TriggerFiredResult result = bndles.get(i); TriggerFiredBundle bndle = result.getTriggerFiredBundle(); Exception exception = result.getException(); if (exception instanceof RuntimeException) { getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception); qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } // it‘s possible to get ‘null‘ if the triggers was paused, // blocked, or other similar occurrences that prevent it being // fired at this time... or if the scheduler was shutdown (halted) if (bndle == null) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue; } }
其中最重要的是执行JobRunShellFactory的createJobRunShell方法。以的实现类StdJobRunShellFactory为例,其createJobRunShell方法(见代码清单8)创建作业运行的shell。
代码清单8
public JobRunShell createJobRunShell(TriggerFiredBundle bndle) throws SchedulerException { return new JobRunShell(scheduler, bndle); }
作业执行
执行作业的代码,见代码清单9.
代码清单9
if (qsRsrcs.getThreadPool().runInThread(shell) == false) { // this case should never happen, as it is indicative of the // scheduler being shutdown or a bug in the thread pool or // a thread pool being used concurrently - which the docs // say not to do... getLog().error("ThreadPool.runInThread() return false!"); qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); }
执行作业
runInThread方法的的实现见代码清单10。
代码清单10
public boolean runInThread(Runnable runnable) { if (runnable == null) { return false; } synchronized (nextRunnableLock) { handoffPending = true; // Wait until a worker thread is available while ((availWorkers.size() < 1) && !isShutdown) { try { nextRunnableLock.wait(500); } catch (InterruptedException ignore) { } } if (!isShutdown) { WorkerThread wt = (WorkerThread)availWorkers.removeFirst(); busyWorkers.add(wt); wt.run(runnable); } else { // If the thread pool is going down, execute the Runnable // within a new additional worker thread (no thread from the pool). WorkerThread wt = new WorkerThread(this, threadGroup, "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable); busyWorkers.add(wt); workers.add(wt); wt.start(); } nextRunnableLock.notifyAll(); handoffPending = false; } return true; }
可以看到其中创建了WorkerThread,并启动WorkerThread。WorkerThread的run方法中的主要代码如下:
synchronized(lock) { while (runnable == null && run.get()) { lock.wait(500); } if (runnable != null) { ran = true; runnable.run(); } }
上面代码中的runnable,实际就是之前创建的JobRunShell。可以看到WorkerThread的run方法实际代理执行了JobRunShell的run方法。JobRunShell的run方法中最重要的是执行了以下代码:
job.execute(jec);
以NativeJob为例,其实现见代码清单11.
代码清单11
public void execute(JobExecutionContext context) throws JobExecutionException { JobDataMap data = context.getMergedJobDataMap(); String command = data.getString(PROP_COMMAND); String parameters = data.getString(PROP_PARAMETERS); if (parameters == null) { parameters = ""; } boolean wait = true; if(data.containsKey(PROP_WAIT_FOR_PROCESS)) { wait = data.getBooleanValue(PROP_WAIT_FOR_PROCESS); } boolean consumeStreams = false; if(data.containsKey(PROP_CONSUME_STREAMS)) { consumeStreams = data.getBooleanValue(PROP_CONSUME_STREAMS); } Integer exitCode = this.runNativeCommand(command, parameters, wait, consumeStreams); context.setResult(exitCode); }
NativeJob的execute方法中的关键代码即调用了runNativeCommand方法,其实现见代码清单12。我们可以知道Quartz是如何运行shell了。
代码清单12
private Integer runNativeCommand(String command, String parameters, boolean wait, boolean consumeStreams) throws JobExecutionException { String[] cmd; String[] args = new String[2]; Integer result = null; args[0] = command; args[1] = parameters; try { //with this variable will be done the swithcing String osName = System.getProperty("os.name"); // specific for Windows if (osName.startsWith("Windows")) { cmd = new String[args.length + 2]; if (osName.equals("Windows 95")) { // windows 95 only cmd[0] = "command.com"; } else { cmd[0] = "cmd.exe"; } cmd[1] = "/C"; System.arraycopy(args, 0, cmd, 2, args.length); } else if (osName.equals("Linux")) { cmd = new String[3]; cmd[0] = "/bin/sh"; cmd[1] = "-c"; cmd[2] = args[0] + " " + args[1]; } else { // try this... cmd = args; } Runtime rt = Runtime.getRuntime(); // Executes the command getLog().info("About to run " + cmd[0] + " " + cmd[1] + " " + (cmd.length>2 ? cmd[2] : "") + " ..."); Process proc = rt.exec(cmd); // Consumes the stdout from the process StreamConsumer stdoutConsumer = new StreamConsumer(proc.getInputStream(), "stdout"); // Consumes the stderr from the process if(consumeStreams) { StreamConsumer stderrConsumer = new StreamConsumer(proc.getErrorStream(), "stderr"); stdoutConsumer.start(); stderrConsumer.start(); } if(wait) { result = proc.waitFor(); } // any error message? } catch (Throwable x) { throw new JobExecutionException("Error launching native command: ", x, false); } return result; }
作业执行完成后的处理
在QuartzSchedulerThread执行完作业后还会进行一些后续处理,见代码清单9。以StdJDBCDelegate为例,其triggeredJobComplete方法的实现见代码清单13.
代码清单13
public void triggeredJobComplete(final OperableTrigger trigger, final JobDetail jobDetail, final CompletedExecutionInstruction triggerInstCode) { retryExecuteInNonManagedTXLock( LOCK_TRIGGER_ACCESS, new VoidTransactionCallback() { public void executeVoid(Connection conn) throws JobPersistenceException { triggeredJobComplete(conn, trigger, jobDetail,triggerInstCode); } }); }
可以看到其中调用了retryExecuteInNonManagedTXLock方法,其执行逻辑和executeInNonManagedTXLock非常相似,最终回调了VoidTransactionCallback匿名类的execute方法。triggeredJobComplete方法将做一些最终的工作:如清除QRTZ_FIRED_TRIGGERS表中触发器触发的实例,更新触发器的完成或者错误状态等。
小结
经过以上分析,对Quartz如何定时调度的原理有了较深入的了解。不过,阅读Quartz源码相对不是很轻松的过程。因为相比较Tomcat、Spark的源码,其设计略感繁重,逻辑严重耦合与数据库,注释也没有前两者那么丰富。
后记:个人总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。
京东:http://item.jd.com/11846120.html
当当:http://product.dangdang.com/23838168.html