Spring Batch_JOB执行流程分析

debug 代码

JobExecution result = launcher.run(job,
		jobParametersBuilder.toJobParameters());

这是启动job的方法,如下是方法的具体实现:

SimpleJobLauncher.java run方法的具体实现(删除了部分代码)

@Override
public JobExecution run(final Job job, final JobParameters jobParameters)
		throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
		JobParametersInvalidException {

	final JobExecution jobExecution;

	/*
	 * There is a very small probability that a non-restartable job can be
	 * restarted, but only if another process or thread manages to launch
	 * <i>and</i> fail a job execution for this instance between the last
	 * assertion and the next method returning successfully.
	 */
	jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);

	try {
		taskExecutor.execute(new Runnable() {

			@Override
			public void run() {
				try {
					logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters
							+ "]");
					job.execute(jobExecution);
					logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters
							+ "] and the following status: [" + jobExecution.getStatus() + "]");
				}
				catch (Throwable t) {
					logger.info("Job: [" + job
							+ "] failed unexpectedly and fatally with the following parameters: [" + jobParameters
							+ "]", t);
					rethrow(t);
				}
			}

			private void rethrow(Throwable t) {
				if (t instanceof RuntimeException) {
					throw (RuntimeException) t;
				}
				else if (t instanceof Error) {
					throw (Error) t;
				}
				throw new IllegalStateException(t);
			}
		});
	}
	return jobExecution;
}

看taskExecutor.execute 方法的主要逻辑,就是把 job execute 封装进 一个抽象的任务内,通过taskExecutor 执行,那么这个taskExecutor 是一个什么类型——[email protected],就是SyncTaskExecutor类型的对象,一个同步的任务执行类。这样就可以明确的知道每个chunk的处理都是在一个单线程内,循环往复的处理每个commit-interval。

taskExecutor.execute(new Runnable() {
	@Override
	public void run() {
		job.execute(jobExecution);
	}
});

继续走代码,那接下来执行的方法是job.execute(jobExecution);那先来看job 是什么类型的对象,

那么就是他了,FlowJob: [name=addPeopleDescJob]。进入job.execute ,

他就直接跳进了 org.springframework.batch.core.job.AbstractJob.execute(JobExecution execution)方法,这是FlowJob的父类,FlowJob 显然没有覆写 该方法。

AbstractJob.execute(JobExecution execution)

@Override
public final void execute(JobExecution execution) {
	doExecute(execution);
}

这是方法的主要执行逻辑,那么这个doExecute 方法就是FlowJob覆写的 父类的方法,完成 job 的执行的任务。。。

继续debug

job 是任务的任务的抽象表示,完成的具体任务还要在step中,那么接下来就是step 的执行了,step 是如何执行的??

我们的代码最终调到这org.springframework.batch.core.step.AbstractStep.execute(StepExecution stepExecution) ,step的抽象类。

这个方法的主要逻辑如下:

@Override
public final void execute(StepExecution stepExecution)
		throws JobInterruptedException, UnexpectedJobExecutionException {
	doExecute(stepExecution);

}

这个step 又是什么类型的对象——TaskletStep: [name=addDescStep],TaskletStep 的对象,其父类为AbstractStep。

继续看 TaskletStep.doExecute(stepExecution);

@Override
protected void doExecute(StepExecution stepExecution) throws Exception {
	stepExecution.getExecutionContext().put(TASKLET_TYPE_KEY, tasklet.getClass().getName());
	stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());

	stream.update(stepExecution.getExecutionContext());
	getJobRepository().updateExecutionContext(stepExecution);

	// Shared semaphore per step execution, so other step executions can run
	// in parallel without needing the lock
	final Semaphore semaphore = createSemaphore();

	stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {

		@Override
		public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
				throws Exception {

			StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();

			// Before starting a new transaction, check for
			// interruption.
			interruptionPolicy.checkInterrupted(stepExecution);

			RepeatStatus result;
			try {
				result = new TransactionTemplate(transactionManager, transactionAttribute)
				.execute(new ChunkTransactionCallback(chunkContext, semaphore));
			}
			catch (UncheckedTransactionException e) {
				// Allow checked exceptions to be thrown inside callback
				throw (Exception) e.getCause();
			}

			chunkListener.afterChunk(chunkContext);

			// Check for interruption after transaction as well, so that
			// the interrupted exception is correctly propagated up to
			// caller
			interruptionPolicy.checkInterrupted(stepExecution);

			return result;
		}

	});

}

上面是 TaskletStep.doExecute的完整代码,主要的逻辑是:

stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {
	@Override
	public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
			throws Exception {

		StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();

		// Before starting a new transaction, check for
		// interruption.
		interruptionPolicy.checkInterrupted(stepExecution);

		RepeatStatus result;
		try {
			result = new TransactionTemplate(transactionManager, transactionAttribute)
			.execute(new ChunkTransactionCallback(chunkContext, semaphore));
		}
		catch (UncheckedTransactionException e) {
			// Allow checked exceptions to be thrown inside callback
			throw (Exception) e.getCause();
		}

		chunkListener.afterChunk(chunkContext);

		// Check for interruption after transaction as well, so that
		// the interrupted exception is correctly propagated up to
		// caller
		interruptionPolicy.checkInterrupted(stepExecution);

		return result;
	}

});

