Quartz与Spring集成——创建调度器

前言

在《Quartz与Spring集成—— SchedulerFactoryBean的初始化分析》一文中介绍过Spring集成Quartz时的初始化过程,其中简单的提到了创建调度器的方法createScheduler。本文将着重介绍Quartz初始化时是如何创建调度器的。

创建调度器

这里从createScheduler的实现(见代码清单1)来分析,其处理步骤如下:

  1. 设置线程上下文的类加载器;
  2. 通过单例方法获取SchedulerRepository的实例(见代码清单2);
  3. 从调度仓库实例SchedulerRepository中查找已经存在的调度器,这里想说的是虽然此实现考虑到了多线程安全问题,不过这种方式效率较低。不如提前初始化,由final修饰,不使用同步语句,避免线程间的阻塞和等待;
  4. 获取调取器(见代码清单3),其实际上首先从调度器缓存中查找调度器,否则调用instantiate方法创建调度器;
  5. 检查重新获取的调度器和已经存在的调度器是否相同,如果相同则说明此调度器已经被激活了,将会报出异常。如果调度器是首次被激活,那么将返回此调度器。这里的实现稍微有些拖沓,其实只有当existingScheduler为null时,才会调用instantiate方法创建newScheduler,也只有在这个时候newScheduler才不等于existingScheduler,并且不会抛出异常。因此我们甚至可以省去Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);这行代码,而直接将代码清单3中的实现进行修改——当sched为null时才调用instantiate方法创建调度器。

代码清单1

	protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName)
			throws SchedulerException {

		// Override thread context ClassLoader to work around naive Quartz ClassLoadHelper loading.
		Thread currentThread = Thread.currentThread();
		ClassLoader threadContextClassLoader = currentThread.getContextClassLoader();
		boolean overrideClassLoader = (this.resourceLoader != null &&
				!this.resourceLoader.getClassLoader().equals(threadContextClassLoader));
		if (overrideClassLoader) {
			currentThread.setContextClassLoader(this.resourceLoader.getClassLoader());
		}
		try {
			SchedulerRepository repository = SchedulerRepository.getInstance();
			synchronized (repository) {
				Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);
				Scheduler newScheduler = schedulerFactory.getScheduler();
				if (newScheduler == existingScheduler) {
					throw new IllegalStateException("Active Scheduler of name ‘" + schedulerName + "‘ already registered " +
							"in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!");
				}
				if (!this.exposeSchedulerInRepository) {
					// Need to remove it in this case, since Quartz shares the Scheduler instance by default!
					SchedulerRepository.getInstance().remove(newScheduler.getSchedulerName());
				}
				return newScheduler;
			}
		}
		finally {
			if (overrideClassLoader) {
				// Reset original thread context ClassLoader.
				currentThread.setContextClassLoader(threadContextClassLoader);
			}
		}
	}

代码清单2

    public static synchronized SchedulerRepository getInstance() {
        if (inst == null) {
            inst = new SchedulerRepository();
        }

        return inst;
    }

代码清单3

    public Scheduler getScheduler() throws SchedulerException {
        if (cfg == null) {
            initialize();
        }

        SchedulerRepository schedRep = SchedulerRepository.getInstance();

        Scheduler sched = schedRep.lookup(getSchedulerName());

        if (sched != null) {
            if (sched.isShutdown()) {
                schedRep.remove(getSchedulerName());
            } else {
                return sched;
            }
        }

        sched = instantiate();

        return sched;
    }

读取调度器配置

instantiate方法中包含了很多从PropertiesParser(PropertiesParser在《Quartz与Spring集成—— SchedulerFactoryBean的初始化分析》一文中介绍过)中获取各种属性的语句,这里不过多展示。重点来看其更为本质的内容。

创建远端调度器代理

如果当前调度器实际是代理远程RMI调度器,那么创建RemoteScheduler,并将当前调取器与RemoteScheduler进行绑定,最后以此RemoteScheduler作为调度器,见代码清单4。

代码清单4

        if (rmiProxy) {

            if (autoId) {
                schedInstId = DEFAULT_INSTANCE_ID;
            }

            String uid = (rmiBindName == null) ? QuartzSchedulerResources.getUniqueIdentifier(
                    schedName, schedInstId) : rmiBindName;

            RemoteScheduler remoteScheduler = new RemoteScheduler(uid, rmiHost, rmiPort);

            schedRep.bind(remoteScheduler);

            return remoteScheduler;
        }

创建远端jmx调度器代理

如果当前调度器实际是代理远程JMX调度器,那么创建RemoteMBeanScheduler,并将当前调度器与RemoteMBeanScheduler进行绑定,最后以此RemoteMBeanScheduler作为调度器,见代码清单5。

