走进企业级批处理框架--Springbatch

Springbatch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch可以提供大量的,可重复的数据处理功能,包括日志记录/跟踪,事务管理,作业处理统计工作重新启动、跳过,和资源管理等重要功能。它能使业务人员专注于核心业务的开发,而将重复性的耗时工作交给系统自动处理。如数据的倒入,导出,数据的复制等工作。本文将通过一个简单的文件复制的小例子介绍SpringBatch的工作原理。首先来看相关的核心代码和配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
		http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
		http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd"
	default-autowire="byName">
	<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>
	<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
	</bean>
	<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
	<batch:job id="iconvJob">
		<batch:step id="iconvStep">
			<batch:tasklet transaction-manager="transactionManager">
				<batch:chunk reader="iconvItemReader" writer="iconvItemWriter" processor="iconvItemProcessor"
					commit-interval="1" />
			</batch:tasklet>
		</batch:step>
	</batch:job>
	<context:property-placeholder location="classpath:files.properties" />
	<bean id="iconvItemReader" class="com.inetpsa.batch.iconv.IconvItemReader">
		<property name="input" value="file:${input.file}" />
		<property name="charset" value="UTF-8" />
	</bean>
	<bean id="iconvItemWriter" class="com.inetpsa.batch.iconv.IconvItemWriter">
		<property name="output" value="file:${output.file}" />
		<property name="charset" value="UTF-8" />
	</bean>
	<bean id="iconvItemProcessor" class="com.inetpsa.batch.iconv.IconvItemProcessor"/>
</beans>

springbatch的核心配置都体现在了这个配置文件中,每一个batch都会包含有一个(或多个job),每一个job中都定义了我们所要完成这个job所要执行的步骤,也就是job中的step,每一个step的完成都需要相应的持久化机制的支持,而jobRepository担当的就是持久化机制提供者的身份。体现在配置文件中,jobLauncher所起的作用就是由外部控制器调用开启一个job,当一个job开启之后就进入实质性的step执行阶段每一个step的执行都是由ItemReader首先读取数据然后返回给ItemProcessor进行处理然后返回给ItemWriter进行输出。我们可以显示的给每一个Step定义ItemReader,ItermProcessor和ItemWriter.如同配置文件中所定义的那样,我们制定了三个类作分别集成相应的基类用于实现自定义的读取,处理和输出。下面来看具体的实现。

public class ShellItemReader implements ItemReader<InputStream> {

	private String input;
	private InputStream item = null;

	public InputStream read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
		if (this.item == null) {
			this.item = this.input == null ? System.in : new URL(this.input).openStream();
			return this.item;
		}
		return null;
	}

	public void setInput(String input) {
		this.input = input;
	}
}

ShellItemReader类读取配置文件中配置的文件,然后将输入流返回给ShellIterProcessor进行处理:

public class ShellItemProcessor implements ItemProcessor<InputStream, InputStream> {

	private List<String> command;
	public InputStream process(InputStream item) throws Exception {
		final ProcessBuilder pb = new ProcessBuilder(this.command);
		pb.redirectErrorStream(true);
		final Process process = pb.start();
		IOUtils.copy(item, process.getOutputStream());
		IOUtils.closeQuietly(item);
		IOUtils.closeQuietly(process.getOutputStream());
		return process.getInputStream();
	}

	public void setCommand(List<String> command) {
		this.command = command;
	}
}

ShellItemProcessor将输入流进行处理然后返回给ShellItemWriter进行输出:

public class ShellItemWriter implements ItemWriter<InputStream> {

	private String output;

	public void setOutput(String output) {
		this.output = output;
	}

	public void write(List<? extends InputStream> items) throws Exception {
		OutputStream os = System.out;
		if (this.output != null) {
			final URL url = new URL(this.output);
			if (url.getProtocol().equals("file")) {
				os = new FileOutputStream(url.getPath());
			} else {
				os = url.openConnection().getOutputStream();
			}
		}
		for (final InputStream is : items) {
			IOUtils.copy(is, os);
			IOUtils.closeQuietly(is);
		}
		IOUtils.closeQuietly(os);
	}

}

如果是对数据库进行或文件读取数据的时候,ShellItemReader的read()操作每次都会读取一条记录然后交给ShellItemProcessor进行操作,当处理的记录数达到配置的commit-interval值的时候将处理后的数据交给ShellItermWriter进行一次数据。在执行每个Step的时候我们同样配置了事务处理,以便在程序出错的时候进行回滚。以上这些就是springbatch执行的简单流程。

时间: 2024-10-11 21:01:31

走进企业级批处理框架--Springbatch的相关文章

SpringBatch批处理框架+mysql仓库+web监控实录

1.概念 Spring Batch 是一款轻量级地适合企业级应用的批处理框架,值得注意的是,不同于其他调度框架,Spring Batch不提供调度功能. 2.批处理过程 批处理可以分为以下几个步骤: 读取数据 按照业务处理数据 归档数据的过程 3.Spring Batch给我们提供了什么? 统一的读写接口 丰富的任务处理方式 灵活的事务管理及并发处理 日志.监控.任务重启与跳过等特性 4.基础组件 名称 用途 JobRepository 用于注册和存储Job的容器 JobLauncher 用于启

【转】大数据批处理框架 Spring Batch全面解析

