RabbitMQ指南之三:发布/订阅模式(Publish/Subscribe)

  在上一章中,我们创建了一个工作队列,工作队列模式的设想是每一条消息只会被转发给一个消费者。本章将会讲解完全不一样的场景: 我们会把一个消息转发给多个消费者,这种模式称之为发布-订阅模式。

  为了阐述这个模式,我们将会搭建一个简单的日志系统,它包含两种程序:一种发送日志消息,另一种接收并打印日志消息。在这个日志系统里,每一个运行的消费者都可以获取到消息,在这种情况下,我们可以实现这种需求:一个消费者接收消息并写入磁盘,另一个消费者接收消息并打印在电脑屏幕上。简单来说,生产者发布的消息将会以广播的形式转发到所有的消费者。

1、交换器(Exchange)

  在前两章节我们,我们往队列中发布消息或获取消息,然而,前面的讲解其实并不完整,接下来,是时候介绍完整的RabbitMq消息模型了。

  回忆一下我们前两章指南中包含的内容:

    • 一个生产者用以发送消息;
    • 一个队列缓存消息;
    • 一个消费者用以消费队列中的消息。

  RabbitMq消息模式的核心思想是:一个生产者并不会直接往一个队列中发送消息,事实上,生产者根本不知道它发送的消息将被转发到哪些队列。

  实际上,生产者只能把消息发送给一个exchange,exchange只做一件简单的事情:一方面它们接收从生产者发送过来的消息,另一方面,它们把接收到的消息推送给队列。一个exchage必须清楚地知道如何处理一条消息。

  

  有四种类型的交换器,分别是:direct、topic、headers、fanout。本章主要讲解最后一种:fanous(广播模式)。下面创建一个fanout类型的交换器,我们称之为:logs:

1 channel.exchangeDeclare("logs", "fanout");

  广播模式交换器很简单,从字面意思也能理解,它其实就是把接收到的消息推送给所有它知道的队列。在我们的日志系统中正好需要这种模式。

  如果想查看当前系统中有多少个exchange,可以使用以下命令:

sudo rabbitmqctl list_exchanges

  

  或者通过控制台查看:

  

  可以看到有很多以amq.*开头的交换器,以及(AMQP default)默认交换器,这些是默认创建的交换器。

  在前面两章的指南中,我们并不知道交换器的存在,但是依然可以将消息发送到队列中,那其实并不是因为我们可以不使用交换器,实际上是我们使用了默认的交换器(我们通过指定交换器为字字符串:""),回顾一下我们之前是如何发送消息的:

1 channel.basicPublish("", "hello", null, message.getBytes());

  第一个参数是交换器的名字,空字符串表示它是一个默认或无命名的交换器,消息将会由指定的路由键(第二个参数,routingKey,后面会讲)转发到队列。

  你可能会有疑问:既然exchange可以指定为空字符串(""),那么可否指定为null?

  答案是:不能!

  通过跟踪发布消息的代码,在AMQImpl类中的Publish()方面中,可以看到,不光是exchange不能为null,同时routingKey路由键也不能为null,否则会抛出异常:

  接着上面的讲解,我们创建一个命名的交换器:

1 channel.basicPublish( "logs", "", null, message.getBytes());

2、临时队列

  在前两章的例子中,我们使用的队列都是有具体的队列名,创建命名队列是很必要的,因为我们需要将消费者指向同一名字的队列。因此,要想在生产者和消费者中间共享队列就必须要使用命名队列。

  但是,本章讲解的日志系统也可以使用非命名队列(可以不手动命名),我们希望收到所有日志消息,而不是部分。并且我们希望总是接收到新的日志消息而不是旧的日志消息。为了解决这个问题,需要分两步走。

  首先,无论何时我们的消费者连接到RabbitMq,我们都需要一个新的、空的队列来接收日志消息,因此,消费者在连接上RabbitMq之后需要创建一个任意名字的队列,或者让RabbitMq生成任意的队列名字。

  其次,一旦该消费者断开了与RabbitMq的连接,队列也被自动删除。

  通过JAVA客户端的无参方法:queueDeclare()来创建一个非持久化、专有的、自动删除的、名字随机生成的队列。

1 String queueName = channel.queueDeclare().getQueue();

