spring boot整合activeMQ

spring boot整合activeMQ


spring boot整合MQ以后,对于消息的发送和接收操作更加便捷。本文将通过四个案例,分别讲解spring boot整合MQ:

  1. spring boot整合MQ发送queue消息
  2. spring boot整合MQ发送topic消息
  3. spring boot整合MQ以后如何让queue和topic消息共存
  4. spring boot整合MQ以后topic消息如何持久化

下面分别进行讲解:


一、 spring boot 整合MQ发送queue消息

搭建测试工程,pom.xml配置如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>  <groupId>com.itheima.demo</groupId>  <artifactId>activemq-topic</artifactId>  <version>0.0.1-SNAPSHOT</version>  <!-- 变更JDK版本【固定写法】 -->  <properties>    <java.version>1.7</java.version>    </properties>  <parent>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-parent</artifactId>    <version>1.5.3.RELEASE</version>  </parent>  <dependencies>  <!-- web工程启动器 -->    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-web</artifactId>    </dependency>    <!-- 热部署依赖 -->    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-devtools</artifactId>    </dependency>    <!-- 整合MQ起步依赖 -->    <dependency>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-activemq</artifactId>    </dependency>  </dependencies></project>

application.properties文件

#set tomcat portserver.port=8083?#set MQ urlspring.activemq.broker-url=tcp://192.168.25.135:61616?

ApplicationBoot.java

@SpringBootApplicationpublic class ApplicationBoot {

    public static void main(String[] args) {        SpringApplication.run(ApplicationBoot.class, args);    }}?

消息发送端ProviderController

@RestControllerpublic class ProviderController {    @Autowired    private JmsMessagingTemplate template;        @RequestMapping("/sendQueue")    public void sendQueueString(){        //第一个参数:消息目的地名称    第二个参数:消息内容        template.convertAndSend("a_queueMQ_1",                 "发送了一个queue消息,发送时间:"+new Date().toLocaleString());    }}?

消息接收端QueueConsumer

@Componentpublic class QueueConsumer {

    /**     * @JmsListener 接收目的地要和发送端保持一致     * @param msg   msg即消息内容,msg是什么类型取决于发送端发送的消息类型     *      */    @JmsListener(destination="a_queueMQ_1")    public void consumerQueue(String msg){        System.out.println("consumerQueue--接收到消息:"+msg);        System.out.println("接收时间:"+new Date().toLocaleString());    }

}

测试发送queue消息

在浏览器中执行发送queue消息的URL

http://localhost:8083/sendQueue

查看消费端控制台输出信息:

consumerQueue--接收到消息:发送了一个queue消息,发送时间:2018-3-29 9:37:33接收时间:2018-3-29 9:37:33

访问MQ管理页面

http://192.168.25.135:8161

查看queue消息


二、 spring boot 整合MQ发送topic消息

在spring boot整合MQ以后,默认发送的就是queue消息。如果要发送topic消息,则需要在application.properties中添加如下配置:

#set topic message   默认值:false   当值为true:发送topic消息     当值为false:发送queue消息spring.jms.pub-sub-domain=true

在上述工程中添加发送topic消息的测试代码

消息发送端ProviderController.java

@RestControllerpublic class ProviderController {    @Autowired    private JmsMessagingTemplate template;        @RequestMapping("/sendQueue")    public void sendQueueString(){        //第一个参数:消息目的地名称    第二个参数:消息内容        template.convertAndSend("a_queueMQ_1",                 "发送了一个queue消息,发送时间:"+new Date().toLocaleString());    }        @RequestMapping("/sendTopic")    public void sendTopicString() throws JMSException{        template.convertAndSend("a_MQtopic_1",                 "发送了一个topic消息,发送时间:"+new Date().toLocaleString());    }?}

消息接收端TopicConsumer1

@Componentpublic class TopicConsumer1 {?    @JmsListener(destination="a_MQtopic_1")    public void consumerTopic(String msg){        System.out.println("topicConsumer1--接收到消息:"+msg);    }?}

消息接收端TopicConsumer2

@Componentpublic class TopicConsumer2 {?    @JmsListener(destination="a_MQtopic_1")    public void consumerTopic(String msg){        System.out.println("topicConsumer2--接收到消息:"+msg);    }

}

测试发送topic消息

在浏览器中执行发送topic消息的URL

http://localhost:8083/sendTopic

查看消费端控制台输出信息:

topicConsumer1--接收到消息:发送了一个topic消息,发送时间:2018-3-29 10:20:35topicConsumer2--接收到消息:发送了一个topic消息,发送时间:2018-3-29 10:20:35

访问MQ管理页面

http://192.168.25.135:8161

查看topic消息

可以看到,我们已经可以发送topic消息了。

但这里有一个问题:当我们重新执行发送queue消息的url时,会发现实际发送的还是topic消息,如图:

也就是说:

在默认情况下,发送的是queue消息。当我们在application.properties中修改spring.jms.pub-sub-domain=true之后,发送的是topic消息。 queue消息和topic消息这两种消息模式在默认情况下无法共存!

而在实际开发中,queue消息和topic消息都是常见的需求。所以我们有必要让这两种消息能够共存,接下来就来看一看如何解决这个问题。


三、 spring boot整合MQ以后如何让queue和topic消息共存

spring boot整合MQ以后接收消息的基本原理:

接收消息使用的是监听器,由于在项目中可能有多个监听器,如果为每一个监听器都创建一个MQ的连接就太耗资源,所以这里提供了一个监听器的连接工厂JmsListenerContainerFactory,由它来完成消息的接收和分发。

基于上述原理,为了解决queue和topic消息共存的问题,我们可以进行如下处理:
  1. 为queue接收端和topic接收端分别创建监听器连接工厂
  2. 在topic的连接工厂中设置pub-sub-domain=true
  3. 在接收端的@JmsListener注解中,通过containerFactory指定使用的连接工厂

下面就来解决这个问题,测试代码如下

application.properties中取消spring.jms.pub-sub-domain=true配置

#set tomcat portserver.port=8083?#set MQ urlspring.activemq.broker-url=tcp://192.168.25.135:61616?#set topic message#spring.jms.pub-sub-domain=true

编写MQConfig类,配监听工厂

/**
 * MQ配置类
 * @author leejin
 *
 */
@Configuration
public class MQConfig {

