一、pom.xml与mq.properties
Spring提供了对JMS的支持,需要添加Spring支持jms的包,如下:
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.1.7.RELEASE</version> </dependency>
添加ActiveMQ的pool包,如下:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.11.1</version> </dependency>
添加xbean的标签配置,如下:
<dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.16</version> </dependency>
pom.xml完整配置如下:
<properties> <activemq.version>5.9.0</activemq.version> <activemq-pool.version>5.11.1</activemq-pool.version> <spring.version>4.1.7.RELEASE</spring.version> <xbean.version>3.16</xbean.version> <commons-lang3.version>3.3.2</commons-lang3.version> <commons-io.version>2.4</commons-io.version> <commons-fileupload.version>1.3.1</commons-fileupload.version> <fasterxml.jackson.version>2.8.4</fasterxml.jackson.version> <codehaus.jackson.version>1.9.13</codehaus.jackson.version> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> </dependency> <!-- Apache工具组件 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>${commons-lang3.version}</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>${commons-io.version}</version> </dependency> <dependency> <groupId>commons-fileupload</groupId> <artifactId>commons-fileupload</artifactId> <version>${commons-fileupload.version}</version> </dependency> <!-- jackson --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>${fasterxml.jackson.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>${fasterxml.jackson.version}</version> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-core-asl</artifactId> <version>${codehaus.jackson.version}</version> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> <version>${codehaus.jackson.version}</version> </dependency> <!-- activemq --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>${activemq.version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>${activemq-pool.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>${xbean.version}</version> </dependency> <!-- spring --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aspects</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> </dependency> </dependencies>
二、mq.xml配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"> <!-- ActiveMQ 连接工厂 --> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码--> <amq:connectionFactory id="amqConnectionFactory" brokerURL="${activemq.brokerURL}" userName="${activemq.userName}" password="${activemq.password}" /> <!-- Spring Caching连接工厂 --> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <!-- 同上,同理 --> <!-- <constructor-arg ref="amqConnectionFactory" /> --> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="100" /> </bean> <!-- =======Spring JmsTemplate 的消息生产者【开始】======== --> <!-- 定义JmsTemplate的Queue类型 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="connectionFactory" /> <!-- 非pub/sub模型(发布/订阅),即队列模式 --> <property name="pubSubDomain" value="false" /> </bean> <!-- 定义JmsTemplate的Topic类型 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="connectionFactory" /> <!-- pub/sub模型(发布/订阅) --> <property name="pubSubDomain" value="true" /> </bean> <!-- =======Spring JmsTemplate 的消息生产者【结束】======== --> <!-- =======消息消费者=======【开始】 --> <!-- 定义Queue监听器 --> <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <jms:listener destination="test.queue" ref="queueReceiver1"/> <jms:listener destination="test.queue" ref="queueReceiver2"/> </jms:listener-container> <!-- 定义Topic监听器 --> <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <jms:listener destination="test.topic" ref="topicReceiver1"/> <jms:listener destination="test.topic" ref="topicReceiver2"/> </jms:listener-container> <!-- =======消息消费者=======【结束】 --> </beans>
三、java类
3.1 消费者监听器
3.1.1 队列消息监听器
package com.liuy.mq.consumer.queue; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.springframework.stereotype.Component; /** * 队列消息监听器1 * @description 队列消息监听器1 * @author liuy * @version V1.00 * @date:2017年4月12日上午10:15:19 */ @Component public class QueueReceiver1 implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("QueueReceiver1接收到消息:"+((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
package com.liuy.mq.consumer.queue; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.springframework.stereotype.Component; /** * 队列消息监听器2 * @description 队列消息监听器2 * @author liuy * @version V1.00 * @date:2017年4月12日上午10:15:19 */ @Component public class QueueReceiver2 implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("QueueReceiver2接收到消息:"+((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
3.1.2 Topic消息监听器
package com.liuy.mq.consumer.topic; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.springframework.stereotype.Component; /** * Topic消息监听器1 * @description Topic消息监听器1 * @author liuy * @version V1.00 * @date:2017年4月12日上午10:17:11 */ @Component public class TopicReceiver1 implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.println("TopicReceiver1接收到消息:"+((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
package com.liuy.mq.consumer.topic; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.springframework.stereotype.Component; /** * Topic消息监听器2 * @description Topic消息监听器2 * @author liuy * @version V1.00 * @date:2017年4月12日上午10:17:11 */ @Component public class TopicReceiver2 implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.println("TopicReceiver2接收到消息:"+((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
3.2 消息生产者
package com.liuy.mq.producer.queue; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; /** * 队列消息生产者,发送消息到队列 * @description 队列消息生产者,发送消息到队列 * @author liuy * @version V1.00 * @date:2017年4月12日上午10:20:46 */ @Component("queueSender") public class QueueSender { @Autowired @Qualifier("jmsQueueTemplate") private JmsTemplate jmsTemplate;//通过@Qualifier修饰符来注入对应的bean /** * 发送一条消息到指定的队列(目标) * @param queueName 队列名称 * @param message 消息内容 */ public void send(String queueName,final String message){ jmsTemplate.send(queueName, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } }
package com.liuy.mq.producer.topic; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; /** * Topic生产者发送消息到Topic * @description Topic生产者发送消息到Topic * @author liuy * @version V1.00 * @date:2017年4月12日上午10:20:46 */ @Component("topicSender") public class TopicSender { @Autowired @Qualifier("jmsTopicTemplate") private JmsTemplate jmsTemplate; /** * 发送一条消息到指定的队列(目标) * @param queueName 队列名称 * @param message 消息内容 */ public void send(String topicName,final String message){ jmsTemplate.send(topicName, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } }
四、测试
package com.liuy.test.common; import org.junit.runner.RunWith; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /** * 测试共公类 * @description 测试共公类 * @author liuy * @version V1.00 * @date:2016年4月24日下午5:20:54 */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:application-context.xml") public class SpringJunitTest { }
package com.liuy.test.core; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import com.liuy.mq.producer.queue.QueueSender; import com.liuy.mq.producer.topic.TopicSender; import com.liuy.test.common.SpringJunitTest; /** * @description 描述 * @author liuy * @version 1.0 * @date:2017年4月11日下午9:00:18 */ public class SpringQueueTest extends SpringJunitTest { @Autowired private QueueSender queueSender; @Autowired private TopicSender topicSender; /** * 发送消息到队列 * Queue队列:仅有一个订阅者会收到消息,消息一旦被处理就不会存在队列中 * @param message * @return String */ @Test public void testQueueSend() throws Exception { queueSender.send("test.queue", "测试"); } /** * 发送消息到主题 * Topic主题 :放入一个消息,所有订阅者都会收到 * 这个是主题目的地是一对多的 * @param message * @return String */ @Test public void testTopicSend() throws Exception { topicSender.send("test.topic", "测试222"); } }
效果:
列队:
主题:
时间: 2024-12-31 16:34:50