Spring整合JMS-基于activeMQ实现(二)

Spring整合JMS-基于activeMQ实现(二)

1、消息监听器

在Spring整合JMS的应用中我们在定义消息监听器的时候一共可以定义三种类型的消息监听器,分别是MessageListener、SessionAwareMessageListener、MessageListenerAdapter

1.1 MessageListener

MessageListener是最原始的消息监听器(javax.jms.MessageListener),它是JMS规范中定义的一个接口。定义了一个omMessage方法,只接收一个message参数

public class ConsumerMessageListener implements MessageListener{

public void onMessage(Message
message) {

//这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage

TextMessage textMessage = (TextMessage)message;

System. out.println( "接收到一个纯文本消息" );

try {

System. out.println( "消息内容是:" +
textMessage.getText());

catch (JMSException
e) {

e.printStackTrace();

}

}

}

1.2 SessionAwareMessageListener

sessionAwareMessageListener是Spring为我们提供的,它不是标准的JMS消息监听器,MessageListener处理接收到的消息时候如果需要返回消息给对方,此时就需要重新获取connection和session,SessionAwareMessageListener的设计就是为了方便我们在接收到消息后发送一个回复的消息,onMessage方法中接收两个参数,一个Message,一个发送消息的Session。(看红色新增部分)

<beans xmlns= "http://www.springframework.org/schema/beans" xmlns:aop= "http://www.springframework.org/schema/aop"

xmlns:tx= "http://www.springframework.org/schema/tx" xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"

xmlns:context= "http://www.springframework.org/schema/context" xmlns:jms= "http://www.springframework.org/schema/jms"

xsi:schemaLocation= "

    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd

    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd

    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd

    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsd

    http://www.springframework.org/schema/tx  http://www.springframework.org/schema/tx/spring-tx-3.1.xsd ">

<bean id ="connectionFactory" class= "org.springframework.jms.connection.CachingConnectionFactory" >

<property name ="targetConnectionFactory">

<bean class= "org.apache.activemq.ActiveMQConnectionFactory" >

<property name ="brokerURL">

<value >tcp://localhost:61616 </value >

</property >

</bean >

</property >

<property name ="sessionCacheSize" value= "1" />

</bean >

<!-- Spring jmsTemplate queue -->

<bean id ="jmsTemplate" class= "org.springframework.jms.core.JmsTemplate" >

<property name ="connectionFactory" ref= "connectionFactory"></property >

<property name ="defaultDestinationName" value= "subject"></property >

<property name ="deliveryPersistent" value= "true"></property >

<property name ="pubSubDomain" value="false"></ property> <!--
false p2p,true topic -->

<property name ="sessionAcknowledgeMode" value= "1"></property >

<property name ="explicitQosEnabled" value= "true"></property >

<property name ="timeToLive" value="604800000"></ property>

</bean >

<!-- 配置Queue,其中value为Queue名称->start -->

<bean id = "testQueue" class = "org.apache.activemq.command.ActiveMQQueue" >

<constructor-arg index = "0" value ="${pur.test.add}" />

</bean >

<bean id = "sessionAwareQueue" class = "org.apache.activemq.command.ActiveMQQueue" >

<constructor-arg index = "0" value= "queue.liupeng.sessionaware" />

</bean >

<!-- 配置Queue,其中value为Queue名称->end -->

<!-- 注入AMQ的实现类属性(JmsTemplate和Destination) -->

<bean id = "amqQueueSender" class = "com.tuniu.scc.purchase.plan.manage.core.amq.AMQQueueSender" >

<property name = "jmsTemplate" ref="jmsTemplate" ></property >

<property name = "testQueue" ref="testQueue" ></property >

<property name = "sessionAwareQueue" ref= "sessionAwareQueue"></property >

</bean >

<!-- 消息发送必用的发送类 -->

<bean id = "multiThreadAMQSender" class ="com.tuniu.scc.purchase.plan.manage.core.amq.MultiThreadAMQSender"

init-method= "init">

<property name = "jmsTemplate" ref="jmsTemplate" ></property >

<property name = "multiThreadAMQExecutor" ref= "multiThreadAMQExecutor" ></property >

</bean >

<!-- 消息监听器->start -->

<bean id = "consumerMessageListener" class= "com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerMessageListener" />

<!-- 消息监听容器 -->

<bean id = "jmsContainer" class= "org.springframework.jms.listener.DefaultMessageListenerContainer" >

<property name = "connectionFactory" ref= "connectionFactory" />

<property name = "destination" ref= "testQueue" /> <!--
消费者队列名称,修改 -->

<property name = "messageListener" ref= "consumerMessageListener" />

</bean >

<bean id = "consumerSessionAwareMessageListener" class ="com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerSessionAwareMessageListener" >

<property name ="testQueue" ref="testQueue"/> <!--
接收消息后返回给testQueue队列 -->

</bean >

< bean id= "sessionAwareListenerContainer" class= "org.springframework.jms.listener.DefaultMessageListenerContainer" >

<property name ="connectionFactory" ref= "connectionFactory" />

<property name ="destination" ref="sessionAwareQueue" />

<property name ="messageListener" ref= "consumerSessionAwareMessageListener" />

</bean >

<!-- 消息监听器->end -->

</beans>

发送消息:

@Resource

private AMQQueueSender amqQueueSender;

private static final Logger LOG =
LoggerFactory.getLogger(AMQController. class);

@UvConfig(method
= "testQueue", description = "测试AMQ")

@RequestMapping(value
= "/testQueue", method = RequestMethod. POST)

@TSPServiceInfo(name
= "PUR.NM.AMQController.testQueue" ,
description = "测试AMQ")

public void testQueue(HttpServletRequest
request, HttpServletResponse response) {

try {

long beginTime
= System. currentTimeMillis();

LOG.info( "发送开始");

//amqQueueSender.sendMessage("test", StaticProperty.TEST_QUEUE);

amqQueueSender.sendMessage( "test",
StaticProperty.TEST_SESSIONAWARE_QUEUE );

LOG.info( "发送结束,耗时:" +(System.currentTimeMillis()-beginTime)+ "ms");

catch (InterruptedException
e) {

LOG.error( "测试失败",
e);

}

}

接收消息:

public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener<TextMessage>{

private Destination testQueue; //返回消息目的队列

@Override

public void onMessage(TextMessage
message, Session session) throws JMSException {

System. out.println( "收到一条消息" );

System. out.println( "消息内容是:" +message.getText());

MessageProducer producer = session.createProducer( testQueue);

Message txtMessage = session.createTextMessage("consumerSessionAwareMessageListener..." );

producer.send(txtMessage);

}

public Destination
getTestQueue() {

return testQueue;

}

public void setTestQueue(Destination
sessionAwareQueue) {

this.testQueue =
sessionAwareQueue;

}

}

 打印结果:

收到一条消息

消息内容是:test

接收到一个纯文本消息

消息内容是:consumerSessionAwareMessageListener...

1.3 MessageListenerAdapter

MessageListenerAdapter实现了MessageListener接口和SessionAwareMessageListener接口,它的作用主要是将接收到的消息进行类型转换,然后通过反射的形式把它交给一个普通的Java类进行处理。

MessageListenerAdapter会把接收到的消息做如下转换:

TextMessage转换为String对象

BytesMessage转换为byte数组

MapMessage转换为Map对象

ObjectMessage转换为对应的Serializable对象

既然前面说到MessageListenerAdapter会把接收到的消息做类型转换再通过反射交给Java类处理,如果真正目标处理器是一个MessageListener或者是一个SessionAwareMessageListener,那么Spring将直接使用接收到的Message对象作为参数调用它们的onMessage方法,而不会利用反射去调用。下面定义的时候为它指定一个目标类

<!-- 消息监听适配器 -->

<bean id ="messageListenerAdapter" class= "org.springframework.jms.listener.adapter.MessageListenerAdapter" >

<property name ="delegate">

<bean class= "com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerListener" />

</property >

<property name ="defaultListenerMethod" value= "receiveMessage"/>

</bean >

<bean id ="messageListenerAdapterContainer" class= "org.springframework.jms.listener.DefaultMessageListenerContainer" >

<property name ="connectionFactory" ref= "connectionFactory" />

<property name ="destination" ref="adapterQueue" />

<property name ="messageListener" ref= "messageListenerAdapter" />

</bean >

上面说到,目标处理器是一个普通Java类的时候,Spring将进行类型转换之后的对象通过反射去调用真正的方法,那么Spring是如何知道该调用哪个方法的呢?这是通过MessageListenerAdapter的defaultListenerMethod属性来决定的,当没有指定该属性的时候,会默认调用目标处理器的handleMessage方法

编写新队列、发送消息同上

 接收消息:

public class ConsumerListener{

public void handleMessage(String
message) {

System. out.println( "ConsumerListener通过handleMessage接收到一个纯文本消息,消息内容是:" +
message);

}

public void receiveMessage(String
message) {

System.out.println("ConsumerListener通过receiveMessage接收到一个纯文本消息,消息内容是:" +
message);

}

}

MessageListenerAdapter的另外一个主要功能就是可以自动的发送回复的消息

  1. 方法一:
  2. 、public void sendMessage(Destination destination, final String message) {
  3. System.out.println("---------------生产者发送消息-----------------");
  4. System.out.println("---------------生产者发了一个消息:" + message);
  5. jmsTemplate.send(destination, new MessageCreator() {
  6. public Message createMessage(Session session) throws JMSException {
  7. TextMessage textMessage = session.createTextMessage(message);
  8.  textMessage.setJMSReplyTo(responseDestination);  //(省略编写其对应的监听器代码)
  9. return textMessage;
  10. }
  11. });
  12. }

方法二:

<!-- 消息监听适配器 -->

<bean id ="messageListenerAdapter" class= "org.springframework.jms.listener.adapter.MessageListenerAdapter" >

<property name ="delegate">

<bean class= "com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerListener" />

</property >

<property name ="defaultListenerMethod" value= "receiveMessage"/>

</bean >

<bean id ="messageListenerAdapterContainer" class= "org.springframework.jms.listener.DefaultMessageListenerContainer" >

<property name ="connectionFactory" ref= "connectionFactory" />

<property name ="destination" ref="adapterQueue" />

<property name ="messageListener" ref= "messageListenerAdapter" />

 <property name="defaultResponseDestination" ref="defaultResponseQueue"/>   

</bean >

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-17 13:11:10

Spring整合JMS-基于activeMQ实现(二)的相关文章

Spring整合JMS——基于ActiveMQ实现

1.1     JMS简介 JMS的全称是Java Message Service,即Java消息服务.它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息.把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑.对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应:另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收. 1.2  

spring整合JMS - 基于ActiveMQ实现

一. 开篇语 继上一篇apache ActiveMQ之初体验后, 由于近期一直在复习spring的东西, 所以本文就使用spring整合下JMS. 二. 环境准备 1. ActiveMQ5.2.0 (activemq-all-5.2.0.jar) 2. spring2.5 (spring.jar) 3. JavaEE5 4. JDK1.6 注意: 測试前请先启动ActiveMQserver 三. 代码測试(P2P) 1. MsgSender: 消息生产者 /** * message sender

Spring整合JMS(一)——基于ActiveMQ实现

1.1     JMS简介 JMS的全称是Java Message Service,即Java消息服务.它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息.把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑.对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应:另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收. 1.2  

Spring整合JMS(二)——三种消息监听器

一.消息监听器MessageListener 在Spring整合JMS的应用中我们在定义消息监听器的时候一共可以定义三种类型的消息监听器,分别是MessageListener.SessionAwareMessageListener和MessageListenerAdapter.下面就分别来介绍一下这几种类型的区别. 1).MessageListener MessageListener是最原始的消息监听器,它是JMS规范中定义的一个接口.其中定义了一个用于处理接收到的消息的onMessage方法,

Spring整合JMS(二)——消息监听器

消息监听器 在Spring整合JMS的应用中我们在定义消息监听器的时候一共能够定义三种类型的消息监听器,各自是MessageListener.SessionAwareMessageListener和MessageListenerAdapter. 以下就分别来介绍一下这几种类型的差别. MessageListener MessageListener是最原始的消息监听器.它是JMS规范中定义的一个接口.当中定义了一个用于处理接收到的消息的onMessage方法,该方法仅仅接收一个Message參数.

Spring整合JMS——事务管理

Spring提供了一个JmsTransactionManager用于对JMS ConnectionFactory做事务管理.这将允许JMS应用利用Spring的事务管理特性.JmsTransactionManager在执行本地资源事务管理时将从指定的ConnectionFactory绑定一个ConnectionFactory/Session这样的配对到线程中.JmsTemplate会自动检测这样的事务资源,并对它们进行相应操作. 在Java EE环境中,ConnectionFactory会池化C

一步一步Spring整合JMS

1.1 JMS简介 JMS的全称是Java Message Service,即Java消息服务.它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息.把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑.对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应:另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收. 1.2 Sprin

Spring整合JMS-基于activeMQ实现(一)

Spring整合JMS-基于activeMQ实现(一) 1.1 JMS简介 JMS的全称是Java Message Service,即Java消息服务.它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息.把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑.对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应:另一种是发布/订阅模式,即一个生产者产生消息并进

Spring整合JMS(四)——事务管理

原文链接:http://haohaoxuexi.iteye.com/blog/1983532 Spring提供了一个JmsTransactionManager用于对JMS ConnectionFactory做事务管理.这将允许JMS应用利用Spring的事务管理特性.JmsTransactionManager在执行本地资源事务管理时将从指定的ConnectionFactory绑定一个ConnectionFactory/Session这样的配对到线程中.JmsTemplate会自动检测这样的事务资