Spring Boot 与消息

一、消息概述

  • 在大多数应用中,可以通过消息服务中间件来提升系统的异步通信扩展解耦流量削峰等能力。

  • 当消息发送者发送消息后,将由消息代理接管,消息代理保证消息传递到指定目的地
  • 消息队列主要有两种形式的目的地:
    • 队列(queue):点对点消息通信(point-to-point):消息发送者发送消息,消息代理将其送入一个队列中,消息接收者从队列中获取消息,消息被读取后被移出队列。注意:每一个消息只能从一个发送者发送到达唯一一个接收者。
    • 主题(topic):发布(publish)/订阅(subscribe)消息通信:发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么接收者(订阅者)就会在消息到达时同时收到消息。
  • JMS(Java Message Service):JAVA 消息服务--基于 JVM 消息代理的规范。ActiveMQ、HornetMQ 是 JMS 的实现。
  • AMQP(Advanced Message Queuing Protocol):高级消息队列协议,也是一个消息代理的规范,兼容 JMS,RabbitMQ是 AMQP 的实现。

二、RabbitMQ

  • 简介:RabbitMQ 是一个由 erlang 开发的 AMQP 的开源实现。
  • 核心概念:
    • Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
    • Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
    • Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列
    • Queue:消息队列,用来保存消息直到发送给消费者一个消息可投入一个或多个队列。
    • Binding:绑定,用于关联消息队列和交换器。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和 Queue 之间的绑定可以多对多的。
    • Connection:网络连接,比如一个 TCP 连接。
    • Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接内的虚拟连接,AMQP 的命令都是通过信道发送出去的。因为建立和销毁一条 TCP 连接开销太大,所以使用信道来复用 TCP 连接。
    • Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
    • Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是/。
    • Broker:消息队列服务器实体。

Exchange 类型:Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:directfanouttopic、headers(性能较差,几乎不用)。

1、docker 安装 RabbitMQ

拉取镜像(阿里云服务器上):

[[email protected] ~]# docker pull rabbitmq:3-management

镜像实例化为容器:

[[email protected] ~]# docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq a7f2574d507f

# 第一个端口映射为 rabbitmq 客户端服务,第二个端口映射为 web 管理界面

测试访问(IP + 端口):

输入账号:guest 密码:guest

2、测试 RabbitMQ

以上图中的 3 种交换器、4 个队列为例,测试 RabbitMQ:

  1. 添加一个名为 exchange.direct ,类型为 direct 交换器;添加一个名为 exchange.fanout,类型为 fanout 的交换器;添加一个名为 exchange.topic,类型为 topic 的交换器。

  2. 添加 4 个队列,分别名为 atguigu、atguigu.news、atguigu.emps 和 gulixueyuan.news。

  1. 添加交换器 和 队列之间的绑定(Routing Key):①为 exchange.direct 与 4 个队列之间添加 Bindings,Routing Key 分别为队列名 atguigu、atguigu.news、atguigu.emps 和 gulixueyuan.news。(direct 类型交换器精确匹配路由键)

    ②为 exchange.fanout 与 4 个队列之间添加 Bindings,Routing Key 分别为队列名 atguigu、atguigu.news、atguigu.emps 和 gulixueyuan.news。(fanout 类型交换器与路由键无关,它是广播的)

    ③为 exchange.topic 与 4 个队列之间添加如示意图所示的 Bindings。绑定结果如下:

4.发布消息:①向 exchange.direct 发送 4 条消息,消息的 Routing key 分别为 atguigu、atguigu.news、atguigu.emps 和 gulixueyuan.news。

预测 4 个队列各自有一条消息,消息发送到队列根据路由键来的。

依次读取消息并从队列中移除:

? ②向 exchange.fanout 中 发送一条消息,路由键随意,由于它是广播的,所以 4 个队列都会得到该消息。

?

?

? 移除所有消息。

? ③向 exchan.topic 中 发送一条消息,路由键为 atguigu.news,由于队列 atguigu 绑定的路由键为 atguigu.#,匹配成功,收到消息,队列 gulixueyuan.news 绑定的路由键为*.news,匹配成功收到消息。同理另外 2 个队列也会收到消息。若发送的消息键值为 abc.news, 则只有队列 gulixueyuan.news 和 guigu.news 收到消息。