代码清单5

        if (jmxProxy) {
            if (autoId) {
                schedInstId = DEFAULT_INSTANCE_ID;
            }

            if (jmxProxyClass == null) {
                throw new SchedulerConfigException("No JMX Proxy Scheduler class provided");
            }

            RemoteMBeanScheduler jmxScheduler = null;
            try {
                jmxScheduler = (RemoteMBeanScheduler)loadHelper.loadClass(jmxProxyClass)
                        .newInstance();
            } catch (Exception e) {
                throw new SchedulerConfigException(
                        "Unable to instantiate RemoteMBeanScheduler class.", e);
            }

            if (jmxObjectName == null) {
                jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId);
            }

            jmxScheduler.setSchedulerObjectName(jmxObjectName);

            tProps = cfg.getPropertyGroup(PROP_SCHED_JMX_PROXY, true);
            try {
                setBeanProps(jmxScheduler, tProps);
            } catch (Exception e) {
                initException = new SchedulerException("RemoteMBeanScheduler class ‘"
                        + jmxProxyClass + "‘ props could not be configured.", e);
                throw initException;
            }

            jmxScheduler.initialize();

            schedRep.bind(jmxScheduler);

            return jmxScheduler;
        }

实例化作业工厂

如果指定了jobFactoryClass属性,那么实例化作业工厂实例,见代码清单6。

代码清单6

        JobFactory jobFactory = null;
        if(jobFactoryClass != null) {
            try {
                jobFactory = (JobFactory) loadHelper.loadClass(jobFactoryClass)
                        .newInstance();
            } catch (Exception e) {
                throw new SchedulerConfigException(
                        "Unable to instantiate JobFactory class: "
                                + e.getMessage(), e);
            }

            tProps = cfg.getPropertyGroup(PROP_SCHED_JOB_FACTORY_PREFIX, true);
            try {
                setBeanProps(jobFactory, tProps);
            } catch (Exception e) {
                initException = new SchedulerException("JobFactory class ‘"
                        + jobFactoryClass + "‘ props could not be configured.", e);
                throw initException;
            }
        }

实例化实例ID生成器

如果指定了instanceIdGeneratorClass属性,那么实例化实例ID生成器,见代码清单7。

代码清单7

        InstanceIdGenerator instanceIdGenerator = null;
        if(instanceIdGeneratorClass != null) {
            try {
                instanceIdGenerator = (InstanceIdGenerator) loadHelper.loadClass(instanceIdGeneratorClass)
                    .newInstance();
            } catch (Exception e) {
                throw new SchedulerConfigException(
                        "Unable to instantiate InstanceIdGenerator class: "
                        + e.getMessage(), e);
            }

            tProps = cfg.getPropertyGroup(PROP_SCHED_INSTANCE_ID_GENERATOR_PREFIX, true);
            try {
                setBeanProps(instanceIdGenerator, tProps);
            } catch (Exception e) {
                initException = new SchedulerException("InstanceIdGenerator class ‘"
                        + instanceIdGeneratorClass + "‘ props could not be configured.", e);
                throw initException;
            }
        }

实例化线程池

org.quartz.threadPool.class属性用于指定线程池类,如果没有指定,则默认为org.quartz.simpl.SimpleThreadPool,见代码清单8。

代码清单8

        String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());

        if (tpClass == null) {
            initException = new SchedulerException(
                    "ThreadPool class not specified. ");
            throw initException;
        }

        try {
            tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
        } catch (Exception e) {
            initException = new SchedulerException("ThreadPool class ‘"
                    + tpClass + "‘ could not be instantiated.", e);
            throw initException;
        }
        tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true);
        try {
            setBeanProps(tp, tProps);
        } catch (Exception e) {
            initException = new SchedulerException("ThreadPool class ‘"
                    + tpClass + "‘ props could not be configured.", e);
            throw initException;
        }

实例化JobStore的具体实例

org.quartz.jobStore.class属性用于指定JobStore的具体类型,我显示指定了org.springframework.scheduling.quartz.LocalDataSourceJobStore,如果没有指定,则默认为RAMJobStore,见代码清单9。

代码清单9

        String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS,
                RAMJobStore.class.getName());

        if (jsClass == null) {
            initException = new SchedulerException(
                    "JobStore class not specified. ");
            throw initException;
        }

        try {
            js = (JobStore) loadHelper.loadClass(jsClass).newInstance();
        } catch (Exception e) {
            initException = new SchedulerException("JobStore class ‘" + jsClass
                    + "‘ could not be instantiated.", e);
            throw initException;
        }

        SchedulerDetailsSetter.setDetails(js, schedName, schedInstId);

        tProps = cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX, true, new String[] {PROP_JOB_STORE_LOCK_HANDLER_PREFIX});
        try {
            setBeanProps(js, tProps);
        } catch (Exception e) {
            initException = new SchedulerException("JobStore class ‘" + jsClass
                    + "‘ props could not be configured.", e);
            throw initException;
        }

获取数据库管理器并设置数据库连接池

这一步骤的执行逻辑比较多,但是仔细整理后发现数据库管理器都一样,无非是数据连接池的提供者不同(见代码清单10),一共分为三种:

方式一:连接池提供者由connectionProvider.class属性指定;

方式二:连接池提供者由jndiURL属性指定;

方式三:连接池提供者为PoolingConnectionProvider,其使用了C3P0连接池;

