Spring Batch(4): Job详解

第四章 配置作业Job

4.1 基本配置

Job的配置有3个必须的属性,name,jobRepository,steps。一个简单的Job配置如下:

<job id="footballJob">
    <step id="playerload"          parent="s1" next="gameLoad"/>
    <step id="gameLoad"            parent="s2" next="playerSummarization"/>
    <step id="playerSummarization" parent="s3"/>
</job>

jobRepository默认引用名称为jobRepository的bean,当然也可以显式地配置:

<job id="footballJob" job-repository="specialRepository">
    <step id="playerload"          parent="s1" next="gameLoad"/>
    <step id="gameLoad"            parent="s3" next="playerSummarization"/>
    <step id="playerSummarization" parent="s3"/>
</job>
4.1.1 Restartable属性

该属性定义Job是否可以被重启,默认为true,在JobExecution执行失败后,可以创建另一个JobExecution来继续上次的执行。但是如果该属性设为false,重新执行该JobInstance将抛出异常。

<job id="footballJob" restartable="false">
    ...
</job>
4.1.2 拦截Job执行

Spring Batch在Job的生命周期中提供了一些钩子方法,可这些钩子方法通过Listener的形式提供。JobListener的接口定义如下:

public interface JobExecutionListener {

    void beforeJob(JobExecution jobExecution);

    void afterJob(JobExecution jobExecution);

}

通过实现JobExecutionListener接口并配置给Job,可以在Job执行前后执行特定的逻辑。例如在执行结束之后,如果失败,发送邮件通知管理人员等。

<job id="footballJob">
    <step id="playerload"          parent="s1" next="gameLoad"/>
    <step id="gameLoad"            parent="s2" next="playerSummarization"/>
    <step id="playerSummarization" parent="s3"/>
    <listeners>
        <listener ref="sampleListener"/>
    </listeners>
</job>

需要注意的是,无论Job是否成功执行,afterJob方法都会执行,Job是否执行成功,可以从JobExecution中获取。

public void afterJob(JobExecution jobExecution){
    if( jobExecution.getStatus() == BatchStatus.COMPLETED ){
        //job success
    }
    else if(jobExecution.getStatus() == BatchStatus.FAILED){
        //job failure
    }
}

Listener的执行顺序:

beforeJob与配置的顺序一样,afterJob与配置的顺序相反。

Listener异常:

Listener的执行过程中如果抛出异常,将导致Job无法继续完成,最终状态为FAILED.因此要合理控制Listener异常对业务的影响。

注解支持:

如果不想使用侵入性强的Listener接口,可以使用@BeforeJob和@AfterJob两个注解声明。

4.1.3 Job抽象与继承

通用的Job配置可以抽取出来,作为抽象的Job存在,抽象的Job不允许被实例化:

<job id="baseJob" abstract="true">
    <listeners>
        <listener ref="listenerOne"/>
    <listeners>
</job>

子Job可以通过继续共用这些配置(当然,也可以继承非抽象的Job)。

<job id="job1" parent="baseJob">
    <step id="step1" parent="standaloneStep"/>

    <listeners merge="true">
        <listener ref="listenerTwo"/>
    <listeners>
</job>

其中的merge=”true”表示合并父job和子job的配置,也就是两个Listener都生效,同常规的Spring配置。

4.1.4 Job参数验证

JobParameterValidator组件用于验证JobParameter。通过以下配置为job配置验证器:

<job id="job1" parent="baseJob3">
    <step id="step1" parent="standaloneStep"/>
    <validator ref="paremetersValidator"/>
</job>
4.1.4 属性的Late Binding

在Spring中,可以把Bean配置用到的属性值通过PropertiesPlaceHolderConfiguer把属性从配置中分离出来独立管理,理论上来说,在配置Job的时候也可以使用相同的方式。但是Spring Batch提供了在运行时配置参数值的能力:

<bean:property name="filePath" value="#{jobParameters[‘filePath‘]}" />

在启动Job时:

    launcher.executeJob("job.xml" , "footjob",
        new JobParametersBuilder().addDate("day", new Date()))
                                  .addString("filePath", "/opt/data/test.xml"));

4.2 配置JobRepository

JobRepository为任务框架中的各个组件对象提供CRUD操作,例如JobExecution,StepExecution。

一个配置例子如下:

<job-repository id="jobRepository"
    data-source="dataSource"
    transaction-manager="transactionManager"
    isolation-level-for-create="SERIALIZABLE"
    table-prefix="BATCH_"
    max-varchar-length="1000"/>
4.2.1 事务配置

JobRepository的操作需要事务来保证其完整性以及正确性,这些元数据的完整性对框架来说非常重要。如果没有事务支持,框架的行为将无法正确定义。

