陪你解读Spring Batch(二)带你入手Spring Batch

前言

  说得多不如show code。上一章简单介绍了一下Spring Batch。本章将从头到尾搭建一套基于Spring Batch(2.1.9)、Spring(3.0.5)、mybatis(3.4.5)、mysql、gradle的批处理简单应用来处理文件中大量交易数据的写入。

  那么这里简单定义以下交易文件的格式,一个txnId交易Id,一个amt交易金额。一天比如有100w交易数据过来要落表。文件大概长这样,只是简单定义以下,实际开发肯定不会那么少。

  因工作需求没有使用最新版本的Spring Batch,所以本章是基于XML config的例子。最新版本支持用Java Config配置Spring Batch Job、Job Scope等。有兴趣的同学可以自行研究一下。本人技术有限,本章讲的如有错误希望请指正。

2.1 项目依赖

  首先我们要引入Spring Batch的依赖,这里的版本是2.1.9

springbatch = ["org.springframework.batch:spring-batch-core:2.1.9.RELEASE",
                   "org.springframework.batch:spring-batch-infrastructure:2.1.9.RELEASE"]

  批量处理的过程中,我们都需要数据持久化。这里我用的数据库是mysql,ORM框架是mybatis。所以还要添加mysql-connect和mybatis的依赖

mybatis = "org.mybatis:mybatis:3.4.5"
mysqlconnect = "mysql:mysql-connector-java:5.1.25"
dbcp = "commons-dbcp:commons-dbcp:1.4"

  事务和数据库的配置就不用说了,必须的。

<!-- transaction config -->
    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="dataSource"/>
    </bean>

    <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://localhost/m_test_db"/>
        <property name="username" value="root"/>
        <property name="password" value="root"/>
    </bean>

2.2 配置Job

  上一章节说过,其实文件批处理的场景,抽象的处理三大步骤分为,读,处理,写。那么我们就依照这张图来开始

  上图如果看不懂的请看上一章节来理解。那么我们先建立一个Spring Batch任务的xml文件,然后定义Job

<!--data-source,transaction-manager 数据源以及事务管理器--><batch:job-repository id="jobRepository"                      data-source="dataSource" transaction-manager="transactionManager"                      isolation-level-for-create="SERIALIZABLE"                      table-prefix="DPL_" max-varchar-length="1000"/>
<batch:job id="investmentMatchFileJob"
               job-repository="jobRepository">
        <batch:step id="investmentMatchFileToDb">
            <batch:tasklet>
                <batch:chunk reader="txnListFileReader" writer="txnListResultWriter"
                             commit-interval="300"/>
            </batch:tasklet>
        </batch:step>
</batch:job>

  结合上面的图看,是不是找到点感觉了?一个Job可以有多个Step组合,每一个Step由开发者自己编写,可一把一个大Step分成多个小Step,完全看开发者意愿。每一个Step对应一个ItemReader、ItemProcessor和ItemWriter。所有的批处理框架都可以抽象成最简单的过程,读取数据,处理数据,写数据。所以Spring Batch提供了3个接口,ItemReader、ItemProcessor和ItemWriter。JobRepository则是记录Job、Step和发起Job的执行信息等。

  xml配置Job必须依赖的有三项,名称,JobRepository和Step列表。还有一个没介绍就是commit-interval属性,这就是控制读了多少行进行一次写。总不可能读一行写一行对吧?这里配置多少,那么Writer的入参list的size就是多少。

2.2.1 JobRepository

  JobRepository是记录Job、Step和发起Job的执行信息,SpringBatch一共会让你导入9张表,具体哪9张表请导入依赖然后查看schema-mysql.sql文件。

  这里要说明的一点是table-prefix属性,默认是以BATCH_开头的,你可以改变前缀,当然你的sql脚本的表名前缀也要改动。注意,这里只能改前缀,不可以改表的全名。表的列可以增加,比如说你的公司建表必须要有id,created_at,xxxx等字段的话,可以增加列,没有问题。但是原有列的名称不可以修改。脚本会在3张以SEQ结尾的表插入0,必须要先插入。

2.3 Step

2.3.1 Reader

  上面配置的reader是以下这个bean,value="file:#{jobParameters[‘txnListFile‘]}"。这里用到SPEL表达式,传入文件路径参数。FlatFileItemReader只能处理一个文件,实际使用中不可能只处理一个文件,所以你也可以导入下面那个叫MultiResourceItemReader类,通过给MultiResourceItemReader设置Resource数组可以实现一个Job读取一个目录下多个文件。但是这里注意,JobRepository不会记录每个文件的处理情况。

<bean id="txnListFileReader"
      class="org.springframework.batch.item.file.FlatFileItemReader"
      scope="step">
    <!--输入文件-->
    <property name="resource" value="file:#{jobParameters[‘txnListFile‘]}"/>
    <!--将每行映射为一个对象-->
    <property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
            <!--从划分的字段中构建一个对象-->
            <property name="fieldSetMapper" ref="InvestMatchItemMapper"/>
            <!--根据某种分隔符来分-->
            <property name="lineTokenizer" ref="TxnListItemMapperFileLineTokenizer"/>
        </bean>
    </property>
    <!--跳过开头的的一些行-->
    <property name="linesToSkip" value="1"/>
    <property name="encoding" value="UTF-8"/>