代码清单10

        String[] dsNames = cfg.getPropertyGroups(PROP_DATASOURCE_PREFIX);
        for (int i = 0; i < dsNames.length; i++) {
            PropertiesParser pp = new PropertiesParser(cfg.getPropertyGroup(
                    PROP_DATASOURCE_PREFIX + "." + dsNames[i], true));

            String cpClass = pp.getStringProperty(PROP_CONNECTION_PROVIDER_CLASS, null);

            // custom connectionProvider...
            if(cpClass != null) {
                ConnectionProvider cp = null;
                try {
                    cp = (ConnectionProvider) loadHelper.loadClass(cpClass).newInstance();
                } catch (Exception e) {
                    initException = new SchedulerException("ConnectionProvider class ‘" + cpClass
                            + "‘ could not be instantiated.", e);
                    throw initException;
                }

                try {
                    // remove the class name, so it isn‘t attempted to be set
                    pp.getUnderlyingProperties().remove(
                            PROP_CONNECTION_PROVIDER_CLASS);

                    if (cp instanceof PoolingConnectionProvider) {
                        populateProviderWithExtraProps((PoolingConnectionProvider)cp, pp.getUnderlyingProperties());
                    } else {
                        setBeanProps(cp, pp.getUnderlyingProperties());
                    }
                    cp.initialize();
                } catch (Exception e) {
                    initException = new SchedulerException("ConnectionProvider class ‘" + cpClass
                            + "‘ props could not be configured.", e);
                    throw initException;
                }

                dbMgr = DBConnectionManager.getInstance();
                dbMgr.addConnectionProvider(dsNames[i], cp);
            } else {
                String dsJndi = pp.getStringProperty(PROP_DATASOURCE_JNDI_URL, null);

                if (dsJndi != null) {
                    boolean dsAlwaysLookup = pp.getBooleanProperty(
                            PROP_DATASOURCE_JNDI_ALWAYS_LOOKUP);
                    String dsJndiInitial = pp.getStringProperty(
                            PROP_DATASOURCE_JNDI_INITIAL);
                    String dsJndiProvider = pp.getStringProperty(
                            PROP_DATASOURCE_JNDI_PROVDER);
                    String dsJndiPrincipal = pp.getStringProperty(
                            PROP_DATASOURCE_JNDI_PRINCIPAL);
                    String dsJndiCredentials = pp.getStringProperty(
                            PROP_DATASOURCE_JNDI_CREDENTIALS);
                    Properties props = null;
                    if (null != dsJndiInitial || null != dsJndiProvider
                            || null != dsJndiPrincipal || null != dsJndiCredentials) {
                        props = new Properties();
                        if (dsJndiInitial != null) {
                            props.put(PROP_DATASOURCE_JNDI_INITIAL,
                                    dsJndiInitial);
                        }
                        if (dsJndiProvider != null) {
                            props.put(PROP_DATASOURCE_JNDI_PROVDER,
                                    dsJndiProvider);
                        }
                        if (dsJndiPrincipal != null) {
                            props.put(PROP_DATASOURCE_JNDI_PRINCIPAL,
                                    dsJndiPrincipal);
                        }
                        if (dsJndiCredentials != null) {
                            props.put(PROP_DATASOURCE_JNDI_CREDENTIALS,
                                    dsJndiCredentials);
                        }
                    }
                    JNDIConnectionProvider cp = new JNDIConnectionProvider(dsJndi,
                            props, dsAlwaysLookup);
                    dbMgr = DBConnectionManager.getInstance();
                    dbMgr.addConnectionProvider(dsNames[i], cp);
                } else {
                    String dsDriver = pp.getStringProperty(PoolingConnectionProvider.DB_DRIVER);
                    String dsURL = pp.getStringProperty(PoolingConnectionProvider.DB_URL);

                    if (dsDriver == null) {
                        initException = new SchedulerException(
                                "Driver not specified for DataSource: "
                                        + dsNames[i]);
                        throw initException;
                    }
                    if (dsURL == null) {
                        initException = new SchedulerException(
                                "DB URL not specified for DataSource: "
                                        + dsNames[i]);
                        throw initException;
                    }
                    try {
                        PoolingConnectionProvider cp = new PoolingConnectionProvider(pp.getUnderlyingProperties());
                        dbMgr = DBConnectionManager.getInstance();
                        dbMgr.addConnectionProvider(dsNames[i], cp);

                        // Populate the underlying C3P0 data source pool properties
                        populateProviderWithExtraProps(cp, pp.getUnderlyingProperties());
                    } catch (Exception sqle) {
                        initException = new SchedulerException(
                                "Could not initialize DataSource: " + dsNames[i],
                                sqle);
                        throw initException;
                    }
                }

            }

        }

设置调度器插件

这一段用于设置各种调度器插件,见代码清单11。