3、绑定(Binding)

  

  前面广播模式的交换器和队列已经创建好了,接下来就是要告诉交换器向队列里发送消息。交换器与队列之间的关系称之为绑定关系。

1 channel.queueBind(queueName, "logs", "");

  至此,交换器已经可以往队列中发送消息了。

  可以通过下列命令来查看队列的绑定关系:

4、完整的代码

  EmitLog.java

 1 import com.rabbitmq.client.BuiltinExchangeType;
 2 import com.rabbitmq.client.Channel;
 3 import com.rabbitmq.client.Connection;
 4 import com.rabbitmq.client.ConnectionFactory;
 5
 6 public class EmitLog {
 7
 8     private static final String EXCHANGE_NAME = "logs";
 9
10     public static void main(String[] args) throws Exception {
11
12         ConnectionFactory factory = new ConnectionFactory();
13         factory.setHost("192.168.92.130");
14
15         try (Connection connection = factory.newConnection();
16              Channel channel = connection.createChannel();) {
17
18             channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
19
20             String message = "RabbitMq fanout。。。。。。";
21             channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("utf-8"));
22
23             System.out.println(" [x] Sent ‘" + message + "‘");
24         }
25     }
26 }

  正好你所看到的,Connection创建完成之后,定义了exchange,这一步是必要的,因为如果没有交换器将无法发送消息。

  如此没有队列绑定到该交换器上,那么,交换器收到的消息将会丢失,但是对我们本章的日志系统来说没问题的,当没有消费者时,我们可以安全地放弃掉数据,我们只接收最新的日志消息。

  ReceiveLogs.java

 1 public class ReceiveLogs {
 2
 3     private static final String EXCHANGE_NAME = "logs";
 4
 5     public static void main(String[] args) throws Exception {
 6
 7         ConnectionFactory factory = new ConnectionFactory();
 8         factory.setHost("192.168.92.130");
 9
10         Connection connection = factory.newConnection();
11         Channel channel = connection.createChannel();
12
13         channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
14
15         final String queue = channel.queueDeclare().getQueue();
16         channel.queueBind(queue,EXCHANGE_NAME,"");
17
18         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
19
20         DeliverCallback deliverCallback = (consumerTa,delivery) -> {
21
22             String message = new String(delivery.getBody(), "UTF-8");
23             System.out.println(" [x] Received ‘" + message + "‘");
24
25         };
26
27         channel.basicConsume(queue,true,deliverCallback,consumerTag -> {});
28     }
29 }

  这里的autoAck设置为true,因为我们这里是广播模式,每个消费者都会收到一样的消息,并且这里给消费者生产的随机名称的队列相当于是独有的,所以在接收到消息之后立即发送确认回执是OK的。

  但是这里先提出一个疑问:在这种模式下,每个队列收到的消息是否也会有Ready和Unacked状态?

5、测试结果

  一、首先启动生产者,再启动两个消费者

  

 

  可以看到,生产者启动后发送的消息丢失了,两个消费者并没有消费到,此时再看控制台:

  

  可见RabbitMq为我们创建了两个随机命名的队列,其Exclusive是Owner,表示是专有的,Parameters为AD(auto delete),拥有该队列的消费者一占断开连接,队列将会被自动删除。

  二、其次启动生产者发送一次消息

   

  两个消费都都收到了消息。

  三、关闭所有消费者,观察控制台变化

  

  两个专有随机队列自动删除了。

6、SpringBoot的实现

  工程结构图:

 一、配置文件application.properties:

  生产者:

#RabbitMq
spring.rabbitmq.host=192.168.92.130
spring.rabbitmq.exchange=logs

  消费者:

#RabbitMq
spring.rabbitmq.host=192.168.92.130
spring.rabbitmq.exchange=logs

##队列--我们可以自己指定队列名称,也可以由RabbitMq自动生成,这里为了方便,我们自己命名(如果需要,我也可以写一个自动生成名称的方法)
rqbbitmq.log.fanout.info=info
rqbbitmq.log.fanout.error=error
server.port=8090

