SpringBatch批处理框架+mysql仓库+web监控实录

1、概念

Spring Batch 是一款轻量级地适合企业级应用的批处理框架,值得注意的是,不同于其他调度框架,Spring Batch不提供调度功能。

2、批处理过程

批处理可以分为以下几个步骤:

  1. 读取数据
  2. 按照业务处理数据
  3. 归档数据的过程

3、Spring Batch给我们提供了什么?

  1. 统一的读写接口
  2. 丰富的任务处理方式
  3. 灵活的事务管理及并发处理
  4. 日志、监控、任务重启与跳过等特性

4、基础组件

名称 用途
JobRepository 用于注册和存储Job的容器
JobLauncher 用于启动Job
Job 实际要执行的作业,包含一个或多个step
step 步骤,批处理的步骤一般包含ItemReader, ItemProcessor, ItemWriter
ItemReader 从给定的数据源读取item
ItemProcessor 在item写入数据源之前进行数据整理
ItemWriter 把Chunk中包含的item写入数据源。
Chunk 数据块,给定数量的item集合,让item进行多次读和处理,当满足一定数量的时候再一次写入。
TaskLet 子任务表, step的一个事务过程,包含重复执行,同步/异步规则等。

5、job, step, tasklet 和 chunk 关系

一个job对应至少一个step,一个step对应0或者1个TaskLet,一个taskLet对应0或者1个Chunk

6、实战:批处理excel插入数据库

6.1:定义数据仓库

  <!-- 内存仓库  -->
    <!--<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"/>-->

    <!-- 数据库仓库  -->
    <batch:job-repository id="jobRepository" data-source="dataRepDruidDataSource"
                          isolation-level-for-create="SERIALIZABLE" transaction-manager="transactionManager"
                          table-prefix="BATCH_" max-varchar-length="1000" />

6.2:定义启动器

    <!-- 作业调度器,用来启动job,引用作业仓库 -->
    <bean id="jobLauncher"
          class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository"/>
    </bean>

6.3:定义JOB

    <batch:job id="userBatchJobName" restartable="true">
        <batch:step id="userStep">
            <batch:tasklet allow-start-if-complete="false"
                           start-limit="1" task-executor="taskExecutor" throttle-limit="5">
                <batch:chunk reader="userReader" writer="userWriter"
                             processor="userProcessor" commit-interval="5" retry-limit="10">
                    <batch:retryable-exception-classes>
                        <batch:include class="org.springframework.dao.DuplicateKeyException"/>
                        <batch:include class="java.sql.BatchUpdateException"/>
                        <batch:include class="java.sql.SQLException"/>
                    </batch:retryable-exception-classes>
                </batch:chunk>
            </batch:tasklet>
        </batch:step>
    </batch:job>

    <bean id="taskExecutor"
          class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <!-- 线程池维护线程的最少数量 -->
        <property name="corePoolSize" value="100"/>
        <!-- 线程池维护线程所允许的空闲时间 -->
        <property name="keepAliveSeconds" value="30000"/>
        <!-- 线程池维护线程的最大数量 -->
        <property name="maxPoolSize" value="300"/>
        <!-- 线程池所使用的缓冲队列 -->
        <property name="queueCapacity" value="100"/>
    </bean>

6.4:定义ItemReader

     <bean id="userReader" class="org.springframework.batch.item.file.FlatFileItemReader">
        <property name="lineMapper" ref="lineMapper"/>
        <property name="resource" value="classpath:message/batch-data-source.csv"/>
    </bean>
 <!-- 将每行映射成对象 -->
    <bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
        <property name="lineTokenizer">
            <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                <property name="delimiter" value=","/><!-- 根据某种分隔符分割 -->
                <property name="names" value="id,name" />
            </bean>
        </property>
        <property name="fieldSetMapper"><!-- 将拆分后的字段映射成对象 -->
            <bean class="com.hcw.core.batch.UserFieldSetMapper" />
        </property>
    </bean>