代码清单11

        String[] pluginNames = cfg.getPropertyGroups(PROP_PLUGIN_PREFIX);
        SchedulerPlugin[] plugins = new SchedulerPlugin[pluginNames.length];
        for (int i = 0; i < pluginNames.length; i++) {
            Properties pp = cfg.getPropertyGroup(PROP_PLUGIN_PREFIX + "."
                    + pluginNames[i], true);

            String plugInClass = pp.getProperty(PROP_PLUGIN_CLASS, null);

            if (plugInClass == null) {
                initException = new SchedulerException(
                        "SchedulerPlugin class not specified for plugin ‘"
                                + pluginNames[i] + "‘");
                throw initException;
            }
            SchedulerPlugin plugin = null;
            try {
                plugin = (SchedulerPlugin)
                        loadHelper.loadClass(plugInClass).newInstance();
            } catch (Exception e) {
                initException = new SchedulerException(
                        "SchedulerPlugin class ‘" + plugInClass
                                + "‘ could not be instantiated.", e);
                throw initException;
            }
            try {
                setBeanProps(plugin, pp);
            } catch (Exception e) {
                initException = new SchedulerException(
                        "JobStore SchedulerPlugin ‘" + plugInClass
                                + "‘ props could not be configured.", e);
                throw initException;
            }

            plugins[i] = plugin;
        }

设置作业监听器

这一步用于设置作业监听器,我觉得可以用于做一些作业监控相关的扩展,见代明清单12。

代明清单12

        Class<?>[] strArg = new Class[] { String.class };
        String[] jobListenerNames = cfg.getPropertyGroups(PROP_JOB_LISTENER_PREFIX);
        JobListener[] jobListeners = new JobListener[jobListenerNames.length];
        for (int i = 0; i < jobListenerNames.length; i++) {
            Properties lp = cfg.getPropertyGroup(PROP_JOB_LISTENER_PREFIX + "."
                    + jobListenerNames[i], true);

            String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null);

            if (listenerClass == null) {
                initException = new SchedulerException(
                        "JobListener class not specified for listener ‘"
                                + jobListenerNames[i] + "‘");
                throw initException;
            }
            JobListener listener = null;
            try {
                listener = (JobListener)
                       loadHelper.loadClass(listenerClass).newInstance();
            } catch (Exception e) {
                initException = new SchedulerException(
                        "JobListener class ‘" + listenerClass
                                + "‘ could not be instantiated.", e);
                throw initException;
            }
            try {
                Method nameSetter = null;
                try {
                    nameSetter = listener.getClass().getMethod("setName", strArg);
                }
                catch(NoSuchMethodException ignore) {
                    /* do nothing */
                }
                if(nameSetter != null) {
                    nameSetter.invoke(listener, new Object[] {jobListenerNames[i] } );
                }
                setBeanProps(listener, lp);
            } catch (Exception e) {
                initException = new SchedulerException(
                        "JobListener ‘" + listenerClass
                                + "‘ props could not be configured.", e);
                throw initException;
            }
            jobListeners[i] = listener;
        }

设置触发器监听器

这一步设置触发器监听器,见代码清单13。

代码清单13

        String[] triggerListenerNames = cfg.getPropertyGroups(PROP_TRIGGER_LISTENER_PREFIX);
        TriggerListener[] triggerListeners = new TriggerListener[triggerListenerNames.length];
        for (int i = 0; i < triggerListenerNames.length; i++) {
            Properties lp = cfg.getPropertyGroup(PROP_TRIGGER_LISTENER_PREFIX + "."
                    + triggerListenerNames[i], true);

            String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null);

            if (listenerClass == null) {
                initException = new SchedulerException(
                        "TriggerListener class not specified for listener ‘"
                                + triggerListenerNames[i] + "‘");
                throw initException;
            }
            TriggerListener listener = null;
            try {
                listener = (TriggerListener)
                       loadHelper.loadClass(listenerClass).newInstance();
            } catch (Exception e) {
                initException = new SchedulerException(
                        "TriggerListener class ‘" + listenerClass
                                + "‘ could not be instantiated.", e);
                throw initException;
            }
            try {
                Method nameSetter = null;
                try {
                    nameSetter = listener.getClass().getMethod("setName", strArg);
                }
                catch(NoSuchMethodException ignore) { /* do nothing */ }
                if(nameSetter != null) {
                    nameSetter.invoke(listener, new Object[] {triggerListenerNames[i] } );
                }
                setBeanProps(listener, lp);
            } catch (Exception e) {
                initException = new SchedulerException(
                        "TriggerListener ‘" + listenerClass
                                + "‘ props could not be configured.", e);
                throw initException;
            }
            triggerListeners[i] = listener;
        }

获取线程执行器

可以通过属性org.quartz.threadExecutor.class设置线程执行器,如果没有指定,默认为DefaultThreadExecutor,见代码清单13。

代码清单13

        String threadExecutorClass = cfg.getStringProperty(PROP_THREAD_EXECUTOR_CLASS);
        if (threadExecutorClass != null) {
            tProps = cfg.getPropertyGroup(PROP_THREAD_EXECUTOR, true);
            try {
                threadExecutor = (ThreadExecutor) loadHelper.loadClass(threadExecutorClass).newInstance();
                log.info("Using custom implementation for ThreadExecutor: " + threadExecutorClass);

                setBeanProps(threadExecutor, tProps);
            } catch (Exception e) {
                initException = new SchedulerException(
                        "ThreadExecutor class ‘" + threadExecutorClass + "‘ could not be instantiated.", e);
                throw initException;
            }
        } else {
            log.info("Using default implementation for ThreadExecutor");
            threadExecutor = new DefaultThreadExecutor();
        }

