Spring Batch_Parallel Steps_使用并行的Step

spring 官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html#scalabilityParallelSteps

As long as the application logic that needs to be parallelized can be split into distinct responsibilities, and assigned to individual steps then it can be parallelized in a single process. Parallel Step execution is easy to configure and use, for example, to execute steps (step1,step2) in parallel with step3, you could configure a flow like this:

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" parent="s1" next="step2"/>
            <step id="step2" parent="s2"/>
        </flow>
        <flow>
            <step id="step3" parent="s3"/>
        </flow>
    </split>
    <step id="step4" parent="s4"/>
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

The configurable "task-executor" attribute is used to specify which TaskExecutor implementation should be used to execute the individual flows. The default is SyncTaskExecutor, but an asynchronous TaskExecutor is required to run the steps in parallel. Note that the job will ensure that every flow in the split completes before aggregating the exit statuses and transitioning.

多个step之间的并行化,可以提高批处理的效率。什么情况下可以应用step之间的并行化,那就要根据具体的业务需求来定。

那我们假设有这样一种场景:有一类数据,分别存在于文件和数据库,数据的内容一样,只是形式不一样,那么我们可以定义并行的step来分别处理来自文件的数据和来自数据库的数据,然后,分别进行同样的processor,然后写入数据库。

下面我们就刚才讲的那种场景实现我们并行的step,如下:

spring-batch-split.xml

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
	<!-- 包的扫描 -->
	<context:component-scan base-package="com.lyx.batch" />

	<bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" />

	<batch:step id="abstractStep" abstract="true">
		<batch:listeners>
			<batch:listener ref="exceptionHandler" />
		</batch:listeners>
	</batch:step>
	<bean id="abstractCursorReader" abstract="true"
		class="org.springframework.batch.item.database.JdbcCursorItemReader">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
	<!-- 并行的step -->
	<batch:job id="addPeopleDescJob">
		<batch:split id="split1" task-executor="taskExecutor">
			<batch:flow>
				<batch:step id="parallel_step_1">
					<batch:tasklet>
						<batch:chunk reader="peopleAddDescReader_db"
							processor="addDescProcessor" writer="addDescPeopleWriter"
							commit-interval="10" />
					</batch:tasklet>
				</batch:step>
			</batch:flow>
			<batch:flow>
				<batch:step id="parallel_step_2">
					<batch:tasklet>
						<batch:chunk reader="peopleAddDescReader_file"
							processor="addDescProcessor" writer="addDescPeopleWriter"
							commit-interval="10" />
					</batch:tasklet>
				</batch:step>
			</batch:flow>
		</batch:split>
	</batch:job>

	<!-- 从数据库读取数据的reader -->
	<bean id="peopleAddDescReader_db" parent="abstractCursorReader"
		scope="step">
		<property name="sql">
			<value><![CDATA[select first_name ,last_name from people where 
			first_name like ? or last_name like ?]]></value>
		</property>
		<property name="rowMapper" ref="peopleRowMapper" />
		<property name="preparedStatementSetter" ref="preparedStatementSetter" />
		<property name="fetchSize" value="20" />
	</bean>
	<bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" />
	<bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" />
	<bean id="addDescProcessor" class="com.lyx.batch.AddPeopleDescProcessor" />
	<bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" />

	<!-- 从文件中读取数据的reader -->
	<bean id="lineTokenizer"
		class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
		<property name="delimiter" value="," />
		<property name="names">
			<list>
				<value>firstName</value>
				<value>lastName</value>
			</list>
		</property>
	</bean>
	<bean id="fieldSetMapper"
		class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
		<property name="prototypeBeanName" value="people" />
	</bean>
	<bean id="people" class="com.lyx.batch.People" scope="prototype" />
	<bean id="lineMapper"
		class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
		<property name="lineTokenizer" ref="lineTokenizer" />
		<property name="fieldSetMapper" ref="fieldSetMapper" />
	</bean>
	<bean id="resource" class="org.springframework.core.io.ClassPathResource">
		<constructor-arg index="0" type="java.lang.String"
			value="sample-data.csv" />
	</bean>
	<bean id="peopleAddDescReader_file" class="org.springframework.batch.item.file.FlatFileItemReader">
		<property name="resource" ref="resource" />
		<property name="encoding" value="utf-8" />
		<property name="lineMapper" ref="lineMapper" />
	</bean>
	<!--从文件读取数据的reader end -->

	<!--tomcat jdbc pool数据源配置 -->
	<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
		destroy-method="close">
		<property name="poolProperties">
			<bean class="org.apache.tomcat.jdbc.pool.PoolProperties">
				<property name="driverClassName" value="com.mysql.jdbc.Driver" />
				<property name="url" value="jdbc:mysql://localhost:3306/test" />
				<property name="username" value="root" />
				<property name="password" value="034039" />
			</bean>
		</property>
	</bean>

	<!-- spring batch 配置jobRepository -->
	<batch:job-repository id="jobRepository"
		data-source="dataSource" transaction-manager="transactionManager"
		isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_"
		max-varchar-length="1000" />
	<!-- spring的事务管理器 -->
	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- batch luncher -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>
