Spring Batch_使用JdbcPagingItemReader_多线程的Step

我们最经常使用的就是 JdbcCursorItemReader,使用游标的方式 逐条数据的读取。但是 从spring 官方文档我们知道 ,他不是线程安全的。在这里,我们使用 JdbcPagingItemReader

从数据库读取数据,并且是分页的读,而且这个类是线程安全的,那么我们就可以使用多线程的Step,从而提高 JOB 的执行效率。

下面是主要的配置文件:

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
	<!-- 包的扫描 -->
	<context:component-scan base-package="com.lyx.batch" />

	<bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" />

	<batch:step id="abstractStep" abstract="true">
		<batch:listeners>
			<batch:listener ref="exceptionHandler" />
		</batch:listeners>
	</batch:step>
	<bean id="abstractJdbcPagingItemReader" abstract="true"
		class="org.springframework.batch.item.database.JdbcPagingItemReader">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- add people desc job begin -->
	<batch:job id="addPeopleDescJob">
		<batch:step id="addDescStep" parent="abstractStep">
			<batch:tasklet>
				<batch:chunk reader="peopleAddDescReader" processor="addDescProcessor"
					writer="addDescPeopleWriter" commit-interval="2" />
			</batch:tasklet>
		</batch:step>
	</batch:job>
	<!-- add people desc job end -->

	<!-- 使用分页的reader begin -->
	<bean id="peopleAddDescReader" parent="abstractJdbcPagingItemReader"
		scope="step">
		<property name="queryProvider">
			<bean
				class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
				<property name="dataSource" ref="dataSource" />
				<property name="selectClause" value="select person_id, first_name, last_name" />
				<property name="fromClause" value="from people" />
				<property name="whereClause"
					value="where ( first_name like :first_name or last_name like :last_name ) " />
				<property name="sortKey" value="person_id" />
			</bean>
		</property>
		<property name="parameterValues">
			<map>
				<entry key="first_name" value="#{jobParameters[‘first_name‘]}" />
				<entry key="last_name" value="#{jobParameters[‘last_name‘]}" />
			</map>
		</property>
		<!-- 配置limit的大小 -->
		<property name="pageSize" value="2" />
		<property name="rowMapper" ref="peopleRowMapper" />
	</bean>
	<!-- 使用分页的reader end -->

	<bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" />
	<bean id="addDescProcessor" class="com.lyx.batch.AddPeopleDescProcessor" />
	<bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" />

	<!--tomcat jdbc pool数据源配置 -->
	<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
		destroy-method="close">
		<property name="poolProperties">
			<bean class="org.apache.tomcat.jdbc.pool.PoolProperties">
				<property name="driverClassName" value="com.mysql.jdbc.Driver" />
				<property name="url" value="jdbc:mysql://localhost:3306/test" />
				<property name="username" value="root" />
				<property name="password" value="034039" />
			</bean>
		</property>
	</bean>

	<!-- spring batch 配置jobRepository -->
	<batch:job-repository id="jobRepository"
		data-source="dataSource" transaction-manager="transactionManager"
		isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_"
		max-varchar-length="1000" />
	<!-- spring的事务管理器 -->
	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- batch luncher -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>
</beans>

主要的配置就是:

<!-- 使用分页的reader begin -->
<bean id="peopleAddDescReader" parent="abstractJdbcPagingItemReader"
	scope="step">
	<property name="queryProvider">
		<bean
			class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
			<property name="dataSource" ref="dataSource" />
			<property name="selectClause" value="select person_id, first_name, last_name" />
			<property name="fromClause" value="from people" />
			<property name="whereClause"
				value="where ( first_name like :first_name or last_name like :last_name ) " />
			<property name="sortKey" value="person_id" />
		</bean>
	</property>
	<property name="parameterValues">
		<map>
			<entry key="first_name" value="#{jobParameters[‘first_name‘]}" />
			<entry key="last_name" value="#{jobParameters[‘last_name‘]}" />
		</map>
	</property>
	<!-- 配置limit的大小 -->
	<property name="pageSize" value="2" />
	<property name="rowMapper" ref="peopleRowMapper" />
</bean>
<!-- 使用分页的reader end -->

其他类的从前面的文章找出处,下面是我为了调试 与前面不同的类

PeopleRowMapper.java

package com.lyx.batch;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;

public class PeopleRowMapper implements RowMapper<People> {

	public People mapRow(ResultSet rs, int rowNum) throws SQLException {
		People p = new People();
		System.out.println("-----------------------person_id-"
				+ rs.getInt("person_id"));
		p.setId(rs.getInt("person_id"));
		p.setFirstName(rs.getString("first_name"));
		p.setLastName(rs.getString("last_name"));
		return p;
	}

}

运行程序

AppMain14.java

package com.lyx.batch;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 测试 使用分页的 reader
 * 
 * @author Lenovo
 *
 */
public class AppMain14 {

	public static void main(String[] args)
			throws JobExecutionAlreadyRunningException, JobRestartException,
			JobInstanceAlreadyCompleteException, JobParametersInvalidException {

		long startTime = System.currentTimeMillis(); // 获取开始时间

		@SuppressWarnings("resource")
		ApplicationContext context = new ClassPathXmlApplicationContext(
				new String[] { "classpath:spring-batch-paging.xml" });
		JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
		jobParametersBuilder.addString("first_name", "%JOHN%");
		jobParametersBuilder.addString("last_name", "%DOE%");
		Job job = (Job) context.getBean("addPeopleDescJob");
		JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
		JobExecution result = launcher.run(job,
				jobParametersBuilder.toJobParameters());
		ExitStatus es = result.getExitStatus();
		if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
			System.out.println("任务正常完成");
		} else {
			System.out.println("任务失败,exitCode=" + es.getExitCode());
		}

		long endTime = System.currentTimeMillis(); // 获取结束时间
		System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
	}

}

运行结果:

-----------------------person_id-157

-----------------------person_id-158

process people desc

process people desc

write people desc

write people desc

任务正常完成

程序运行时间: 11929ms

十一月 22, 2014 12:29:50 上午 org.springframework.batch.core.launch.support.SimpleJobLauncher run

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{first_name=%JOHN%, last_name=%DOE%}] and the following status: [COMPLETED]

最后是成功了。其实更重要的是JdbcPagingItemReader 的实现方式和源码。为什么 他是线程安全的,为什么他能分页读,这才是我们最终关心的。这里先不分析,到下篇文章再说。

这里我们使用的还是单线程的方式运行的该 job ,下面我们来配置多线程的 step。配置很简单。就是配置一个异步的spring task executor,使用该 异步 task executor 来运行我们的

job。。

看如下的配置:

<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
<!-- add people desc job begin -->
<batch:job id="addPeopleDescJob">
	<batch:step id="addDescStep" parent="abstractStep">
		<batch:tasklet task-executor="taskExecutor">
			<batch:chunk reader="peopleAddDescReader" processor="addDescProcessor"
				writer="addDescPeopleWriter" commit-interval="2" />
		</batch:tasklet>
	</batch:step>
</batch:job>
<!-- add people desc job end -->

这里的 taskExecutor 就是一个异步的 task executor,具体怎么实现,请看源码。。

下面 运行一下 多线程 的step。

运行结果:

-----------------------person_id-157

-----------------------person_id-158

process people desc

process people desc

write people desc

write people desc

十一月 22, 2014 1:01:35 上午 org.springframework.batch.core.launch.support.SimpleJobLauncher run

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{first_name=%JOHN%, last_name=%DOE%}] and the following status: [COMPLETED]

任务正常完成

程序运行时间: 8577ms

成功,好的,你是否注意到 和上面的单线程的step 比,是不是程序运行的时间要少了。

关于 JdbcPagingItemReader 的实现方式和其线程安全性,如何分页,JdbcPagingItemReader的分页策略我们在下面文章道来。

=================================END=================================

时间: 2024-10-24 04:11:11

Spring Batch_使用JdbcPagingItemReader_多线程的Step的相关文章

Spring Batch_Multi-threaded Step_使用多线程的Step

spring官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html The simplest way to start parallel processing is to add a TaskExecutor to your Step configuration, e.g. as an attribute of the tasklet: <step id="loading">

Spring Batch_官网DEMO实现

http://spring.io/guides/gs/batch-processing/ 使用spring xml方式实现了spring batch官网的demo,现在把具体的代码贴出来,具体的细节配置还要参考官网的说明. 首先建立maven项目,pom文件如下: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance&

Spring Batch_Parallel Steps_使用并行的Step

spring 官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html#scalabilityParallelSteps As long as the application logic that needs to be parallelized can be split into distinct responsibilities, and assigned to individual steps

Spring单实例、多线程安全、事务解析

原文:http://blog.csdn.net/c289054531/article/details/9196053 引言: 在使用Spring时,很多人可能对Spring中为什么DAO和Service对象采用单实例方式很迷惑,这些读者是这么认为的: DAO对象必须包含一个数据库的连接Connection,而这个Connection不是线程安全的,所以每个DAO都要包含一个不同的Connection对象实例,这样一来DAO对象就不能是单实例的了. 上述观点对了一半.对的是“每个DAO都要包含一个

Spring Boot教程10——多线程

Spring通过任务执行器(TaskExecutor)来实现多线程和并发编程.使用ThreadPoolTaskExecutor可实现一个基于线程池的TaskExcutor.而实际开发中任务一般是非阻碍的,即异步的,所以我们要在配置类中通过@EnableAsync开启对异步任务的支持,并通过在实际执行的Bean的方法中通过使用@Async注解来声明其是一个异步任务. 示例 1>.配置类 package com.wisely.highlight_spring4.ch3.taskexecutor; i

Spring事务与自定义多线程陷阱

场景:Spring+Ibatis环境,使用spring aop事务(配置到service层),在一个service方法中,自定义了一个多线程,结果事务不起作用了,不用线程,则事务有效. 原因:Spring的事务是通过ThreadLocal来保证线程安全的,事务和当前线程绑定,所以自己开了多线程自然会让事务失效. Spring的事务管理器是通过ThreadLocal来保存每个线程的副本,从而实现线程安全的,再结合IoC和Aop实现高级声明式事务的功能,所以Spring的事务天然地和线程有着千丝万缕

spring boot 集成kafka (多线程,消费者使用kafka的原生api实现,因为@KakfkaListener修改groupId无效)

application-test.properties 1 #kafka 2 kafka.consumer.zookeeper.connect=*:2181 3 kafka.consumer.servers=*:9092 4 kafka.consumer.enable.auto.commit=true 5 kafka.consumer.session.timeout=6000 6 kafka.consumer.auto.commit.interval=1000 7 #保证每个组一个消费者消费同一

Spring Boot入门教程大纲

Spring Boot教程之Spring基础 Spring Boot教程1--Spring概述 Spring Boot教程2--Spring项目快速搭建 Spring Boot教程3--Spring基础配置 Spring Boot教程之Spring常用配置 Spring Boot教程4--@Scope注解 Spring Boot教程5--Spring EL-Spring表达式语言 Spring Boot教程6--Bean的初始化和销毁 Spring Boot教程7--Profile Spring

spring integration 概述

Spring Integration Extends the Spring programming model to support the well-known Enterprise Integration Patterns. Spring Integration enables lightweight messaging within Spring-based applications and supports integration with external systems via de