RabbitMQ 发布订阅持久化

RabbitMQ是一种重要的消息队列中间件,在生产环境中,稳定是第一考虑。RabbitMQ厂家也深知开发者的声音,稳定、可靠是第一考虑,为了消息传输的可靠性传输,RabbitMQ提供了多种途径的消息持久化保证:Exchange持久化、Queue持久化及Message的持久化。以保证RabbitMQ在退出或Crash等异常情况下,消息不会丢失。RabbitMQ提供了简单的参数配置来实现持久化操作。

简单说明一下各种持久化方式:(描述代码采用的是Rabbit.Client  SDK,  C#代码)

Queue持久化:队列是我们使用RabbitMQ进行数据传输的最多使用的方式,是进行点对点消息传递使用最多的方式。队列的持久化是通过durable=true 来实现。

var connFactory = new ConnectionFactory();
Conn = connFactory.CreateConnection();
Model = Conn.CreateModel();
Model.QueueDeclare(q, false, false, false, null);  

其中,QueueDeclare的定义:

/// <summary>(Spec method) Declare a queue.</summary>
        [AmqpMethodDoNotImplement(null)]
        QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive,
            bool autoDelete, IDictionary<string, object> arguments);  

参数说明:queue:队列名称。durable:设置是否执行持久化。如果设置为true,即durable=true,持久化实现的重要参数

exclusive:指示队列是否是排他性。如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。需要注意:1. 排他队列是基于连接可见的,同一连接的不同信道Channel是可以同时访问同一连接创建的排他队列;2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。

autoDelete:是否自动删除。如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于发布订阅方式创建的临时队列。

消息的持久化:如果将一个队列设置为持久化,那么会创建一个持久化的队列,但并不意味着队列中的消息也会持久化存储。因此如果要保证消息在RabbitMQ出现异常时不会丢失,需要设定消息的持久化。

简要说明一下消息持久化和队列持久化的联系:

队列设置为持久化,那么在RabbitMQ重启之后,持久化的队列也会存在,并会保持和重启前一致的队列参数。

消息设置为持久化,在RabbitMQ重启之后,持久化的消息也会存在。

那么就会出现一些矛盾的地方:

1、因为消息必须依附于队列存在才有意义,那么如果队列设置为非持久化,而消息设置为持久化。在RabbitMQ重启之后,持久化的消息是否还存在呢?因为非持久化的队列可能并不存在。

2、如果设置消息持久化为true,但队列设置成排他性队列,那么在RabbitMQ重启之后,消息是否仍然存在。请自行查找分析,下次分析该问题。

 1              var sf = new ConnectionFactory();
 2             using (IConnection conn = cf.CreateConnection())
 3             {
 4                 IModel ch = conn.CreateModel();
                   Model = Conn.CreateModel();
                   Model.QueueDeclare(queueName, true, false, false, null); 
string message = "Hello C# SSL Client World"; 11 byte[] msgBytes = System.Text.Encoding.UTF8.GetBytes(message);                   //发送消息
12                 ch.BasicPublish("", queueName, null, msgBytes);
13
14                 bool noAck = false;
15                 BasicGetResult result = ch.BasicGet(qName, noAck);
16                 byte[] body = result.Body;
17                 string resultMessage = System.Text.Encoding.UTF8.GetString(body);
18
19                 Assert.AreEqual(message, resultMessage);
20             }

通过RabbitMQ SDK发送消息至MQ非常简单,通过BasicPublish即可。

BasicPublish 的定义:

1   /// <summary>
2         /// (Spec method) Convenience overload of BasicPublish.
3         /// </summary>
4         /// <remarks>
5         /// The publication occurs with mandatory=false
6         /// </remarks>
7         [AmqpMethodDoNotImplement(null)]
8         void BasicPublish(string exchange, string routingKey, IBasicProperties basicProperties, byte[] body);

设置消息持久化,需要设置basicProperties的DeliveryMode=2 (Non-persistent (1) or persistent (2)).

设置了队列和消息持久化后,当服务重启之后,消息仍然存在。只设置队列持久化,不设置消息持久化,重启之后消息会丢失;只设置消息持久化,不设置队列持久化,在服务重启后,队列会消失,从而依附于队列的消息也会丢失。只设置消息持久化而不设置队列的持久化,毫无意义。

Exchange持久化:

为了实现一对多的消息发送,我们一般会采用发布订阅模式,通过一个发送端、多个订阅端来实现消息的分发。

发布订阅模式存在一些问题:

1、如果消费者由于网络或其他原因,与RabbitMQ的连接断开,那么RabbitMQ会自动将与其对应的队列删除,当消息程序重新连接以后,无法获取断开前未来得及消费的消息。

2、如果RabbitMQ出现故障或Crash,那么在RabbitMQ  服务重启之后,消费端未及时消费的消息也会丢失,并且如果Exchange 不设置成持久化,那么在MQ服务重启之后,Exchange也不会存在。

1   /// <summary>(Spec method) Declare an exchange.</summary>
2         /// <remarks>
3         /// The exchange is declared non-passive and non-internal.
4         /// The "nowait" option is not exercised.
5         /// </remarks>
6         [AmqpMethodDoNotImplement(null)]
7         void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete,
8             IDictionary<string, object> arguments);

