springbatch apache-activemq 整合(往mq中put数据,从mq中take数据)

简单测试如下:

1:收下下载apache-activemq-5.14.4 解压apache-activemq-5.14.4\bin\win64,运行activemq.bat

启动本地MQ服务器。

通过浏览器可以查看本地MQ服务器的信息。

http://127.0.0.1:8161/admin/index.jsp

2: 先往mq中put数据

配置如下:

<job id="jmsReadJob">
        <step id="jmsReadStep">
            <tasklet transaction-manager="transactionManager">
                <chunk reader="jmsItemReader" processor="creditBillProcessor" writer="creditItemWriter" commit-interval="2"></chunk>
            </tasklet>
        </step>
 </job>
      <!-- 从数据库读取 -->
     <bean:bean id="jmsItemReader" scope="step"
        class="org.springframework.batch.item.database.JdbcCursorItemReader" >
        <bean:property name="dataSource" ref="dataSource"/>
<bean:property name="sql" 
        value="
        select BIZ_UUID,BIZ_TYPE_CODE,BIZ_TYPE_PROP_CODE,BIZ_TYPE_PROP_NAME from IMS_BIZ_ARGS
        "/>
        <bean:property name="rowMapper">
            <bean:bean class="org.springframework.jdbc.core.BeanPropertyRowMapper">
                <bean:property name="mappedClass" value="com.citiccard.channel.sts.mqbatch.BizArgs"/>
            </bean:bean>
        </bean:property>
    </bean:bean>
 <!-- 写到MQ中 -->
<bean:bean id="creditItemWriter" class="org.springframework.batch.item.jms.JmsItemWriter">
<bean:property name="jmsTemplate" ref="jmsTemplate" />
</bean:bean>
 <!-- MQ对象配置 -->
<bean:bean id="jmsTemplate"  class="org.springframework.jms.core.JmsTemplate">
<bean:property name="connectionFactory" ref="mqfactory" />
<bean:property name="messageConverter">
<bean:bean
class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</bean:property> 
<bean:property name="defaultDestination" ref="destination" />
<bean:property name="receiveTimeout" value="500" />
</bean:bean>
<bean:bean id="mqfactory" class="org.apache.activemq.ActiveMQConnectionFactory" >
<bean:property name="brokerURL" value="tcp://localhost:61616" />
<bean:property name="trustAllPackages" value="true" />
<bean:property name="useAsyncSend" value="true" />
<bean:property name="redeliveryPolicy">
<bean:bean class="org.apache.activemq.RedeliveryPolicy">
<bean:property name="maximumRedeliveries" value="0" />
</bean:bean>
</bean:property>
</bean:bean>
<bean:bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
 <!-- MQ队列名称(MQ服务器中可以有多个队列)-->
<bean:constructor-arg value="enrollMq444" />
</bean:bean>
 <!-- 处理器(意思就是你从源中取到数据后,这些数据是否要加工一下(比如我根据这个数据发一个socket请求或者我把某个字段的值乘以2等等))-->
<bean:bean id="creditBillProcessor" scope="step"
class="com.citiccard.channel.sts.mqbatch.CreditBillProcessor">
</bean:bean>
<!-- 我这个测试的例子的意思是:我从数据库中把对象取出来啥没干直接把对象发送给springbatch的creditItemWriter对象
也就是说MQ中存放的是BizArgs对象。这也意味着我从MQ中取数据时数据是个BizArgs对象而不是String
什么的其他的对象。
当然我也可以toString一下玩玩,测试么。
-->
public class CreditBillProcessor implements ItemProcessor<BizArgs, BizArgs> {
private JdbcTemplate jdbcTemplate = null;
public JdbcTemplate getJdbcTemplate() {
return jdbcTemplate;
}
public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public BizArgs process(BizArgs bizArgs) throws Exception {
System.out.println(bizArgs);
return bizArgs;
}
}
<!--
跑一把
-->
public static void main(String[] args) {
//ApplicationContext context = getContext("springJob/mq/job-db-jdbc-month01-putMq.xml");
ApplicationContext context = getContext("springJob/mq/job-db-jdbc-month01-getMq.xml");
//JmsTemplate jmsTemplate = getJmsTemplate(context);
//sendMessage(jmsTemplate, new CreditBill("4047390012345678","tom",100.00,"2013-2-2 12:00:08","Lu Jia Zui road"));
//sendMessage(jmsTemplate, new CreditBill("4047390012345678","tom",320,"2013-2-3 10:35:21","Lu Jia Zui road"));
//sendMessage(jmsTemplate, new CreditBill("4047390012345678","tom",360.00,"2013-2-11 11:12:38","Longyang road"));
executeJob(context, "jmsReadJob",
new JobParametersBuilder().addDate("date", new Date()));
}