</beans>

以上配置如果略显突兀,请在前面一系列spring batch的文章中找到出处

以上就是step 并行化的主要配置。。

AppMain7.java

package com.lyx.batch;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 测试并行的job
 * 
 * @author Lenovo
 *
 */
public class AppMain7 {

	public static void main(String[] args)
			throws JobExecutionAlreadyRunningException, JobRestartException,
			JobInstanceAlreadyCompleteException, JobParametersInvalidException {

		long startTime = System.currentTimeMillis(); // 获取开始时间

		@SuppressWarnings("resource")
		ApplicationContext context = new ClassPathXmlApplicationContext(
				new String[] { "classpath:spring-batch-split.xml" });
		JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
		Job job = (Job) context.getBean("addPeopleDescJob");
		JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
		JobExecution result = launcher.run(job,
				jobParametersBuilder.toJobParameters());
		ExitStatus es = result.getExitStatus();
		if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
			System.out.println("任务正常完成");
		} else {
			System.out.println("任务失败,exitCode=" + es.getExitCode());
		}

		long endTime = System.currentTimeMillis(); // 获取结束时间
		System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
	}

}

=====================END=====================

时间: 2024-08-05 10:13:06

Spring Batch_Parallel Steps_使用并行的Step的相关文章

Spring Batch_Multi-threaded Step_使用多线程的Step

spring官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html The simplest way to start parallel processing is to add a TaskExecutor to your Step configuration, e.g. as an attribute of the tasklet: <step id="loading">

Spring Batch_使用JdbcPagingItemReader_多线程的Step

我们最经常使用的就是 JdbcCursorItemReader,使用游标的方式 逐条数据的读取.但是 从spring 官方文档我们知道 ,他不是线程安全的.在这里,我们使用 JdbcPagingItemReader 从数据库读取数据,并且是分页的读,而且这个类是线程安全的,那么我们就可以使用多线程的Step,从而提高 JOB 的执行效率. 下面是主要的配置文件: <beans xmlns="http://www.springframework.org/schema/beans"

Spring Batch学习笔记二

此系列博客皆为学习Spring Batch时的一些笔记: Spring Batch的架构 一个Batch Job是指一系列有序的Step的集合,它们作为预定义流程的一部分而被执行: Step代表一个自定义的工作单元,它是Job的主要构件块:每一个Step由三部分组成:ItemReader.ItemProcessor.ItemWriter:这三个部分将执行在每一条被处理的记录上,ItemReader读取每一条记录,然后传递给ItemProcessor处理,最后交给ItemWriter做持久化:It

Spring Batch学习(二)架构

Spring Batch的架构 一个Batch Job是指一系列有序的Step的集合,它们作为预定义流程的一部分而被执行: Step代表一个自定义的工作单元,它是Job的主要构件块:每一个Step由三部分组成:ItemReader.ItemProcessor.ItemWriter:这三个部分将执行在每一条被处理的记录上,ItemReader读取每一条记录,然后传递给ItemProcessor处理,最后交给ItemWriter做持久化:ItemProcessor不是必须的,一个Step可以仅仅包含

spring integration 概述

Spring Integration Extends the Spring programming model to support the well-known Enterprise Integration Patterns. Spring Integration enables lightweight messaging within Spring-based applications and supports integration with external systems via de

OSGI与Spring结合开发web工程

简介: 作为一个新的事实上的工业标准,OSGi 已经受到了广泛的关注, 其面向服务(接口)的基本思想和动态模块部署的能力, 是企业级应用长期以来一直追求的目标.Spring 是一个著名的 轻量级 J2EE 开发框架,其特点是面向接口编程和非侵入式的依赖注入.将 OSGi 和 Spring 结合能充分发挥二者各自的特长,更好地满足企业级应用开发的需求.Spring 开发组织在 2008 年发布了将 OSGi 和 Spring 结合的第一个版本:Spring-DM.本文通过一个简单实例,介绍如何利用

Springboot系列:@SpringBootApplication注解

在使用 Springboot 框架进行开发的时候,通常我们会在 main 函数上添加 @SpringBootApplication 注解,今天为大家解析一下 @SpringBootApplication,如有不正之处,欢迎批评指正. @SpringBootApplication @SpringBootApplication源码如下: @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inher

Spring Batch_Configuring a Step for Restart

spring官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/configureStep.html#stepRestart 当一个普通的 job 处于complete 的状态的时候,是不能被restart的.下面看如何使一个job 能够被 restart . restartable="true" <job id="footballJob" restartable="false&q

spring security step by step

First we can see the folder structure. Step 1  : Create a new Maven Project. Step 2 : Add below dependencies jar to the project. But in my pom.xml I have only add below jars One Note here, it is a knowledge here as I only add these but these jars wil