创建脚本执行工厂

如果需要作业运行在事务中(可以通过属性org.quartz.scheduler.wrapJobExecutionInUserTransaction指定),则创建JTAJobRunShellFactory,否则创建JTAAnnotationAwareJobRunShellFactory,见代码清单14。

代码清单14

            JobRunShellFactory jrsf = null; // Create correct run-shell factory...

            if (userTXLocation != null) {
                UserTransactionHelper.setUserTxLocation(userTXLocation);
            }

            if (wrapJobInTx) {
                jrsf = new JTAJobRunShellFactory();
            } else {
                jrsf = new JTAAnnotationAwareJobRunShellFactory();
            }

生成调度实例ID

如果需要自动生成调度实例ID(可以通过属性org.quartz.scheduler.instanceId为AUTO或者SYS_PROP,其中当指定为AUTO时,则instanceIdGeneratorClass由org.quartz.scheduler.instanceIdGenerator.class属性指定,默认为org.quartz.simpl.SimpleInstanceIdGenerator;当指定为SYS_PROP,则instanceIdGeneratorClass等于org.quartz.simpl.SystemPropertyInstanceIdGenerator),那么调度实例ID为NON_CLUSTERED,当JobStore支持集群部署,那么调度实例ID将由调度实例ID生成器instanceIdGenerator产生,见代码清单15。(注:当不需要自动生成调度实例ID时,可以通过属性org.quartz.scheduler.instanceId指定)

代码清单15

            if (autoId) {
                try {
                  schedInstId = DEFAULT_INSTANCE_ID;
                  if (js.isClustered()) {
                      schedInstId = instanceIdGenerator.generateInstanceId();
                  }
                } catch (Exception e) {
                    getLog().error("Couldn‘t generate instance Id!", e);
                    throw new IllegalStateException("Cannot run without an instance id.");
                }
            }

设置JobStore的数据库错误重试的间隔及现场执行器

JobStoreSupport是JobStore的抽象实现类,只有继承自JobStoreSupport的具体实现类(例如org.springframework.scheduling.quartz.LocalDataSourceJobStore)才可以通过调用其setDbRetryInterval方法设置数据库错误重试间隔(dbFailureRetry属性默认为15000,也可以通过设置org.quartz.scheduler.dbFailureRetryInterval属性进行指定),setThreadExecutor方法用于设置JobStoreSupport的线程执行器,见代码清单16。

代码清单16

            if (js instanceof JobStoreSupport) {
                JobStoreSupport jjs = (JobStoreSupport)js;
                jjs.setDbRetryInterval(dbFailureRetry);
                if(threadsInheritInitalizersClassLoader)
                    jjs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);

                jjs.setThreadExecutor(threadExecutor);
            }

构造QuartzSchedulerResources

在构造QuartzSchedulerResources的过程中(见代码清单17),设置了很多属性,现在列举如下:

属性名称 含义 备注
name 调度名称 可以由org.quartz.scheduler.instanceName属性指定
threadName 调度线程名称 可以由org.quartz.scheduler.threadName属性指定,默认等于调度名称加后缀_QuartzSchedulerThread产生
instanceId 调度实例ID 可以由org.quartz.scheduler.instanceId属性指定,具体生成规则见文中描述
jobRunShellFactory 作业运行脚本工厂 可以由org.quartz.scheduler.wrapJobExecutionInUserTransaction属性指定,具体实现有JTAJobRunShellFactory和JTAAnnotationAwareJobRunShellFactory两种
makeSchedulerThreadDaemon 调度线程是否是后台线程 可以由org.quartz.scheduler.makeSchedulerThreadDaemon属性指定
threadsInheritInitalizersClassLoader 线程是否继承初始化的类加载器 可以由org.quartz.scheduler.threadsInheritContextClassLoaderOfInitializer属性指定
runUpdateCheck 运行时是否检查Quartz的可用更新版本 可以由org.quartz.scheduler.skipUpdateCheck属性指定,runUpdateCheck与指定值相反
batchTimeWindow 在时间窗口前批量触发 可以由org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow属性指定
maxBatchSize 最大批量执行的作业数 可以由org.quartz.scheduler.batchTriggerAcquisitionMaxCount属性指定
interruptJobsOnShutdown 当关闭作业时,中断作业线程 可以由org.quartz.scheduler.interruptJobsOnShutdown属性指定
interruptJobsOnShutdownWithWait 当关闭作业时,等待中断作业线程 可以由org.quartz.scheduler.interruptJobsOnShutdownWithWait属性指定
threadExecutor 线程执行器 可以由org.quartz.threadExecutor.class属性指定,默认为DefaultThreadExecutor
threadPool 线程池 可以由org.quartz.threadPool.class属性指定,默认为SimpleThreadPool
jobStore 作业存储 可以由org.quartz.jobStore.class属性指定,默认为RAMJobStore

