前提:你已经有了一定的Spring基础
你已经可以跑动一个简单的Spring batch 的实例
参考:http://www.cnblogs.com/gulvzhe/archive/2011/10/25/2224249.html
http://www.cnblogs.com/cdutedu/p/3789396.html
先盗几个图
JobLauncher 指定一个 JobRepository
JobRepository包含了一些传入JOB的参数,主要有六个表去存储
每个JOB可以对应多个Step
...<batch:step id="aStep" next="bStep"> <batch:tasklet> <batch:chunk reader="aReader" writer="aWriter" processor="aProcessor" commit-interval="1000" /> </batch:tasklet> </batch:step>...
tesklet里面的工作块为chunk,每个chunk执行完commit-interval,即提交一次,可以同时开多个chunk
每个STEP 这样执行
(以下一段摘抄)
从DB或是文件中取出数据的时候,read()操作每次只读取一条记录,之后将读取的这条数据传递给processor(item)处理,框架将重复做这两步操作,直到读取记录的件数达到batch配置信息中”commin-interval”设定值的时候,就会调用一次write操作。然后再重复上图的处理,直到处理完所有的数据。当这个Step的工作完成以后,或是跳到其他Step,或是结束处理。
那么问题来了?读取数据到处理的时候发生了异常如何处理?会不会导致整个批处理中断呢?
有异常没有捕获,到最上层也没有的话整个进程会挂掉,导致整个批处理中断。
显然,为了不影响后面的处理,要么捕获异常,打出日志,跳过。要么,重试(在IO超时或者表锁定的情况下是很有效的-瞬态情况)。
即使有中断,我们也需要重启JOB
以上几种正好对应Spring-Batch中的 SKIP\RETYR\RESTART
一、SKIP
<job id="importProductsJob"> <step id="importProductsStep"> <tasklet> <chunk reader="reader" writer="writer" commit-interval="100" skip-limit="10"> <skippable-exception-classes> <include class="org.springframework.batch ? .item.file.FlatFileParseException" /> </skippable-exception-classes> </chunk> </tasklet> </step> </job>
skippable-exception-classes 里面配需要SKIP的异常类型
skip-limit 最多可以容错次数,超过这个数,该STEP中断
也可以添加listener去打日志
<bean id="skipListener" class="com.manning ? .sbia.ch08.skip.DatabaseSkipListener"> <constructor-arg ref="dataSource" /> </bean> <job id="importProductsJob" xmlns="http://www.springframework.org/schema/batch"> <step id="importProductsStep"> <tasklet> <chunk reader="reader" writer="writer" commit-interval="100" skip-limit="10"> <skippable-exception-classes> <include class="org.springframework.batch.item.file ? .FlatFileParseException" /> </skippable-exception-classes> </chunk> <listeners> <listener ref="skipListener" /> </listeners> </tasklet> </step> </job>
二、Retrying on error
主要针对于IO操作的、并发等,瞬态的错误
类似于SKIP的配置
<job id="importProducsJob"> <step id="importProductsStep"> <tasklet> <chunk reader="reader" writer="writer" commit-interval="100" retry-limit="3"> <retryable-exception-classes> <include class="org.springframework.dao ?.OptimisticLockingFailureException" /> </retryable-exception-classes> </chunk> </tasklet> </step> </job>
如果你不想重试次数达到后,由于这些错误导致STEP的中断退出,可以混合RETRY和SKIP两者。
<job id="job"> <step id="step"> <tasklet> <chunk reader="reader" writer="writer" commit-interval="100" retry-limit="3" skip-limit="10"> <retryable-exception-classes> <include class="org.springframework.dao ? .DeadlockLoserDataAccessException" /> </retryable-exception-classes> <skippable-exception-classes> <include class="org.springframework.dao ? .DeadlockLoserDataAccessException" /> </skippable-exception-classes> </chunk> </tasklet> </step> </job>
你也可以通过自定义的策略来控制重试
<job id="retryPolicyJob" xmlns="http://www.springframework.org/schema/batch"> <step id="retryPolicyStep"> <tasklet> <chunk reader="reader" writer="writer" commit-interval="100" retry-policy="retryPolicy" /> </tasklet> </step> </job>
<bean id="retryPolicy" class="org.springframework ?.batch.retry.policy.ExceptionClassifierRetryPolicy"> <property name="policyMap"> <map> <entry key="org.springframework.dao.ConcurrencyFailureException"> <bean class="org.springframework.batch.retry ?.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="3" /> </bean> </entry> <entry key="org.springframework.dao ? .DeadlockLoserDataAccessException"> <bean class="org.springframework.batch.retry ? .policy.SimpleRetryPolicy"> <property name="maxAttempts" value="5" /> </bean> </entry> </map> </property> </bean>
你也可以添加Listener
配置方法类似同SKIP,继承RetryListenerSupport,配置在retry-listeners
你还可以通过RetryTemplate来重试。RetryTemplate配置属性retryPolicy
三、重启
重启的重点是能够保存之前的job repository,自带的reader,都可以,自己写的Reader需要继承接口
Spring batch 默认是会重启的
allow-start-if-complete配置在tasklet上,决定了tasklet会不会在JOB重试的时候,重试该STEP(可能是下个STEP发生了异常)。
start-limit用来控制STEP级别的重试次数,重试次数结束后,JOB中断退出。
处理在STEP中的状态,已经处理了很多ITEM,失败了?
middle of a chunk-oriented step,item级别的重启。首先是reader的重启。
如果你需要你自己写的ITEM的reader,也可以实现重启的话,需要实现ItemStram的接口,将对于item 计数的值,保存到executionContext,重启的时候去读取。