RabbitMQ 核心概念及与 Spring Boot 2 的整合

RabbitMQ 简介

RabbitMQ 是什么

RabbitMQ 是一个用 Erlang 编写的开源的消息队列中间件,它实现了 AMQP 协议(其实还实现了 MTQQ 等消息协议)。和其他两个主流的消息队列中间件 Kafka 和 RocketMQ 相比,拥有更低的延迟、更高的稳定性、更完备的功能、更完善的文档支持以及较活跃的开源社区支持,但是在吞吐量上和分布式扩展能力上逊色一些。

AMQP 是什么

AMQP(Advanced Message Queuing Protocol),高级消息队列协议,是一个语言无关的面向消息中间件的开放标准,它定义了一套消息中间件的模型架构,即生产者将消息发送给交换机,交换机根据路由键将消息路由到队列,消费者通过订阅队列来获取消息。从更低的层面来看,AMQP是一套应用层的通信协议,它跟 HTTP 这样的协议一样提供了 TCP 之上的报文封装定义,定义了协议命令的交互规则。RabbitMQ 就用 Erlang 实现了 AMQP。

Exchange(交换机) 是什么

通常消息队列只会有生产者、队列和消费者三个概念,而 AMQP 多引入了一个概念 Exchange 交换机(也译交换器),生产者会将消息发送到交换机,交换机再根据自身的路由策略和 routing key(路由键)将消息转发到合适的队列上。这里 Exchange 其实更应该叫 Router 也就是路由器而不是交换机,因为它的作用更像网络路由器根据 IP 将数据路由到指定的设备。

Routing key(路由键)是什么

RabbitMQ 的 routing key 标识了消息的目标队列。它有两个含义,在发送消息的时候要指定一个 routing key,下文所说的 routing key 都是指发送消息指定的 routing key;在绑定交换机和路由器的时候也要指定一个 routing key,这时候这个 routing key 又叫 binding key,binding key 是可以用通配符表示的。下文描述“绑定时指定的 routing key”的时候都会用 binding key 替代。

binding(绑定)是什么

绑定是随着交换机这个概念一起出现的概念,指的是将交换机和队列关联起来的动作或者说是关联关系。沿用上述的网络路由器的类比,绑定就类似于将路由器用网线和终端设备连接起来(上述的 binding key 相当于网线的命名),只有建立了绑定关系交换机才能把消息路由到匹配 routing key 的队列。

交换机类型

交换机将消息路由到哪些队列由两个因素共同决定,分别是 routing key 和交换机类型。RabbitMQ 的交换机有4种类型:

  • direct

direct 类型的交换器会把消息路由到 binding key 和发送消息指定的 routing key 完全相同的队列上。如下图生产者P向交换机X发送了一个routing key为 error 的消息就会转发到两个队列上,如果 routing key 为 info就只会转发到下面的队列。

  • topic

只有用到 topic 这种路由类型,binding 和 binding key 才真正有其产生的意义。topic 和 direct 的区别在于,direct 要求消息的 routing key 和 绑定的 binding key 完全相同,而 topic 支持 routing key 和 binding key 模糊匹配。如下图生产者P向交换机X发送了一个 routing key 为 a.orange.drink 的消息的话,交换机只会把消息路由到 Q1 队列,如果消息的 routing key 为 a.orange.rabbit 的话,交换机会把消息路由到 Q1 和 Q2 两个队列。这里"*"代表1个单词,"#"代表0到n个单词,这里说的“单词”指使用"."进行分隔的字符串单位。具体的匹配规则见 官方文档

  • fanout

所有发送到 fanout 交换机的消息都会被路由到所有与该交换机绑定的队列上,routing key 和 binding key 不会被 fanout 交换机理会。fanout 可以实现广播消息的目的,不过 RabbitMQ 不支持单个队列层面的广播消费,Kafka 的消息存储在 topic 这个逻辑层面,广播消息只需要为每个消费者分别维护 topic 上的消息位移;而 RabbitMQ 的消息直接存储在队列这个逻辑层面,要实现广播消费就要为每个消费者创建一个队列,如果多个消费者订阅同一个队列,RabbitMQ 默认会采用负载均衡的策略将消息分摊给多个消费者,而不是所有消费者都收到所有消息。

  • headers

这种交换机类型比较冷门不太实用,不说了。

整合 Spring Boot 实战

快速部署

这里使用 Docker 快速部署,并开启自带的web管理界面。