代码清单17

            QuartzSchedulerResources rsrcs = new QuartzSchedulerResources();
            rsrcs.setName(schedName);
            rsrcs.setThreadName(threadName);
            rsrcs.setInstanceId(schedInstId);
            rsrcs.setJobRunShellFactory(jrsf);
            rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon);
            rsrcs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
            rsrcs.setRunUpdateCheck(!skipUpdateCheck);
            rsrcs.setBatchTimeWindow(batchTimeWindow);
            rsrcs.setMaxBatchSize(maxBatchSize);
            rsrcs.setInterruptJobsOnShutdown(interruptJobsOnShutdown);
            rsrcs.setInterruptJobsOnShutdownWithWait(interruptJobsOnShutdownWithWait);
            rsrcs.setJMXExport(jmxExport);
            rsrcs.setJMXObjectName(jmxObjectName);

            if (managementRESTServiceEnabled) {
                ManagementRESTServiceConfiguration managementRESTServiceConfiguration = new ManagementRESTServiceConfiguration();
                managementRESTServiceConfiguration.setBind(managementRESTServiceHostAndPort);
                managementRESTServiceConfiguration.setEnabled(managementRESTServiceEnabled);
                rsrcs.setManagementRESTServiceConfiguration(managementRESTServiceConfiguration);
            }

            if (rmiExport) {
                rsrcs.setRMIRegistryHost(rmiHost);
                rsrcs.setRMIRegistryPort(rmiPort);
                rsrcs.setRMIServerPort(rmiServerPort);
                rsrcs.setRMICreateRegistryStrategy(rmiCreateRegistry);
                rsrcs.setRMIBindName(rmiBindName);
            }

            SchedulerDetailsSetter.setDetails(tp, schedName, schedInstId);

            rsrcs.setThreadExecutor(threadExecutor);
            threadExecutor.initialize();

            rsrcs.setThreadPool(tp);
            if(tp instanceof SimpleThreadPool) {
                if(threadsInheritInitalizersClassLoader)
                    ((SimpleThreadPool)tp).setThreadsInheritContextClassLoaderOfInitializingThread(threadsInheritInitalizersClassLoader);
            }
            tp.initialize();
            tpInited = true;

            rsrcs.setJobStore(js);

            // add plugins
            for (int i = 0; i < plugins.length; i++) {
                rsrcs.addSchedulerPlugin(plugins[i]);
            }

构造QuartzScheduler

构造QuartzScheduler的代码如下:

            qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
            qsInited = true;

QuartzScheduler的构造器实现见代码清单18,其处理步骤如下:

  1. 设置QuartzSchedulerResources;
  2. QuartzSchedulerResources设置的JobStore如果实现了JobListener接口,那么将其作为作业监听器添加到监听器列表;
  3. 构造线程QuartzSchedulerThread实例;
  4. 从QuartzSchedulerResources中获取设置的线程执行器;
  5. 启动QuartzSchedulerThread;
  6. 创建执行作业管理器ExecutingJobsManager,由于其实现了JobListener,所以加入了内置的作业监听器中;
  7. 创建错误日志组件ErrorLogger,由于继承了SchedulerListenerSupport,所以加入了内置的调度监听器中;
  8. 构造SchedulerSignalerImpl,此组件的作业包括:向QuartzScheduler中注册的触发器监听器发送触发器失常或者触发器再也不会被触发的信号、修改触发器下次触发的时间、向QuartzScheduler中注册的调度监听器发送作业被删除或者调度异常的信号;
  9. 当shouldRunUpdateCheck为true是则调用scheduleUpdateCheck方法(见代明清单19),实际是利用定时器定时执行UpdateChecker任务,此任务用于检查Quartz的可用的更新版本;为了提高性能,可以将属性org.quartz.scheduler.skipUpdateCheck设置为true;

代码清单18

    public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
        throws SchedulerException {
        this.resources = resources;
        if (resources.getJobStore() instanceof JobListener) {
            addInternalJobListener((JobListener)resources.getJobStore());
        }

        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        schedThreadExecutor.execute(this.schedThread);
        if (idleWaitTime > 0) {
            this.schedThread.setIdleWaitTime(idleWaitTime);
        }

        jobMgr = new ExecutingJobsManager();
        addInternalJobListener(jobMgr);
        errLogger = new ErrorLogger();
        addInternalSchedulerListener(errLogger);

        signaler = new SchedulerSignalerImpl(this, this.schedThread);

        if(shouldRunUpdateCheck())
            updateTimer = scheduleUpdateCheck();
        else
            updateTimer = null;

        getLog().info("Quartz Scheduler v." + getVersion() + " created.");
    }

代码清单19

    private Timer scheduleUpdateCheck() {
        Timer rval = new Timer(true);
        rval.scheduleAtFixedRate(new UpdateChecker(), 1000, 7 * 24 * 60 * 60 * 1000L);
        return rval;
    }

构造QuartzSchedulerThread

这里再详细分析下QuartzSchedulerThread的构造过程,其构造器见代码清单20。