3、Spring Boot 使用 RabbitMQ

  • RabbitAutoConfiguration 自动配置了连接工厂 rabbitConnectionFactory

    @Configuration
    @ConditionalOnClass({ RabbitTemplate.class, Channel.class })
    @EnableConfigurationProperties(RabbitProperties.class)
    @Import(RabbitAnnotationDrivenConfiguration.class)
    public class RabbitAutoConfiguration {
    
       @Configuration
       @ConditionalOnMissingBean(ConnectionFactory.class)
       protected static class RabbitConnectionFactoryCreator {
    
          @Bean
          public CachingConnectionFactory rabbitConnectionFactory(
                RabbitProperties properties,
                ObjectProvider<ConnectionNameStrategy> connectionNameStrategy)
                throws Exception {
             PropertyMapper map = PropertyMapper.get();
             CachingConnectionFactory factory = new CachingConnectionFactory(
                   getRabbitConnectionFactoryBean(properties).getObject());
             map.from(properties::determineAddresses).to(factory::setAddresses);
             map.from(properties::isPublisherConfirms).to(factory::setPublisherConfirms);
  • RabbitProperties 封装了 RabbitMQ 的配置
    @ConfigurationProperties(prefix = "spring.rabbitmq")
    public class RabbitProperties {
    
       /**
        * RabbitMQ host.
        */
       private String host = "localhost";
    
       /**
        * RabbitMQ port.
        */
       private int port = 5672;
    
       /**
        * Login user to authenticate to the broker.
        */
       private String username = "guest";
  • AmqpAdmin : RabbitMQ 系统管理功能组件,创建和删除 Queue,Exchange,Binding
        @Bean
       @ConditionalOnSingleCandidate(ConnectionFactory.class)
       @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
       @ConditionalOnMissingBean
       public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
          return new RabbitAdmin(connectionFactory);
       }
    
    }
    @ManagedResource(description = "Admin Tasks")
    public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware,
          BeanNameAware, InitializingBean {
              ...
    
      @Override
      public void declareExchange(final Exchange exchange) {
          try {
              this.rabbitTemplate.execute(channel -> {
                  declareExchanges(channel, exchange);
                  return null;
              });
          }
          catch (AmqpException e) {
              logOrRethrowDeclarationException(exchange, "exchange", e);
          }
      }
    
      @Override
      @ManagedOperation(description = "Delete an exchange from the broker")
      public boolean deleteExchange(final String exchangeName) {
          return this.rabbitTemplate.execute(channel -> { // NOSONAR never returns null
  • RabbitTemplate :给 RabbitMQ 发送和接受消息
    @Bean
    @ConditionalOnSingleCandidate(ConnectionFactory.class)
    @ConditionalOnMissingBean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
       PropertyMapper map = PropertyMapper.get();
       RabbitTemplate template = new RabbitTemplate(connectionFactory);
       MessageConverter messageConverter = this.messageConverter.getIfUnique();
       if (messageConverter != null) {
          template.setMessageConverter(messageConverter);
       }
    public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count/comment density
          implements BeanFactoryAware, RabbitOperations, MessageListener,
              ListenerContainerAware, PublisherCallbackChannel.Listener, Lifecycle, BeanNameAware {
    ...
    @Override
    public void convertAndSend(Object object) throws AmqpException {
       convertAndSend(this.exchange, this.routingKey, object, (CorrelationData) null);
    }

1.使用 IDEA Spring Initializer 创建 Spring Boot 项目并选中 RabbitMQ 模块。

2.配置 RabbitMQ

application.properties:

spring.rabbitmq.host=xx.xx.xx.xx
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3.测试

  • 使用 AmqpAdmin 创建 Exchange、队列和相应的 Bindings。

    package com.yunche;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringRabbitmqApplicationTests {
    
        @Autowired
        AmqpAdmin amqpAdmin;
        @Test
        public void contextLoads() {
            amqpAdmin.declareExchange(new DirectExchange("test-direct-exchange"));
            System.out.println("创建了一个 direct 类型的 exchange");
            amqpAdmin.declareQueue(new Queue("test-queue"));
            System.out.println("创建了一个队列");
            amqpAdmin.declareBinding(new Binding("test-queue", Binding.DestinationType.QUEUE, "test-direct-exchange", "test-routingkey", null));
            System.out.println("添加了绑定,路由键为 test-routingkey");
        }
    
    }
  • 使用 RabbitTemplate 发送和接收消息:
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    public void sendMsg() {
        rabbitTemplate.convertAndSend("test-direct-exchange","test-routingkey", "hello world");
    }
    @Test
    public void receiveMsg() {
        String msg = (String)rabbitTemplate.receiveAndConvert("test-queue");
        System.out.println(msg);
    } /*Output:hello world*/
    @Test
    public void sendObject() {
    
        rabbitTemplate.convertAndSend("test-direct-exchange","test-routingkey", new Book("红楼梦", "曹雪芹"));
    }
    
    private static class Book implements Serializable {
        private String name;
        private String author;
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public Book(String name, String author) {
            this.name = name;
            this.author = author;
        }
    
        public String getAuthor() {
    
            return author;
        }
    
        public void setAuthor(String author) {
            this.author = author;
        }
    }
    
    @Test
    public void receiveObject() {
        Book b = (Book)rabbitTemplate.receiveAndConvert("test-queue");
        System.out.println(b.name);
    } /*Output: 红楼梦*/

4.监听消息

@RabbitListener 监听消息队列的内容;首先用@EnableRabbit 开启基于注解的 RabbitMQ 模式

package com.yunche;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@EnableRabbit
@SpringBootApplication
public class SpringRabbitmqApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringRabbitmqApplication.class, args);
    }

}

首先运行单元测试中的 sendObject() 方法发送消息,然后控制台立即输出消息。

@RabbitListener(queues = "test-queue")
public void msgListener(Book book) {
    System.out.println(book.getAuthor());
} /*Output:曹雪芹*/

三、参考资料

尚硅谷.Spring Boot 高级

原文地址:https://www.cnblogs.com/yunche/p/10351207.html

时间: 2024-11-02 09:24:56

Spring Boot 与消息的相关文章

Spring Boot与消息

一.概述 大多应用中,可通过消息服务中间件来提升系统异步通信.扩展解耦能力 消息服务中两个重要概念: 消息代理(message broker)和目的地(destination)当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地. 消息队列主要有两种形式的目的地 主题(topic):发布(publish)/订阅(subscribe)消息通信 队列(queue):点对点消息通信(point-to-point) 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列

Spring Boot 持续更新中...

内容 Spring Boot入门 Spring Boot配置 Spring Boot与日志 Spring Boot与Web开发 Spring Boot与Docker Spring Boot与数据访问 Spring Boot启动配置原理 Spring Boot自定义starters Spring Boot与缓存 Spring Boot与消息 Spring Boot与检索 Spring Boot与任务 Spring Boot与安全 Spring Boot与分布式 Spring Boot与开发热部署

spring boot Rabbitmq集成,延时消息队列实现

本篇主要记录Spring boot 集成Rabbitmq,分为两部分, 第一部分为创建普通消息队列, 第二部分为延时消息队列实现: spring boot提供对mq消息队列支持amqp相关包,引入即可: [html] view plain copy <!-- rabbit mq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-

RabbitMq 集成 spring boot 消息队列 入门Demo

spring boot 集成 RabbitMq还是很方便的.现在来一个简单的例子来集成rabbitmq.入门demo. 主要概念: 其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定. 虚拟主机:一个虚拟主机持有一组交换机.队列和绑定.为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制. 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机.每一个RabbitMQ服务器都有一个默认的虚拟主机"/"

在Spring Boot框架下使用WebSocket实现消息推送

Spring Boot的学习持续进行中.前面两篇博客我们介绍了如何使用Spring Boot容器搭建Web项目(使用Spring Boot开发Web项目)以及怎样为我们的Project添加HTTPS的支持(使用Spring Boot开发Web项目(二)之添加HTTPS支持),在这两篇文章的基础上,我们今天来看看如何在Spring Boot中使用WebSocket. 什么是WebSocket WebSocket为浏览器和服务器之间提供了双工异步通信功能,也就是说我们可以利用浏览器给服务器发送消息,

【redis】spring boot利用redis的Keyspace Notifications实现消息通知

前言 需求:当redis中的某个key失效的时候,把失效时的value写入数据库. github: https://github.com/vergilyn/RedisSamples 1.修改redis.conf 安装的redis服务默认是: notify-keyspace-events "",修改成 notify-keyspace-events Ex; 位置:redis安装目下的redis.windows-service.conf 或 redis.windows.conf.(具体看re

Spring Boot 揭秘与实战(六) 消息队列篇 - RabbitMQ

文章目录 1. 什么是 RabitMQ 2. Spring Boot 整合 RabbitMQ 3. 实战演练4. 源代码 3.1. 一个简单的实战开始 3.1.1. Configuration 3.1.2. 消息生产者 3.1.3. 消息消费者 3.1.4. 运行 3.1.5. 单元测试 3.2. 路由的实战演练 3.2.1. Configuration 3.2.2. 消息生产者 3.2.3. 消息消费者 3.2.4. 运行 3.2.5. 单元测试 本文,讲解 Spring Boot 如何集成

RabbitMq+Spring boot 消息生产者向队列发送消息 (一)

本人学习新框架方法. 一.先学习框架基本知识,也就是看这本书的前三章,了解基本概念.比如这个Rabbitmq,我会先看一些概念,比如,交换机,路由器,队列,虚拟机. 二.然后写代码,写demo,有哪些不懂的地方直接再去翻书或者google找资料,带着问题去学习,学的更快更扎实一些. 三.然后再看这个框架的应用场景,自己能否独立的写一些简单的项目,来验证自己的成果. 四.实际项目积累经验. RabbitMq 消息生产者向队列发送消息 (一) MQ分为消息生产者和消息消费者,这次做的主要是消息的生产

spring boot下WebSocket消息推送

WebSocket协议 WebSocket是一种在单个TCP连接上进行全双工通讯的协议.WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范.WebSocket API也被W3C定为标准. WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据.在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输 STOMP协议 STOMP是面向文本的消息传