springbatch操作DB

一、需求分析

使用Spring Batch对DB进行读写操作: 从一个表中读取数据, 然后批量的插入另外一张表中.

二、代码实现

1. 代码结构图:

2. applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
	http://www.springframework.org/schema/context
	http://www.springframework.org/schema/context/spring-context-3.1.xsd
	http://www.springframework.org/schema/tx
	http://www.springframework.org/schema/tx/spring-tx-3.1.xsd">

	<!-- 配置spring扫描范围 -->
	<context:component-scan base-package="com.zdp" />

    <!-- 配置数据源 -->
    <bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource" abstract="false" scope="singleton">
		<property name="driverClass" value="org.gjt.mm.mysql.Driver" />
		<property name="jdbcUrl" value="jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8" />
		<property name="user" value="root" />
		<property name="password" value="root" />
		<property name="checkoutTimeout" value="30000" />
		<property name="maxIdleTime" value="120" />
		<property name="maxPoolSize" value="100" />
		<property name="minPoolSize" value="2" />
		<property name="initialPoolSize" value="2" />
		<property name="maxStatements" value="0" />
		<property name="maxStatementsPerConnection" value="0" />
		<property name="idleConnectionTestPeriod" value="30" />
	</bean>

	<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
		<property name="transactionManager" ref="transactionManager" />
	</bean>

	<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>

	<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<tx:annotation-driven transaction-manager="transactionManager" />
</beans>

base-package: 扫描spring注解

jobLauncher: 启动Job

jobRepository: 为Job提供持久化操作

transactionManager: 提供事务管理操作

3. springBatch.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
	http://www.springframework.org/schema/batch
	http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
	http://www.springframework.org/schema/context
	http://www.springframework.org/schema/context/spring-context-3.1.xsd
	http://www.springframework.org/schema/tx
	http://www.springframework.org/schema/tx/spring-tx-3.1.xsd">

	<!-- 引入spring核心配置文件 -->
	<import resource="applicationContext.xml"/>

	<batch:job id="ledgerJob">
		<!-- 监听job运行状态 -->
		<batch:listeners>
			<batch:listener ref="appJobExecutionListener" />
		</batch:listeners>
		<batch:step id="step">
			<!-- 添加事务控制 -->
			<batch:tasklet transaction-manager="transactionManager">
				<batch:listeners>
					<batch:listener ref="itemFailureLoggerListener" />
				</batch:listeners>
				<!-- commit-interval: 批量提交的条数; skip-limit: 指允许跳过记录数 -->
				<batch:chunk reader="ledgerReader" writer="ledgerWriter" commit-interval="1000" skip-limit="1000">
					<batch:skippable-exception-classes>
						<batch:include class="java.lang.Exception"/> <!-- 出现exception或其子类, Job仍然会往后执行 -->
						<batch:exclude class="java.io.FileNotFoundException"/> <!-- 出现这个异常, Job会立刻停止 -->
					</batch:skippable-exception-classes>
				</batch:chunk>
			</batch:tasklet>
		</batch:step>
	</batch:job>

	<!-- 从ledger表读取数据 -->
	<bean id="ledgerReader" class="org.springframework.batch.item.database.JdbcCursorItemReader">
		<property name="dataSource" ref="dataSource" />
		<property name="sql" value="select * from ledger" />
		<property name="rowMapper" ref="ledgerRowMapper" />
	</bean>

	<bean id="jobParameterBulider" class="org.springframework.batch.core.JobParametersBuilder" />

	<!-- 定时任务开始 -->
	<bean id="ledgerJobDetail" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
		<property name="targetObject">
			<!-- 定时执行类 -->
			<ref bean="quartzLedgerJob" />
		</property>
		<property name="targetMethod">
			<!-- 定时执行类的方法 -->
			<value>execute</value>
		</property>
	</bean>  

	<bean id="ledgerCronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean" >
		<property name="jobDetail" >
			<ref bean="ledgerJobDetail" />
		</property>
		<property name="cronExpression" >
			<!-- 每天晚上22:30执行 -->
			<value>0 30 22 ? * *</value>
		</property>
	</bean>  

	<!-- 触发器工厂,将所有的定时任务都注入工厂-->
	<bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
		<!-- 添加触发器 -->
		<property name="triggers">
			<list>
				<!-- 将上面定义的测试定时任务注入(可以定义多个定时任务,同时注入)-->
				<ref local="ledgerCronTrigger" />
			</list>
		</property>
	</bean>
