Spring Batch(4): Job具体解释

第四章 配置作业Job

4.1 基本配置

Job的配置有3个必须的属性。name,jobRepository,steps。一个简单的Job配置例如以下:

<job id="footballJob">
    <step id="playerload"          parent="s1" next="gameLoad"/>
    <step id="gameLoad"            parent="s2" next="playerSummarization"/>
    <step id="playerSummarization" parent="s3"/>
</job>

jobRepository默认引用名称为jobRepository的bean,当然也能够显式地配置:

<job id="footballJob" job-repository="specialRepository">
    <step id="playerload"          parent="s1" next="gameLoad"/>
    <step id="gameLoad"            parent="s3" next="playerSummarization"/>
    <step id="playerSummarization" parent="s3"/>
</job>
4.1.1 Restartable属性

该属性定义Job能否够被重新启动,默觉得true,在JobExecution运行失败后,能够创建还有一个JobExecution来继续上次的运行。

可是假设该属性设为false。又一次运行该JobInstance将抛出异常。

<job id="footballJob" restartable="false">
    ...
</job>
4.1.2 拦截Job运行

Spring Batch在Job的生命周期中提供了一些钩子方法,可这些钩子方法通过Listener的形式提供。JobListener的接口定义例如以下:

public interface JobExecutionListener {

    void beforeJob(JobExecution jobExecution);

    void afterJob(JobExecution jobExecution);

}

通过实现JobExecutionListener接口并配置给Job,能够在Job运行前后运行特定的逻辑。

比如在运行结束之后。假设失败,发送邮件通知管理人员等。

<job id="footballJob">
    <step id="playerload"          parent="s1" next="gameLoad"/>
    <step id="gameLoad"            parent="s2" next="playerSummarization"/>
    <step id="playerSummarization" parent="s3"/>
    <listeners>
        <listener ref="sampleListener"/>
    </listeners>
</job>

须要注意的是。不管Job是否成功运行,afterJob方法都会运行。Job是否运行成功,能够从JobExecution中获取。

public void afterJob(JobExecution jobExecution){
    if( jobExecution.getStatus() == BatchStatus.COMPLETED ){
        //job success
    }
    else if(jobExecution.getStatus() == BatchStatus.FAILED){
        //job failure
    }
}

Listener的运行顺序:

beforeJob与配置的顺序一样,afterJob与配置的顺序相反。

Listener异常:

Listener的运行过程中假设抛出异常,将导致Job无法继续完毕,终于状态为FAILED.因此要合理控制Listener异常对业务的影响。

注解支持:

假设不想使用侵入性强的Listener接口,能够使用@BeforeJob和@AfterJob两个注解声明。

4.1.3 Job抽象与继承

通用的Job配置能够抽取出来,作为抽象的Job存在,抽象的Job不同意被实例化:

<job id="baseJob" abstract="true">
    <listeners>
        <listener ref="listenerOne"/>
    <listeners>
</job>

子Job能够通过继续共用这些配置(当然。也能够继承非抽象的Job)。

<job id="job1" parent="baseJob">
    <step id="step1" parent="standaloneStep"/>

    <listeners merge="true">
        <listener ref="listenerTwo"/>
    <listeners>
</job>

当中的merge=”true”表示合并父job和子job的配置,也就是两个Listener都生效。同常规的Spring配置。

4.1.4 Job參数验证

JobParameterValidator组件用于验证JobParameter。

通过以下配置为job配置验证器:

<job id="job1" parent="baseJob3">
    <step id="step1" parent="standaloneStep"/>
    <validator ref="paremetersValidator"/>
</job>
4.1.4 属性的Late Binding

在Spring中。能够把Bean配置用到的属性值通过PropertiesPlaceHolderConfiguer把属性从配置中分离出来独立管理,理论上来说,在配置Job的时候也能够使用同样的方式。可是Spring Batch提供了在运行时配置參数值的能力:

<bean:property name="filePath" value="#{jobParameters[‘filePath‘]}" />

在启动Job时:

    launcher.executeJob("job.xml" , "footjob",
        new JobParametersBuilder().addDate("day", new Date()))
                                  .addString("filePath", "/opt/data/test.xml"));

4.2 配置JobRepository

JobRepository为任务框架中的各个组件对象提供CRUD操作,比如JobExecution,StepExecution。

一个配置样例例如以下:

<job-repository id="jobRepository"
    data-source="dataSource"
    transaction-manager="transactionManager"
    isolation-level-for-create="SERIALIZABLE"
    table-prefix="BATCH_"
    max-varchar-length="1000"/>
4.2.1 事务配置

JobRepository的操作须要事务来保证其完整性以及正确性,这些元数据的完整性对框架来说非常重要。假设没有事务支持。框架的行为将无法正确定义。

create*方法的事务隔离级别单独定义,为了保证同一个JobInstance不会被同一时候运行两次,默认的隔离级别为SERIALIZABLE。能够被改动:

<job-repository id="jobRepository"
                isolation-level-for-create="REPEATABLE_READ" />