docker run -d -p 5672:5672 -p 15672:15672 --name my-rabbit rabbitmq:3-management

Demo上手

1、添加依赖,这里使用 Maven

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置文件,这里使用 yml

spring:
  rabbitmq:
    username: guest         # RabbitMQ默认用户名和密码,生产环境记得改
    password: guest
    addresses: 127.0.0.1:5672

3、声明队列

@Configuration
public class RabbitConfig {

    @Bean
    public Queue Queue() {
        //单参构造传入队列名称
        return new Queue("myQueue");
    }

}

4、生产者

@Component
public class DemoSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send() {
        String context = "hello " + new Date();
        this.rabbitTemplate.convertAndSend("hello", context);
    }
}

5、消费者

@Component
public class DemoReceiver {
    //注解填写要消费的队列名称
    @RabbitListener(queues = "myQueue")
        public void receive(String message) {
            System.out.println("Received " + message);
        }
    }
}

6、测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitDemoTest {

    @Autowired
    private DemoSender demoSender;

    @Test
    public void testSend() throws Exception {
        demoSender.send();
    }
}

配置无误的话运行上面的单元测试消费者会把消费到的消息打印出来,这里我们只是用一个 hello world级别的demo验证了 RabbitMQ 的正常运行。

配置Direct模式

@Configuration
public class RabbitConfig {

    /**
     * 声明一个名为exchange.d的direct交换机
     */
    @Bean
    DirectExchange exchangeD() {
        return new DirectExchange("exchange.d");
    }

    /**
     * 声明一个名为queue-d的队列
     */
    @Bean
    public Queue queueD() {
        return new Queue("queue.d");
    }

    /**
     * 绑定队列queue.d和交换机exchange.d,使用队列名称作为binding key
     */
    @Bean
    public Binding bindingDirect() {
        return BindingBuilder.bind(queueD()).to(exchangeD()).withQueueName();
    }
}

测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitDemoTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //订阅queue.d的消费者可以消费到这条消息
    @Test
    public void sendCorrect() {
        String context = "hello " + new Date();
        rabbitTemplate.convertAndSend("exchange.d", "queue.d", context);
    }

    //订阅queue.d的消费者无法消费到到这条消息
    @Test
    public void sendIncorrect() {
        String context = "hello " + new Date();
        rabbitTemplate.convertAndSend("exchange.d", "queue.d.2", context);
    }
}

配置Topic模式

@Configuration
public class RabbitConfig {

    /**
     * 声明一个名为exchange.t的topic交换机
     */
    @Bean
    TopicExchange exchangeT() {
        return new TopicExchange("exchange.t");
    }

    @Bean
    public Queue queueT1() {
        return new Queue("queue.t.1");
    }

    @Bean
    public Queue queueT2() {
        return new Queue("queue.t.2");
    }

    /**
     * 绑定队列queue.t.1和交换机exchange.t,使用queue.#作为binding key
     */
    @Bean
    public Binding bindingT1() {
        return BindingBuilder.bind(queueT1()).to(exchangeT()).with("queue.#");
    }

    /**
     * 绑定队列queue.t.2和交换机exchange.t,使用queue.t.2作为binding key
     */
    @Bean
    public Binding bindingT2() {
        return BindingBuilder.bind(queueT2()).to(exchangeT()).with("queue.t.2");
    }
}

测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitDemoTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //两个队列的消费者都能消费到这条消息
    @Test
    public void send1() {
        String context = "hello " + new Date();
        rabbitTemplate.convertAndSend("exchange.t", "queue.t.2", context);
    }

    //只有队列queue.t.1的消费者能消费到这条消息,因为它绑定的路由键queue.#能匹配queue.t.3
    @Test
    public void send1() {
        String context = "hello " + new Date();
        rabbitTemplate.convertAndSend("exchange.t", "queue.t.3", context);
    }
}

配置Fanout模式

@Configuration
public class RabbitConfig {

    /**
     * 声明一个名为exchange.f的fanout交换机
     */
    @Bean
    FanoutExchange exchangeF() {
        return new TopicExchange("exchange.f");
    }

    @Bean
    public Queue queueF1() {
        return new Queue("queue.f.1");
    }

    @Bean
    public Queue queueF2() {
        return new Queue("queue.f.2");
    }

    /**
     * 绑定队列queue.f.1和交换机exchange.f,不需要指定binding key
     */
    @Bean
    public Binding bindingF1() {
        return BindingBuilder.bind(queueF1()).to(exchangeF());
    }

    /**
     * 绑定队列queue.f.2和交换机exchange.f,不需要指定binding key
     */
    @Bean
    public Binding bindingF2() {
        return BindingBuilder.bind(queueF2()).to(exchangeF());
    }
}

测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitDemoTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //两个队列的消费者都能消费到这条消息
    @Test
    public void send() {
        String context = "hello " + new Date();
        //发送消息时不需要指定routing key,指定了也会被交换机忽略
        rabbitTemplate.convertAndSend("exchange.f", "", context);
    }
}

消费确认

对于比较重要不能丢失的消息,消费确认机制非常重要。RabbitMQ 提供了消息确认机制,当开启自动确认时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存或磁盘中删除,不管消费者是否真正消费到了消息;当关闭自动确认机制,RabbitMQ 会等待消费者主动回复确认信号才会移除消息。spring boot 可以在配置文件关闭自动确认,使用手动确认:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

消费端手动确认消息:

@Component
public class DemoReceiver {

    @RabbitListener(queues = "myQueue")
        public void receive(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
            System.out.println("Received " + message);
            channel.basicAck(deliveryTag, false);
    }
}

消息持久化

对于比较重要不能丢失的消息,保证消息能够持久化是必要的,消息持久化能防止服务器宕机导致队列中的消息丢失。RabbitMQ 的持久化要求将交换机、队列和消息都配置为持久化。在 Spring/Spring Boot 中只需要配置交换机和队列为持久化就行。其实上面的配置中队列和交换机都是默认持久化的,以第一个demo为例,显式的配置如下:

@Configuration
public class RabbitConfig {
    @Bean
    DirectExchange exchangeD() {
        //第二个参数为true表示持久化该交换机
        return new DirectExchange("exchange.d", true, false);
    }

    @Bean
    public Queue queueD() {
        //第二个参数为true表示持久化该队列
        return new Queue("queue.d", true);
    }

    ...
}

配置持久化之后,先不启动消费者,生产者将消息发送到RabbitMQ 后,关闭 RabbitMQ 服务并重启,再启动消费者,消费者仍能收到消息,证明 RabbitMQ 宕机后消息仍然存在,持久化是生效的。并不是所有消息都应该开启持久化,由于持久化需要 RabbitMQ 将消息写入磁盘来实现,过多的持久化必定影响 RabbitMQ 的性能,类似日志这样的非关键数据就不建议持久化,当然这个都要看业务。

过期时间(TTL)

为防止消息长期积压在队列中导致队列甚至是磁盘被占满,应该给消息设置过期时间。

@Bean
public Queue queue() {
    Map<String, Object> args = new HashMap<>(1);
    //设置该队列的消息的过期时间为1小时
    args.put("x-message-ttl", 3600 * 1000);
    return new Queue("queue.d", true, false, false, args);
}

死信队列

当消息过了过期时间还没有被消费,消息就会变成死信(Dead message),除了消息过期这种情况,当消息要入队的队列已达到最大长度,或者消息被消费者拒绝且不允许重发,消息也会变成死信。为了死信不被丢弃删除,防止没有被成功消费的消息无迹可寻,RabbitMQ 允许设置一种特殊的交换机————DLX 死信交换机(Dead-Letter-Exchange),使变成死信的消息重新发送到死信交换机上。我们可以另外声明一个队列并绑定该死信交换机,这种绑定死信交换机的队列就叫死信队列。通过订阅死信队列,我们就可以收集到消费失败的消息,进行问题的排查分析。配置死信队列的方式也很简单:

@Bean
DirectExchange exchangeNormal() {
    //声明一个交换机用于常规消息的分发
    return new DirectExchange("normal_exchange", true, false);
}
@Bean
DirectExchange exchangeDLX() {
    //声明一个交换机作为死信交换机
    return new DirectExchange("dlx_exchange", true, false);
}
@Bean
public Queue queue() {
    Map<String, Object> args = new HashMap<>(2);
    args.put("x-message-ttl", 3600 * 1000);
    //将dlx_exchange设置为该队列的死信交换机
    args.put("x-dead-letter-exchange", "dlx_exchange");
    return new Queue("queue.normal", true, false, false, args);
}
@Bean
public Binding bindingNormal() {
    //常规业务队列和业务交换机绑定
    return BindingBuilder.bind(queue()).to(exchangeNormal()).withQueueName();
}
@Bean
public Queue queueDLX() {
    //声明一个死信队列专门消费死信交换机转发的死信
    return new Queue("queue.dlx", false);
}
@Bean
public Binding bindingDLX() {
    //绑定死信队列和死信交换机
    return BindingBuilder.bind(queueDLX()).to(exchangeDLX()).withQueueName();
}

高可用

生产环境下没人用单点的 RabbitMQ ,其实任何基础服务和核心服务都不应该是单点的,否则单点故障会影响整个系统。RabbitMQ 提供两种集群模式以实现高可用性。

普通集群

普通的 RabbitMQ 集群允许消费者和生产者在 RabbitMQ 出现单点故障的时候仍然能够进行生产和消费,通过添加集群节点可以线性地提高消息的吞吐量。这里演示搭建2个节点的集群:

1、启动2个 RabbitMQ 节点:

docker run -d -p 5672:5672 -p 15672:15672 --name rabbit-node1 --hostname host1 -e RABBITMQ_ERLANG_COOKIE='myrabbitcookie' rabbitmq:3-management
docker run -d -p 5673:5672 -p 15673:15672 --name rabbit-node2 --hostname host2 -e RABBITMQ_ERLANG_COOKIE='myrabbitcookie' --link rabbit-node1:host1 rabbitmq:3-management

这里需要注意的是设置了环境变量 RABBITMQ_ERLANG_COOKIE,相当于写 /var/lib/rabbitmq/.erlang.cookie 文件,所有节点的这个值需要保证相同,才能够协商加入集群。

2、节点2加入节点1组成集群:

docker exec -it rabbit-node2 bash
rabbitmqctl join_cluster --ram [email protected]

这里需要注意的是 --ram 这个设置,它将节点2指定为内存节点,内存节点将队列、交换机、用户等元信息放在内存里;如果指定--disc 就是磁盘节点,顾名思义它将这些元信息存在磁盘。配置内存节点有助于提高性能,但是集群中至少需要一个磁盘节点。

普通集群其实并不够高可用,虽然集群的所有节点都会同步所有元数据,但是消息数据只保存在于一个节点上。一旦该节点宕机,其他节点可以重新声明队列继续进行生产消费,但是宕机节点的数据在节点恢复前无法恢复。如果宕机节点刚好保存了某个持久化队列的消息数据,我们称这个节点是主节点,那么这个队列甚至无法在其他节点上被重新使用,必须等到主节点恢复。因此,对于重要的业务消息,在将其设置为持久化的同时,我们还要使用另一种真正高可用的集群模式:镜像队列集群。

镜像队列

RabbitMQ 的镜像队列机制,可以将队列镜像到集群的所有节点中,如果一个节点失效了,消息通常不会丢失还能切换到其他节点消费到。可以这样配置镜像队列:

rabbitmqctl set_policy ha-all "hello" '{"ha-mode":"all"}'

这里指定了完整队列名"hello"将其设为镜像队列,只有设置了镜像模式的队列才会被镜像到其他节点。也可以使用通配符,"^ha." 表示将所有ha.开头的队列设置为镜像队列,或者使用"^" 指定所有队列。ha-all 和 "ha-mode":"all" 指定镜像到所有节点,也可以镜像到特定节点:

rabbitmqctl set_policy ha-nodes "^ha\." '{"ha-mode":"nodes","ha-params":["[email protected]", "[email protected]"]}

配置了集群和镜像队列之后,在应用的配置文件指定集群的节点:

spring:
  rabbitmq:
    addresses: 127.0.0.1:5672,127.0.0.1:5673

关闭任意节点进行验证即可,消费者和生产者都会自动切换到其他节点。

跟设置持久化的原则一样,并不是所有队列都设置镜像最好,在集群的每个节点镜像备份相同的数据必然对集群性能有一定的负担。如果是不需要持久化的非重要消息,可以考虑不设置镜像。

总结

本文根据个人经验介绍了 RabbitMQ 从概念到应用的相关知识,尽可能把关键的点都写出来,但是由于能力有限追求全面的同时就很难兼顾详细的配置说明和截图验证,实际上生产环境中的配置当然比这里更多也更严谨,真正技术落地还是要参考官方文档才是!

原文地址:https://www.cnblogs.com/qingkongxing/p/12009038.html

时间: 2024-08-26 22:54:49

RabbitMQ 核心概念及与 Spring Boot 2 的整合的相关文章

spring boot 2.0 整合 elasticsearch NoNodeAvailableException

原文地址:spring boot 2.0 整合 elasticsearch NoNodeAvailableException 原文说的有点问题,下面贴出我的配置: 码云项目地址:https://gitee.com/11230595/springboot-elasticsearch elasticsearch.yml cluster.name: my-applicationnetwork.host: 0.0.0.0 http.port: 9200transport.tcp.port: 9300tr

Spring Boot入门 and Spring Boot与ActiveMQ整合

1.Spring Boot入门 1.1什么是Spring Boot Spring 诞生时是 Java 企业版(Java Enterprise Edition,JEE,也称 J2EE)的轻量级代替品.无需开发重量级的 Enterprise JavaBean(EJB),Spring 为企业级Java 开发提供了一种相对简单的方法,通过依赖注入和面向切面编程,用简单的Java 对象(Plain Old Java Object,POJO)实现了 EJB 的功能. 虽然 Spring 的组件代码是轻量级的

Spring Boot 2.X整合Spring-cache,让你的网站速度飞起来

计算机领域有人说过一句名言:“计算机科学领域的任何问题都可以通过增加一个中间层来解决”,今天我们就用Spring-cache给网站添加一层缓存,让你的网站速度飞起来. 本文目录 一.Spring Cache介绍二.缓存注解介绍三.Spring Boot+Cache实战1.pom.xml引入jar包2.启动类添加@EnableCaching注解3.配置数据库和redis连接4.配置CacheManager5.使用缓存注解6.查看缓存效果7.注意事项 一.Spring Cache介绍 Spring

Spring Boot 2.0 整合 ES 5 文章内容搜索实战

本章内容 文章内容搜索思路 搜索内容分词 搜索查询语句 筛选条件 分页.排序条件 小结 一.文章内容搜索思路 上一篇讲了在怎么在 Spring Boot 2.0 上整合ES5 ,这一篇聊聊具体实战.简单讲下如何实现文章.问答这些内容搜索的具体实现.实现思路很简单: 基于「短语匹配」并设置最小匹配权重值 哪来的短语,利用 IK 分词器分词 基于 Fiter 实现筛选 基于 Pageable 实现分页排序 这里直接调用搜索的话,容易搜出不尽人意的东西.因为内容搜索关注内容的连接性.所以这里处理方法比

Spring Boot 学习笔记--整合Thymeleaf

1.新建Spring Boot项目 添加spring-boot-starter-thymeleaf依赖 1 <dependency> 2 <groupId>org.springframework.boot</groupId> 3 <artifactId>spring-boot-starter-thymeleaf</artifactId> 4 </dependency> 2.添加静态文件 根据spring boot默认规则,脚本和样式默

spring boot 与 Mybatis整合(*)

在pom.xml文件中加入数据库.spring-mybatis整合 <!-- spring boot 整合mybatis --> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.3.0</version> </dep

Activiti学习之spring boot 与activiti整合

声明:本文是springboot2.0的多项目构建,springboot2.0和spingboot1.5的配置是有出入的,构建项目之前请规范您的springboot版本,选择2.0以上. 一.在IDEA中使用工具创建SpringBoot + Gradle的父工程 new -> project ->gradle 二.在父工程下新建叁个模块 dao service web 右键单击父工程 new -> module -> Spring Initializr -> type选项选中

Spring Boot认证:整合Jwt

背景 Jwt全称是:json web token.它将用户信息加密到token里,服务器不保存任何用户信息.服务器通过使用保存的密钥验证token的正确性,只要正确即通过验证. 优点 简洁: 可以通过URL.POST参数或者在HTTP header发送,因为数据量小,传输速度也很快: 自包含:负载中可以包含用户所需要的信息,避免了多次查询数据库: 因为Token是以JSON加密的形式保存在客户端的,所以JWT是跨语言的,原则上任何web形式都支持: 不需要在服务端保存会话信息,特别适用于分布式微

Spring Boot和Dubbo整合

创建3个项目 dubbo-api 新建一个maven项目,这个项目只有接口(或实体类) dubbo-service和dubbo-web这两个依赖于dubbo-api <groupId>com.zhang</groupId> <artifactId>dubbo-api</artifactId> <version>1.0</version> package com.zhang.service; public interface UserS