Spring Batch_Intercepting Step Execution_配置SkipListener
关于配置skip:http://my.oschina.net/xinxingegeya/blog/346244
先看一下StepListener.java 接口的继承关系图:
StepExecutionListener
StepExecutionListener represents the most generic listener for Step execution. It allows for notification before a Step is started and after it has ends, whether it ended normally or failed
SkipListener
ItemReadListener, ItemProcessListener, and ItemWriteListner all provide mechanisms for being notified of errors, but none will inform you that a record has actually been skipped. onWriteError, for example, will be called even if an item is retried and successful. For this reason, there is a separate interface for tracking skipped items:
public interface SkipListener<T,S> extends StepListener { void onSkipInRead(Throwable t); void onSkipInProcess(T item, Throwable t); void onSkipInWrite(S item, Throwable t); }
onSkipInRead will be called whenever an item is skipped while reading. It should be noted that rollbacks may cause the same item to be registered as skipped more than once. onSkipInWrite will be called when an item is skipped while writing. Because the item has been read successfully (and not skipped), it is also provided the item itself as an argument.
SkipListeners and Transactions
One of the most common use cases for a SkipListener is to log out a skipped item, so that another batch process or even human process can be used to evaluate and fix the issue leading to the skip. Because there are many cases in which the original transaction may be rolled back, Spring Batch makes two guarantees:
The appropriate skip method (depending on when the error happened) will only be called once per item.
The SkipListener will always be called just before the transaction is committed. This is to ensure that any transactional resources call by the listener are not rolled back by a failure within the ItemWriter.
ChunkListener
A chunk is defined as the items processed within the scope of a transaction. Committing a transaction, at each commit interval, commits a ‘chunk‘. A ChunkListener can be useful to perform logic before a chunk begins processing or after a chunk has completed successfully:
public interface ChunkListener extends StepListener { void beforeChunk(); void afterChunk(); }
The beforeChunk method is called after the transaction is started, but before read is called on the ItemReader. Conversely, afterChunk is called after the chunk has been committed (and not at all if there is a rollback).
上面就例举了几个Listener,那么这些listener 都怎么用:以SkipListener 为例,先来了解一下SkipListener 接口:
我们选择一个SkipListenerSupport ,通过继承 它实现我们自定义逻辑的 SkipListener。如下:
MySkipListener.java
package com.lyx.batch3; import org.springframework.batch.core.listener.SkipListenerSupport; import com.lyx.batch.People; import com.lyx.batch.PeopleDESC; public class MySkipListener extends SkipListenerSupport<People, PeopleDESC> { @Override public void onSkipInRead(Throwable t) { // TODO Auto-generated method stub super.onSkipInRead(t); System.out.println(">>>>>>>>>>>>>>skip in read>>>>>>>>>>>>>>"); } @Override public void onSkipInWrite(PeopleDESC item, Throwable t) { // TODO Auto-generated method stub super.onSkipInWrite(item, t); System.out.println(">>>>>>>>>>>>>>skip in write>>>>>>>>>>>>>>"); System.out.println(">>>>=" + item.toString()); } /** * 当processor抛出 skip include包含的异常时 */ @Override public void onSkipInProcess(People item, Throwable t) { // TODO Auto-generated method stub super.onSkipInProcess(item, t); System.out.println(">>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>"); System.out.println(">>>>=" + item.toString()); } }
以下是配置文件:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 包的扫描 --> <context:component-scan base-package="com.lyx.batch" /> <bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" /> <batch:step id="abstractStep" abstract="true"> <batch:listeners> <batch:listener ref="exceptionHandler" /> </batch:listeners> </batch:step> <bean id="abstractCursorReader" abstract="true" class="org.springframework.batch.item.database.JdbcCursorItemReader"> <property name="dataSource" ref="dataSource" /> </bean> <!-- add people desc job begin --> <batch:job id="addPeopleDescJob"> <batch:step id="addDescStep" parent="abstractStep"> <batch:tasklet> <batch:chunk reader="peopleAddDescReader" processor="allowSkipProcessor" writer="addDescPeopleWriter" commit-interval="2" skip-limit="40"> <batch:skippable-exception-classes> <!--batch:include配置允许发生的异常 --> <batch:include class="com.lyx.batch.InvalidDataException" /> </batch:skippable-exception-classes> <batch:listeners> <!-- 这里可以配置多个listener --> <batch:listener ref="sampleSkipListener" /> </batch:listeners> </batch:chunk> </batch:tasklet> </batch:step> <!-- 在job的运行期间,可以监视job --> <batch:listeners> <batch:listener ref="sampleListener" /> </batch:listeners> </batch:job> <!-- add people desc job end --> <bean id="sampleListener" class="com.lyx.batch3.SampleJobExecutionListener" /> <bean id="sampleSkipListener" class="com.lyx.batch3.MySkipListener" /> <bean id="peopleAddDescReader" parent="abstractCursorReader" scope="step"> <property name="sql"> <value><![CDATA[select first_name ,last_name from people where first_name like ? or last_name like ?]]></value> </property> <property name="rowMapper" ref="peopleRowMapper" /> <property name="preparedStatementSetter" ref="preparedStatementSetter" /> <property name="fetchSize" value="20" /> </bean> <bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" /> <bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" /> <bean id="allowSkipProcessor" class="com.lyx.batch.AllowSkipProcessor" /> <bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" /> <!--tomcat jdbc pool数据源配置 --> <bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource" destroy-method="close"> <property name="poolProperties"> <bean class="org.apache.tomcat.jdbc.pool.PoolProperties"> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> <property name="url" value="jdbc:mysql://localhost:3306/test" /> <property name="username" value="root" /> <property name="password" value="034039" /> </bean> </property> </bean> <!-- spring batch 配置jobRepository --> <batch:job-repository id="jobRepository" data-source="dataSource" transaction-manager="transactionManager" isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_" max-varchar-length="1000" /> <!-- spring的事务管理器 --> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource" /> </bean> <!-- batch luncher --> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> </beans>
主要的配置为:
<!-- add people desc job begin --> <batch:job id="addPeopleDescJob"> <batch:step id="addDescStep" parent="abstractStep"> <batch:tasklet> <batch:chunk reader="peopleAddDescReader" processor="allowSkipProcessor" writer="addDescPeopleWriter" commit-interval="2" skip-limit="40"> <batch:skippable-exception-classes> <!--batch:include配置允许发生的异常 --> <batch:include class="com.lyx.batch.InvalidDataException" /> </batch:skippable-exception-classes> <batch:listeners> <!-- 这里可以配置多个listener --> <batch:listener ref="sampleSkipListener" /> </batch:listeners> </batch:chunk> </batch:tasklet> </batch:step> <!-- 在job的运行期间,可以监视job --> <batch:listeners> <batch:listener ref="sampleListener" /> </batch:listeners> </batch:job> <!-- add people desc job end --> <bean id="sampleListener" class="com.lyx.batch3.SampleJobExecutionListener" /> <bean id="sampleSkipListener" class="com.lyx.batch3.MySkipListener" />
运行:
AppMain12.java
package com.lyx.batch; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * 测试step listener skip listener * * @author Lenovo * */ public class AppMain12 { public static void main(String[] args) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException { long startTime = System.currentTimeMillis(); // 获取开始时间 @SuppressWarnings("resource") ApplicationContext context = new ClassPathXmlApplicationContext( new String[] { "classpath:spring-batch-exception-listener.xml" }); JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); Job job = (Job) context.getBean("addPeopleDescJob"); JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher"); JobExecution result = launcher.run(job, jobParametersBuilder.toJobParameters()); ExitStatus es = result.getExitStatus(); if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) { System.out.println("任务正常完成"); } else { System.out.println("任务失败,exitCode=" + es.getExitCode()); } long endTime = System.currentTimeMillis(); // 获取结束时间 System.out.println("程序运行时间: " + (endTime - startTime) + "ms"); } }
运行结果:
.........................................................................
process people desc
skip invalid data!!!!!!!!!!!!!!!!!!!!!!!!
process people desc
skip invalid data!!!!!!!!!!!!!!!!!!!!!!!!
>>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>
>>>>=firstName: lyx, lastName: lyx
>>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>
>>>>=firstName: lyx, lastName: lyx
process people desc
skip invalid data!!!!!!!!!!!!!!!!!!!!!!!!
>>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>
>>>>=firstName: lyx, lastName: lyx
job success.........
十一月 19, 2014 3:00:06 下午 org.springframework.batch.core.launch.support.SimpleJobLauncher run
信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]
任务正常完成
程序运行时间: 8779ms
总结:通过skip listener 可以看到当skip 发生时,可以通过listener 捕捉到该事件的发生,从而把skip 的数据记录下来,以便做进一步的处理。
====================END====================