</bean>
<bean id="InvestMatchItemMapper" class="me.grimmjx.sync.TxnListItemMapper"/>
<bean id="TxnListItemMapperFileLineTokenizer"
      class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
    <property name="delimiter" value="|"/>
    <property name="names">
        <list>
            <value>txnId</value>
            <value>amt</value>
        </list>
    </property>
</bean>

<!--以下的内容是对一个目录下多个文件进行批处理的样例-->
<bean id="txnListFileReader"
      class="org.springframework.batch.item.file.MultiResourceItemReader"
      scope="step">
    <property name="resources" value="file:#{jobParameters[‘txnListFile‘]}/*.txt"/>
    <property name="delegate">
        <bean class="org.springframework.batch.item.file.FlatFileItemReader"
              scope="step">
            <property name="lineMapper">
                <bean
                        class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                    <property name="fieldSetMapper" ref="InvestMatchItemMapper"/>
                    <property name="lineTokenizer"
                              ref="TxnListItemMapperFileLineTokenizer"/>
                </bean>
            </property>
            <property name="linesToSkip" value="1"/>
            <property name="encoding" value="UTF-8"/>
        </bean>
    </property>
</bean>

  以下图来理解比较方便

  从xml配置来看,delimiter控制如何分割,names就是文件每一列的名字。在这么多配置里,我们只需要写一个Java类。这里就是从一行数据,转换成一个对象。

/**
 * @author GrimMjx
 * 交易记录匹配器类。
 */
public class TxnListItemMapper implements FieldSetMapper<TxnList>{

    @Override
    public TxnList mapFieldSet(FieldSet fieldSet) throws BindException {
        TxnList txnList = new TxnList();
        txnList.setTxnId(fieldSet.readString("txnId"));
        txnList.setAmt(fieldSet.readBigDecimal("amt"));

        return txnList;
    }
}

2.3.2 Writer

  writer的bean为

<bean id="txnListResultWriter" class="me.grimmjx.sync.TxnListResultWriter" scope="step"/>

  writer执行的是写入操作,我们要实现ItemWriter<T>接口,以下为这个类的Java代码。这里的操作很简单,将构建好的对象集合直接写入库。注意了,外面没有幂等的话,最好这里先判断库里有没有,不要无脑写入。

/**
 * @author GrimMjx
 * 交易数据写入类。
 */
public class TxnListResultWriter implements ItemWriter<TxnList> {
    @Autowired
    private TxnListMapper txnListMapper;

    @Override
    public void write(List<? extends TxnList> items) throws Exception {
        List<TxnList> txnLists = Lists.newArrayList();
        for (TxnList item : items) {
            txnLists.add(item);
        }
        txnListMapper.insertBatch(txnLists);
    }
}

2.4 启动Job

  这里先定义一个bean,与之前的Job相关联。

<bean id="DefaultFileProcessor" class="me.grimmjx.processor.DefaultFileProcessor">
        <property name="job" ref="investmentMatchFileJob"/>
        <property name="jobLauncher" ref="jobLauncher"/>
</bean>

  以下为这个processor的Java代码

/**
 * 默认文件处理器类。
 *
 * @author GrimMjx
 */
public class DefaultFileProcessor {
    /**
     * 批次job
     */
    protected Job job;

    /**
     * 任务启动器
     */
    protected JobLauncher jobLauncher;

    public void process() {
        String baseDir = "/Users/miaojiaxing/test/2019.01.31.txt";

        JobParametersBuilder builder = new JobParametersBuilder();
        builder.addString("txnListFile", baseDir);
        // 携带参数
//        builder.addString("packageCode", "12345");
        builder.addString("dateTime", System.currentTimeMillis() + "");
        JobParameters jobParas = builder.toJobParameters();

        try {
            jobLauncher.run(job, jobParas);
        } catch (Exception e) {
            throw new RuntimeException("Run springBatchJob meet error", e);
        }
    }

    public void setJob(Job job) {
        this.job = job;
    }

    public void setJobLauncher(JobLauncher jobLauncher) {
        this.jobLauncher = jobLauncher;
    }

}

  最后我们试试

/**
 * @author GrimMjx
 * <p>
 * 测试类。
 */
public class MainTest {

    public static void main(String[] args) {
        ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
        DefaultFileProcessor bean = ctx.getBean("DefaultFileProcessor", DefaultFileProcessor.class);
        bean.process();
        DefaultFileProcessor rereadProcessor = ctx.getBean("rereadProcessor", DefaultFileProcessor.class);
        rereadProcessor.process();
    }
}

  没有问题。

  下一章节将结合校验清洗、异常弹性处理、并行配置附上代码。

原文地址:https://www.cnblogs.com/GrimMjx/p/10415667.html

时间: 2024-08-01 13:16:56