    /**
     * 配置topic目的地
     * bean的名字默认就是方法名
     * @return
     */
    @Bean
    public Topic topic(){
    	return new ActiveMQTopic("a1_MQtopic_3");
    }

    /**
     * 配置queue目的地
     * bean的名字默认就是方法名
     * @return
     */
    @Bean
    public Queue queue(){
    	return new ActiveMQQueue("a1_queueMQ_3");
    }

    /**
     * 此bean的名称就是方法名
     * 配置topic消息监听容器工厂
     * @param connectionFactory   与MQ创建连接的工厂
     * @return
     */
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory connectionFactory) {
    	DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        //指定接收topic消息。相当于在applicatioin.properties中配置了spring.jms.pub-sub-domain=true
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(connectionFactory);
        return bean;
    }

    /**
     * 此bean的名称就是方法名
     * 配置queue消息监听容器工厂
     * @param connectionFactory   与MQ创建连接的工厂
     * @return
     */
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory connectionFactory){
    	DefaultJmsListenerContainerFactory bean=new DefaultJmsListenerContainerFactory();
    	bean.setConnectionFactory(connectionFactory);
    	return bean;
    }

}

修改之前的发送端和消费端代码

发送端ProviderController

@RestControllerpublic class ProviderController {

    @Autowired    private Topic topic;        @Autowired    private Queue queue;        @Autowired    private JmsMessagingTemplate template;        @RequestMapping("/sendQueue")    public void sendQueueString(){        //第一个参数:消息目的地名称    第二个参数:消息内容        template.convertAndSend(queue,                 "发送了一个queue消息,发送时间:"+new Date().toLocaleString());    }        @RequestMapping("/sendTopic")    public void sendTopicString() throws JMSException{        template.convertAndSend(topic,                 "发送了一个topic消息,发送时间:"+new Date().toLocaleString());    }?}

queue消费端QueueConsumer

@Componentpublic class QueueConsumer {

    /**     * @JmsListener 接收目的地要和发送端保持一致     * destination  指定目的地     * containerFactory  指定监听器连接工厂     * @param msg   msg即消息内容,msg是什么类型取决于发送端发送的消息类型     */    @JmsListener(destination="a1_queueMQ_3",containerFactory="jmsListenerContainerQueue")    public void consumerQueue(String msg){        System.out.println("consumerQueue--接收到消息:"+msg);    }    }

topic消费端TopicConsumer1

@Componentpublic class TopicConsumer1 {

    /**     * @JmsListener 接收目的地要和发送端保持一致     * destination  指定目的地     * containerFactory  指定监听器连接工厂     * @param msg   msg即消息内容,msg是什么类型取决于发送端发送的消息类型     */    @JmsListener(destination="a1_MQtopic_3",containerFactory="jmsListenerContainerTopic")    public void consumerTopic(String msg){        System.out.println("topicConsumer1--接收到消息:"+msg);    }

}

topic消费端TopicConsumer2

@Componentpublic class TopicConsumer2 {?    /**     * @JmsListener 接收目的地要和发送端保持一致     * destination  指定目的地     * containerFactory  指定监听器连接工厂     * @param msg   msg即消息内容,msg是什么类型取决于发送端发送的消息类型     */    @JmsListener(destination="a1_MQtopic_3",containerFactory="jmsListenerContainerTopic")    public void consumerTopic(String msg){        System.out.println("topicConsumer2--接收到消息:"+msg);    }

}

测试queue和topic消息共存

  1. 测试发送queue消息

在浏览器中执行发送queue消息的URL

http://localhost:8083/sendQueue

查看消费端控制台输出信息:

consumerQueue--接收到消息:发送了一个queue消息,发送时间:2018-3-29 14:39:55

访问MQ管理页面

http://192.168.25.135:8161

查看queue消息

  1. 测试发送topic消息在浏览器中执行发送topic消息的URL
http://localhost:8083/sendTopic

查看消费端控制台输出信息:

topicConsumer1--接收到消息:发送了一个topic消息,发送时间:2018-3-29 14:41:10topicConsumer2--接收到消息:发送了一个topic消息,发送时间:2018-3-29 14:41:10

访问MQ管理页面

http://192.168.25.135:8161

查看topic消息

这样我们就解决了queue和topic消息的共存问题,主要的实现思路就是为queue消息和topic消息分别指定监听容器工厂。

接下来还有最后一个问题,就是topic消息的持久化问题。众所周知,topic消息默认情况下是不持久化的,也就是说发送端发送消息的时候,如果消费端不在线,这个消息就丢了,消费端再上线时是无法收到这个消息的。而很多时候我们是需要去接收到这个消息的,那么要解决这个问题,就需要对topic消息做持久化处理。

接下来我们就来看看怎么实现topic消息的持久化。


四、 spring boot整合MQ以后topic消息如何持久化

要实现topic消息的持久化,需要在消息发送端和消费端以及监听容器(JmsListenerContainerFactory)都做处理。在之前案例的基础上,我们做一些修改,加入topic消息持久化设置。测试代码如下:

监听容器(JmsListenerContainerFactory)配置

/** * MQ配置类 * @author leejin * */@Configurationpublic class MQConfig {

    /**     * 配置topic目的地     * bean的名字默认就是方法名     * @return     */    @Bean    public Topic topic(){        return new ActiveMQTopic("a11_1_MQtopic_5");    }        /**     * 配置queue目的地     * bean的名字默认就是方法名     * @return     */    @Bean    public Queue queue(){        return new ActiveMQQueue("a1_queueMQ_3");    }        /**     * 此bean的名称就是方法名     * 配置topic消息监听容器工厂     * @param connectionFactory   与MQ创建连接的工厂     * @return     */    @Bean    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory connectionFactory) {        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();        //指定接收topic消息。相当于在applicatioin.properties中配置了spring.jms.pub-sub-domain=true        bean.setPubSubDomain(true);        bean.setConnectionFactory(connectionFactory);        return bean;    }        /**     * 此bean的名称就是方法名     * 配置queue消息监听容器工厂     * @param connectionFactory   与MQ创建连接的工厂     * @return     */    @Bean    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory connectionFactory){        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();        bean.setConnectionFactory(connectionFactory);        return bean;    }        //上面配置的那个topic监听工厂,对于不需要持久化的topic消息是可以用的    //下面配置的监听工厂,是给需要持久化的topic消息使用的    /**     * 第一个topic消费端的监听工厂     * @param connectionFactory   与MQ创建连接的工厂     * @return     */    @Bean    public JmsListenerContainerFactory<?> topicFactory1(ConnectionFactory connectionFactory){         DefaultJmsListenerContainerFactory bean = defaultJmsListenerContainerFactoryTopic(connectionFactory);         //用来标识第一个topic客户端(clientId值可以自己指定)         bean.setClientId("10001");         return bean;    }        /**     * 第二个topic消费端的监听工厂     * @param connectionFactory   与MQ创建连接的工厂     * @return     */    @Bean    public JmsListenerContainerFactory<?> topicFactory2(ConnectionFactory connectionFactory){        DefaultJmsListenerContainerFactory bean = defaultJmsListenerContainerFactoryTopic(connectionFactory);        //用来标识第二个topic客户端(clientId值可以自己指定)        bean.setClientId("10002");        return bean;    }        /**     * 对于一些相同配置,提取成方法     * @param connectionFactory     * @return     */    private DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactoryTopic(ConnectionFactory connectionFactory) {        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();        //指定接收topic消息。相当于在applicatioin.properties中配置了spring.jms.pub-sub-domain=true        bean.setPubSubDomain(true);        bean.setConnectionFactory(connectionFactory);        //开启持久化订阅        bean.setSubscriptionDurable(true);        return bean;    }

}

消息发送端ProviderController

@RestControllerpublic class ProviderController {

    @Autowired    private Topic topic;        @Autowired    private Queue queue;        @Autowired    private JmsMessagingTemplate template;        /**     * 发送queue消息     */    @RequestMapping("/sendQueue")    public void sendQueueString(){        //第一个参数:消息目的地名称    第二个参数:消息内容        template.convertAndSend(queue,                 "发送了一个queue消息,发送时间:"+new Date().toLocaleString());    }        /**     * 发送topic消息     * @throws JMSException     */    @RequestMapping("/sendTopic")    public void sendTopicString() throws JMSException{        /**设置发送持久化的topic消息         * 注意:DeliveryMode是javax.jms的一个接口         * 不要导入了org.springframework.boot.autoconfigure.jms.JmsProperties.DeliveryMode的这个枚举类        **/        template.getJmsTemplate().setDeliveryMode(DeliveryMode.PERSISTENT);        template.convertAndSend(topic,                 "发送了一个topic消息,发送时间:"+new Date().toLocaleString());    }

}

消费端TopicConsumer1

@Componentpublic class TopicConsumer1 {

    /**     * @JmsListener 接收目的地要和发送端保持一致     * destination  指定目的地     * containerFactory  指定监听器连接工厂     * @param msg   msg即消息内容,msg是什么类型取决于发送端发送的消息类型     */    @JmsListener(destination="a11_1_MQtopic_5",containerFactory="topicFactory1")    public void consumerTopic(String msg){        System.out.println("topicConsumer1--接收到消息:"+msg);    }

}

消费端TopicConsumer2

@Component
public class TopicConsumer2 {

    /**
     * @JmsListener 接收目的地要和发送端保持一致
     * destination  指定目的地
     * containerFactory  指定监听器连接工厂
     * @param msg   msg即消息内容,msg是什么类型取决于发送端发送的消息类型
     */
    @JmsListener(destination="a11_1_MQtopic_5",containerFactory="topicFactory2")
    public void consumerTopic(String msg){
    	System.out.println("topicConsumer2--接收到消息:"+msg);
    }

}
测试topic消息的持久化【注意:在测试的时候需要让两个消费端先完成clientId注册(也就是要先在线接收一次消息)】

这里的测试分两步进行:

  1. TopicConsumer1和TopicConsumer2同时在线(目的是为了完成clientId注册)

在浏览器中执行发送topic消息的URL

http://localhost:8083/sendTopic

查看消费端控制台输出信息:

topicConsumer1--接收到消息:发送了一个topic消息,发送时间:2018-3-29 16:03:54
topicConsumer2--接收到消息:发送了一个topic消息,发送时间:2018-3-29 16:03:54

访问MQ管理页面

http://192.168.25.135:8161

查看topic消息

  1. TopicConsumer1不在线(这里是注释掉了TopicConsumer1接收消息的代码)

在浏览器中执行发送topic消息的URL

http://localhost:8083/sendTopic

查看消费端控制台输出信息(只有TopicConsumer2接收到了消息):

topicConsumer2--接收到消息:发送了一个topic消息,发送时间:2018-3-29 16:07:44

这时打开注释的代码(TopicConsumer1上线),查看控制台,发现TopicConsumer1在上线后,接收到了刚才的消息

这样,我们就完成了topic消息的持久化。

原文地址:https://www.cnblogs.com/leejin/p/11254251.html

时间: 2024-11-06 03:38:18

spring boot整合activeMQ的相关文章

activeMQ入门+spring boot整合activeMQ

最近想要学习MOM(消息中间件:Message Oriented Middleware),就从比较基础的activeMQ学起,rabbitMQ.zeroMQ.rocketMQ.Kafka等后续再去学习. 上面说activeMQ是一种消息中间件,可是为什么要使用activeMQ? 在没有使用JMS的时候,很多应用会出现同步通信(客户端发起请求后需要等待服务端返回结果才能继续执行).客户端服务端耦合.单一点对点(P2P)通信的问题,JMS可以通过面向消息中间件的方式很好的解决了上面的问题. JMS规

Spring Boot 整合 ActiveMQ

依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!--消息队列连接池--> <dependency> <groupId>org.apache.activemq</groupId>

Spring Boot入门 and Spring Boot与ActiveMQ整合

1.Spring Boot入门 1.1什么是Spring Boot Spring 诞生时是 Java 企业版(Java Enterprise Edition,JEE,也称 J2EE)的轻量级代替品.无需开发重量级的 Enterprise JavaBean(EJB),Spring 为企业级Java 开发提供了一种相对简单的方法,通过依赖注入和面向切面编程,用简单的Java 对象(Plain Old Java Object,POJO)实现了 EJB 的功能. 虽然 Spring 的组件代码是轻量级的

spring boot 整合 quartz 集群环境 实现 动态定时任务配置【原】

最近做了一个spring boot 整合 quartz  实现 动态定时任务配置,在集群环境下运行的 任务.能够对定时任务,动态的进行增删改查,界面效果图如下: 1. 在项目中引入jar 2. 将需要的表导入数据库 官网上有不同数据库的脚本,找到对应的,导入即可 3. java 代码 将quartz 的相关配置文件,配置为暴露bean,方便后期引用. 有一处关键的地方,就是注入spring 上下文,也可以算是一个坑.如果,不注入spring 上下文,那么新添加的定时任务job,是新new 的一个

spring boot 整合spring Data JPA+Spring Security+Thymeleaf框架(上)

最近上班太忙所以耽搁了给大家分享实战springboot 框架的使用. 下面是spring boot 整合多个框架的使用. 首先是准备工作要做好. 第一  导入框架所需的包,我们用的事maven 进行对包的管理. 以上的举例是本人的H5DS的真实的后台管理项目,这个项目正在盛情融资中,各位多多捧点人场.关注一下软件发展的动态,说不定以后就是您的生活不可或缺的软件哟. 点击打开链接.闲话少说.现在切入正题. 第二,写点配置文件 第三,spring data -设计一个简单的po关系,这里需要下载一

spring boot整合jsp的那些坑(spring boot 学习笔记之三)

Spring Boot 整合 Jsp 步骤: 1.新建一个spring boot项目 2.修改pom文件 <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <depend

Spring boot整合jsp

这几天在集中学习Spring boot+Shiro框架,因为之前view层用jsp比较多,所以想在spring boot中配置jsp,但是spring boot官方不推荐使用jsp,因为jsp相对于一些模板引擎,性能都比较低,官方推荐使用thymeleaf,但是Spring boot整合jsp的过程已经完成,在这里记录一下. 这篇博文是在LZ上篇文章spring boot+mybatis整合基础上写的,开发工具仍然是Intellij idea.这篇文章的重点是Intellij idea的设置,否

企业分布式微服务云SpringCloud SpringBoot mybatis (十三)Spring Boot整合MyBatis

Spring中整合MyBatis就不多说了,最近大量使用Spring Boot,因此整理一下Spring Boot中整合MyBatis的步骤.搜了一下Spring Boot整合MyBatis的文章,方法都比较老,比较繁琐.查了一下文档,实际已经支持较为简单的整合与使用.下面就来详细介绍如何在Spring Boot中整合MyBatis,并通过注解方式实现映射. 整合MyBatis 新建Spring Boot项目,或以Chapter1为基础来操作 pom.xml中引入依赖 这里用到spring-bo

Spring Kafka和Spring Boot整合实现消息发送与消费简单案例

本文主要分享下Spring Boot和Spring Kafka如何配置整合,实现发送和接收来自Spring Kafka的消息. 先前我已经分享了Kafka的基本介绍与集群环境搭建方法.关于Kafka的介绍请阅读Apache Kafka简介与安装(一),关于Kafka安装请阅读Apache Kafka安装,关于Kafka集群环境搭建请阅读Apache Kafka集群环境搭建 .这里关于服务器环境搭建不在赘述. Spring Kafka整合Spring Boot创建生产者客户端案例 创建一个kafk