简单测试如下:
1:收下下载apache-activemq-5.14.4 解压apache-activemq-5.14.4\bin\win64,运行activemq.bat
启动本地MQ服务器。
通过浏览器可以查看本地MQ服务器的信息。
http://127.0.0.1:8161/admin/index.jsp
2: 先往mq中put数据
配置如下:
<job id="jmsReadJob"> <step id="jmsReadStep"> <tasklet transaction-manager="transactionManager"> <chunk reader="jmsItemReader" processor="creditBillProcessor" writer="creditItemWriter" commit-interval="2"></chunk> </tasklet> </step> </job>
<!-- 从数据库读取 --> <bean:bean id="jmsItemReader" scope="step" class="org.springframework.batch.item.database.JdbcCursorItemReader" > <bean:property name="dataSource" ref="dataSource"/> <bean:property name="sql" value=" select BIZ_UUID,BIZ_TYPE_CODE,BIZ_TYPE_PROP_CODE,BIZ_TYPE_PROP_NAME from IMS_BIZ_ARGS "/> <bean:property name="rowMapper"> <bean:bean class="org.springframework.jdbc.core.BeanPropertyRowMapper"> <bean:property name="mappedClass" value="com.citiccard.channel.sts.mqbatch.BizArgs"/> </bean:bean> </bean:property> </bean:bean>
<!-- 写到MQ中 -->
<bean:bean id="creditItemWriter" class="org.springframework.batch.item.jms.JmsItemWriter"> <bean:property name="jmsTemplate" ref="jmsTemplate" /> </bean:bean>
<!-- MQ对象配置 -->
<bean:bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <bean:property name="connectionFactory" ref="mqfactory" /> <bean:property name="messageConverter"> <bean:bean class="org.springframework.jms.support.converter.SimpleMessageConverter" /> </bean:property> <bean:property name="defaultDestination" ref="destination" /> <bean:property name="receiveTimeout" value="500" /> </bean:bean>
<bean:bean id="mqfactory" class="org.apache.activemq.ActiveMQConnectionFactory" > <bean:property name="brokerURL" value="tcp://localhost:61616" /> <bean:property name="trustAllPackages" value="true" /> <bean:property name="useAsyncSend" value="true" /> <bean:property name="redeliveryPolicy"> <bean:bean class="org.apache.activemq.RedeliveryPolicy"> <bean:property name="maximumRedeliveries" value="0" /> </bean:bean> </bean:property> </bean:bean>
<bean:bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- MQ队列名称(MQ服务器中可以有多个队列)--> <bean:constructor-arg value="enrollMq444" /> </bean:bean>
<!-- 处理器(意思就是你从源中取到数据后,这些数据是否要加工一下(比如我根据这个数据发一个socket请求或者我把某个字段的值乘以2等等))--> <bean:bean id="creditBillProcessor" scope="step" class="com.citiccard.channel.sts.mqbatch.CreditBillProcessor"> </bean:bean>
<!-- 我这个测试的例子的意思是:我从数据库中把对象取出来啥没干直接把对象发送给springbatch的creditItemWriter对象 也就是说MQ中存放的是BizArgs对象。这也意味着我从MQ中取数据时数据是个BizArgs对象而不是String 什么的其他的对象。 当然我也可以toString一下玩玩,测试么。 --> public class CreditBillProcessor implements ItemProcessor<BizArgs, BizArgs> { private JdbcTemplate jdbcTemplate = null; public JdbcTemplate getJdbcTemplate() { return jdbcTemplate; } public void setJdbcTemplate(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; } @Override public BizArgs process(BizArgs bizArgs) throws Exception { System.out.println(bizArgs); return bizArgs; } }
<!-- 跑一把 --> public static void main(String[] args) { //ApplicationContext context = getContext("springJob/mq/job-db-jdbc-month01-putMq.xml"); ApplicationContext context = getContext("springJob/mq/job-db-jdbc-month01-getMq.xml"); //JmsTemplate jmsTemplate = getJmsTemplate(context); //sendMessage(jmsTemplate, new CreditBill("4047390012345678","tom",100.00,"2013-2-2 12:00:08","Lu Jia Zui road")); //sendMessage(jmsTemplate, new CreditBill("4047390012345678","tom",320,"2013-2-3 10:35:21","Lu Jia Zui road")); //sendMessage(jmsTemplate, new CreditBill("4047390012345678","tom",360.00,"2013-2-11 11:12:38","Longyang road")); executeJob(context, "jmsReadJob", new JobParametersBuilder().addDate("date", new Date())); }
完事后,apache-activemq服务器中会多一个叫enrollMq444(配置文件中配的)的Queue,并且数据和数据库中的条数一致。
3: 从mq中take数据,不详细说了,直接贴配置
<bean:import resource="classpath:springJob/job-context.xml" />
<job id="jmsReadJob"> <step id="jmsReadStep"> <tasklet transaction-manager="transactionManager"> <chunk reader="jmsItemReader" processor="creditBillProcessor" writer="creditItemWriter" commit-interval="2"></chunk> </tasklet> </step> </job>
<bean:bean id="jmsItemReader" class="org.springframework.batch.item.jms.JmsItemReader"> <bean:property name="itemType" value="com.citiccard.channel.sts.mqbatch.BizArgs"/> <bean:property name="jmsTemplate" ref="jmsTemplate"/> </bean:bean> <bean:bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" > <bean:property name="connectionFactory" ref="mqfactory" /> <bean:property name="messageConverter"> <bean:bean class="org.springframework.jms.support.converter.SimpleMessageConverter" /> </bean:property> <bean:property name="defaultDestination" ref="destination" /> <bean:property name="receiveTimeout" value="500" /> </bean:bean>
<bean:bean class="org.apache.activemq.ActiveMQConnectionFactory" id="mqfactory"> <bean:property name="brokerURL" value="tcp://localhost:61616" /> <bean:property name="trustAllPackages" value="true" /> </bean:bean> <bean:bean id="destination2" class="org.apache.activemq.command.ActiveMQQueue"> <bean:constructor-arg value="enroll" /> </bean:bean> <bean:bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <bean:constructor-arg value="enrollMq444" /> </bean:bean>
<bean:bean id="creditBillProcessor" scope="step" class="com.citiccard.channel.sts.mqbatch.GetCreditBillProcessor"> </bean:bean> <!-- 写数据库 --> <bean:bean id="creditItemWriter" class="org.springframework.batch.item.database.JdbcBatchItemWriter" scope="step"> <bean:property name="dataSource" ref="w" /> <bean:property name="sql" value="update IMS_BIZ_ARGS t set t.BIZ_TYPE_PROP_NAME = ? where t.BIZ_UUID = ? " /> <bean:property name="itemPreparedStatementSetter"> <bean:bean class="com.citiccard.channel.sts.mqbatch.UpdateSqlSetter" /> </bean:property> </bean:bean> </bean:beans>
<!-- 我把name值加了一个haha,哈哈哈 --> public class UpdateSqlSetter implements ItemPreparedStatementSetter<BizArgs> { @Override public void setValues(BizArgs bigargs, PreparedStatement ps) throws SQLException { ps.setString(1, bigargs.getBizTypePropName()+"haha"); ps.setString(2, bigargs.getBizUuid()); } }
4:完了,小试一把。
时间: 2024-10-18 17:34:44