create*方法的事务隔离级别单独定义,为了保证同一个JobInstance不会被同时执行两次,默认的隔离级别为SERIALIZABLE,可以被修改:

<job-repository id="jobRepository"
                isolation-level-for-create="REPEATABLE_READ" />

如果没有使用Batch命名空间或者没有使用Factory Bean,则需要显示配置事务AOP:

<aop:config>
    <aop:advisor
           pointcut="execution(* org.springframework.batch.core..*Repository+.*(..))"/>
    <advice-ref="txAdvice" />
</aop:config>

<tx:advice id="txAdvice" transaction-manager="transactionManager">
    <tx:attributes>
        <tx:method name="*" />
    </tx:attributes>
</tx:advice>
4.2.2 表名前缀

默认情况下,Spring Batch需要的表以BATCH作为前缀,不过可以自定义:

<job-repository id="jobRepository"
                table-prefix="e_batch" />

表前缀可以修改,但是表名和表的列不能被修改。

4.2.3 特殊的Repository

测试环境中,内存级别的数据库十分方便:

<bean id="jobRepository"
  class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
    <property name="transactionManager" ref="transactionManager"/>
</bean>

如果使用的数据库类型不在SpringBatch的支持中,可以通过JobRepositoryFactoryBean自定义。

4.3 配置JobLauncher

默认提供了一个简单的Launcher:

<bean id="jobLauncher"
      class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
</bean>

JobLauncher的时序图如下:

如果启动的请求来自HTTP,那么等待整个Job完成再返回不是一个好方法,此时需要异步启动Job,时序图如下:

相应的Launcher配置如下:

<bean id="jobLauncher"
      class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
    <property name="taskExecutor">
        <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
    </property>
</bean>

4.4 运行Job

有多种方式可以启动一个Job,但是核心都是通过JobLauncher来实现。

1. 命令行运行

主要通过CommandLineJobRunner类完成

2. 从Web容器中运行

通过Http请求启动任务很常见,时序图如下:

Controller可以是常规的Spring MVC Controller:

@Controller
public class JobLauncherController {

    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    Job job;

    @RequestMapping("/jobLauncher.html")
    public void handle() throws Exception{
        jobLauncher.run(job, new JobParameters());
    }
}

3. 使用调度框架运行

可以与其他调度框架一起使用,例如使用Spring的轻量级调用框架Spring Scheduler或者Quartz

4.5 元数据的高级用法

除了通过JobRepository对元数据进行CRUD操作外,Spring batch还提供另外的接口用于访问元数据。

包括: JobExplorer JobOperator。整体结构如下:

4.5.1 JobExplorer

该组件提供了只读的查询操作,是JobRepository的只读版本,接口定义如下:

public interface JobExplorer {

    List<JobInstance> getJobInstances(String jobName, int start, int count);

    JobExecution getJobExecution(Long executionId);

    StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);

    JobInstance getJobInstance(Long instanceId);

    List<JobExecution> getJobExecutions(JobInstance jobInstance);

    Set<JobExecution> findRunningJobExecutions(String jobName);
}

配置一个Bean如下:

<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
      p:dataSource-ref="dataSource" />

如果需要制定表名前缀:

<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
      p:dataSource-ref="dataSource" p:tablePrefix="BATCH_" />
4.5.2 JobOperator

JobOperator集成了很多接口定义,提供了综合的操作方法。定义如下:

public interface JobOperator {

    List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;

    List<Long> getJobInstances(String jobName, int start, int count)
          throws NoSuchJobException;

    Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;

    String getParameters(long executionId) throws NoSuchJobExecutionException;

    Long start(String jobName, String parameters)
          throws NoSuchJobException, JobInstanceAlreadyExistsException;

    Long restart(long executionId)
          throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
                  NoSuchJobException, JobRestartException;

    Long startNextInstance(String jobName)
          throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
                 JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;

    boolean stop(long executionId)
          throws NoSuchJobExecutionException, JobExecutionNotRunningException;

    String getSummary(long executionId) throws NoSuchJobExecutionException;

    Map<Long, String> getStepExecutionSummaries(long executionId)
          throws NoSuchJobExecutionException;

    Set<String> getJobNames();

}

配置:

<bean id="jobOperator" class="org.spr...SimpleJobOperator">
    <property name="jobExplorer">
        <bean class="org.spr...JobExplorerFactoryBean">
            <property name="dataSource" ref="dataSource" />
        </bean>
    </property>
    <property name="jobRepository" ref="jobRepository" />
    <property name="jobRegistry" ref="jobRegistry" />
    <property name="jobLauncher" ref="jobLauncher" />
</bean>

其中的startNextInstance方法将使用当前Job的JobParameter,经过JobParametersIncrementer处理之后的参数启动一个JobInstance。