参数说明:exchange:RabbitMQ中定义的Exchange名称,type:类型,包含fanout、topic、direct、headers,durable:持久化设置。设置成true,就可以设定exchange持久化存储,autodelete:是否自动删除。

exchange是实现发布订阅的基础,其类型包含fanout、headers、direct、、topic。我们本次仅讨论类型为topic。

发布订阅模式执行消息发送的流程:

原文地址:https://www.cnblogs.com/jiagoushi/p/8678871.html

时间: 2024-08-01 09:49:48

RabbitMQ 发布订阅持久化的相关文章

RabbitMQ 发布/订阅

我们会做一些改变,就是把一个消息发给多个消费者,这种模式称之为发布/订阅(类似观察者模式). 为了验证这种模式,我们准备构建一个简单的日志系统.这个系统包含两类程序,一类程序发动日志,另一类程序接收和处理日志. 在我们的日志系统中,每一个运行的接收者程序都会收到日志.然后我们实现,一个接收者将接收到的数据写到硬盘上,与此同时,另一个接收者把接收到的消息展现在屏幕上.本质上来说,就是发布的日志消息会转发给所有的接收者. 1.转发器(Exchanges) RabbitMQ消息模型的核心理念是生产者永

.Net下RabbitMQ发布订阅模式实践

一.概念AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然.AMQP的主要特征是面向消息.队列.路由(包括点对点和发布/订阅).可靠性.安全.RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python.Ruby..NET.Java.JMS.C.PHP.ActionScri

RabbitMQ发布订阅模式

这个可能是消息队列中最重要的队列了,其他的都是在它的基础上进行了扩展.功能实现:一个生产者发送消息,多个消费者获取消息(同样的消息),包括一个生产者,一个交换机,多个队列,多个消费者. 思路解读(重点理解): (1)一个生产者,多个消费者(2)每一个消费者都有自己的一个队列(3)生产者没有直接发消息到队列中,而是发送到交换机(4)每个消费者的队列都绑定到交换机上(5)消息通过交换机到达每个消费者的队列该模式就是Fanout Exchange(扇型交换机)将消息路由给绑定到它身上的所有队列以用户发

Linux 安装redis,redis发布订阅,持久化

安装redis 1.安装redis的方式 -yum (删除这个yum安装的redis,我们只用源码编译安装的) -rpm -源码编译 2.删除原本的redis yum remove redis -y 3.下载redis源码 wget http://download.redis.io/releases/redis-4.0.10.tar.gz 4.解压缩 tar -zxf redis-4.0.10.tar.gz 5.切换redis源码目录 cd redis-4.0.10.tar.gz 6.编译源文件

RabbitMQ - 发布订阅

这次我们试试publish / subscribe模式,也就是将一个消息发送给多个consumer. 这里用一个简单的小程序来说明publish / subscribe.由一个provider提供消息,这个消息会被多个consumer接收.consumer对同一个消息做出不同的反应,比如打印.保存到文件.数据库什么的. 之前的例子可能会给人这种感觉:producer将消息发送到队列中,消息缓冲在队列中,consumer从队列获得消息. 但这并不正确.在rabbit中,producer从来不会直接

【译】RabbitMQ:发布-订阅(Publish/Subscribe)

在前一篇教程中,我们创建了一个工作队列,我们假设在工作队列后的每一个任务都只被调度给一个消费者.在这一部分,我们将做一些完全不一样的事情,调度同一条消息给多个消费者,也就是有名的“发布-订阅”模式.为了阐述这种模式,我们将构建一个简单的日志系统.该系统将由两部分组成:一部分发送日志消息,另一部分接收并且打印日志消息,在这个日志系统中,每一份运行着的接收程序都将会收到消息.这样我们可以运行一个接收者把日志写入到磁盘中,同时可以运行另一个接收者将日志打印到显示器上面.也就是说,发布的日志消息会被广播

RabbitMQ实例教程:发布/订阅者消息队列

消息交换机(Exchange) RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列. 相反的,生产者只能发送消息给交换机(Exchange).交换机的作用非常简单,一边接收从生产者发来的消息,另一边把消息推送到队列中.交换机必须清楚的知道消息如何处理它收到的每一条消息.是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过交换机的类型进行定义. 交换机的类型有:direct,topic,head

RabbitMQ/JAVA (发布/订阅模式)

发布/订阅模式即生产者将消息发送给多个消费者. 下面介绍几个在发布/订阅模式中的关键概念-- 1. Exchanges (转发器) 可能原来我们都是基于一个队列发送和接收消息.现在介绍一下完整的消息传递模式. Rabbitmq消息模式的核心理念是:生产者没有直接发送任何消息到队列.实际上,生产者都不知道这个消息是发送给哪个队列的.相反,生产者只能发送消息给转发器. 转发器一方面接收生产者的消息,另一方面向队列推送消息. 转发器必须清楚的指导如何处理接收到的消息,需要附加队列吗?附加几个?或者是否

rabbitMq及安装、fanout交换机-分发(发布/订阅)

<dependency>            <groupId>com.rabbitmq</groupId>            <artifactId>amqp-client</artifactId>            <version>3.6.5</version> </dependency> 最常见的几种消息通信模式主要有发布-订阅.点对点这两种http://blog.csdn.net/wooge