1、什么是quartz?
quartz是一个开源的定时任务框架,具备将定时任务持久化至数据库以及分布式环境下多节点调度的能力。当当的elastic-job便是以quartz为基础,结合zookeeper开发出来的一款产品。
2、整合springboot示例
项目使用springboot提高开发效率,并将定时任务持久化到mysql数据库中。
2.1)引入quartz依赖
<dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz-jobs</artifactId> <version>2.3.0</version> </dependency>
2.2)配置quartz
先引入配置的config代码,在下一节中对每一段配置进行详细解释:
1 /** 2 * 分布式定时任务quartz配置 3 * Created by chenjunyi on 2018/6/6. 4 */ 5 @Configuration 6 public class QuartzConfiguration { 7 8 /** 9 * quartz的JobFactory,根据注册的JobClass从spring应用上下文中获取job实例 10 */ 11 public static class AutoSpringBeanJobFactory extends AdaptableJobFactory implements SchedulerContextAware { 12 13 /** spring应用上下文 */ 14 private ApplicationContext applicationContext; 15 16 /** scheduler上下文 */ 17 private SchedulerContext schedulerContext; 18 19 /** 需要忽略的属性 */ 20 private String[] ignoredUnknownProperties = null; 21 22 private void setApplicationContext(ApplicationContext applicationContext) { 23 this.applicationContext = applicationContext; 24 } 25 26 @Override 27 public void setSchedulerContext(SchedulerContext schedulerContext) { 28 this.schedulerContext = schedulerContext; 29 } 30 31 private void setIgnoredUnknownProperties(String... ignoredUnknownProperties) { 32 this.ignoredUnknownProperties = ignoredUnknownProperties; 33 } 34 35 @Override 36 protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception { 37 //获取定时任务的clazz,并从spring上下文中获取实例 38 Class<? extends Job> clazz = bundle.getJobDetail().getJobClass(); 39 Job job = applicationContext.getBean(clazz); 40 41 if (isEligibleForPropertyPopulation(job)) { 42 //非继承自QuartzJobBean的Job,设置job属性 43 BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(job); 44 MutablePropertyValues pvs = new MutablePropertyValues(); 45 if (this.schedulerContext != null) { 46 pvs.addPropertyValues(this.schedulerContext); 47 } 48 pvs.addPropertyValues(bundle.getJobDetail().getJobDataMap()); 49 pvs.addPropertyValues(bundle.getTrigger().getJobDataMap()); 50 if (this.ignoredUnknownProperties != null) { 51 for (String propName : this.ignoredUnknownProperties) { 52 if (pvs.contains(propName) && !bw.isWritableProperty(propName)) { 53 pvs.removePropertyValue(propName); 54 } 55 } 56 bw.setPropertyValues(pvs); 57 } else { 58 bw.setPropertyValues(pvs, true); 59 } 60 } 61 return job; 62 } 63 64 private boolean isEligibleForPropertyPopulation(Object jobObject) { 65 return (!(jobObject instanceof QuartzJobBean)); 66 } 67 68 } 69 70 /** 71 * 配置任务工厂实例 72 * @param applicationContext spring上下文实例 73 * @return 任务工厂实例 74 */ 75 @Bean 76 public JobFactory jobFactory(ApplicationContext applicationContext) { 77 AutoSpringBeanJobFactory jobFactory = new AutoSpringBeanJobFactory(); 78 jobFactory.setApplicationContext(applicationContext); 79 return jobFactory; 80 } 81 82 /** 83 * 配置任务调度器,使用项目数据源作为quartz数据源 84 * @param jobFactory 自定义配置任务工厂 85 * @param dataSource 数据源实例 86 * @return 任务调度器 87 */ 88 @Bean 89 public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory, DataSource dataSource) { 90 SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean(); 91 //将spring管理job自定义工厂交由调度器维护 92 schedulerFactoryBean.setJobFactory(jobFactory); 93 //设置覆盖已存在的任务(配置已失效,因为改写了原有的注册方式,JOB注册时便已自动进行替换) 94 schedulerFactoryBean.setOverwriteExistingJobs(true); 95 //项目启动完成后,等待50秒后开始执行调度器初始化(需要小于JOB的间隔时间) 96 schedulerFactoryBean.setStartupDelay(50); 97 //设置调度器自动运行 98 schedulerFactoryBean.setAutoStartup(true); 99 //设置数据源,使用与项目统一数据源 100 schedulerFactoryBean.setDataSource(dataSource); 101 //设置定时调度器命名空间 102 schedulerFactoryBean.setSchedulerName("MY-QUARTZ-SCHEDULER"); 103 //设置存储在quartz上文中的Spring应用上下文key 104 schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContext"); 105 //设置属性 106 Properties properties = new Properties(); 107 //设置调度器实例名 108 properties.setProperty("org.quartz.scheduler.instanceName", "SCHEDULER-INSTANCE"); 109 //设置调度器实例ID,在cluster中使用,AUTO标识自动生成 110 properties.setProperty("org.quartz.scheduler.instanceId", "AUTO"); 111 //禁用rmi配置 112 properties.setProperty("org.quartz.scheduler.rmi.export", "false"); 113 //禁用rmi配置 114 properties.setProperty("org.quartz.scheduler.rmi.proxy", "false"); 115 //quartz线程池实现类 116 properties.setProperty("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool"); 117 //quartz线程池线程数 118 properties.setProperty("org.quartz.threadPool.threadCount", "10"); 119 //quartz线程池线程优先级 120 properties.setProperty("org.quartz.threadPool.threadPriority", "5"); 121 //quartz线程池是否自动加载数据库内的定时任务 122 properties.setProperty("org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread", "true"); 123 //Job错过执行时间的阈值 124 properties.setProperty("org.quartz.jobStore.misfireThreshold", "60000"); 125 //Job持久化方式配置 126 properties.setProperty("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX"); 127 //Job的JDBC持久化驱动,此处配置为MySql 128 properties.setProperty("org.quartz.jobStore.driverDelegateClass", "org.quartz.impl.jdbcjobstore.StdJDBCDelegate"); 129 //配置是否使用 130 properties.setProperty("org.quartz.jobStore.useProperties", "false"); 131 //持久化的quartz表结构前缀 132 properties.setProperty("org.quartz.jobStore.tablePrefix", "QRTZ_"); 133 //是否是集群quartz 134 properties.setProperty("org.quartz.jobStore.isClustered", "true"); 135 //集群quartz中节点有效性检查时间间隔 136 properties.setProperty("org.quartz.jobStore.clusterCheckinInterval", "20000"); 137 //错过执行时间的Job最大持有数 138 properties.setProperty("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1"); 139 schedulerFactoryBean.setQuartzProperties(properties); 140 //返回结果 141 return schedulerFactoryBean; 142 } 143 144 }
2.3)编写定时任务
为了各个定时任务方便注册,使用模板模式定义一个抽象基类。该抽象基类实现InterruptableJob接口,当然也可以实现其他接口或抽象类(该继承树的顶级接口为org.quartz.Job)。
1 /** 2 * 抽象定时任务,完成向quartz注册的功能 3 * Created by chenjunyi on 2018/6/6. 4 */ 5 @Slf4j 6 public abstract class AbstractScheduler implements InterruptableJob { 7 8 @Autowired 9 private Scheduler scheduler; 10 11 /** 12 * 向定时任务调度器注册 13 * @throws SchedulerException 注册时发生异常 14 */ 15 @PostConstruct 16 protected void register() throws SchedulerException { 17 //任务和触发器名称(若不进行设置,则quartz默认使用UUID,因此每次启动应用都会注册一个新任务) 18 String jobName = this.getClass().getSimpleName() + "Job"; 19 String triggerName = this.getClass().getSimpleName() + "Trigger"; 20 //设置定时任务 21 JobDetail jobDetail = JobBuilder.newJob(this.getClass()).withIdentity(jobName).build(); 22 //创建任务触发器 23 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(getCron()); 24 Trigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerName).withSchedule(scheduleBuilder).build(); 25 //将触发器与任务绑定到调度器内 26 Set<Trigger> triggers = new HashSet<>(); 27 triggers.add(trigger); 28 scheduler.scheduleJob(jobDetail, triggers, true); 29 log.info(">>>>>注册[Trigger={}, Job={}]的定时任务成功", triggerName, jobName); 30 } 31 32 /** 33 * 获取cron表达式 34 * @return cron表达式 35 */ 36 protected abstract String getCron(); 37 38 }
实现一个自定义的demo定时任务,并通过@Service交于Spring-IOC容器进行托管,代码如下:
1 /** 2 * 示例定时任务 3 * Created by chenjunyi on 2018/6/5. 4 */ 5 @Service 6 @PersistJobDataAfterExecution 7 @DisallowConcurrentExecution 8 public class DemoScheduler extends AbstractScheduler { 9 10 @Value("${env.cron.demoScheduler}") 11 private String cron; 12 13 @Autowired 14 private DemoService demoService; 15 16 @Override 17 public void interrupt() throws UnableToInterruptJobException { 18 19 } 20 21 @Override 22 public void execute(JobExecutionContext context) throws JobExecutionException { 23 demoService.sayHello(); 24 } 25 26 @Override 27 protected String getCron() { 28 return this.cron; 29 } 30 31 }
3、整合代码详解
3.1)AutoSpringBeanJobFactory
JobFactory,即定时任务Job工厂,在定时任务触发时,通过该factory获取定时任务实例并执行。该接口的继承树比较简单,实现的子类只有2个,如下图所示:
不论是AdaptableJobFactory还是SimpleJobFactory,其获取Job实例的方式都比较简单,即通过JobDetail的Class直接newInstance,以AdaptableJobFactory为例,相关源码如下:
1 @Override 2 public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException { 3 try { 4 Object jobObject = createJobInstance(bundle); //直接根据class.newInstance获取实例 5 return adaptJob(jobObject); //job适配,判断job类型并进行包装,此处忽略 6 } catch (Exception ex) { 7 throw new SchedulerException("Job instantiation failed", ex); 8 } 9 } 10 11 protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { 12 return bundle.getJobDetail().getJobClass().newInstance(); 13 }
但这样带来了一个问题,因为我们的定时任务可能会依赖spring-ioc容器中的其他bean(需要进行注入),直接newInstance创建的实例无法被spring-ioc容器托管,从而在执行时拿不到注入的对象,导致NPE(NullPointException)。为了解决这个问题,在网上搜索了解决办法,给出的思路基本都是通过newInstance拿到Job对象后,再手动将其托管到spring-ioc容器中,如下代码所示:
1 //参考博客链接:https://www.jianshu.com/p/d52d62fb2ac6;作者:恒宇少年 2 public static class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements ApplicationContextAware { 3 4 /** spring-beanfactory */ 5 private transient AutowireCapableBeanFactory beanFactory; 6 7 @Override 8 public void setApplicationContext(final ApplicationContext context) { 9 beanFactory = context.getAutowireCapableBeanFactory(); 10 } 11 12 @Override 13 protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception { 14 final Object job = super.createJobInstance(bundle); 15 beanFactory.autowireBean(job); //通过newInstance获取job实例后,将其交付给spring-ioc 16 return job; 17 } 18 }
通过在AutowiringSpringBeanJobFactory中维护一个AutowireCapableBeanFactory,并且在获取Job实例之后使用autowireBean方法将其交给spring-ioc进行托管,从而保证该实例在执行过程中能够拿到注入的对象。
这样的处理同样有个问题,就是每次触发定时任务时,都要newInstance创建对象,那么能不能让Job在spring-ioc容器启动时便被托管(即使用@Service等注解),然后直接获取到这个托管的对象呢。解决这个问题的思路就在于获取Job实例的方式,既然我们可以获取到Job的Class(由Job在向Scheduler注册时设置),那么便可以通过ApplicationContext拿到相应的实例。
因此,在AutoSpringBeanJobFactory中,取代了AutowireCapableBeanFactory,维护一个ApplicationContext,并通过Job的Class获取实例。
1 Class<? extends Job> clazz = bundle.getJobDetail().getJobClass(); 2 Job job = applicationContext.getBean(clazz);
3.2)AbstractScheduler
该抽象基类实现了自动注册的逻辑,不需要在config中配置额外的注册。思路是通过@PostConstruct注解,在该类被Spring完成初始化后,执行注册动作,有以下几点需要注意:
- 需要设置任务和触发器名称。如果不设置,quartz默认使用UUID,因此每次启动应用都会注册一个新任务;
- 调度器的绑定,要使用含有replace功能的方法(方法参数带有boolean replace的)。由于是将Job持久化到数据库,应用再次启动时,会读取数据库中的任务列表。不含replace功能的方法在进行一次新的注册时,发现任务已存在的话,就会报错;而含replace功能的方法会更新数据库的Job配置信息;
- 由于我们在Job初始化时便进行了任务注册,且采用的是replace的方式,因此在config中的schedulerFactoryBean.setOverwriteExistingJobs(true)该条配置便失效了(因为SchedulerFactoryBean在afterPropertiesSet这个属于SpringBean的生命周期方法中,调用了自身的registerJobsAndTriggers方法,该方法会根据此参数决定是否调用含replace功能的绑定方法进行更新Job,我们自己的Job注册实现中便完成了此功能);
- 继续上一条,值得注意的是,SchedulerFactoryBean.registerJobsAndTriggers的方法中,会根据是否设置了TransactionManager来决定是否将所有Job和Trigger的更新放在同一个事务中,由于目前的应用没有需要使用事务来更新Job的需求,并且若更新失败,启动应用时会抛出异常,因此该问题放置待解决(解决办法也很简单,在register方法中添加事务控制即可);
原文地址:https://www.cnblogs.com/manayi/p/9152546.html