二、生产者代码

  这里为了让系统生产者启动时就自动发送一条消息,我加了一个EmitLogRunner类。

  EmitLog.java

 1 import org.springframework.amqp.core.AmqpTemplate;
 2 import org.springframework.beans.factory.annotation.Autowired;
 3 import org.springframework.beans.factory.annotation.Value;
 4 import org.springframework.stereotype.Component;
 5
 6 @Component
 7 public class EmitLog {
 8
 9     @Value("${spring.rabbitmq.exchange}")
10     private String exchange;
11
12     @Autowired
13     private AmqpTemplate amqpTemplate;
14
15     public void send(String msg) {
16         amqpTemplate.convertAndSend(exchange,"",msg);
17     }
18 }

  EmitLogRunner.java

 1 import org.springframework.beans.factory.annotation.Autowired;
 2 import org.springframework.boot.ApplicationArguments;
 3 import org.springframework.boot.ApplicationRunner;
 4 import org.springframework.stereotype.Component;
 5
 6 @Component
 7 public class EmitLogRunner implements ApplicationRunner {
 8
 9     @Autowired
10     private EmitLog emitLog;
11
12     @Override
13     public void run(ApplicationArguments args) throws Exception {
14         System.out.println("生产者发布消息:" + msg);
15         emitLog.send("RabbitMq fanout test message");
16     }
17 }

二、消费者代码  

  ReceiveInfoLogs.java

 1 @Component
 2 @RabbitListener(
 3         bindings = @QueueBinding(
 4                 value = @Queue(value = "${rqbbitmq.log.fanout.info}",autoDelete = "true"),
 5                 exchange = @Exchange(value = "${spring.rabbitmq.exchange}",type = ExchangeTypes.FANOUT)
 6         )
 7 )
 8 public class ReceiveInfoLogs {
 9
10     @Autowired
11     private AmqpTemplate amqpTemplate;
12
13     @RabbitHandler
14     public void receiveInfoLog (Object message) {
15
16         System.out.println("接收到info级别的日志:" + message);
17     }
18 }

  ReceiveErrorLogs.java

 1 import org.springframework.amqp.core.AmqpTemplate;
 2 import org.springframework.amqp.core.ExchangeTypes;
 3 import org.springframework.amqp.rabbit.annotation.*;
 4 import org.springframework.beans.factory.annotation.Autowired;
 5 import org.springframework.stereotype.Component;
 6
 7 @Component
 8 @RabbitListener(
 9         bindings = @QueueBinding(
10                 value = @Queue(value = "${rqbbitmq.log.fanout.error}",autoDelete = "true"),
11                 exchange = @Exchange(value = "${spring.rabbitmq.exchange}",type = ExchangeTypes.FANOUT)
12         )
13 )
14 public class ReceiveErrorLogs {
15
16     @Autowired
17     private AmqpTemplate amqpTemplate;
18
19     @RabbitHandler
20     public void receiveErrorLog(Object message) {
21         System.out.println("接收到的error级别日志:" + message);
22     }
23 }

  注意看一下注解方式bindings里面都是以@开头并加上对应的要绑定的项,琢磨一下应该都能理解。

三、验证

  启动消费者和生产者,查看控制台:

    

  至此,发布订阅模式讲解完了,在下一章中将会讲解Routing(路由)的概念。

原文地址:https://www.cnblogs.com/wuhenzhidu/p/10800239.html

时间: 2024-08-05 10:54:17

RabbitMQ指南之三:发布/订阅模式(Publish/Subscribe)的相关文章

Mina、Netty、Twisted一起学(七):发布/订阅(Publish/Subscribe)

消息传递有很多种方式,请求/响应(Request/Reply)是最常用的.在前面的博文的例子中,很多都是采用请求/响应的方式,当服务器接收到消息后,会立即write回写一条消息到客户端.HTTP协议也是基于请求/响应的方式. 但是请求/响应并不能满足所有的消息传递的需求,有些需求可能需要服务端主动推送消息到客户端,而不是被动的等待请求后再给出响应. 发布/订阅(Publish/Subscribe)是一种服务器主动发送消息到客户端的消息传递方式.订阅者Subscriber连接到服务器客户端后,相当

NATS学习 -- 概念学习之消息(Message)与发布订阅(Publish Subscribe)

1 理论篇 1.1 来自官方的介绍 NATS acts as a central nervous system for distributed systems such as mobile devices, IoT networks, enterprise microservices and cloud native infrastructure. Unlike traditional enterprise messaging systems, NATS provides an always o

【译】RabbitMQ:发布-订阅(Publish/Subscribe)