</beans>  

4. AppJobExecutionListener.java

/**
 * 监听job运行状态
 */
@Component("appJobExecutionListener")
public class AppJobExecutionListener implements JobExecutionListener {
	private final static Logger logger = Logger.getLogger(AppJobExecutionListener.class);

	public void afterJob(JobExecution jobExecution) {
		if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
			logger.info("Job completed: " + jobExecution.getJobId());
		} else if (jobExecution.getStatus() == BatchStatus.FAILED) {
			logger.info("Job failed: " + jobExecution.getJobId());
		}
	}

	public void beforeJob(JobExecution jobExecution) {
		if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
			logger.info("Job completed: " + jobExecution.getJobId());
		} else if (jobExecution.getStatus() == BatchStatus.FAILED) {
			logger.info("Job failed: " + jobExecution.getJobId());
		}
	}
}

5. ItemFailureLoggerListener.java

/**
 * 检查是读出错还是写出错
 */
@Component("itemFailureLoggerListener")
public class ItemFailureLoggerListener extends ItemListenerSupport<Object, Object> {
	private final static Logger LOG = Logger.getLogger(ItemFailureLoggerListener.class);

	public void onReadError(Exception ex) {
		LOG.error("Encountered error on read", ex);
	}

	public void onWriteError(Exception ex, Object item) {
		LOG.error("Encountered error on write", ex);
	}

}

6. Ledger.java

public class Ledger implements Serializable {
	private static final long serialVersionUID = 1L;
	private int id;
	private Date receiptDate;
	private String memberName;
	private String checkNumber;
	private Date checkDate;
	private String paymentType;
	private double depositAmount;
	private double paymentAmount;
	private String comments;

	// getter and setter
}

7. LedgerRowMapper.java

/**
 * ledger行的映射类
 */
@SuppressWarnings("rawtypes")
@Component("ledgerRowMapper")
public class LedgerRowMapper implements RowMapper {
	public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
		Ledger ledger = new Ledger();
		ledger.setId(rs.getInt("ID"));
		ledger.setReceiptDate(rs.getDate("RECEIPT_DATE"));
		ledger.setMemberName(rs.getString("MEMBER_NAME"));
		ledger.setCheckNumber(rs.getString("MEMBER_NAME"));
		ledger.setCheckDate(rs.getDate("CHECK_DATE"));
		ledger.setPaymentType(rs.getString("PAYMENT_TYPE"));
		ledger.setDepositAmount(rs.getDouble("DEPOSIT_AMOUNT"));
		ledger.setPaymentAmount(rs.getDouble("PAYMENT_AMOUNT"));
		ledger.setComments(rs.getString("COMMENTS"));
		return ledger;
	}
}

8. LedgerDao.java

public interface LedgerDao {
	public void save(final Ledger item) ;
}

9. LedgerDaoImpl.java

/**
 * ledger数据操作类
 */
@Repository
public class LedgerDaoImpl implements LedgerDao {

	private static final String SAVE_SQL = "INSERT INTO LEDGER_TEMP (RECEIPT_DATE, MEMBER_NAME, CHECK_NUMBER, CHECK_DATE, PAYMENT_TYPE, DEPOSIT_AMOUNT, PAYMENT_AMOUNT, COMMENTS) VALUES(?,?,?,?,?,?,?,?)";

	@Autowired
	private JdbcTemplate jdbcTemplate;