陪你解读Spring Batch(二)带你入手Spring Batch的相关文章

Spring(二十一):Spring JdbcTemplate

JdbcTemplate主要提供以下五类方法: execute方法:可以用于执行任何SQL语句,一般用于执行DDL语句: update方法及batchUpdate方法:update方法用于执行新增.修改.删除等语句:batchUpdate方法用于执行批处理相关语句: query方法及queryForXXX方法:用于执行查询相关语句: call方法:用于执行存储过程.函数相关语句. JdbcTemplate提供的接口测试: 第一步:导入包 a)导入mysql驱动包 b)导入spring包 b)导入

spring batch(二):核心部分(1):配置Spring batch

spring batch(二):核心部分(1):配置Spring batch 博客分类: Spring 经验 java chapter 3.Batch configuration 1.spring batch 的命名空间 spring xml中指定batch的前缀作为命名空间. 示例: Xml代码   <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.sprin

使用Spring Mvc 转发 带着模板 父页面 之解决方法 decorators.xml

周末了,周一布置的任务还没完成,卡在了页面跳转上,接手了一个半截的项目要进行开发,之前没有人给培训,全靠自己爬代码,所以进度比较慢,而且加上之前没有用过 Spring Mvc 开发项目,所以有点吃力,不过接触了Spring Mvc近一个月的时间感觉 开发速度确实比 SSH快不少,不用一个一个的Bean去配置,直接扫描就OK了,可就是这样还是有些地方容易搞上一天也没搞多少进度,这不,被我新写的一个 Controller 的转发搞晕了,我本来要实现一个列表的分页查询,哪里想到点下一页的时候,除了我要

[Spring Batch 系列] 第一节 初识 Spring Batch

距离开始使用 Spring Batch 有一段时间了,一直没有时间整理,现在项目即将完结,整理下这段时间学习和使用经历. 官网地址:http://projects.spring.io/spring-batch/ 一.定义与特点 A lightweight, comprehensive batch framework designed to enable the development of robust batch applications vital for the daily operati

Spring(二):AOP(面向切面编程),Spring的JDBC模板类

1 AOP概述 1.2 什么是AOP 在软件业,AOP为Aspect Oriented Programmig的缩写,意为:面向切面编程,通过预编译方式和运行期动态代理实现程序功能的统一维护的一种技术.AOP是OOP的延续,是软件开发中的一个热点,也是Spring框架中的一个重要内容,是函数式编程的一种衍生范型.利用AOP可以对业务逻辑的各个部分进行隔离,从而使得业务逻辑各部分之间的耦合度降低,提高程序的可重用性,同时提高了开发的效率. AOP解决了OOP遇到一些问题,采取横向抽取机制,取代了传统

Spring学习(二十五)Spring AOP之增强介绍

课程概要: Spring AOP的基本概念 Spring AOP的增强类型 Spring AOP的前置增强 Spring AOP的后置增强 Spring AOP的环绕增强 Spring AOP的异常抛出增强 Spring AOP的引介增强 一.Spring AOP增强的基本概念 Spring当中的专业术语-advice,翻译成中文就是增强的意思. 所谓增强,其实就是向各个程序内部注入一些逻辑代码从而增强原有程序的功能. 二.Spring AOP的增强类型 首先先了解一下增强接口的继承关系 如上图

小牛带你走进Spring的事务

摘要本文摘抄了Spring事务相关的一些理论,主要讲述事务的特性.事务的传播行为.事务的隔离规则. 关键词:事务特性,事务传播,事务隔离 一.什么是事务事务是用来保证数据的完整性和一致性,正如金钱转账,金钱总数不会增加也不会减少. 数据库 事务管理有四个特性(ACID): 特性描述原子性(Atomicity)事务作为一个整体被执行,要么全部被执行,要么都不执行.一致性(Consistency)事务应确保数据的状态从一个一致状态转变为另一个一致状态,数据不应该被破坏.隔离性(Isolation)多

Spring.net(二)----初探IOC容器

我在上一篇关于Spring.net的文章“Spring.NET框架简介及模块说明 ”中很详细的介绍了,本文就不旧话从提.我门就直奔主题吧. 1.首先了解两个接口.  IObjectFactory接口和IApplicationContext接口:他两个称为“容器”或“IOC容器”. Spring.net框架的核心原则是非侵入性.  IObjectFactory接口是初始化.配置及管理对象的实际容器.  IObjectFactory全限定名为Spring.Objects.Factory.IObjec

Spring讲解二:Spring中的Bean配置1---基于XML文件的方式

一.在Spring的IOC容器中配置Bean 在xml文件中通过bean节点配置bean id:Bean的名称: (1) 在IOC容器中必须是唯一的 (2) 若id没有指定,Spring自动将权限限定性类名作为bean的名字 (3) id可以指定多个名字,名字之间可以用逗号.分号.或空格分隔 二.Spring容器 在Spring IOC容器读取Bean配置创建Bean实例之前,必须对它进行初始化.只有在容器实例化后,才可以从IOC容器中获取Bean实例并使用. Spring提供了两种类型的IOC