在这一篇文章 对于restart做了试验,http://my.oschina.net/xinxingegeya/blog/344817在这片文章里,我们只是当job成功时,重启了job,对于job失败后,重启job有什么效果,我没有演示,下面我们就来演示一下当job 失败退出后,再重启job 有什么效果。。
先做一个 导致job 失败的情景,如下的processor :
ThrowExceptionProcessor.java
package com.lyx.batch; import org.springframework.batch.item.ItemProcessor; public class ThrowExceptionProcessor implements ItemProcessor<People, PeopleDESC> { public PeopleDESC process(People item) throws Exception { System.out.println("process people desc"); if ("lyx".equals(item.getFirstName())) { throw new InvalidDataException("invalid data"); } return new PeopleDESC(item.getLastName(), item.getFirstName(), Thread .currentThread().getName()); } }
当判断数据为某个值 时,抛出异常 ,导致job 失败。
下面是整个配置文件:
spring-batch-failure-restart.xml
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 包的扫描 --> <context:component-scan base-package="com.lyx.batch" /> <bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" /> <batch:step id="abstractStep" abstract="true"> <batch:listeners> <batch:listener ref="exceptionHandler" /> </batch:listeners> </batch:step> <bean id="abstractCursorReader" abstract="true" class="org.springframework.batch.item.database.JdbcCursorItemReader"> <property name="dataSource" ref="dataSource" /> </bean> <!-- add people desc job begin --> <batch:job id="addPeopleDescJob" restartable="true"> <batch:step id="addDescStep" parent="abstractStep"> <batch:tasklet allow-start-if-complete="true" start-limit="3"> <batch:chunk reader="peopleAddDescReader" processor="throwExceptionProcessor" writer="addDescPeopleWriter" commit-interval="2" /> </batch:tasklet> </batch:step> </batch:job> <!-- add people desc job end --> <bean id="peopleAddDescReader" parent="abstractCursorReader" scope="step"> <property name="sql"> <value><![CDATA[select first_name ,last_name from people where first_name like ? or last_name like ?]]></value> </property> <property name="rowMapper" ref="peopleRowMapper" /> <property name="preparedStatementSetter" ref="preparedStatementSetter" /> <property name="fetchSize" value="20" /> </bean> <bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" /> <bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" /> <bean id="throwExceptionProcessor" class="com.lyx.batch.ThrowExceptionProcessor" /> <bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" /> <!--tomcat jdbc pool数据源配置 --> <bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource" destroy-method="close"> <property name="poolProperties"> <bean class="org.apache.tomcat.jdbc.pool.PoolProperties"> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> <property name="url" value="jdbc:mysql://localhost:3306/test" /> <property name="username" value="root" /> <property name="password" value="034039" /> </bean> </property> </bean> <!-- spring batch 配置jobRepository --> <batch:job-repository id="jobRepository" data-source="dataSource" transaction-manager="transactionManager" isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_" max-varchar-length="1000" /> <!-- spring的事务管理器 --> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource" /> </bean> <!-- batch luncher --> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> </beans>
运行任务:
AppMain13.java
package com.lyx.batch; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * 测试当任务失败时,重启任务 * * @author Lenovo * */ public class AppMain13 { public static void main(String[] args) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException { long startTime = System.currentTimeMillis(); // 获取开始时间 @SuppressWarnings("resource") ApplicationContext context = new ClassPathXmlApplicationContext( new String[] { "classpath:spring-batch-failure-restart.xml" }); JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); Job job = (Job) context.getBean("addPeopleDescJob"); JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher"); JobExecution result = launcher.run(job, jobParametersBuilder.toJobParameters()); ExitStatus es = result.getExitStatus(); if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) { System.out.println("任务正常完成"); } else { System.out.println("任务失败,exitCode=" + es.getExitCode()); } long endTime = System.currentTimeMillis(); // 获取结束时间 System.out.println("程序运行时间: " + (endTime - startTime) + "ms"); } }
第一次运行的结果:
严重: Encountered an error executing step addDescStep in job addPeopleDescJob
com.lyx.batch.InvalidDataException: invalid data
at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:11)
十一月 19, 2014 4:56:16 下午 org.springframework.batch.core.launch.support.SimpleJobLauncher run
信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [FAILED]
任务失败,exitCode=FAILED
程序运行时间: 7028ms
如上,显示job失败,那么失败的job 在spring batch 的meta table 里存储了什么信息:
mysql> select * from batch_step_execution \G *************************** 1. row *************************** STEP_EXECUTION_ID: 1 VERSION: 52 STEP_NAME: addDescStep JOB_EXECUTION_ID: 1 START_TIME: 2014-11-19 16:56:11 END_TIME: 2014-11-19 16:56:16 STATUS: FAILED COMMIT_COUNT: 50 READ_COUNT: 102 FILTER_COUNT: 0 WRITE_COUNT: 100 READ_SKIP_COUNT: 0 WRITE_SKIP_COUNT: 0 PROCESS_SKIP_COUNT: 0 ROLLBACK_COUNT: 1 EXIT_CODE: FAILED EXIT_MESSAGE: com.lyx.batch.InvalidDataException: invalid data at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:11) at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:1) at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:126) at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:293) at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:192) at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75) at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406) at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330) at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133) at o LAST_UPDATED: 2014-11-19 16:56:16 1 row in set (0.00 sec) mysql>
这就是step 的运行信息,请注意几个关键的数据,就是
COMMIT_COUNT: 50 READ_COUNT: 102 FILTER_COUNT: 0 WRITE_COUNT: 100 READ_SKIP_COUNT: 0 WRITE_SKIP_COUNT: 0 PROCESS_SKIP_COUNT: 0 ROLLBACK_COUNT: 1
这些字段表示什么意思,请看这里:http://docs.spring.io/spring-batch/trunk/reference/html/metaDataSchema.html#metaDataBatchStepExecution
好了,我们先做这些工作,不修改数据,我们第二次运行这个job
第二次运行的结果:
信息: Executing step: [addDescStep]
process people desc
十一月 19, 2014 5:03:30 下午 org.springframework.batch.core.step.AbstractStep execute
严重: Encountered an error executing step addDescStep in job addPeopleDescJob
com.lyx.batch.InvalidDataException: invalid data
任务失败,exitCode=FAILED
程序运行时间: 3233ms
十一月 19, 2014 5:03:30 下午 org.springframework.batch.core.launch.support.SimpleJobLauncher run
信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [FAILED]
如上所示,任务失败,再来看一下失败的任务在 spring batch meta table 里存储了什么信息:
mysql> select * from batch_step_execution where step_execution_id = 2 \G *************************** 1. row *************************** STEP_EXECUTION_ID: 2 VERSION: 2 STEP_NAME: addDescStep JOB_EXECUTION_ID: 2 START_TIME: 2014-11-19 17:03:30 END_TIME: 2014-11-19 17:03:30 STATUS: FAILED COMMIT_COUNT: 0 READ_COUNT: 2 FILTER_COUNT: 0 WRITE_COUNT: 0 READ_SKIP_COUNT: 0 WRITE_SKIP_COUNT: 0 PROCESS_SKIP_COUNT: 0 ROLLBACK_COUNT: 1 EXIT_CODE: FAILED EXIT_MESSAGE: com.lyx.batch.InvalidDataException: invalid data at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:11) at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:1) at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:126) at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:293) at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:192) at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75) at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406) at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330) at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133) at o LAST_UPDATED: 2014-11-19 17:03:30 1 row in set (0.00 sec) mysql>
要注意这些数据:
COMMIT_COUNT: 0 READ_COUNT: 2 FILTER_COUNT: 0 WRITE_COUNT: 0 READ_SKIP_COUNT: 0 WRITE_SKIP_COUNT: 0 PROCESS_SKIP_COUNT: 0 ROLLBACK_COUNT: 1
看到了没,这里说明了读出来的数据条数为 2 , 除了回滚的次数为 1 外,其他为 0 ;
第三次运行的结果:
在第三次运行前,我们把数据库里的数据线修正,再运行
update people set first_name = ‘hello‘,last_name = ‘DOE‘ where first_name = ‘lyx‘;
修正完成,那么第三次运行
信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]
任务正常完成
程序运行时间: 3960ms
好的,运行成功了,最重要的就是 此时 spring batch 在meta table 里存的数据,再来看一下:
mysql> select * from batch_step_execution where step_execution_id = 3 \G *************************** 1. row *************************** STEP_EXECUTION_ID: 3 VERSION: 14 STEP_NAME: addDescStep JOB_EXECUTION_ID: 3 START_TIME: 2014-11-19 17:11:40 END_TIME: 2014-11-19 17:11:42 STATUS: COMPLETED COMMIT_COUNT: 12 READ_COUNT: 23 FILTER_COUNT: 0 WRITE_COUNT: 23 READ_SKIP_COUNT: 0 WRITE_SKIP_COUNT: 0 PROCESS_SKIP_COUNT: 0 ROLLBACK_COUNT: 0 EXIT_CODE: COMPLETED EXIT_MESSAGE: LAST_UPDATED: 2014-11-19 17:11:42 1 row in set (0.00 sec) mysql>
通过对这三次运行结果的分析,我们可以知道spring batch 对 失败的job 进行restart ,不是从头开始处理数据,而是从出错的事务边界内第一条记录重复执行的,这样便确保了数据完整性。
而当 job 运行成功后(运行成功后也没有必要进行restart),如果 restart 一个job,spring batch就会从第一条记录开始读数据,处理数据,导致数据被重复处理。
batch_step_execution 表字段含义
- STEP_EXECUTION_ID: Primary key that uniquely identifies this execution. The value of this column should be obtainable by calling the getId method of the StepExecution object.
- VERSION: See above section.
- STEP_NAME: The name of the step to which this execution belongs.
- JOB_EXECUTION_ID: Foreign key from the BATCH_JOB_EXECUTION table indicating the JobExecution to which this StepExecution belongs. There may be only one StepExecution for a given JobExecution for a given Step name.
- START_TIME: Timestamp representing the time the execution was started.
- END_TIME: Timestamp representing the time the execution was finished, regardless of success or failure. An empty value in this column even though the job is not currently running indicates that there has been some type of error and the framework was unable to perform a last save before failing.
- STATUS: Character string representing the status of the execution. This may be COMPLETED, STARTED, etc. The object representation of this column is the BatchStatus enumeration.
- COMMIT_COUNT: The number of times in which the step has committed a transaction during this execution.
- READ_COUNT: The number of items read during this execution.
- FILTER_COUNT: The number of items filtered out of this execution.??
- WRITE_COUNT: The number of items written and committed during this execution.
- READ_SKIP_COUNT: The number of items skipped on read during this execution.
- WRITE_SKIP_COUNT: The number of items skipped on write during this execution.
- PROCESS_SKIP_COUNT: The number of items skipped during processing during this execution.
- ROLLBACK_COUNT: The number of rollbacks during this execution. Note that this count includes each time rollback occurs, including rollbacks for retry and those in the skip recovery procedure.
- EXIT_CODE: Character string representing the exit code of the execution. In the case of a command line job, this may be converted into a number.
- EXIT_MESSAGE: Character string representing a more detailed description of how the job exited. In the case of failure, this might include as much of the stack trace as is possible.
- LAST_UPDATED: Timestamp representing the last time this execution was persisted.
=================================END=================================