	@Override
	public void save(final Ledger item) {
		jdbcTemplate.update(SAVE_SQL, new PreparedStatementSetter() {
			public void setValues(PreparedStatement stmt) throws SQLException {
				stmt.setDate(1, new java.sql.Date(item.getReceiptDate().getTime()));
				stmt.setString(2, item.getMemberName());
				stmt.setString(3, item.getCheckNumber());
				stmt.setDate(4, new java.sql.Date(item.getCheckDate().getTime()));
				stmt.setString(5, item.getPaymentType());
				stmt.setDouble(6, item.getDepositAmount());
				stmt.setDouble(7, item.getPaymentAmount());
				stmt.setString(8, item.getComments());
			}
		});
	}
}

10. LedgerWriter.java

/**
 * ledger写入数据
 */
@Component("ledgerWriter")
public class LedgerWriter implements ItemWriter<Ledger> {

	@Autowired
	private LedgerDao ledgerDao;

	/**
	 * 写入数据
	 * @param ledgers
	 */
	public void write(List<? extends Ledger> ledgers) throws Exception {
		for (Ledger ledger : ledgers) {
			ledgerDao.save(ledger);
		}
	}

}

11. QuartzLedgerJob.java

/**
 * 定时调度类
 */
@Component("quartzLedgerJob")
public class QuartzLedgerJob {

	private static final Logger LOG = LoggerFactory.getLogger(QuartzLedgerJob.class);

	@Autowired
	private JobLauncher jobLauncher;

	@Autowired
	private Job ledgerJob;

	@Autowired
	JobParametersBuilder jobParameterBulider;

	private static long counter = 0l;

	/**
	 * 执行业务方法
	 * @throws Exception
	 */
	public void execute() throws Exception {
		/**
		 * Spring Batch Job同一个job instance,成功执行后是不允许重新执行的,
		 * 失败后是否允许重跑,可通过配置Job的restartable参数来控制,默认是true,如果需要重新执行,可以变通处理,
		 * 添加一个JobParameters构建类,以当前时间作为参数,保证其他参数相同的情况下是不同的job instance
		 */
		LOG.debug("start...");
		StopWatch stopWatch = new StopWatch();
		stopWatch.start();
		jobParameterBulider.addDate("date", new Date());
		jobLauncher.run(ledgerJob, jobParameterBulider.toJobParameters());
		stopWatch.stop();
		LOG.debug("Time elapsed:{},Execute quartz ledgerJob:{}", stopWatch.prettyPrint(), ++counter);
	}
}

12. StartQuartz.java

/**
 * 启动定时调度
 * 需求描述: 定时从表ledger读取数据, 然后批量写入表ledger_temp
 */
public class StartQuartz {
	public static void main(String[] args) throws FileNotFoundException {
		new ClassPathXmlApplicationContext("/com/zdp/resources/springBatch.xml");
	}
}

13. sql:

create table ledger(
	ID int(10) not null AUTO_INCREMENT PRIMARY KEY,
	RECEIPT_DATE date,
	MEMBER_NAME varchar(10) ,
	CHECK_NUMBER varchar(10) ,
	CHECK_DATE date,
	PAYMENT_TYPE varchar(10) ,
	DEPOSIT_AMOUNT double(10,3),
	PAYMENT_AMOUNT double(10,3),
	COMMENTS varchar(100)
);

create table ledger_temp(
	ID int(10) not null AUTO_INCREMENT PRIMARY KEY,
	RECEIPT_DATE date,
	MEMBER_NAME varchar(10) ,
	CHECK_NUMBER varchar(10) ,
	CHECK_DATE date,
	PAYMENT_TYPE varchar(10) ,
	DEPOSIT_AMOUNT double(10,3),
	PAYMENT_AMOUNT double(10,3),
	COMMENTS varchar(100)
);

springbatch操作DB,布布扣,bubuko.com

时间: 2024-10-25 07:41:28

springbatch操作DB的相关文章

springbatch操作CSV文件

一.需求分析 使用Spring Batch对CSV文件进行读写操作: 读取一个含有四个字段的CSV文件(id, name, age, score), 对文件做简单的处理, 然后输出到另一个csv文件中. 二.代码实现 1. 代码结构图: JobLaunch: 启动Job CsvItemProcessor: 对Reader数据进行处理 Student: 实体对象 input.csv: 数据读取文件 output.csv: 数据输出文件 2. applicationContext.xml <?xml