代码清单20

    QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs) {
        this(qs, qsRsrcs, qsRsrcs.getMakeSchedulerThreadDaemon(), Thread.NORM_PRIORITY);
    }

QuartzSchedulerThread的构造器又代理了另一个构造器,见代码清单21。

代码清单21

    QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs, boolean setDaemon, int threadPrio) {
        super(qs.getSchedulerThreadGroup(), qsRsrcs.getThreadName());
        this.qs = qs;
        this.qsRsrcs = qsRsrcs;
        this.setDaemon(setDaemon);
        if(qsRsrcs.isThreadsInheritInitializersClassLoadContext()) {
            log.info("QuartzSchedulerThread Inheriting ContextClassLoader of thread: " + Thread.currentThread().getName());
            this.setContextClassLoader(Thread.currentThread().getContextClassLoader());
        }

        this.setPriority(threadPrio);

        // start the underlying thread, but put this object into the ‘paused‘
        // state
        // so processing doesn‘t start yet...
        paused = true;
        halted = new AtomicBoolean(false);
    }

代码清单21比较简单,QuartzScheduler的getSchedulerThreadGroup方法用于创建线程组,QuartzSchedulerResources的isThreadsInheritInitializersClassLoadContext方法实际获取QuartzSchedulerResources的属性threadsInheritInitializersClassLoadContext,此属性如果为真,则设置QuartzSchedulerThread的线程上下文类加载器为当前线程的类加载器,设置paused标志为true,以便于QuartzSchedulerThread线程不能开始处理。halted可以解释为叫停当前线程的执行。

阻止QuartzSchedulerThread的执行

由于在构造QuartzScheduler的过程中已经启动了QuartzSchedulerThread,那么势必导致此线程的执行,其run方法的部分代码见代码清单22.