假设没有使用Batch命名空间或者没有使用Factory Bean,则须要显示配置事务AOP:

<aop:config>
    <aop:advisor
           pointcut="execution(* org.springframework.batch.core..*Repository+.*(..))"/>
    <advice-ref="txAdvice" />
</aop:config>

<tx:advice id="txAdvice" transaction-manager="transactionManager">
    <tx:attributes>
        <tx:method name="*" />
    </tx:attributes>
</tx:advice>
4.2.2 表名前缀

默认情况下,Spring Batch须要的表以BATCH作为前缀,只是能够自己定义:

<job-repository id="jobRepository"
                table-prefix="e_batch" />

表前缀能够改动,可是表名和表的列不能被改动。

4.2.3 特殊的Repository

測试环境中。内存级别的数据库十分方便:

<bean id="jobRepository"
  class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
    <property name="transactionManager" ref="transactionManager"/>
</bean>

假设使用的数据库类型不在SpringBatch的支持中,能够通过JobRepositoryFactoryBean自己定义。

4.3 配置JobLauncher

默认提供了一个简单的Launcher:

<bean id="jobLauncher"
      class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
</bean>

JobLauncher的时序图例如以下:

假设启动的请求来自HTTP,那么等待整个Job完毕再返回不是一个好方法。此时须要异步启动Job,时序图例如以下:

对应的Launcher配置例如以下:

<bean id="jobLauncher"
      class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
    <property name="taskExecutor">
        <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
    </property>
</bean>

4.4 运行Job

有多种方式能够启动一个Job。可是核心都是通过JobLauncher来实现。

1. 命令行运行

主要通过CommandLineJobRunner类完毕

2. 从Web容器中运行

通过Http请求启动任务非经常见。时序图例如以下:

Controller能够是常规的Spring MVC Controller:

@Controller
public class JobLauncherController {

    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    Job job;

    @RequestMapping("/jobLauncher.html")
    public void handle() throws Exception{
        jobLauncher.run(job, new JobParameters());
    }
}

3. 使用调度框架运行

能够与其它调度框架一起使用,比如使用Spring的轻量级调用框架Spring Scheduler或者Quartz

4.5 元数据的高级使用方法

除了通过JobRepository对元数据进行CRUD操作外,Spring batch还提供另外的接口用于訪问元数据。

包含: JobExplorer JobOperator。

总体结构例如以下:

4.5.1 JobExplorer

该组件提供了仅仅读的查询操作,是JobRepository的仅仅读版本号,接口定义例如以下:

public interface JobExplorer {

    List<JobInstance> getJobInstances(String jobName, int start, int count);

    JobExecution getJobExecution(Long executionId);

    StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);

    JobInstance getJobInstance(Long instanceId);

    List<JobExecution> getJobExecutions(JobInstance jobInstance);

    Set<JobExecution> findRunningJobExecutions(String jobName);
}

配置一个Bean例如以下:

<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
      p:dataSource-ref="dataSource" />

假设须要制定表名前缀:

<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
      p:dataSource-ref="dataSource" p:tablePrefix="BATCH_" />
4.5.2 JobOperator

JobOperator集成了非常多接口定义,提供了综合的操作方法。定义例如以下:

public interface JobOperator {

    List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;

    List<Long> getJobInstances(String jobName, int start, int count)
          throws NoSuchJobException;

    Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;

    String getParameters(long executionId) throws NoSuchJobExecutionException;

    Long start(String jobName, String parameters)
          throws NoSuchJobException, JobInstanceAlreadyExistsException;

    Long restart(long executionId)
          throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
                  NoSuchJobException, JobRestartException;

    Long startNextInstance(String jobName)
          throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
                 JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;

    boolean stop(long executionId)
          throws NoSuchJobExecutionException, JobExecutionNotRunningException;

    String getSummary(long executionId) throws NoSuchJobExecutionException;

    Map<Long, String> getStepExecutionSummaries(long executionId)
          throws NoSuchJobExecutionException;

    Set<String> getJobNames();

}

配置:

<bean id="jobOperator" class="org.spr...SimpleJobOperator">
    <property name="jobExplorer">
        <bean class="org.spr...JobExplorerFactoryBean">
            <property name="dataSource" ref="dataSource" />
        </bean>
    </property>
    <property name="jobRepository" ref="jobRepository" />
    <property name="jobRegistry" ref="jobRegistry" />
    <property name="jobLauncher" ref="jobLauncher" />
</bean>

当中的startNextInstance方法将使用当前Job的JobParameter。经过JobParametersIncrementer处理之后的參数启动一个JobInstance。

public interface JobParametersIncrementer {

    JobParameters getNext(JobParameters parameters);

}

以下是一个简单实现:

public class SampleIncrementer implements JobParametersIncrementer {

    public JobParameters getNext(JobParameters parameters) {
        if (parameters==null || parameters.isEmpty()) {
            return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
        }
        long id = parameters.getLong("run.id",1L) + 1;
        return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
    }
}

为job配置incrementer:

<job id="footballJob" incrementer="sampleIncrementer">
    ...
</job>