springbatch操作XML文件

一.需求分析 使用Spring Batch对XML文件进行读写操作: 从一个xml文件中读取商品信息, 经过简单的处理, 写入另外一个xml文件中. 二.代码实现 1. 代码结构图: 2. applicationContext.xml <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"

[Android] Android 使用 Greendao 操作 db sqlite(2)-- 封装DaoUtils类1

[Android] Android 使用 Greendao 操作 db sqlite(2)-- 封装DaoUtils类 原文地址:https://www.cnblogs.com/wukong1688/p/10725092.html

Atitit.软件按钮与仪表盘(13)--全文索引操作--db数据库子系统mssql2008

全文索引操作 4.全文索引和like语句比较 1 5.倒排索引 inverted index 1 2.SQL Server 2008全文检索 2 3.Lucene全文检索 3 一般情况,使用SQL Server中的全文索引,经过大体4个步骤: 4 Mssql2008的全文索引操作(attilax验证) 5 查看全文index使用大小 5 查看表行数与体积大小 6 参考 6 4.全文索引和like语句比较 当然是全文索引的执行效率高. 一般全文索引使用的是倒排索引,能够支持多关键字的索引,而LIK

thinkphp 和 laravel使用sql语句操作db和源码浅析

前言 对于一个PHP应用,可能最多的就是操作数据,以致于初学者有时只把php当做数据库增删查改的工具(这也无可厚非).而基于框架的语言,在框架中自然不能少了对数据库操作的封装,总想打开源码,看看到底是怎么工作的,趁着有时间~~ thinkphp[tp框架] 首先是这个中国人用的最多的框架说起.ps:我是基于thinkphp3.2来说,tp5.x党见谅~ thinkphp支持对原生的sql语句执行,如: $db=M(); $condition="XXX"; $sql="sele

Atitit.软件button和仪表板(13)--全文索引操作--db数据库子系统mssql2008

全文索引操作 4.全文索引和like语句比較 1 5.倒排索引 inverted index 1 2.SQL Server 2008全文检索 2 3.Lucene全文检索 3 普通情况,使用SQL Server中的全文索引,经过大体4个步骤: 4 Mssql2008的全文索引操作(attilax验证) 5 查看全文index使用大小 5 查看表行数与体积大小 6 參考 6 4.全文索引和like语句比較 当然是全文索引的运行效率高. 一般全文索引使用的是倒排索引,可以支持多keyword的索引,

mongodb or操作与连接池

mongodb # 类似于sql中的in或者or操作 mulites field query: db.cool.find({$or:[{field1:'val'},{'field2':'val'}-]}) # 类似于sql中的like操作 db.coo.find('name': /m/) == sql like pymongo 使用 {'field':{$regex: keyword}} http://stackoverflow.com/questions/3305561/how-do-i-qu

SQLite数据操作

SQLite,是一款轻型的数据库,是遵守ACID的关联式数据库管理系统,SQLite引擎不是个程序与之通讯的独立进程,而是连接到程序中成为它的一个主要部分,其主要的通信协议是在编程语言内的直接API调用.这在消耗总量.延迟时间和整体简单性上有着积极的作用.1.Qt操作SQLite数据库Qt提供了与平台以及数据库种类无关的访问数据库接口,支持类型和描述分别有:Driver Type DescriptionQDB2 IBM DB2QIBASE Borland InterBase DriverQMYS

SQLiteDatabase数据库操作详解

今天花了点时间总结了一下数据的相关知识android中系统自带的数据库SQLiteDatabase数据库,这种数据库操作起来比ormLite数据库(第三方的)麻烦点,但是我对这种数据库操作比较熟悉所以我就采用了这种数据库,如有错误欢迎大家批评指正,谢谢 1.SQLiteDatabase SQLiteDatabase本身是一个数据库的操作类,但是如果想进行数据库的操作,还需要android.database.sqlite.SQLiteOpenHelper类的帮助,在执行SQL语句时execSQL(