代码清单22

    public void run() {
        boolean lastAcquireFailed = false;

        while (!halted.get()) {
            try {
                // check if we‘re supposed to pause...
                synchronized (sigLock) {
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }
                    }

                    if (halted.get()) {
                        break;
                    }
                }

我们并未叫停调度线程的执行,所以halted属性等于false,对于paused标志而言,这里涉及多线程安全问题,所以这里使用了同步块,但是实际上可以通过调整代码将paused用volatile修饰,这样通过内存可见性省去同步,能够提高性能。由于paused标志在线程刚开始执行时为false,那么这里的white循环将不断轮询,每次循环线程wait一秒。既然QuartzSchedulerThread已经开始执行,但是却又无法执行,岂不是自相矛盾?虽然QuartzSchedulerThread线程开始启动,但是QuartzScheduler并未准备好这一切,必须等到QuartzScheduler准备时将paused修改为false。虽说这样实现也是可以的,但是在QuartzScheduler准备好的这段时间内,QuartzSchedulerThread线程频繁的睡眠、被唤醒,线程上下文来回切换,耗费了一些性能。何不等到QuartzScheduler准备好时再启动QuartzSchedulerThread线程呢?

创建调度器

创建调度器的代码如下

            // Create Scheduler ref...
            Scheduler scheduler = instantiate(rsrcs, qs);

其他处理

剩余的工作包括:设置作业工厂,对插件初始化,给QuartzScheduler的监听器管理器注册作业监听器和触发器监听器,设置调度器上下文属性,触发JobStore,触发脚本运行工厂,将调度器注册到SchedulerRepository等,见代码清单23。

代码清单23

            // set job factory if specified
            if(jobFactory != null) {
                qs.setJobFactory(jobFactory);
            }

            // Initialize plugins now that we have a Scheduler instance.
            for (int i = 0; i < plugins.length; i++) {
                plugins[i].initialize(pluginNames[i], scheduler, loadHelper);
            }

            // add listeners
            for (int i = 0; i < jobListeners.length; i++) {
                qs.getListenerManager().addJobListener(jobListeners[i], EverythingMatcher.allJobs());
            }
            for (int i = 0; i < triggerListeners.length; i++) {
                qs.getListenerManager().addTriggerListener(triggerListeners[i], EverythingMatcher.allTriggers());
            }

            // set scheduler context data...
            for(Object key: schedCtxtProps.keySet()) {
                String val = schedCtxtProps.getProperty((String) key);
                scheduler.getContext().put((String)key, val);
            }

            // fire up job store, and runshell factory

            js.setInstanceId(schedInstId);
            js.setInstanceName(schedName);
            js.setThreadPoolSize(tp.getPoolSize());
            js.initialize(loadHelper, qs.getSchedulerSignaler());

            jrsf.initialize(scheduler);

            qs.initialize();

            getLog().info(
                    "Quartz scheduler ‘" + scheduler.getSchedulerName()
                            + "‘ initialized from " + propSrc);

            getLog().info("Quartz scheduler version: " + qs.getVersion());

            // prevents the repository from being garbage collected
            qs.addNoGCObject(schedRep);
            // prevents the db manager from being garbage collected
            if (dbMgr != null) {
                qs.addNoGCObject(dbMgr);
            }

            schedRep.bind(scheduler);
            return scheduler;

总结

可以看到创建调度器的过程,几乎完全是顺序编程,步骤也十分清楚。但是可以看到其中可以优化的地方也比较多,另外代码组织稍微不太合理,例如instantiate方法的长度1355-579=776行。

后记:个人总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。

京东:http://item.jd.com/11846120.html

当当:http://product.dangdang.com/23838168.html

时间: 2024-10-17 00:26:18

Quartz与Spring集成——创建调度器的相关文章

Quartz与Spring集成——启动调度器

前言 在<Quartz与Spring集成--创建调度器>一文中介绍了调度器的创建过程,本文将分析其启动过程.熟悉Spring原理的人都知道AbstractApplicationContext的refresh方法的重要性,在refresh方法中调用了finishRefresh方法,最后会调用到SchedulerFactoryBean的start方法,其调用栈如图1所示. 图1 SchedulerFactoryBean的start方法的调用栈 根据图1的内容,我们知道spring容器初始化完毕的最

Quartz与Spring集成——QuartzSchedulerThread的执行分析

前言 前面说过当paused设置为false,QuartzSchedulerThread才正式启动,我们接着<Quartz与Spring集成--创建调度器>与<Quartz与Spring集成--启动调度器>中QuartzSchedulerThread启动的部分接着展开分析,QuartzSchedulerThread的run方法紧接着会从线程池获取可用的线程数,代码如下: int availThreadCount = qsRsrcs.getThreadPool().blockForA

Quartz与Spring集成 Job如何自动注入Spring容器托管的对象

在Spring中使用Quartz有两种方式实现:第一种是任务类继承QuartzJobBean,第二种则是在配置文件里定义任务类和要执行的方法,类和方法可以是普通类.很显然,第二种方式远比第一种方式来的灵活. 测试环境 Spring3 M2 quartz-2.1.7 我们要达到这样的效果 public class CancelUnpaidOrderTask implements Job { @Autowired private AppOrderService orderService; @Over

Quartz与Spring集成

待写... Job类是由Quartz实例化的,并非由Spring管理,如果Job类中使用了Spring的依赖注入(@Autowired,@Resource等),是无法注入的. 如何解决?请参考这篇博客:http://www.cnblogs.com/daxin/p/3608320.html

spring定时任务之-quartz调度器

公司项目里面有个定时任务,一直想弄清楚其流程,以为很难,今天有空研究了一下,发现其实实现很简单. 查资料发现spring的定时任务有很多种,quartz调度器也有两种一种是作业类需继承自特定的基类:org.springframework.scheduling.quartz.QuartzJobBean. 另一种则是作业类不需继承特定基类.我觉得第二种更加简单简洁,公司项目里面也是用到的这种方法.接下来就讲一下具体编写步骤: 第一步:编写任务类 Java代码   package com.grx.pa

Spring中Quartz调度器的使用

一.Quartz的特点 * 按作业类的继承方式来分,主要有以下两种: 1.作业类继承org.springframework.scheduling.quartz.QuartzJobBean类的方式 2.作业类不继承org.springframework.scheduling.quartz.QuartzJobBean类的方式 注:个人比较推崇第二种,因为这种方式下的作业类仍然是POJO. * 按任务调度的触发时机来分,主要有以下两种: 1.每隔指定时间则触发一次,对应的调度器为org.springf

Quartz学习总结(1)——Spring集成Quartz框架

一.Quartz简介 Quartz是OpenSymphony开源组织在Job scheduling领域又一个开源项目,它可以与J2EE与J2SE应用程序相结合也可以单独使用.Quartz可以用来创建简单或为运行十个,百个,甚至是好几万个Jobs这样复杂的程序.Jobs可以做成标准的Java组件或 EJBs.Quartz 是个开源的作业调度框架,为在 Java 应用程序中进行作业调度提供了简单却强大的机制.Quartz 允许开发人员根据时间间隔(或天)来调度作业.它实现了作业和触发器的多对多关系,

Quartz与Spring的集成

一.Quartz简介 Quartz是OpenSymphony开源组织在Job scheduling领域又一个开源项目,它可以与J2EE与J2SE应用程序相结合也可以单独使用.Quartz可以用来创建简单或为运行十个,百个,甚至是好几万个Jobs这样复杂的程序.Jobs可以做成标准的Java组件或 EJBs.Quartz 是个开源的作业调度框架,为在 Java 应用程序中进行作业调度提供了简单却强大的机制.Quartz 允许开发人员根据时间间隔(或天)来调度作业.它实现了作业和触发器的多对多关系,

[Spring笔记]支持注解的Spring调度器

概述 如果想在Spring中使用任务调度功能,除了集成调度框架Quartz这种方式,也可以使用Spring自己的调度任务框架. 使用Spring的调度框架,优点是:支持注解(@Scheduler),可以省去大量的配置. 实时触发调度任务 TaskScheduler接口 Spring3引入了TaskScheduler接口,这个接口定义了调度任务的抽象方法. TaskScheduler接口的声明: public interface TaskScheduler { ScheduledFuture<?>