stepOperations.iterate() 方法传入StepContextRepeatCallback的一个匿名对象。

继续debug,转了一圈又回到 这个回调方法上StepContextRepeatCallback.doInChunkContext(),看这个方法 :

result = new TransactionTemplate(transactionManager, transactionAttribute)
	.execute(new ChunkTransactionCallback(chunkContext, semaphore));

new TransactionTemplate().execute()方法忽略过去,在这个方法里面最终还要 调用ChunkTransactionCallback.doInTransaction()的回调方法,看下面:

ChunkTransactionCallback是 TaskletStep 的内部类:

ChunkTransactionCallback.doInTransaction():

@Override
public RepeatStatus doInTransaction(TransactionStatus status) {
	RepeatStatus result = RepeatStatus.CONTINUABLE;
	result = tasklet.execute(contribution, chunkContext);
	return result;
}

这个tasklet 的类型是 :org.springframework.batch.core.step.item.ChunkOrientedTasklet@3e521715,ChunkOrientedTasklet 其父类 Tasklet,定义step的执行策略。

ChunkOrientedTasklet.execute() 方法:

@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

	@SuppressWarnings("unchecked")
	Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
	if (inputs == null) {
		inputs = chunkProvider.provide(contribution);
		if (buffering) {
			chunkContext.setAttribute(INPUTS_KEY, inputs);
		}
	}

	chunkProcessor.process(contribution, inputs);
	chunkProvider.postProcess(contribution, inputs);

	// Allow a message coming back from the processor to say that we
	// are not done yet
	if (inputs.isBusy()) {
		logger.debug("Inputs still busy");
		return RepeatStatus.CONTINUABLE;
	}

	chunkContext.removeAttribute(INPUTS_KEY);
	chunkContext.setComplete();

	logger.debug("Inputs not busy, ended: " + inputs.isEnd());
	return RepeatStatus.continueIf(!inputs.isEnd());

}

好了,Chunk<I> inputs,就是要读入的数据,他是怎么来的 :

inputs = chunkProvider.provide(contribution);

chunkProvider 是 [email protected]529dc 的类型,provide方法:

@Override
public Chunk<I> provide(final StepContribution contribution) throws Exception {

	final Chunk<I> inputs = new Chunk<I>();
	repeatOperations.iterate(new RepeatCallback() {

		@Override
		public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
			I item = null;
			try {
				item = read(contribution, inputs);
			}
			catch (SkipOverflowException e) {
				// read() tells us about an excess of skips by throwing an
				// exception
				return RepeatStatus.FINISHED;
			}
			if (item == null) {
				inputs.setEnd();
				return RepeatStatus.FINISHED;
			}
			inputs.add(item);
			contribution.incrementReadCount();
			return RepeatStatus.CONTINUABLE;
		}

	});

	return inputs;

}

在provide 方法内,逐条读取 一条数据,然后放入inputs 结合,而不是一次读取多条。

数据读完了(此commit内)接下来就要处理inputs 数据了,到chunkProcessor.process(contribution, inputs);方法

@Override
public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {

	// Allow temporary state to be stored in the user data field
	initializeUserData(inputs);

	// If there is no input we don‘t have to do anything more
	if (isComplete(inputs)) {
		return;
	}

	// Make the transformation, calling remove() on the inputs iterator if
	// any items are filtered. Might throw exception and cause rollback.
	Chunk<O> outputs = transform(contribution, inputs);

	// Adjust the filter count based on available data
	contribution.incrementFilterCount(getFilterCount(inputs, outputs));

	// Adjust the outputs if necessary for housekeeping purposes, and then
	// write them out...
	write(contribution, inputs, getAdjustedOutputs(inputs, outputs));

}

在chunkProcessor.process 方法内定义了 两个步骤,一个是transform,也就是 process 读入的数据集合,一个是write。

代码跳到transform这:

protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
	Chunk<O> outputs = new Chunk<O>();
	for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
		final I item = iterator.next();
		O output;
		try {
			output = doProcess(item);
		}
		catch (Exception e) {
			/*
			 * For a simple chunk processor (no fault tolerance) we are done
			 * here, so prevent any more processing of these inputs.
			 */
			inputs.clear();
			throw e;
		}
		if (output != null) {
			outputs.add(output);
		}
		else {
			iterator.remove();
		}
	}
	return outputs;
}

对于 inputs 集合 ,遍历,处理后得到 outputs 集合。

output = doProcess(item);

上面这行代码就是要调用自定义的process 方法进行处理。

然后看write 方法是如何写入数据的,

protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws Exception {
	try {
		doWrite(outputs.getItems());
	}
	catch (Exception e) {
		/*
		 * For a simple chunk processor (no fault tolerance) we are done
		 * here, so prevent any more processing of these inputs.
		 */
		inputs.clear();
		throw e;
	}
	contribution.incrementWriteCount(outputs.size());
}

在write 方法内调用了 doWrite() 方法 ,然后调用 自定义的 write 方法把 数据写入。

好的 ,上面就是 在一次commit-interval 内的主要过程,和主要的逻辑代码。那么 spring batch 是如何 重复commit-interval 的呢 (RepeatTemplate)?以后再详细说来。

=========================END=========================

时间: 2024-08-30 11:29:30

Spring Batch_JOB执行流程分析的相关文章

03 spring security执行流程分析

spring security主要是依赖一系列的Filter来实现权限验证的,责任链设计模式是跑不了的.下面简单记录一下spring操作这些Filter的过程. 1. WebSecurityConfiguration.java 该类是spring security的一个配置类,里面定了一系列的Bean,咱主要是看springSecurityFilterChain这个bean, 就是它创建了FilterChain. @Bean(name = AbstractSecurityWebApplicati

Spring MVC 执行流程分析

Spring MVC 的执行流程图 原文地址:https://www.cnblogs.com/wbyixx/p/10290491.html

转 深入浅出Mybatis系列(十)---SQL执行流程分析(源码篇)

深入浅出Mybatis系列(十)---SQL执行流程分析(源码篇) 最近太忙了,一直没时间继续更新博客,今天忙里偷闲继续我的Mybatis学习之旅.在前九篇中,介绍了mybatis的配置以及使用, 那么本篇将走进mybatis的源码,分析mybatis 的执行流程, 好啦,鄙人不喜欢口水话,还是直接上干活吧: 1. SqlSessionFactory 与 SqlSession. 通过前面的章节对于mybatis 的介绍及使用,大家都能体会到SqlSession的重要性了吧, 没错,从表面上来看,

Spring Core Container 源码分析三:Spring Beans 初始化流程分析

前言 本文是笔者所著的 Spring Core Container 源码分析系列之一: 本篇文章主要试图梳理出 Spring Beans 的初始化主流程和相关核心代码逻辑: 本文转载自本人的私人博客,伤神的博客: http://www.shangyang.me/2017/04/01/spring-core-container-sourcecode-analysis-beans-instantiating-process/ 本文为作者的原创作品,转载需注明出处: 源码分析环境搭建 参考 Sprin

Java Servlet(十二):Servlet、Listener、Filter之间的执行流程分析

时隔几年后,看到本系列文章讲解的内容缺少了不少内容:周末无事分析了Spring Security是如何被集成到Web Servlet(SpringMVC)时,需要重新理清Filter.Listener.Servlet(SpringMVC#DispatcherServlet)之间的执行顺序,于是就有了本篇文章.这个话题是Web Servlet学习中的一个重点,弄清它们之间的执行流程,有助于理解SpringMVC.Spring Security这些框架是否如何与Web Servlet集成到一起. 原

Hive SQL执行流程分析

转自 http://www.tuicool.com/articles/qyUzQj 最近在研究Impala,还是先回顾下Hive的SQL执行流程吧. Hive有三种用户接口: cli (Command line interface) bin/hive或bin/hive –service cli 命令行方式(默认) hive-server/hive-server2 bin/hive –service hiveserver 或bin/hive –service hiveserver2 通过JDBC/

wget www.baidu.com执行流程分析

通过GDB分析程序的执行流程如下: main.c(main) url_parse:解析url,获取url相关信息,返回结构体 struct url 的指针,存于 url_parsed retrieve_url:主要参数 url_parsed ,下载文件,下载网页的关键函数 retr.c(retrieve_url) http_loop,通过 HTTP 下载指定文件 http.c(http_loop) gethttp, 获取文档 http.c(gethttp) connect_to_host:给定域

ThinkPHP 框架执行流程分析

总体来说,应用的流程涉及到几个文件:Index.phpThinkPHP.phpThink.class.phpApp.class.phpDispatcher.class.phpThinkPHP/Mode/common.phpReadHtmlBehavior.class.phpRoute.class.phpHook.class.phpContentReplaceBehavior.class.phpWriteHtmlCacheBehavior.class.php ThinkPHP框架开发的应用的标准执

深入浅出Mybatis系列(十)---SQL执行流程分析(源码篇)

原文地址:http://www.cnblogs.com/dongying/p/4142476.html 最近太忙了,一直没时间继续更新博客,今天忙里偷闲继续我的Mybatis学习之旅.在前九篇中,介绍了mybatis的配置以及使用, 那么本篇将走进mybatis的源码,分析mybatis 的执行流程, 好啦,鄙人不喜欢口水话,还是直接上干活吧: 1. SqlSessionFactory 与 SqlSession. 通过前面的章节对于mybatis 的介绍及使用,大家都能体会到SqlSession