在前一篇教程中,我们创建了一个工作队列,我们假设在工作队列后的每一个任务都只被调度给一个消费者.在这一部分,我们将做一些完全不一样的事情,调度同一条消息给多个消费者,也就是有名的“发布-订阅”模式.为了阐述这种模式,我们将构建一个简单的日志系统.该系统将由两部分组成:一部分发送日志消息,另一部分接收并且打印日志消息,在这个日志系统中,每一份运行着的接收程序都将会收到消息.这样我们可以运行一个接收者把日志写入到磁盘中,同时可以运行另一个接收者将日志打印到显示器上面.也就是说,发布的日志消息会被广播

RabbitMQ/JAVA (发布/订阅模式)

发布/订阅模式即生产者将消息发送给多个消费者. 下面介绍几个在发布/订阅模式中的关键概念-- 1. Exchanges (转发器) 可能原来我们都是基于一个队列发送和接收消息.现在介绍一下完整的消息传递模式. Rabbitmq消息模式的核心理念是:生产者没有直接发送任何消息到队列.实际上,生产者都不知道这个消息是发送给哪个队列的.相反,生产者只能发送消息给转发器. 转发器一方面接收生产者的消息,另一方面向队列推送消息. 转发器必须清楚的指导如何处理接收到的消息,需要附加队列吗?附加几个?或者是否

RabbitMQ系列教程之三:发布/订阅(Publish/Subscribe)

(本教程是使用Net客户端,也就是针对微软技术平台的) 在前一个教程中,我们创建了一个工作队列.工作队列背后的假设是每个任务会被交付给一个[工人].在这一部分我们将做一些完全不同的事情--我们将向多个[消费者]传递信息.这种模式被称为"发布/订阅". 为了说明这种模式,我们将构建一个简单的日志系统.它将包括两个程序,第一个将发出日志消息,第二个将接收并打印它们. 在我们的日志系统中每个接收程序的运行副本都会得到消息.这样我们就可以运行一个接收者程序,将日志记录到磁盘:同时我们可以运行另

rabbitMQ交换机的发布订阅模式

生产者: # !/usr/bin/env python # -*- coding: utf-8 -*- import pika # 创建连接对象 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # 创建交换机 channel.exchange_declare(exchange='logs', exchange_type=

C/S模式,发布/订阅模式和PUSH/PULL模式(上)

CS模式(客户端/服务器模式) 最场景的信息传递模式,也称为Request/Response模式,或者调用模式.http/https协议即此模式.因为最常用所以大家一般都比较熟悉,这里不重点讲了,大家请看图下图: 发布/订阅模式(Publish/Subscribe) 发布订阅模式相对于BS模式稍微难点,我们不妨先看一个生活中的小例子: 如果没有邮局会怎么样?毫无疑问出版社既要发行杂志又要把杂志投递给用户,不仅累而且极其低效!因为大部分时间都将耽误在投递上,发行杂志的事情还有肯能被耽误!此例子可以

RabbitMQ学习第三记:发布/订阅模式(Publish/Subscribe)

工作队列模式是直接在生产者与消费者里声明好一个队列,这种情况下消息只会对应同类型的消费者. 举个用户注册的列子:用户在注册完后一般都会发送消息通知用户注册成功(失败).如果在一个系统中,用户注册信息有邮箱.手机号,那么在注册完后会向邮箱和手机号都发送注册完成信息.利用MQ实现业务异步处理,如果是用工作队列的话,就会声明一个注册信息队列.注册完成之后生产者会向队列提交一条注册数据,消费者取出数据同时向邮箱以及手机号发送两条消息.但是实际上邮箱和手机号信息发送实际上是不同的业务逻辑,不应该放在一块处

Redis研究(十六)—发布/订阅模式

在上一篇中我们写了Redis的任务队列. 除了实现任务队列外,Redis还提供了一组命令可以让开发者实现"发布/订阅"(publish/subscribe)模式."发布/订阅"模式同样可以实现进程间的消息传递,其原理是这样的: "发布/订阅"模式中包含两种角色,分别是发布者和订阅者.订阅者可以订阅一个或若干个频道(channel),而发布者可以向指定的频道发送消息,所有订阅此频道的订阅者都会收到此消息. 发布者发布消息的命令是PUBLISH,用法