6.5:定义ItemWriter

     <bean id="userWriter" class="com.hcw.core.batch.MyBatchItemWriter" scope="step">
        <property name="statementId" value="com.hcw.core.batch.dao.UserToMapper.batchInsert"/>
        <property name="sqlSessionFactory" ref="sqlSessionFactoryTo"/>
    </bean>

6.6:定义ItemProcessor

    <bean id="userProcessor" class="com.hcw.core.batch.UserItemProcessor"/>

6.7: 定义jobRepository的数据源

   <bean id="dataRepDruidDataSource" class="com.alibaba.druid.pool.DruidDataSource"
          init-method="init" destroy-method="close">
        <property name="url" value="${jdbc.mysql.rep.connection.url}" />
        <property name="username" value="${jdbc.mysql.rep.connection.username}" />
        <property name="password" value="${jdbc.mysql.rep.connection.password}" />
        <property name="filters" value="${jdbc.mysql.rep.connection.filters}" />
        <property name="maxActive" value="${jdbc.mysql.rep.connection.maxActive}" />
        <property name="initialSize" value="${jdbc.mysql.rep.connection.initialSize}" />
        <property name="maxWait" value="${jdbc.mysql.rep.connection.maxWait}" />
        <property name="minIdle" value="${jdbc.mysql.rep.connection.minIdle}" />
        <property name="timeBetweenEvictionRunsMillis"
                  value="${jdbc.mysql.rep.connection.timeBetweenEvictionRunsMillis}" />
        <property name="minEvictableIdleTimeMillis"
                  value="${jdbc.mysql.rep.connection.minEvictableIdleTimeMillis}" />
        <property name="validationQuery"
                  value="${jdbc.mysql.rep.connection.validationQuery}" />
        <property name="testWhileIdle"
                  value="${jdbc.mysql.rep.connection.testWhileIdle}" />
        <property name="testOnBorrow" value="${jdbc.mysql.rep.connection.testOnBorrow}" />
        <property name="testOnReturn" value="${jdbc.mysql.rep.connection.testOnReturn}" />
        <property name="poolPreparedStatements"
                  value="${jdbc.mysql.rep.connection.poolPreparedStatements}" />
        <property name="maxPoolPreparedStatementPerConnectionSize"
                  value="${jdbc.mysql.rep.connection.maxPoolPreparedStatementPerConnectionSize}" />
    </bean>

6.8: 启动JOB

启动tomcat,打开启动页面

原文地址:https://blog.51cto.com/14570694/2469184

时间: 2024-10-14 03:41:10

SpringBatch批处理框架+mysql仓库+web监控实录的相关文章

并行处理框架Celery的Web监控管理服务-Flower

Flower: Real-time Celery web-monitor Flower is a real-time web based monitor and administration tool for Celery. It is under active development, but is already an essential tool. Being the recommended monitor for Celery, it obsoletes the Django-Admin

走进企业级批处理框架--Springbatch

Springbatch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统.Spring Batch可以提供大量的,可重复的数据处理功能,包括日志记录/跟踪,事务管理,作业处理统计工作重新启动.跳过,和资源管理等重要功能.它能使业务人员专注于核心业务的开发,而将重复性的耗时工作交给系统自动处理.如数据的倒入,导出,数据的复制等工作.本文将通过一个简单的文件复制的小例子介绍SpringBatch的工作原理.首先来看相关的核心代码和配置: <?xml version=

【转】大数据批处理框架 Spring Batch全面解析

如今微服务架构讨论的如火如荼.但在企业架构里除了大量的OLTP交易外,还存在海量的批处理交易.在诸如银行的金融机构中,每天有3-4万笔的批处理作业需要处理.针对OLTP,业界有大量的开源框架.优秀的架构设计给予支撑:但批处理领域的框架确凤毛麟角.是时候和我们一起来了解下批处理的世界哪些优秀的框架和设计了,今天我将以Spring Batch为例,和大家一起探秘批处理的世界.初识批处理典型场景探秘领域模型及关键架构实现作业健壮性与扩展性批处理框架的不足与增强批处理典型业务场景对账是典型的批处理业务处