public interface JobParametersIncrementer {

    JobParameters getNext(JobParameters parameters);

}

下面是一个简单实现:

public class SampleIncrementer implements JobParametersIncrementer {

    public JobParameters getNext(JobParameters parameters) {
        if (parameters==null || parameters.isEmpty()) {
            return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
        }
        long id = parameters.getLong("run.id",1L) + 1;
        return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
    }
}

为job配置incrementer:

<job id="footballJob" incrementer="sampleIncrementer">
    ...
</job>

在每天处理一次的批处理中,Incrementer的实现可能是按日期递增。

时间: 2024-10-11 11:46:56

Spring Batch(4): Job详解的相关文章

Spring中AOP实例详解

Spring中AOP实例详解 需要增强的服务 假如有以下service,他的功能很简单,打印输入的参数并返回参数. @Service public class SimpleService { public String getName(String name) { System.out.println(get name is: + name); return name; } } 定义切面和切点 @Component @Aspect public class L ogAspect { // 定义切

elasticSearch+spring 整合 maven依赖详解

摘自:http://www.mayou18.com/detail/nTxPQSyu.html [Elasticsearch基础]elasticSearch+spring 整合 maven依赖详解 Maven依赖   <!-- elasticsearch package -->        <dependency>             <groupId>fr.pilato.spring</groupId>             <artifact

Spring Data操作Redis详解

Spring Data操作Redis详解 Redis是一种NOSQL数据库,Key-Value形式对数据进行存储,其中数据可以以内存形式存在,也可以持久化到文件系统.Spring data对Redis进行了很好的封装,用起来也是十分的得心应手.Redis 是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库.缓存和消息中间件. 它支持多种类型的数据结构,如 字符串(strings), 散列(hashes), 列表(lists), 集合(sets), 有序集合(sorted se

Spring IOC源码详解之容器依赖注入

Spring IOC源码详解之容器依赖注入 上一篇博客中介绍了IOC容器的初始化,通过源码分析大致了解了IOC容器初始化的一些知识,先简单回顾下上篇的内容 载入bean定义文件的过程,这个过程是通过BeanDefinitionReader来完成的,其中通过 loadBeanDefinition()来对定义文件进行解析和根据Spring定义的bean规则进行处理 - 事实上和Spring定义的bean规则相关的处理是在BeanDefinitionParserDelegate中完成的,完成这个处理需

Spring IOC源码详解之容器初始化

Spring IOC源码详解之容器初始化 上篇介绍了Spring IOC的大致体系类图,先来看一段简短的代码,使用IOC比较典型的代码 ClassPathResource res = new ClassPathResource("beans.xml"); DefaultListableBeanFactory factory = new DefaultListableBeanFactory(); XmlBeanDefinitionReader reader = new XmlBeanDe

spring框架 AOP核心详解

AOP称为面向切面编程,在程序开发中主要用来解决一些系统层面上的问题,比如日志,事务,权限等待,Struts2的拦截器设计就是基于AOP的思想,是个比较经典的例子. 一 AOP的基本概念 (1)Aspect(切面):通常是一个类,里面可以定义切入点和通知 (2)JointPoint(连接点):程序执行过程中明确的点,一般是方法的调用 (3)Advice(通知):AOP在特定的切入点上执行的增强处理,有before,after,afterReturning,afterThrowing,around

经典Spring入门基础教程详解

经典Spring入门基础教程详解 https://pan.baidu.com/s/1c016cI#list/path=%2Fsharelink2319398594-201713320584085%2F%E7%BB%8F%E5%85%B8Spring%E5%85%A5%E9%97%A8%E5%9F%BA%E7%A1%80%E6%95%99%E7%A8%8B%E8%AF%A6%E8%A7%A3&parentPath=%2Fsharelink2319398594-201713320584085 博达远

Spring事务管理(详解+实例)

写这篇博客之前我首先读了<Spring in action>,之后在网上看了一些关于Spring事务管理的文章,感觉都没有讲全,这里就将书上的和网上关于事务的知识总结一下,参考的文章如下: Spring事务机制详解 Spring事务配置的五种方式 Spring中的事务管理实例详解 1 初步理解 理解事务之前,先讲一个你日常生活中最常干的事:取钱. 比如你去ATM机取1000块钱,大体有两个步骤:首先输入密码金额,银行卡扣掉1000元钱:然后ATM出1000元钱.这两个步骤必须是要么都执行要么都

spring BeanFactory和ApplicationContext详解

1.BeanFactory是spring的核心,有以下几种: 2.详解: 3.例子:获取资源-->启动ioc容器-->第一次调用时初始化类,对于单例类会存储备下次调用. 版权声明:本文为博主原创文章,未经博主允许不得转载.