debug 代码
JobExecution result = launcher.run(job, jobParametersBuilder.toJobParameters());
这是启动job的方法,如下是方法的具体实现:
SimpleJobLauncher.java run方法的具体实现(删除了部分代码)
@Override public JobExecution run(final Job job, final JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException { final JobExecution jobExecution; /* * There is a very small probability that a non-restartable job can be * restarted, but only if another process or thread manages to launch * <i>and</i> fail a job execution for this instance between the last * assertion and the next method returning successfully. */ jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters); try { taskExecutor.execute(new Runnable() { @Override public void run() { try { logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters + "]"); job.execute(jobExecution); logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters + "] and the following status: [" + jobExecution.getStatus() + "]"); } catch (Throwable t) { logger.info("Job: [" + job + "] failed unexpectedly and fatally with the following parameters: [" + jobParameters + "]", t); rethrow(t); } } private void rethrow(Throwable t) { if (t instanceof RuntimeException) { throw (RuntimeException) t; } else if (t instanceof Error) { throw (Error) t; } throw new IllegalStateException(t); } }); } return jobExecution; }
看taskExecutor.execute 方法的主要逻辑,就是把 job execute 封装进 一个抽象的任务内,通过taskExecutor 执行,那么这个taskExecutor 是一个什么类型——[email protected],就是SyncTaskExecutor类型的对象,一个同步的任务执行类。这样就可以明确的知道每个chunk的处理都是在一个单线程内,循环往复的处理每个commit-interval。
taskExecutor.execute(new Runnable() { @Override public void run() { job.execute(jobExecution); } });
继续走代码,那接下来执行的方法是job.execute(jobExecution);那先来看job 是什么类型的对象,
那么就是他了,FlowJob: [name=addPeopleDescJob]。进入job.execute ,
他就直接跳进了 org.springframework.batch.core.job.AbstractJob.execute(JobExecution execution)方法,这是FlowJob的父类,FlowJob 显然没有覆写 该方法。
AbstractJob.execute(JobExecution execution)
@Override public final void execute(JobExecution execution) { doExecute(execution); }
这是方法的主要执行逻辑,那么这个doExecute 方法就是FlowJob覆写的 父类的方法,完成 job 的执行的任务。。。
继续debug
job 是任务的任务的抽象表示,完成的具体任务还要在step中,那么接下来就是step 的执行了,step 是如何执行的??
我们的代码最终调到这org.springframework.batch.core.step.AbstractStep.execute(StepExecution stepExecution) ,step的抽象类。
这个方法的主要逻辑如下:
@Override public final void execute(StepExecution stepExecution) throws JobInterruptedException, UnexpectedJobExecutionException { doExecute(stepExecution); }
这个step 又是什么类型的对象——TaskletStep: [name=addDescStep],TaskletStep 的对象,其父类为AbstractStep。
继续看 TaskletStep.doExecute(stepExecution);
@Override protected void doExecute(StepExecution stepExecution) throws Exception { stepExecution.getExecutionContext().put(TASKLET_TYPE_KEY, tasklet.getClass().getName()); stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName()); stream.update(stepExecution.getExecutionContext()); getJobRepository().updateExecutionContext(stepExecution); // Shared semaphore per step execution, so other step executions can run // in parallel without needing the lock final Semaphore semaphore = createSemaphore(); stepOperations.iterate(new StepContextRepeatCallback(stepExecution) { @Override public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext) throws Exception { StepExecution stepExecution = chunkContext.getStepContext().getStepExecution(); // Before starting a new transaction, check for // interruption. interruptionPolicy.checkInterrupted(stepExecution); RepeatStatus result; try { result = new TransactionTemplate(transactionManager, transactionAttribute) .execute(new ChunkTransactionCallback(chunkContext, semaphore)); } catch (UncheckedTransactionException e) { // Allow checked exceptions to be thrown inside callback throw (Exception) e.getCause(); } chunkListener.afterChunk(chunkContext); // Check for interruption after transaction as well, so that // the interrupted exception is correctly propagated up to // caller interruptionPolicy.checkInterrupted(stepExecution); return result; } }); }
上面是 TaskletStep.doExecute的完整代码,主要的逻辑是:
stepOperations.iterate(new StepContextRepeatCallback(stepExecution) { @Override public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext) throws Exception { StepExecution stepExecution = chunkContext.getStepContext().getStepExecution(); // Before starting a new transaction, check for // interruption. interruptionPolicy.checkInterrupted(stepExecution); RepeatStatus result; try { result = new TransactionTemplate(transactionManager, transactionAttribute) .execute(new ChunkTransactionCallback(chunkContext, semaphore)); } catch (UncheckedTransactionException e) { // Allow checked exceptions to be thrown inside callback throw (Exception) e.getCause(); } chunkListener.afterChunk(chunkContext); // Check for interruption after transaction as well, so that // the interrupted exception is correctly propagated up to // caller interruptionPolicy.checkInterrupted(stepExecution); return result; } });
stepOperations.iterate() 方法传入StepContextRepeatCallback的一个匿名对象。
继续debug,转了一圈又回到 这个回调方法上StepContextRepeatCallback.doInChunkContext(),看这个方法 :
result = new TransactionTemplate(transactionManager, transactionAttribute) .execute(new ChunkTransactionCallback(chunkContext, semaphore));
new TransactionTemplate().execute()方法忽略过去,在这个方法里面最终还要 调用ChunkTransactionCallback.doInTransaction()的回调方法,看下面:
ChunkTransactionCallback是 TaskletStep 的内部类:
ChunkTransactionCallback.doInTransaction():
@Override public RepeatStatus doInTransaction(TransactionStatus status) { RepeatStatus result = RepeatStatus.CONTINUABLE; result = tasklet.execute(contribution, chunkContext); return result; }
这个tasklet 的类型是 :org.springframework.batch.core.step.item.ChunkOrientedTasklet@3e521715,ChunkOrientedTasklet 其父类 Tasklet,定义step的执行策略。
ChunkOrientedTasklet.execute() 方法:
@Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { @SuppressWarnings("unchecked") Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY); if (inputs == null) { inputs = chunkProvider.provide(contribution); if (buffering) { chunkContext.setAttribute(INPUTS_KEY, inputs); } } chunkProcessor.process(contribution, inputs); chunkProvider.postProcess(contribution, inputs); // Allow a message coming back from the processor to say that we // are not done yet if (inputs.isBusy()) { logger.debug("Inputs still busy"); return RepeatStatus.CONTINUABLE; } chunkContext.removeAttribute(INPUTS_KEY); chunkContext.setComplete(); logger.debug("Inputs not busy, ended: " + inputs.isEnd()); return RepeatStatus.continueIf(!inputs.isEnd()); }
好了,Chunk<I> inputs,就是要读入的数据,他是怎么来的 :
inputs = chunkProvider.provide(contribution);
chunkProvider 是 [email protected]529dc 的类型,provide方法:
@Override public Chunk<I> provide(final StepContribution contribution) throws Exception { final Chunk<I> inputs = new Chunk<I>(); repeatOperations.iterate(new RepeatCallback() { @Override public RepeatStatus doInIteration(final RepeatContext context) throws Exception { I item = null; try { item = read(contribution, inputs); } catch (SkipOverflowException e) { // read() tells us about an excess of skips by throwing an // exception return RepeatStatus.FINISHED; } if (item == null) { inputs.setEnd(); return RepeatStatus.FINISHED; } inputs.add(item); contribution.incrementReadCount(); return RepeatStatus.CONTINUABLE; } }); return inputs; }
在provide 方法内,逐条读取 一条数据,然后放入inputs 结合,而不是一次读取多条。
数据读完了(此commit内)接下来就要处理inputs 数据了,到chunkProcessor.process(contribution, inputs);方法
@Override public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception { // Allow temporary state to be stored in the user data field initializeUserData(inputs); // If there is no input we don‘t have to do anything more if (isComplete(inputs)) { return; } // Make the transformation, calling remove() on the inputs iterator if // any items are filtered. Might throw exception and cause rollback. Chunk<O> outputs = transform(contribution, inputs); // Adjust the filter count based on available data contribution.incrementFilterCount(getFilterCount(inputs, outputs)); // Adjust the outputs if necessary for housekeeping purposes, and then // write them out... write(contribution, inputs, getAdjustedOutputs(inputs, outputs)); }
在chunkProcessor.process 方法内定义了 两个步骤,一个是transform,也就是 process 读入的数据集合,一个是write。
代码跳到transform这:
protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception { Chunk<O> outputs = new Chunk<O>(); for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) { final I item = iterator.next(); O output; try { output = doProcess(item); } catch (Exception e) { /* * For a simple chunk processor (no fault tolerance) we are done * here, so prevent any more processing of these inputs. */ inputs.clear(); throw e; } if (output != null) { outputs.add(output); } else { iterator.remove(); } } return outputs; }
对于 inputs 集合 ,遍历,处理后得到 outputs 集合。
output = doProcess(item);
上面这行代码就是要调用自定义的process 方法进行处理。
然后看write 方法是如何写入数据的,
protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws Exception { try { doWrite(outputs.getItems()); } catch (Exception e) { /* * For a simple chunk processor (no fault tolerance) we are done * here, so prevent any more processing of these inputs. */ inputs.clear(); throw e; } contribution.incrementWriteCount(outputs.size()); }
在write 方法内调用了 doWrite() 方法 ,然后调用 自定义的 write 方法把 数据写入。
好的 ,上面就是 在一次commit-interval 内的主要过程,和主要的逻辑代码。那么 spring batch 是如何 重复commit-interval 的呢 (RepeatTemplate)?以后再详细说来。
=========================END=========================