万树IT:Spring Batch批处理框架技巧,让你不再重复造轮子

整理了Spring批处理框架的内容,掌握这些知识,可以帮你省去一些造轮子的过程,提高开发效率.本文由博主姚兆峰分享,小编整理后推送,希望对你的工作有帮助. Part.1 问题分析 在大型的企业应用中,或多或少都会存在大量的任务需要处理,如邮件批量通知所有将要过期的会员等等.而在批量处理任务的过程中,又需要注意很多细节,如任务异常.性能瓶颈等等.那么,使用一款优秀的框架总比我们自己重复地造轮子要好得多一些. AD 我所在的物联网云平台部门就有这么一个需求,需要实现批量下发命令给百万设备.为了防止枯

图书简介:Spring Batch批处理框架

大数据时代批处理利器,国内首度原创解析Spring Batch框架. 内容简介: <Spring Batch 批处理框架>全面.系统地介绍了批处理框架Spring Batch,通过详尽的实战示例向读者展示了Spring Batch框架对大数据批处理的基本开发能力,并对框架的架构设计.源码做了特定的剖析:在帮助读者掌握Spring Batch框架基本功能.高级功能的同时,深入剖析了Spring Batch框架的设计原理,帮助读者可以游刃有余地掌握Spring Batch框架. <Sprin

携程框架团队对于应用监控系统的探索与思考

https://mp.weixin.qq.com/s/I6KDloBiQOfqWthDckKbGg 干货 | 携程框架团队对于应用监控系统的探索与思考 原创: 鄞劭涵 携程技术中心 昨天 作者简介 鄞劭涵,携程框架架构研发部高级软件工程师,爱丁堡大学高性能计算专业硕士.目前主要从事应用监控系统以及消息队列相关基础框架的研发. 一.为什么需要应用监控系统 随着市场环境的变化以及国际化的进程,企业的各种对内.对外需求也日益增长.服务化的架构以及容器化的应用加速了各种功能.产品的迭代与更新.随之而来,

异步并行批处理框架设计的一些思考(转)

随着互联网信息技术日新月异的发展,一个海量数据爆炸的时代已经到来.如何有效地处理.分析这些海量的数据资源,成为各大技术厂商争在激烈的竞争中脱颖而出的一个利器.可以说,如果不能很好的快速处理分析这些海量的数据资源,将很快被市场无情地所淘汰.当然,处理分析这些海量数据目前可以借鉴的方案有很多:首先,在分布式计算方面有Hadoop里面的MapReduce并行计算框架,它主要针对的是离线的数据挖掘分析.此外还有针对实时在线流式数据处理方面的,同样也是分布式的计算框架Storm,也能很好的满足数据实时性分

Zabbix基于Proxy分布式部署实现Web监控

前言 在日常运维工作中,难免会遇到这样或那样的故障,如何能在第一时间发现故障,并及时定位故障原因,保证业务不受影响,我想这应该是做好一个运维必须要掌握的技能.但人力不可能实时掌控系统的变化,于是监控系统应运而生,监控便是运维的眼睛,把监控和性能管理做好后,运维就是一件很轻松的事情.目前比较流行的开源监控工具有Cacti.Nagios(Icinga).Zabbix等.本文带来的是Zabbix基于Proxy分布式部署实现Web监控. Zabbix 简介 Zabbix是一个基于Web界面提供分布式系统

异步并行批处理框架设计的一些思考

随着互联网信息技术日新月异的发展,一个海量数据爆炸的时代已经到来.如何有效地处理.分析这些海量的数据资源,成为各大技术厂商争在激烈的竞争中脱颖而出的一个利器.可以说,如果不能很好的快速处理分析这些海量的数据资源,将很快被市场无情地所淘汰.当然,处理分析这些海量数据目前可以借鉴的方案有很多:首先,在分布式计算方面有Hadoop里面的MapReduce并行计算框架,它主要针对的是离线的数据挖掘分析.此外还有针对实时在线流式数据处理方面的,同样也是分布式的计算框架Strom,也能很好的满足数据实时性分