spring boot整合activeMQ
spring boot整合MQ以后,对于消息的发送和接收操作更加便捷。本文将通过四个案例,分别讲解spring boot整合MQ:
- spring boot整合MQ发送queue消息
- spring boot整合MQ发送topic消息
- spring boot整合MQ以后如何让queue和topic消息共存
- spring boot整合MQ以后topic消息如何持久化
下面分别进行讲解:
一、 spring boot 整合MQ发送queue消息
搭建测试工程,pom.xml配置如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.itheima.demo</groupId> <artifactId>activemq-topic</artifactId> <version>0.0.1-SNAPSHOT</version> <!-- 变更JDK版本【固定写法】 --> <properties> <java.version>1.7</java.version> </properties> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.3.RELEASE</version> </parent> <dependencies> <!-- web工程启动器 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 热部署依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> </dependency> <!-- 整合MQ起步依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> </dependencies></project>
application.properties文件
#set tomcat portserver.port=8083?#set MQ urlspring.activemq.broker-url=tcp://192.168.25.135:61616?
ApplicationBoot.java
@SpringBootApplicationpublic class ApplicationBoot { public static void main(String[] args) { SpringApplication.run(ApplicationBoot.class, args); }}?
消息发送端ProviderController
@RestControllerpublic class ProviderController { @Autowired private JmsMessagingTemplate template; @RequestMapping("/sendQueue") public void sendQueueString(){ //第一个参数:消息目的地名称 第二个参数:消息内容 template.convertAndSend("a_queueMQ_1", "发送了一个queue消息,发送时间:"+new Date().toLocaleString()); }}?
消息接收端QueueConsumer
@Componentpublic class QueueConsumer { /** * @JmsListener 接收目的地要和发送端保持一致 * @param msg msg即消息内容,msg是什么类型取决于发送端发送的消息类型 * */ @JmsListener(destination="a_queueMQ_1") public void consumerQueue(String msg){ System.out.println("consumerQueue--接收到消息:"+msg); System.out.println("接收时间:"+new Date().toLocaleString()); } }
测试发送queue消息
在浏览器中执行发送queue消息的URL
http://localhost:8083/sendQueue
查看消费端控制台输出信息:
consumerQueue--接收到消息:发送了一个queue消息,发送时间:2018-3-29 9:37:33接收时间:2018-3-29 9:37:33
访问MQ管理页面
http://192.168.25.135:8161
查看queue消息
二、 spring boot 整合MQ发送topic消息
在spring boot整合MQ以后,默认发送的就是queue消息。如果要发送topic消息,则需要在application.properties中添加如下配置:
#set topic message 默认值:false 当值为true:发送topic消息 当值为false:发送queue消息spring.jms.pub-sub-domain=true
在上述工程中添加发送topic消息的测试代码
消息发送端ProviderController.java
@RestControllerpublic class ProviderController { @Autowired private JmsMessagingTemplate template; @RequestMapping("/sendQueue") public void sendQueueString(){ //第一个参数:消息目的地名称 第二个参数:消息内容 template.convertAndSend("a_queueMQ_1", "发送了一个queue消息,发送时间:"+new Date().toLocaleString()); } @RequestMapping("/sendTopic") public void sendTopicString() throws JMSException{ template.convertAndSend("a_MQtopic_1", "发送了一个topic消息,发送时间:"+new Date().toLocaleString()); }?}
消息接收端TopicConsumer1
@Componentpublic class TopicConsumer1 {? @JmsListener(destination="a_MQtopic_1") public void consumerTopic(String msg){ System.out.println("topicConsumer1--接收到消息:"+msg); }?}
消息接收端TopicConsumer2
@Componentpublic class TopicConsumer2 {? @JmsListener(destination="a_MQtopic_1") public void consumerTopic(String msg){ System.out.println("topicConsumer2--接收到消息:"+msg); } }
测试发送topic消息
在浏览器中执行发送topic消息的URL
http://localhost:8083/sendTopic
查看消费端控制台输出信息:
topicConsumer1--接收到消息:发送了一个topic消息,发送时间:2018-3-29 10:20:35topicConsumer2--接收到消息:发送了一个topic消息,发送时间:2018-3-29 10:20:35
访问MQ管理页面
http://192.168.25.135:8161
查看topic消息
可以看到,我们已经可以发送topic消息了。
但这里有一个问题:当我们重新执行发送queue消息的url时,会发现实际发送的还是topic消息,如图:
也就是说:
在默认情况下,发送的是queue消息。当我们在application.properties中修改spring.jms.pub-sub-domain=true之后,发送的是topic消息。 queue消息和topic消息这两种消息模式在默认情况下无法共存!
而在实际开发中,queue消息和topic消息都是常见的需求。所以我们有必要让这两种消息能够共存,接下来就来看一看如何解决这个问题。
三、 spring boot整合MQ以后如何让queue和topic消息共存
spring boot整合MQ以后接收消息的基本原理:
接收消息使用的是监听器,由于在项目中可能有多个监听器,如果为每一个监听器都创建一个MQ的连接就太耗资源,所以这里提供了一个监听器的连接工厂JmsListenerContainerFactory,由它来完成消息的接收和分发。
基于上述原理,为了解决queue和topic消息共存的问题,我们可以进行如下处理:
- 为queue接收端和topic接收端分别创建监听器连接工厂
- 在topic的连接工厂中设置pub-sub-domain=true
- 在接收端的@JmsListener注解中,通过containerFactory指定使用的连接工厂
下面就来解决这个问题,测试代码如下
application.properties中取消spring.jms.pub-sub-domain=true配置
#set tomcat portserver.port=8083?#set MQ urlspring.activemq.broker-url=tcp://192.168.25.135:61616?#set topic message#spring.jms.pub-sub-domain=true
编写MQConfig类,配监听工厂
/** * MQ配置类 * @author leejin * */ @Configuration public class MQConfig { /** * 配置topic目的地 * bean的名字默认就是方法名 * @return */ @Bean public Topic topic(){ return new ActiveMQTopic("a1_MQtopic_3"); } /** * 配置queue目的地 * bean的名字默认就是方法名 * @return */ @Bean public Queue queue(){ return new ActiveMQQueue("a1_queueMQ_3"); } /** * 此bean的名称就是方法名 * 配置topic消息监听容器工厂 * @param connectionFactory 与MQ创建连接的工厂 * @return */ @Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); //指定接收topic消息。相当于在applicatioin.properties中配置了spring.jms.pub-sub-domain=true bean.setPubSubDomain(true); bean.setConnectionFactory(connectionFactory); return bean; } /** * 此bean的名称就是方法名 * 配置queue消息监听容器工厂 * @param connectionFactory 与MQ创建连接的工厂 * @return */ @Bean public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory connectionFactory){ DefaultJmsListenerContainerFactory bean=new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(connectionFactory); return bean; } }
修改之前的发送端和消费端代码
发送端ProviderController
@RestControllerpublic class ProviderController { @Autowired private Topic topic; @Autowired private Queue queue; @Autowired private JmsMessagingTemplate template; @RequestMapping("/sendQueue") public void sendQueueString(){ //第一个参数:消息目的地名称 第二个参数:消息内容 template.convertAndSend(queue, "发送了一个queue消息,发送时间:"+new Date().toLocaleString()); } @RequestMapping("/sendTopic") public void sendTopicString() throws JMSException{ template.convertAndSend(topic, "发送了一个topic消息,发送时间:"+new Date().toLocaleString()); }?}
queue消费端QueueConsumer
@Componentpublic class QueueConsumer { /** * @JmsListener 接收目的地要和发送端保持一致 * destination 指定目的地 * containerFactory 指定监听器连接工厂 * @param msg msg即消息内容,msg是什么类型取决于发送端发送的消息类型 */ @JmsListener(destination="a1_queueMQ_3",containerFactory="jmsListenerContainerQueue") public void consumerQueue(String msg){ System.out.println("consumerQueue--接收到消息:"+msg); } }
topic消费端TopicConsumer1
@Componentpublic class TopicConsumer1 { /** * @JmsListener 接收目的地要和发送端保持一致 * destination 指定目的地 * containerFactory 指定监听器连接工厂 * @param msg msg即消息内容,msg是什么类型取决于发送端发送的消息类型 */ @JmsListener(destination="a1_MQtopic_3",containerFactory="jmsListenerContainerTopic") public void consumerTopic(String msg){ System.out.println("topicConsumer1--接收到消息:"+msg); } }
topic消费端TopicConsumer2
@Componentpublic class TopicConsumer2 {? /** * @JmsListener 接收目的地要和发送端保持一致 * destination 指定目的地 * containerFactory 指定监听器连接工厂 * @param msg msg即消息内容,msg是什么类型取决于发送端发送的消息类型 */ @JmsListener(destination="a1_MQtopic_3",containerFactory="jmsListenerContainerTopic") public void consumerTopic(String msg){ System.out.println("topicConsumer2--接收到消息:"+msg); } }
测试queue和topic消息共存
- 测试发送queue消息
在浏览器中执行发送queue消息的URL
http://localhost:8083/sendQueue
查看消费端控制台输出信息:
consumerQueue--接收到消息:发送了一个queue消息,发送时间:2018-3-29 14:39:55
访问MQ管理页面
http://192.168.25.135:8161
查看queue消息
- 测试发送topic消息在浏览器中执行发送topic消息的URL
http://localhost:8083/sendTopic
查看消费端控制台输出信息:
topicConsumer1--接收到消息:发送了一个topic消息,发送时间:2018-3-29 14:41:10topicConsumer2--接收到消息:发送了一个topic消息,发送时间:2018-3-29 14:41:10
访问MQ管理页面
http://192.168.25.135:8161
查看topic消息
这样我们就解决了queue和topic消息的共存问题,主要的实现思路就是为queue消息和topic消息分别指定监听容器工厂。
接下来还有最后一个问题,就是topic消息的持久化问题。众所周知,topic消息默认情况下是不持久化的,也就是说发送端发送消息的时候,如果消费端不在线,这个消息就丢了,消费端再上线时是无法收到这个消息的。而很多时候我们是需要去接收到这个消息的,那么要解决这个问题,就需要对topic消息做持久化处理。
接下来我们就来看看怎么实现topic消息的持久化。
四、 spring boot整合MQ以后topic消息如何持久化
要实现topic消息的持久化,需要在消息发送端和消费端以及监听容器(JmsListenerContainerFactory)都做处理。在之前案例的基础上,我们做一些修改,加入topic消息持久化设置。测试代码如下:
监听容器(JmsListenerContainerFactory)配置
/** * MQ配置类 * @author leejin * */@Configurationpublic class MQConfig { /** * 配置topic目的地 * bean的名字默认就是方法名 * @return */ @Bean public Topic topic(){ return new ActiveMQTopic("a11_1_MQtopic_5"); } /** * 配置queue目的地 * bean的名字默认就是方法名 * @return */ @Bean public Queue queue(){ return new ActiveMQQueue("a1_queueMQ_3"); } /** * 此bean的名称就是方法名 * 配置topic消息监听容器工厂 * @param connectionFactory 与MQ创建连接的工厂 * @return */ @Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); //指定接收topic消息。相当于在applicatioin.properties中配置了spring.jms.pub-sub-domain=true bean.setPubSubDomain(true); bean.setConnectionFactory(connectionFactory); return bean; } /** * 此bean的名称就是方法名 * 配置queue消息监听容器工厂 * @param connectionFactory 与MQ创建连接的工厂 * @return */ @Bean public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory connectionFactory){ DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(connectionFactory); return bean; } //上面配置的那个topic监听工厂,对于不需要持久化的topic消息是可以用的 //下面配置的监听工厂,是给需要持久化的topic消息使用的 /** * 第一个topic消费端的监听工厂 * @param connectionFactory 与MQ创建连接的工厂 * @return */ @Bean public JmsListenerContainerFactory<?> topicFactory1(ConnectionFactory connectionFactory){ DefaultJmsListenerContainerFactory bean = defaultJmsListenerContainerFactoryTopic(connectionFactory); //用来标识第一个topic客户端(clientId值可以自己指定) bean.setClientId("10001"); return bean; } /** * 第二个topic消费端的监听工厂 * @param connectionFactory 与MQ创建连接的工厂 * @return */ @Bean public JmsListenerContainerFactory<?> topicFactory2(ConnectionFactory connectionFactory){ DefaultJmsListenerContainerFactory bean = defaultJmsListenerContainerFactoryTopic(connectionFactory); //用来标识第二个topic客户端(clientId值可以自己指定) bean.setClientId("10002"); return bean; } /** * 对于一些相同配置,提取成方法 * @param connectionFactory * @return */ private DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactoryTopic(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); //指定接收topic消息。相当于在applicatioin.properties中配置了spring.jms.pub-sub-domain=true bean.setPubSubDomain(true); bean.setConnectionFactory(connectionFactory); //开启持久化订阅 bean.setSubscriptionDurable(true); return bean; } }
消息发送端ProviderController
@RestControllerpublic class ProviderController { @Autowired private Topic topic; @Autowired private Queue queue; @Autowired private JmsMessagingTemplate template; /** * 发送queue消息 */ @RequestMapping("/sendQueue") public void sendQueueString(){ //第一个参数:消息目的地名称 第二个参数:消息内容 template.convertAndSend(queue, "发送了一个queue消息,发送时间:"+new Date().toLocaleString()); } /** * 发送topic消息 * @throws JMSException */ @RequestMapping("/sendTopic") public void sendTopicString() throws JMSException{ /**设置发送持久化的topic消息 * 注意:DeliveryMode是javax.jms的一个接口 * 不要导入了org.springframework.boot.autoconfigure.jms.JmsProperties.DeliveryMode的这个枚举类 **/ template.getJmsTemplate().setDeliveryMode(DeliveryMode.PERSISTENT); template.convertAndSend(topic, "发送了一个topic消息,发送时间:"+new Date().toLocaleString()); } }
消费端TopicConsumer1
@Componentpublic class TopicConsumer1 { /** * @JmsListener 接收目的地要和发送端保持一致 * destination 指定目的地 * containerFactory 指定监听器连接工厂 * @param msg msg即消息内容,msg是什么类型取决于发送端发送的消息类型 */ @JmsListener(destination="a11_1_MQtopic_5",containerFactory="topicFactory1") public void consumerTopic(String msg){ System.out.println("topicConsumer1--接收到消息:"+msg); } }
消费端TopicConsumer2
@Component public class TopicConsumer2 { /** * @JmsListener 接收目的地要和发送端保持一致 * destination 指定目的地 * containerFactory 指定监听器连接工厂 * @param msg msg即消息内容,msg是什么类型取决于发送端发送的消息类型 */ @JmsListener(destination="a11_1_MQtopic_5",containerFactory="topicFactory2") public void consumerTopic(String msg){ System.out.println("topicConsumer2--接收到消息:"+msg); } }
测试topic消息的持久化【注意:在测试的时候需要让两个消费端先完成clientId注册(也就是要先在线接收一次消息)】
这里的测试分两步进行:
- TopicConsumer1和TopicConsumer2同时在线(目的是为了完成clientId注册)
在浏览器中执行发送topic消息的URL
http://localhost:8083/sendTopic
查看消费端控制台输出信息:
topicConsumer1--接收到消息:发送了一个topic消息,发送时间:2018-3-29 16:03:54 topicConsumer2--接收到消息:发送了一个topic消息,发送时间:2018-3-29 16:03:54
访问MQ管理页面
http://192.168.25.135:8161
查看topic消息
- TopicConsumer1不在线(这里是注释掉了TopicConsumer1接收消息的代码)
在浏览器中执行发送topic消息的URL
http://localhost:8083/sendTopic
查看消费端控制台输出信息(只有TopicConsumer2接收到了消息):
topicConsumer2--接收到消息:发送了一个topic消息,发送时间:2018-3-29 16:07:44
这时打开注释的代码(TopicConsumer1上线),查看控制台,发现TopicConsumer1在上线后,接收到了刚才的消息
这样,我们就完成了topic消息的持久化。
原文地址:https://www.cnblogs.com/leejin/p/11254251.html