SpringBoot入门 (九) MQ使用

本文记录学习在Spring Boot中使用MQ。

一 什么是MQ

  MQ全称(Message Queue)又名消息队列,是一种异步通讯的中间件。它的作用类似于邮局,发信人(生产者)只需要将信(消息)交给邮局,然后由邮局再将信(消息)发送给具体的接收者(消费者),具体发送过程与时间发信人可以不关注,也不会影响发信人做其它事情。目前常见的MQ有activemq、kafka、rabbitmq、zeromq、rocketmq等。

  使用MQ的优点主要有:

  1 方法的异步执行 使用MQ可以将耗时的同步操作通过以发送消息的方式进行了异步化处理,减少了由于同步而等待的时间;

  2 程序之间松耦合 使用MQ可以减少了服务之间的耦合性,不同的服务可以通过消息队列进行通信,只要约定好消息的内容格式就行;

  JMS(Java Message Service)即java消息服务,是一个Java平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。JMS的消息机制有2种模型,一种是1对1(Point to Point)的队列的消息,这种消息,只能被一个消费者消费;另一种是一对多的发布/订阅(Topic)消息,一条消息可以被多个消费者消费。ActiveMq是对JMS的一个实现。

二 SpringBoot集成Active MQ

  官网下载一个服务程序,解压后直接启动服务就可以了,下载地址:http://activemq.apache.org/activemq-5158-release.html

  SpringBoot也对Active MQ提供了支持,我们使用时引入具体的依赖即可,修改pom.xml文件,添加依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

  在application.properties文件中配置Active MQ服务器的连接信息

spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
#消息模式 true:广播(Topic),false:队列(Queue),默认时false
#spring.jms.pub-sub-domain=true

  完成以上配置信息后,当我们在启动SpringBoot项目时,会自动帮我们完成初始化操作,并提供一个JmsMessagingTemplate,提提供了我们常用发送消息的各种方法供我们使用。我们只需要在使用的地方注入JmsMessagingTemplate即可使用。

  发送队列消息

@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqApplicationTests {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Test
    public void testQueueMsg(){
        //创建名称为zyQueue的队列
        Queue queue = new ActiveMQQueue("zyQueue");
        //向队列发送消息
        jmsMessagingTemplate.convertAndSend(queue,"这是一个队列消息!");
    }
}

  消息的接收方,监听消息队列,当队列中有消息时就可以获取到消息

@Component
public class Consumer {

    private static DateFormat df =  new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,sss");

    /**
     * destination 目标地址即队列
     */
    @JmsListener(destination = "zyQueue")
    public void receiveMessage(String text){
        System.out.println("接收队列消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
    }
}

  执行测试方法发送消息可以看到,控制台输出的消费者接受到消息

队列消息只能有一个消费者,如果有多个消费者同时监听一个队列时,只能有一个拿到消息,我们测试,修改发送方法,循环发送10调消息

@Test
    public void testQueueMsg(){
        //创建名称为zyQueue的队列
        Queue queue = new ActiveMQQueue("zyQueue");
        //向队列发送消息
        for (int i=0;i<10;i++) {
            jmsMessagingTemplate.convertAndSend(queue,"这是第"+i+"个队列消息!");
        }
    }