完事后,apache-activemq服务器中会多一个叫enrollMq444(配置文件中配的)的Queue,并且数据和数据库中的条数一致。

3: 从mq中take数据,不详细说了,直接贴配置

<bean:import resource="classpath:springJob/job-context.xml" />
<job id="jmsReadJob">
        <step id="jmsReadStep">
            <tasklet transaction-manager="transactionManager">
                <chunk reader="jmsItemReader" processor="creditBillProcessor" writer="creditItemWriter" commit-interval="2"></chunk>
            </tasklet>
        </step>
    </job>
 <bean:bean id="jmsItemReader"
class="org.springframework.batch.item.jms.JmsItemReader">
<bean:property name="itemType" value="com.citiccard.channel.sts.mqbatch.BizArgs"/>
<bean:property name="jmsTemplate" ref="jmsTemplate"/>
</bean:bean>
    <bean:bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" >
<bean:property name="connectionFactory" ref="mqfactory" />
<bean:property name="messageConverter">
<bean:bean
class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</bean:property> 
<bean:property name="defaultDestination" ref="destination" />
<bean:property name="receiveTimeout" value="500" />
</bean:bean>
<bean:bean class="org.apache.activemq.ActiveMQConnectionFactory" id="mqfactory">
<bean:property name="brokerURL" value="tcp://localhost:61616" />
<bean:property name="trustAllPackages" value="true" />
</bean:bean>
<bean:bean id="destination2" class="org.apache.activemq.command.ActiveMQQueue">
<bean:constructor-arg value="enroll" />
</bean:bean>
<bean:bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<bean:constructor-arg value="enrollMq444" />
</bean:bean>
<bean:bean id="creditBillProcessor" scope="step"
class="com.citiccard.channel.sts.mqbatch.GetCreditBillProcessor">
</bean:bean>
<!-- 写数据库 -->
<bean:bean id="creditItemWriter"
class="org.springframework.batch.item.database.JdbcBatchItemWriter"
scope="step">
<bean:property name="dataSource" ref="w" />
<bean:property name="sql"
value="update  IMS_BIZ_ARGS t  set t.BIZ_TYPE_PROP_NAME = ? where   t.BIZ_UUID  = ? " />
<bean:property name="itemPreparedStatementSetter">
<bean:bean
class="com.citiccard.channel.sts.mqbatch.UpdateSqlSetter" />
</bean:property>
</bean:bean>
</bean:beans>
<!-- 我把name值加了一个haha,哈哈哈 -->
public class UpdateSqlSetter implements
ItemPreparedStatementSetter<BizArgs> {
@Override
public void setValues(BizArgs bigargs, PreparedStatement ps) throws SQLException {
ps.setString(1, bigargs.getBizTypePropName()+"haha");
ps.setString(2, bigargs.getBizUuid());
}
}

4:完了,小试一把。

时间: 2024-10-18 17:34:44

springbatch apache-activemq 整合(往mq中put数据,从mq中take数据)的相关文章

消息队列MQ - Apache ActiveMQ

Apache ActiveMQ是Apache软件基金会所研发的开放源码消息中间件:由于ActiveMQ是一个纯Jave程式,因此只需要操作系统支援Java虚拟机,ActiveMQ便可执行. 1    queue与topic的技术特点对比   Topic Queue 概要 Publish Subscribe messaging 发布订阅消息 Point-to-Point 点对点 有无状态 topic数据默认不落地,是无状态的. Queue数据默认会在mq服务器上以文件形式保存,比如Active M

深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例

基于spring+JMS+ActiveMQ+Tomcat,做一个Spring4.1.0和ActiveMQ5.11.1整合实例,实现了Point-To-Point的异步队列消息和PUB/SUB(发布/订阅)模型,简单实例,不包含任何业务. 环境准备 工具 JDK1.6或1.7 Spring4.1.0 ActiveMQ5.11.1 Tomcat7.x 目录结构 所需jar包 项目的配置 配置ConnectionFactory connectionFactory是Spring用于创建到JMS服务器链接

Spring和ActiveMQ整合的完整实例

