spring 提供spring-jms模块,用以集成JMS到spring 容器中,并提供jmsTemplate模版类来操作jms,类似集成jdbc数据库操作一样。
首先,我们新建一个applicationContext-qpid.xml来做qpid的集成操作
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
">
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jQpidConnectionFactory"/>
</bean>
<!-- 链接到java broker服务器 -->
<bean id="jQpidConnectionFactory" class="org.apache.qpid.client.AMQConnectionFactory">
<constructor-arg index="0" value="amqp://guest:[email protected]/?brokerlist=‘tcp://localhost:5672‘" />
</bean>
<!-- 缓存会话,保证单链接 -->
<bean id="jCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="jQpidConnectionFactory" />
<property name="sessionCacheSize" value="1" />
<property name="reconnectOnException" value="true" />
</bean>
<!-- 订阅路径 -->
<bean id="jDestination" class="org.apache.qpid.client.AMQAnyDestination">
<constructor-arg index="0" value="ADDR:message_queue; {create: always}" />
</bean>
<!-- 接收事件处理器 -->
<bean id="jReceiver" class="com.liuxg.qpid.receiver.MsgReceiver" />
<!-- 事务处理 -->
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
<!-- 默认消息监听器 -->
<bean id="deviceStatusContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jCachingConnectionFactory" />
<property name="exceptionListener" ref="jCachingConnectionFactory" />
<property name="messageListener" ref="jReceiver" />
<property name="destination" ref="jDestination" />
<property name="transactionManager" ref="transactionManager"/>
</bean>
</beans>
然后需要把applicationContext-qpid.xml添加到web.xml的contextConfigLocation中
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath*:META-INF/applicationContext*.xml</param-value>
</context-param>
然后,在发送端,可以利用模版类发送消息
@Service
public class SenderService {
@Autowired JmsTemplate jmsTemplate;
@Autowired AMQAnyDestination deviceStatusDestination;
/**
* 发送消息
*/
public void sendMsg(){
jmsTemplate.send(deviceStatusDestination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("hello world !!");
}
});
}
}
这里发送的时候,可以利用spring的消息转化器,它支持String 和TextMessage, byte[] 和BytesMesssage还有java.util.Map 和 MapMessage的相互转化,例如下面
public void sendWithConversion() {
Map map = new HashMap();
map.put("Name", "Mark");
map.put("Age", new Integer(47));
jmsTemplate.convertAndSend("testQueue", map, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws JMSException {
message.setIntProperty("AccountID", 1234);
message.setJMSCorrelationID("123-00001");
return message;
}
});
}
最后在接收端,可以定义一个类实现SessionAwareMessageListener或者DeviceStatusReceiver,这里我们实现SessionAwareMessageListener
public class MsgReceiver implements SessionAwareMessageListener<Message> {
@Override
public void onMessage(Message message, Session session) throws JMSException {
JMSTextMessage xx = (JMSTextMessage) message;
String text = xx.getText();
System.out.println(text);
}
}
时间: 2024-10-12 12:27:43