如今微服务架构讨论的如火如荼.但在企业架构里除了大量的OLTP交易外,还存在海量的批处理交易.在诸如银行的金融机构中,每天有3-4万笔的批处理作业需要处理.针对OLTP,业界有大量的开源框架.优秀的架构设计给予支撑:但批处理领域的框架确凤毛麟角.是时候和我们一起来了解下批处理的世界哪些优秀的框架和设计了,今天我将以Spring Batch为例,和大家一起探秘批处理的世界.初识批处理典型场景探秘领域模型及关键架构实现作业健壮性与扩展性批处理框架的不足与增强批处理典型业务场景对账是典型的批处理业务处

异步并行批处理框架设计的一些思考(转)

随着互联网信息技术日新月异的发展,一个海量数据爆炸的时代已经到来.如何有效地处理.分析这些海量的数据资源,成为各大技术厂商争在激烈的竞争中脱颖而出的一个利器.可以说,如果不能很好的快速处理分析这些海量的数据资源,将很快被市场无情地所淘汰.当然,处理分析这些海量数据目前可以借鉴的方案有很多:首先,在分布式计算方面有Hadoop里面的MapReduce并行计算框架,它主要针对的是离线的数据挖掘分析.此外还有针对实时在线流式数据处理方面的,同样也是分布式的计算框架Storm,也能很好的满足数据实时性分

异步并行批处理框架设计的一些思考

随着互联网信息技术日新月异的发展,一个海量数据爆炸的时代已经到来.如何有效地处理.分析这些海量的数据资源,成为各大技术厂商争在激烈的竞争中脱颖而出的一个利器.可以说,如果不能很好的快速处理分析这些海量的数据资源,将很快被市场无情地所淘汰.当然,处理分析这些海量数据目前可以借鉴的方案有很多:首先,在分布式计算方面有Hadoop里面的MapReduce并行计算框架,它主要针对的是离线的数据挖掘分析.此外还有针对实时在线流式数据处理方面的,同样也是分布式的计算框架Strom,也能很好的满足数据实时性分

万树IT:Spring Batch批处理框架技巧,让你不再重复造轮子

整理了Spring批处理框架的内容,掌握这些知识,可以帮你省去一些造轮子的过程,提高开发效率.本文由博主姚兆峰分享,小编整理后推送,希望对你的工作有帮助. Part.1 问题分析 在大型的企业应用中,或多或少都会存在大量的任务需要处理,如邮件批量通知所有将要过期的会员等等.而在批量处理任务的过程中,又需要注意很多细节,如任务异常.性能瓶颈等等.那么,使用一款优秀的框架总比我们自己重复地造轮子要好得多一些. AD 我所在的物联网云平台部门就有这么一个需求,需要实现批量下发命令给百万设备.为了防止枯

企业级应用框架(二)三层架构之数据访问层的封装与抽象

接上一篇我们来对数据访问层进行封装与抽象.在上一篇我们知道,要解除BLL对DAL的依赖,我们就必须抽象出DAL层的接口,同时基于DAL的数据访问技术很多,如EF,ADO.NET,LINQ TO SQL,因此,我们的数据访问层必须对这些技术提供相应的支持.所以今天我们要做的事情有两件,第一,定义我们的数据访问层接口:第二,屏蔽各类数据库访问技术的差异,提供统一的数据库访问模型.举个例子,我们只需要修改一下我们的配置文件,就能够把ADO.NET的实现方式,改变成EF的实现方式.好下面搭建我们的三层构

企业级应用框架(三)三层架构之数据访问层的改进以及测试DOM的发布

在上一篇我们在宏观概要上对DAL层进行了封装与抽象.我们的目的主要有两个:第一,解除BLL层对DAL层的依赖,这一点我们通过定义接口做到了:第二,使我们的DAL层能够支持一切数据访问技术,如Ado.net,EF,linq To Sql,这一点我们实现的不是很完美,仍有很大的改进空间,本文将加以改进. 在此之前我们来看一下我们最新的dom(PS:经过两天的赶工,我们的dom已经相对成熟,其中BLL层已经被我高度抽象化了,并且引进了业务上文文的概念:DAL层除了具体的技术实现尚为完成,其他方面已经相

图书简介:Spring Batch批处理框架

大数据时代批处理利器,国内首度原创解析Spring Batch框架. 内容简介: <Spring Batch 批处理框架>全面.系统地介绍了批处理框架Spring Batch,通过详尽的实战示例向读者展示了Spring Batch框架对大数据批处理的基本开发能力,并对框架的架构设计.源码做了特定的剖析:在帮助读者掌握Spring Batch框架基本功能.高级功能的同时,深入剖析了Spring Batch框架的设计原理,帮助读者可以游刃有余地掌握Spring Batch框架. <Sprin

企业级应用框架(一) 三层架构之解耦

前言 前段时间朋友拿了个网站给我,让我帮忙添加几个小功能,我爽快的答应了,但是当我打开源码,我瞬间就奔溃了,整个项目连最基本的三层框架也没有搭建,仅仅是封装了一个sqlhelp作为数据库的操作接口,项目中的SQL查询语句无处不在,业务逻辑紧紧耦合在UI逻辑中,看到这样的代码,坦白来说,我什么兴致都没有了,但是碍着人情,我硬着头皮,把基本功能的完成交差,通过这件事情,我对软件分层进行了深入的思考. 三层架构 说到三层架构,大伙都很熟悉,我也不再多啰嗦了,我们直接快速搭建一个. 项目的引用关系是:S