Spring和ActiveMQ整合的完整实例 前言 这篇博文,我们基于Spring+JMS+ActiveMQ+Tomcat,做一个Spring4.1.0和ActiveMQ5.11.1整合实例,实现了Point-To-Point的异步队列消息和PUB/SUB(发布/订阅)模型,简单实例,不包含任何业务. 环境准备 工具 JDK1.6或1.7 Spring4.1.0 ActiveMQ5.11.1 Tomcat7.x 目录结构 所需jar包 项目的配置 配置ConnectionFactory conn

深入浅出JMS之Spring和ActiveMQ整合的完整实例

第一篇博文深入浅出JMS(一)–JMS基本概念,我们介绍了JMS的两种消息模型:点对点和发布订阅模型,以及消息被消费的两个方式:同步和异步,JMS编程模型的对象,最后说了JMS的优点. 第二篇博文深入浅出JMS(二)–ActiveMQ简单介绍以及安装,我们介绍了消息中间件ActiveMQ,安装,启动,以及优缺点. 第三篇博文深入浅出JMS(三)–ActiveMQ简单的HelloWorld实例,我们实现了一种点对点的同步消息模型,并没有给大家呈现发布订阅模型. 前言 这篇博文,我们基于spring

Apache ActiveMQ实战(2)-集群

ActiveMQ的集群 内嵌代理所引发的问题: 消息过载 管理混乱 如何解决这些问题--集群的两种方式: Master slave Broker clusters ActiveMQ的集群有两种方式: MASTER/SLAVE模式 Cluster模式 Pure Master Slave Pure master slave的工作方式: 当master broker失效的时候.Slave broker 做出了两种不同的相应方式 启动network connectors和transport connec

Apache ActiveMQ实战(1)-基本安装配置与消息类型

ActiveMQ简介 ActiveMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的.可扩展的.稳定的和安全的企业级消息通信.ActiveMQ使用Apache提供的授权,任何人都可以对其实现代码进行修改. ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件.ActiveMQ实现了JMS标准并提供了很多附加的特性.这些附加的特性包括,JMX管理(java Management Extensions,即java

Sping+ActiveMQ整合

通过前一篇<ActiveMQ简述>大概对ActiveMQ有了一个大概的认识,本篇所阐述的是如何通过Spring继承ActiveMQ进而更有效.更灵活的运用ActiveMQ. Spring和ActiveMQ整合需要在项目中包含以下这几个jar包(缺一不可):activeio-core-3.1.4.jar,activemq-all-5.13.2.jar,activemq-pool-5.13.2.jar,commons-pool2-2.4.2.jar,这些jar可以在ActiveMQ的安装包中的/l

JAVAEE——宜立方商城09:Activemq整合spring的应用场景、添加商品同步索引库、商品详情页面动态展示与使用缓存

1. 学习计划 1.Activemq整合spring的应用场景 2.添加商品同步索引库 3.商品详情页面动态展示 4.展示详情页面使用缓存 2. Activemq整合spring 2.1. 使用方法 第一步:引用相关的jar包. <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> </dependency> &l

学习ActiveMQ(四):spring与ActiveMQ整合

在上一篇中已经怎么使用activemq的api来实现消息的发送接收了,但是在实际的开发过程中,我们很少使用activemq直接上去使用,因为我们每次都要创建连接工厂,创建连接,创建session...有些繁琐,那么利用spring的话简单多了,强大的spring 提供了对了jms的支持,我们可以使用JmsTemplate来实现,JmsTemplate隔离了像打开.关闭Session和Producer的繁琐操作,因此应用开发人员仅仅需要关注实际的业务逻辑,接下来就一起来看看具体怎么做吧. 首先我们

实战Spring4+ActiveMQ整合实现消息队列(生产者+消费者)

引言: 最近公司做了一个以信息安全为主的项目,其中有一个业务需求就是,项目定时监控操作用户的行为,对于一些违规操作严重的行为,以发送邮件(FoxMail)的形式进行邮件告警,可能是多人,也可能是一个人,第一次是以单人的形式,,直接在业务层需要告警的地方发送邮件即可,可是后边需求变更了,对于某些告警邮件可能会发送多人,这其中可能就会有阻塞发邮件的可能,直到把所有邮件发送完毕后再继续做下边的业务,领导说这样会影响用户体验,发邮件的时候用户一直处于等待状态,不能干别的事情.最后研究说用消息队列,当有需