  在Consumer 类中再添加一个消费者,监听队列zyQueue

@JmsListener(destination = "zyQueue")
    public void receiveMessage(String text){
        System.out.println("接收队列消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
    }

    @JmsListener(destination = "zyQueue")
    public void receiveMessage1(String text){
        System.out.println("1接收队列消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
    }

  执行发送消息,看到控制台输出的结果,2个消费者平分了这10条消息

  如果希望监听同一个队列的多个消费者都能接收到所有消息,我们就只能发送Topic消息了,我们修改application.properties中的

#消息模式 true:广播(Topic),false:队列(Queue),默认时false
spring.jms.pub-sub-domain=true

  表示要发送发布/订阅消息,发送消息的队列改用Topic发送消息,如下

@Test
    public void testTopicMsg(){
        Topic topic = new ActiveMQTopic("zyTopic");
        for (int i=0;i<5;i++){
            jmsMessagingTemplate.convertAndSend(topic,"这是第"+i+"个Topic消息!");
        }
    }

  我们在Consumer 类中添加两个消费者来监听zyTopic队列,接受消息

@JmsListener(destination = "zyTopic")
    public void receiveTopicMessage1(String text){
        System.out.println("消费者1接收消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
    }

    @JmsListener(destination = "zyTopic")
    public void receiveTopicMessage2(String text){
        System.out.println("消费者2接收消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
    }

  执行发消息方法,可以看到控制台输出的内容,2个消费者都完整的接收到了5条消息

  我们在测试发送消息时修改了属性文件中的配置信息,才可以发送对应的类型的消息,这是由于SpringBoot中默认的是队列消息(查看源码可以知道,监听器默认使用的DefaultJmsListenerContainerFactory),如果我们想在不修改配置信息的情况下可以同时发送Queue和Topic消息怎么办呢,我们需要手动的更改初始的配置类,分别针对Queue和Topic消息提供JmsListenerContainerFactory

  新建一个配置类,如下

@SpringBootConfiguration
public class ActiveMqConfig {

    @Bean("queueListenerFactory")
    public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //设置消息模型为队列
        factory.setPubSubDomain(false);
        return factory;
    }

    @Bean("topicListenerFactory")
    public JmsListenerContainerFactory topicListenerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //设置消息模型为队列
        factory.setPubSubDomain(true);
        return factory;
    }
}

  在容器启动时会针对两种消息类型,初始化得到两个不同的JmsListenerContainerFactory。下来再修改消费者类,在 @JmsListener 注解中指定 containerFactory,如

@JmsListener(destination = "zyQueue", containerFactory = "queueListenerFactory")
    public void receiveMessage(String text){
        System.out.println("接收队列消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
    }

@JmsListener(destination = "zyTopic", containerFactory = "topicListenerFactory")
    public void receiveTopicMessage1(String text){
        System.out.println("消费者1接收消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
    }

  Queue消息使用 queueListenerFactory,Topic消息使用 topicListenerFactory,然后注释掉属性文件中的消息模式配置就可以了。

原文地址:https://www.cnblogs.com/love-wzy/p/10343097.html

时间: 2024-10-09 10:21:09

SpringBoot入门 (九) MQ使用的相关文章

SpringBoot入门九,添加shiro支持

项目基本配置参考SpringBoot入门一,使用myEclipse新建一个SpringBoot项目,使用myEclipse新建一个SpringBoot项目即可.现在来给项目添加shiro支持,数据暂时硬编码,不连接数据库,具体内容如下: 1. pom.xml添加以下配置信息 <!-- 开启shiro依赖 --> <dependency> <groupId>org.apache.shiro</groupId> <artifactId>shiro-s

SpringBoot入门基础

SpringBoot入门 (一) HelloWorld 一 什么是springboot springboot是一个全新的框架,它设计的目的简化spring项目的初始环境的搭建和开发,主要有以下几个特点: 1.简化初始配置 ,可与主流框架集成: 2.内置Servlet容器,无需在打War包: 3.使用了Starter(启动器)管理依赖并版本控制: 4.大量的自动配置,简化开发,方便集成第三方: 5.提供准生产环境运行时的监控,如指标,健康,外部配置等: 6.无需XML配置,减少冗余代码 . 未使用

SpringBoot入门十九,简单文件上传

项目基本配置参考SpringBoot入门一,使用myEclipse新建一个SpringBoot项目,使用myEclipse新建一个SpringBoot项目即可.现在来给项目添加一个MyBatis支持,添加方式非常简单,仅需两步即可,具体内容如下: 1. pom.xml添加以下配置信息 <!-- 文件上传配置开始 --> <!-- 9.引入commons-io依赖 --> <dependency> <groupId>commons-io</groupId

[WebGL入门]九,顶点缓存的基础

注:文章译自http://wgld.org/,原作者杉本雅広(doxas),文章中如果有我的额外说明,我会加上[lufy:],另外,鄙人webgl研究还不够深入,一些专业词语,如果翻译有误,欢迎大家指正. 局部坐标 使用WebGL可以绘制各种各样的3D模型,而且,还可以绘制点和线,决定绘制什么肯定需要顶点.没有顶点的话,也就没有多边形了,因为没有办法进行点和线的绘制了.所以,WebGL的编程中一定要处理顶点情报.而且,顶点中有必须要包含的情报,那就是顶点的位置坐标.既然顶点的位置坐标是必须的,那

C#基础入门 九

C#基础入门 九 集合 对于很多应用程序,需要创建和管理相关对象组,有两种方式可以将对象分组,一是创建对象数组,如 object[] obj=new object[3]{1,2.33,"string"}; foreach(object o in obj) { Console.WriteLine(o.Tostring()); //output:1 2.33 string } 但是这样实现有一个缺点,数组的初始化必须要固定数量,即数组的长度一定是常量.所以需要集合. 创建和管理相关对象组的

SpringBoot入门示例

SpringBoot入门Demo SpringBoot可以说是Spring的简化版.配置简单.使用方便.主要有以下几种特点: 创建独立的Spring应用程序 嵌入的Tomcat,无需部署WAR文件 简化Maven配置 自动配置Spring 提供生产就绪型功能,如指标,健康检查和外部配置 绝对没有代码生成和对XML没有要求配置 (1)创建maven项目,构建java项目(注意:这里为java项目,也能通过内嵌的tomcat启动服务,访问controller指定路径返回的数据) 项目目录结构如下:

SpringBoot 入门 Demo

SpringBoot   入门 Demo Spring Boot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程.该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置.通过这种方式,Spring Boot致力于在蓬勃发展的快速应用开发领域(rapid application development)成为领导者. 特点 1. 创建独立的Spring应用程序 2. 嵌入的Tomcat,无需部署WAR文件 3. 简化Maven配置

SpringBoot入门(一)——开箱即用

本文来自网易云社区 Spring Boot是什么 从根本上来讲Spring Boot就是一些库的集合,是一个基于"约定优于配置"的原则,快速搭建应用的框架.本质上依然Spring,在这之上帮我们省去了很多样板化的配置,使得我们能够更专注于应用程序功能的开发. Spring Boot精要 SpringBoot将很多魔法带入了Spring应用程序的开发之中,其中最重要的是以下四个核心 自动配置:针对常见的应用功能,SpringBoot自动提供相关的配置,减少用于样板化配置的时间 起步依赖:

SpringBoot入门二,添加JdbcTemplate数据源

项目基本配置参考上一篇文章SpringBoot入门一,使用myEclipse新建一个SpringBoot项目即可.现在来给项目添加一个JdbcTemplate数据源,添加方式非常简单,仅需两步即可,具体内容如下: 1. pom.xml添加以下配置信息 <!-- jdbcTemplate依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring