综合概述
消息队列
消息队列就是一个消息的链表,可以把消息看作一个记录,具有特定的格式以及特定的优先级。对消息队列有写权限的进程可以向消息队列中按照一定的规则添加新消息,对消息队列有读权限的进程则可以从消息队列中读走消息,而消息队列就是在消息的传输过程中保存消息的容器,你可以简单的把消息队列理解为类似快递柜,快递员(消息发布者)往快递柜(消息队列)投递物件(消息),接受者(消息订阅者)从快递柜(消息队列)接收物件(消息),当然消息队列往往还包含一些特定的消息传递和接收机制。
消息队列作为分布式系统中重要的组件,可以有效解决应用耦合,异步消息,流量削锋等系列问题,有利于实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等,各种消息队列也都各有特点,比如Kafka提供高性能、高吞吐量,但可靠性有所欠缺,所以比较适合像日志处理这类对性能要求高但对可靠性要求没那么严格的业务,再比如RabbitMQ支持了各种协议,实现较为臃肿,性能和吞吐量都一般,但却提供了很好的可靠性,比较适合像银行金融一类对可靠性要求较高的业务。
应用场景
以下简单介绍几个消息队列在实际应用中的使用场景(以下场景资料引用自网络)。
1 异步处理
场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式
(1)串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端
(2)并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。
因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)
小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?
引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:
按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍
2 应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图
传统模式的缺点:
- 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败
- 订单系统与库存系统耦合
如何解决以上问题呢?引入应用消息队列后的方案,如下图:
- 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
- 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
- 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦
3 流量削锋
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛
应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
- 可以控制活动的人数
- 可以缓解短时间内高流量压垮应用
- 用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面
- 秒杀业务根据消息队列中的请求信息,再做后续处理
4 日志处理
日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下
- 日志采集客户端,负责日志数据采集,定时写受写入Kafka队列
- Kafka消息队列,负责日志数据的接收,存储和转发
- 日志处理应用:订阅并消费kafka队列中的日志数据
以下是新浪kafka日志处理应用案例:
(1)Kafka:接收用户日志的消息队列
(2)Logstash:做日志解析,统一成JSON输出给Elasticsearch
(3)Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能
(4)Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因
5 消息通讯
消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等
点对点通讯:
客户端A和客户端B使用同一队列,进行消息通讯。
聊天室通讯:
客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。
以上实际是消息队列的两种消息模式,点对点或发布订阅模式。模型为示意图,供参考。
Rabbit MQ
AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦和通讯。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性和安全。
RabbitMQ是一个开源的AMQP实现,服务器端用 Erlang 语言编写,支持多种客户端,如:Java、Python、Ruby、.NET、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,具有很高的易用性和可用性。
接下来,我们先来了解几个相关概念(以下相关介绍资料引用自网络)。
ConnectionFactory、Connection、Channel
ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。 Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
Queue
Queue(队列)是RabbitMQ的内部对象,用于存储消息。
RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费。
生产者Send Message “A”被传送到Queue中,消费者发现消息队列Queue中有订阅的消息,就会将这条消息A读取出来进行一些列的业务操作。这里只是一个消费正对应一个队列Queue,也可以多个消费者订阅同一个队列Queue,当然这里就会将Queue里面的消息平分给其他的消费者,但是会存在一个一个问题就是如果每个消息的处理时间不同,就会导致某些消费者一直在忙碌中,而有的消费者处理完了消息后一直处于空闲状态,因为前面已经提及到了Queue会平分这些消息给相应的消费者。这里我们就可以使用prefetchCount来限制每次发送给消费者消息的个数。详情见下图所示:
这里的prefetchCount=1是指每次从Queue中发送一条消息来。等消费者处理完这条消息后Queue会再发送一条消息给消费者。
Message acknowledgment
在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。 这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的bug——Queue中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑…
另外pub message是没有ack的。
Message durability
如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。由于这里仅为RabbitMQ的简单介绍,所以这里将不讲解RabbitMQ相关的事务。
Exchange
首先明确一点就是生产者产生的消息并不是直接发送给消息队列Queue的,而是要经过Exchange(交换器),由Exchange再将消息路由到一个或多个Queue,当然这里还会对不符合路由规则的消息进行丢弃掉,这里指的是后续要谈到的Exchange Type。那么Exchange是怎样将消息准确的推送到对应的Queue的呢?那么这里的功劳最大的当属Binding,RabbitMQ是通过Binding将Exchange和Queue链接在一起,这样Exchange就知道如何将消息准确的推送到Queue中去。简单示意图如下所示:
在绑定(Binding)Exchange和Queue的同时,一般会指定一个Binding Key,生产者将消息发送给Exchange的时候,一般会产生一个Routing Key,当Routing Key和Binding Key对应上的时候,消息就会发送到对应的Queue中去。那么Exchange有四种类型,不同的类型有着不同的策略。也就是表明不同的类型将决定绑定的Queue不同,换言之就是说生产者发送了一个消息,Routing Key的规则是A,那么生产者会将Routing Key=A的消息推送到Exchange中,这时候Exchange中会有自己的规则,对应的规则去筛选生产者发来的消息,如果能够对应上Exchange的内部规则就将消息推送到对应的Queue中去。那么接下来就来详细讲解下Exchange里面类型。
Exchange Types
- fanout
fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
上图所示,生产者(P)生产消息1将消息1推送到Exchange,由于Exchange Type=fanout这时候会遵循fanout的规则将消息推送到所有与它绑定Queue,也就是图上的两个Queue最后两个消费者消费。
- direct
direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中
当生产者(P)发送消息时Rotuing key=booking时,这时候将消息传送给Exchange,Exchange获取到生产者发送过来消息后,会根据自身的规则进行与匹配相应的Queue,这时发现Queue1和Queue2都符合,就会将消息传送给这两个队列,如果我们以Rotuing key=create和Rotuing key=confirm发送消息时,这时消息只会被推送到Queue2队列中,其他Routing Key的消息将会被丢弃。
- topic
前面提到的direct规则是严格意义上的匹配,换言之Routing Key必须与Binding Key相匹配的时候才将消息传送给Queue,那么topic这个规则就是模糊匹配,可以通过通配符满足一部分规则就可以传送。它的约定是:
- routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
- binding key与routing key一样也是句点号“. ”分隔的字符串
- binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
当生产者发送消息Routing Key=F.C.E的时候,这时候只满足Queue1,所以会被路由到Queue中,如果Routing Key=A.C.E这时候会被同是路由到Queue1和Queue2中,如果Routing Key=A.F.B时,这里只会发送一条消息到Queue2中。
- headers
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
实现案例
首先,需要安装Rabbit MQ,可以直接安装,也可以用Docker安装,这个网上教程很多,这里就不再赘述了。
生成项目模板
为方便我们初始化项目,Spring Boot给我们提供一个项目模板生成网站。
1. 打开浏览器,访问:https://start.spring.io/
2. 根据页面提示,选择构建工具,开发语言,项目信息等。
3. 点击 Generate the project,生成项目模板,生成之后会将压缩包下载到本地。
4. 使用IDE导入项目,我这里使用Eclipse,通过导入Maven项目的方式导入。
添加相关依赖
清理掉不需要的测试类及测试依赖,添加 rabbitmq相关依赖。
<!-- rabbitmq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
下面给出完整的POM文件。
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.louis.springboot</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- swagger --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency> <!-- rabbitmq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
添加相关配置
添加一个swagger 配置类,在工程下新建 config 包并添加一个 SwaggerConfig 配置类。
SwaggerConfig.java
package com.louis.springboot.demo.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import springfox.documentation.builders.ApiInfoBuilder; import springfox.documentation.builders.PathSelectors; import springfox.documentation.builders.RequestHandlerSelectors; import springfox.documentation.service.ApiInfo; import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2; @Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket createRestApi(){ return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()) .select() .apis(RequestHandlerSelectors.any()) .paths(PathSelectors.any()).build(); } private ApiInfo apiInfo(){ return new ApiInfoBuilder() .title("Swagger API Doc") .description("This is a restful api document of Swagger.") .version("1.0") .build(); } }
修改application.properties文件名为application.yml,在其中添加RabbitMQ配置信息,根据自己安装的RabbitMQ配置。
application.yml
# rabbitmq配置 spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
普通队列模式
新建一个RabbitMQ配置类,并添加一个demoQueue队列。
RabbitConfig.java
package com.louis.springboot.demo.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { /** * 定义demoQueue队列 * @return */ @Bean public Queue demoString() { return new Queue("demoQueue"); } }
编写一个消息发布者,并编写一个发送方法,通过AmqpTemplate往"demoQueue"发送消息。
RabbitProducer.java
package com.louis.springboot.demo.mq; import java.text.SimpleDateFormat; import java.util.Date; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class RabbitProducer { @Autowired private AmqpTemplate rabbitTemplate; public void sendDemoQueue() { Date date = new Date(); String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date); System.out.println("[demoQueue] send msg: " + dateString); // 第一个参数为刚刚定义的队列名称 this.rabbitTemplate.convertAndSend("demoQueue", dateString); } }
编写一个消息消费者,通过@RabbitListener(queues = "demoQueue")注解监听"demoQueue"队列,并用@RabbitHandler注解相关方法,这样在在队列收到消息之后,交友@RabbitHandler注解的方法进行处理。
DemoQueueConsumer.java
package com.louis.springboot.demo.mq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "demoQueue") public class DemoQueueConsumer { /** * 消息消费 * @RabbitHandler 代表此方法为接受到消息后的处理方法 */ @RabbitHandler public void recieved(String msg) { System.out.println("[demoQueue] recieved message: " + msg); } }
编写一个控制器,注入RabbitProducer调用相关消息发送方法,方便通过接口触发消息发送。
RabbitMqController.java
package com.louis.springboot.demo.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import com.louis.springboot.demo.mq.RabbitProducer; @RestController public class RabbitMqController { @Autowired private RabbitProducer rabbitProducer; @GetMapping("/sendDemoQueue") public Object sendDemoQueue() { rabbitProducer.sendDemoQueue(); return "success"; } }
编译并启动应用,打开浏览器,访问:http://localhost:8080/swagger-ui.html,进入swagger接口文档界面。
调用两次sendDemoQueue接口,在控制台可以看到我们输出的信息,说明消息已经成功发送并被消费。
[demoQueue] send msg: 2019-58-183 04:07:38 [demoQueue] recieved message: 2019-58-183 04:07:38 [demoQueue] send msg: 2019-01-183 05:07:05 [demoQueue] recieved message: 2019-01-183 05:07:05
Fanout广播模式
Fanout其实就是广播模式,只要跟它绑定的队列都会通知并且接受到消息。修改配置类,在RabbitConfig中添加如下fanout模式的队列跟交换机信息。在代码中我们配置了三个队列名、一个fanout交换机,并且将这三个队列绑定到了fanout交换器上。只要我们往这个交换机生产新的消息,那么这三个队列都会收到。
RabbitConfig.java
//=================== fanout广播模式 ==================== @Bean public Queue fanoutA() { return new Queue("fanout.a"); } @Bean public Queue fanoutB() { return new Queue("fanout.b"); } @Bean public Queue fanoutC() { return new Queue("fanout.c"); } /** * 定义个fanout交换器 * @return */ @Bean FanoutExchange fanoutExchange() { // 定义一个名为fanoutExchange的fanout交换器 return new FanoutExchange("fanoutExchange"); } /** * 将定义的fanoutA队列与fanoutExchange交换机绑定 * @return */ @Bean public Binding bindingExchangeWithA() { return BindingBuilder.bind(fanoutA()).to(fanoutExchange()); } /** * 将定义的fanoutB队列与fanoutExchange交换机绑定 * @return */ @Bean public Binding bindingExchangeWithB() { return BindingBuilder.bind(fanoutB()).to(fanoutExchange()); } /** * 将定义的fanoutC队列与fanoutExchange交换机绑定 * @return */ @Bean public Binding bindingExchangeWithC() { return BindingBuilder.bind(fanoutC()).to(fanoutExchange()); }
然后我们在RabbitProducer中添加一个sendFanout方法,用来向fanout队列发送消息。
RabbitProducer.java
public void sendFanout() { Date date = new Date(); String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date); System.out.println("[fanout] send msg:" + dateString); // 注意 第一个参数是我们交换机的名称 ,第二个参数是routerKey 我们不用管空着就可以,第三个是你要发送的消息 this.rabbitTemplate.convertAndSend("fanoutExchange", "", dateString); }
同样的,在控制器里添加一个访问接口。
RabbitMqController.java
@GetMapping("/sendFanout") public Object sendFanout() { rabbitProducer.sendFanout(); return "success"; }
接着针对三个广播队列分别编写一个消息消费者,指定队列和处理函数。
FanoutAConsumer.java
package com.louis.springboot.demo.mq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "fanout.a") public class FanoutAConsumer { /** * 消息消费 * @RabbitHandler 代表此方法为接受到消息后的处理方法 */ @RabbitHandler public void recieved(String msg) { System.out.println("[fanout.a] recieved message: " + msg); } }
FanoutBConsumer.java
package com.louis.springboot.demo.mq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "fanout.b") public class FanoutBConsumer { /** * 消息消费 * @RabbitHandler 代表此方法为接受到消息后的处理方法 */ @RabbitHandler public void recieved(String msg) { System.out.println("[fanout.b] recieved message: " + msg); } }
FanoutCConsumer.java
package com.louis.springboot.demo.mq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "fanout.c") public class FanoutCConsumer { /** * 消息消费 * @RabbitHandler 代表此方法为接受到消息后的处理方法 */ @RabbitHandler public void recieved(String msg) { System.out.println("[fanout.c] recieved message: " + msg); } }
重新启动应用,调用sendFanout接口,通过控制台可以看到消息发送之后,a, b, c三个队列都收到了消息。
[fanout] send msg:2019-47-183 05:07:12 [fanout.c] recieved message: 2019-47-183 05:07:12 [fanout.b] recieved message: 2019-47-183 05:07:12 [fanout.a] recieved message: 2019-47-183 05:07:12
Topic主题模式
利用topic模式可以实现模糊匹配,同样的,在RabbitConfig中配置topic队列跟交换器,注意的是这里需要多配置一个bindingKey。
RabbitConfig.java
//=================== topic主题模式 ==================== @Bean public Queue topiocA() { return new Queue("topic.a"); } @Bean public Queue topicB() { return new Queue("topic.b"); } @Bean public Queue topicC() { return new Queue("topic.c"); } /** * 定义个topic交换器 * @return */ @Bean TopicExchange topicExchange() { // 定义一个名为fanoutExchange的fanout交换器 return new TopicExchange("topicExchange"); } /** * 将定义的topicA队列与topicExchange交换机绑定 * @return */ @Bean public Binding bindingTopicExchangeWithA() { return BindingBuilder.bind(topiocA()).to(topicExchange()).with("topic.msg"); } /** * 将定义的topicB队列与topicExchange交换机绑定 * @return */ @Bean public Binding bindingTopicExchangeWithB() { return BindingBuilder.bind(topicB()).to(topicExchange()).with("topic.#"); } /** * 将定义的topicC队列与topicExchange交换机绑定 * @return */ @Bean public Binding bindingTopicExchangeWithC() { return BindingBuilder.bind(topicC()).to(topicExchange()).with("topic.*.z"); }
上述配置中:
topicA的key为topic.msg 那么他只会接收包含topic.msg的消息
topicB的key为topic.#那么他只会接收topic开头的消息
topicC的key为topic.*.z那么他只会接收topic.x.z这样格式的消息
然后修改RabbitProducer,在其中添加如下三个方法,如方法名所示,分别根据匹配规则发送到A\B,B,B\C队列。
RabbitProducer.java
public void sendTopicTopicAB() { Date date = new Date(); String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date); dateString = "[topic.msg] send msg:" + dateString; System.out.println(dateString); // 注意 第一个参数是我们交换机的名称 ,第二个参数是routerKey topic.msg,第三个是你要发送的消息 // 这条信息将会被 topic.a topic.b接收 this.rabbitTemplate.convertAndSend("topicExchange", "topic.msg", dateString); } public void sendTopicTopicB() { Date date = new Date(); String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date); dateString = "[topic.good.msg] send msg:" + dateString; System.out.println(dateString); // 注意 第一个参数是我们交换机的名称 ,第二个参数是routerKey ,第三个是你要发送的消息 // 这条信息将会被topic.b接收 this.rabbitTemplate.convertAndSend("topicExchange", "topic.good.msg", dateString); } public void sendTopicTopicBC() { Date date = new Date(); String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date); dateString = "[topic.m.z] send msg:" + dateString; System.out.println(dateString); // 注意 第一个参数是我们交换机的名称 ,第二个参数是routerKey ,第三个是你要发送的消息 // 这条信息将会被topic.b、topic.c接收 this.rabbitTemplate.convertAndSend("topicExchange", "topic.m.z", dateString); }
同样的,在控制器里面添加发送服务对应的接口。
RabbitMqController.java
@GetMapping("/sendTopicTopicAB") public Object sendTopicTopicAB() { rabbitProducer.sendTopicTopicAB(); return "success"; } @GetMapping("/sendTopicTopicB") public Object sendTopicTopicB() { rabbitProducer.sendTopicTopicB(); return "success"; } @GetMapping("/sendTopicTopicBC") public Object sendTopicTopicBC() { rabbitProducer.sendTopicTopicBC(); return "success"; }
接着针对三个主题队列编写对应的消息消费者。
TopicAConsumer.java
package com.louis.springboot.demo.mq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "topic.a") public class TopicAConsumer { /** * 消息消费 * @RabbitHandler 代表此方法为接受到消息后的处理方法 */ @RabbitHandler public void recieved(String msg) { System.out.println("[topic.a] recieved message:" + msg); } }
TopicBConsumer.java
package com.louis.springboot.demo.mq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "topic.b") public class TopicBConsumer { /** * 消息消费 * @RabbitHandler 代表此方法为接受到消息后的处理方法 */ @RabbitHandler public void recieved(String msg) { System.out.println("[topic.b] recieved message:" + msg); } }
TopicCConsumer.java
package com.louis.springboot.demo.mq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "topic.c") public class TopicCConsumer { /** * 消息消费 * @RabbitHandler 代表此方法为接受到消息后的处理方法 */ @RabbitHandler public void recieved(String msg) { System.out.println("[topic.c] recieved message:" + msg); } }
重启应用,调用sendTopicTopicAB接口,经过匹配,route key为“topic.msg”的消息被发送到了topic.a和topic.b。
[topic.msg] send msg:2019-12-183 06:07:22 [topic.b] recieved message:[topic.msg] send msg:2019-12-183 06:07:22 [topic.a] recieved message:[topic.msg] send msg:2019-12-183 06:07:22
调用sendTopicTopicB接口,经过匹配,route key为“topic.good.msg”的消息被发送到了topic.b。
[topic.good.msg] send msg:2019-15-183 06:07:23 [topic.b] recieved message:[topic.good.msg] send msg:2019-15-183 06:07:23
调用sendTopicTopicBC接口,经过匹配,route key为“topic.m.z”的消息被发送到了topic.b和topic.c。
[topic.m.z] send msg:2019-16-183 06:07:09 [topic.b] recieved message:[topic.m.z] send msg:2019-16-183 06:07:09 [topic.c] recieved message:[topic.m.z] send msg:2019-16-183 06:07:09
参考资料
官方网站:https://www.rabbitmq.com/
百度百科:https://baike.baidu.com/item/rabbitmq/9372144?fr=aladdin
中文教程:http://rabbitmq.mr-ping.com/description.html
原文地址:https://www.cnblogs.com/7788IT/p/11143630.html