在每天处理一次的批处理中,Incrementer的实现可能是按日期递增。

时间: 2024-10-08 15:43:42

Spring Batch(4): Job具体解释的相关文章

万树IT:Spring Batch批处理框架技巧,让你不再重复造轮子

整理了Spring批处理框架的内容,掌握这些知识,可以帮你省去一些造轮子的过程,提高开发效率.本文由博主姚兆峰分享,小编整理后推送,希望对你的工作有帮助. Part.1 问题分析 在大型的企业应用中,或多或少都会存在大量的任务需要处理,如邮件批量通知所有将要过期的会员等等.而在批量处理任务的过程中,又需要注意很多细节,如任务异常.性能瓶颈等等.那么,使用一款优秀的框架总比我们自己重复地造轮子要好得多一些. AD 我所在的物联网云平台部门就有这么一个需求,需要实现批量下发命令给百万设备.为了防止枯

【转】大数据批处理框架 Spring Batch全面解析

如今微服务架构讨论的如火如荼.但在企业架构里除了大量的OLTP交易外,还存在海量的批处理交易.在诸如银行的金融机构中,每天有3-4万笔的批处理作业需要处理.针对OLTP,业界有大量的开源框架.优秀的架构设计给予支撑:但批处理领域的框架确凤毛麟角.是时候和我们一起来了解下批处理的世界哪些优秀的框架和设计了,今天我将以Spring Batch为例,和大家一起探秘批处理的世界.初识批处理典型场景探秘领域模型及关键架构实现作业健壮性与扩展性批处理框架的不足与增强批处理典型业务场景对账是典型的批处理业务处

Spring batch的学习

Spring batch是用来处理大量数据操作的一个框架,主要用来读取大量数据,然后进行一定处理后输出成指定的形式. Spring batch主要有以下部分组成: JobRepository     用来注册job的容器 JobLauncher             用来启动Job的接口 Job                          实际执行的任务,包含一个或多个Step Step                        step包含ItemReader.ItemProces

Spring Batch Hello World Example

Spring Batch is a framework for batch processing – execution of a series of jobs. In Spring Batch, A job consists of many steps and each step consists of a READ-PROCESS-WRITE task or single operation task (tasklet). For “READ-PROCESS-WRITE” process,

Spring Batch学习笔记二

此系列博客皆为学习Spring Batch时的一些笔记: Spring Batch的架构 一个Batch Job是指一系列有序的Step的集合,它们作为预定义流程的一部分而被执行: Step代表一个自定义的工作单元,它是Job的主要构件块:每一个Step由三部分组成:ItemReader.ItemProcessor.ItemWriter:这三个部分将执行在每一条被处理的记录上,ItemReader读取每一条记录,然后传递给ItemProcessor处理,最后交给ItemWriter做持久化:It

Spring Batch实践

Spring Batch在大型企业中的最佳实践 在大型企业中,由于业务复杂.数据量大.数据格式不同.数据交互格式繁杂,并非所有的操作都能通过交互界面进行处理.而有一些操作需要定期读取大批量的数据,然后进行一系列的后续处理.这样的过程就是"批处理". 批处理应用通常有以下特点: 数据量大,从数万到数百万甚至上亿不等: 整个过程全部自动化,并预留一定接口进行自定义配置: 这样的应用通常是周期性运行,比如按日.周.月运行: 对数据处理的准确性要求高,并且需要容错机制.回滚机制.完善的日志监控

Spring Batch(4): Job详解

第四章 配置作业Job 4.1 基本配置 Job的配置有3个必须的属性,name,jobRepository,steps.一个简单的Job配置如下: <job id="footballJob"> <step id="playerload" parent="s1" next="gameLoad"/> <step id="gameLoad" parent="s2"

初探Spring Batch

此系列博客皆为阅读<Pro Spring Batch>一书的读书笔记: 为什么我们需要批处理? 我们不会总是想要立即得到需要的信息,批处理允许我们在请求处理之前就一个既定的流程开始搜集信息:比如说一个银行对账单,我们可以按月生成,并在用户查询之前开启一个批处理流程进行处理: 有时候它能让生意做得更好:比如说在线购物时,并不是说你买了一个产品零售商就立即发货,而是四五个小时后,统一发货: 更好的利用资源:让应该利用的处理能力闲置起来是一个大的浪费,我们可以定制处理让一个机器一个接一个的运行Job

Spring Batch 中文参考文档 V3.0.6

1 Spring Batch介绍 企业领域中许多应用系统需要采用批处理的方式在特定环境中运行业务操作任务.这种业务作业包括自动化,大量信息的复杂操作,他们不需要人工干预,并能高效运行.这些典型作业包括:基于时间的事件处理(例如:月底结算,通知或信函):重复的.数据量大的.业务逻辑规则错综复杂的定期任务(例如:保险盈利决策和利率调整):来自内外部不同应用系统的信息集成任务,这些信息需要进行格式化.校验,并通过事务的方式处理成为系统可用的记录.批处理程序就是用来处理这种数以亿计的企业日常事务. Sp