RabbitMQ(三):消息持久化策略

原文:RabbitMQ(三):消息持久化策略

一、前言

  在正常的服务器运行过程中,时常会面临服务器宕机重启的情况,那么我们的消息此时会如何呢?很不幸的事情就是,我们的消息可能会消失,这肯定不是我们希望见到的结果。所以我们希望AMQP服务器崩溃了也可以将消息恢复,这称之为消息持久化。RabbitMQ自然存在这种策略可以帮助我们完成这件事情。

二、持久化的消息

  当RabbitMQ服务器重启后,原先的队列和交换器会随同里面的消息一同消失。原因在于每个队列和交换器都有durable属性,该属性默认是false,它决定了RabbitMQ是否需要在崩溃或者重启之后重新创建队列或者交换器。将它设置为true就代表了持久性,在服务器重启之后就会重新持久的创建队列和交换器。

  当然做到这点还不够,我们需要的是持久化的消息,所以在消息发布前,通过将消息的“投递模式”(delivery mode)属性设置为2将消息标记为持久化。到目前为止,消息还只是被表示为持久化,还需要被发布到持久化的交换器中并到达持久化的队列中才行。如果不是这样,包含持久化消息的队列或者交换器挥着Rabbit崩溃重启后不复存在,导致消息成为一个孤儿。因此,总结起来需要做到以下三点:

  (1)将消息的投递模式选项设置为2(持久);

  (2)将消息发送到持久化的交换器;

  (3)消息到达持久化的队列。

  注意,如果原先有非持久的交换器或者队列,需要删除后才可重新创建,否则就创建其他名称的交换器或者队列,代码如下:

//声明持久交换器
channel.ExchangeDeclare(
    "HelloExchange",    //交换器名称
    ExchangeType.Direct,//交换器类型
    true,              //是否持久话
    false,              //是否自动删除
    null                //关于交换器的详细设置,键值对形式
    );
//声明持久队列
channel.QueueDeclare(
    "HelloQueue",//队列名称
    true,       //是否持久化
    false,       //是否只对首次声明的队列可见
    false,       //是否自动删除
    null         ////关于队列和队列内消息的详细设置,键值对形式
    );
//发布持久消息
string msg_str = "这是生产者第一次发布的消息";
IBasicProperties msg_pro = channel.CreateBasicProperties();
msg_pro.ContentType = "text/plain";//发布的数据类型
msg_pro.DeliveryMode = 2;//标记持久化

三、事务

  目前为止,我们已经将消息、队列和交换器设置为持久化。但是事实上还存在着‘最后一英里‘的距离,就是在把消息写入磁盘前,消息由于服务器宕机而消失该如何?这时候就需要使用到事务,说到事务就会想到SQL中的事务,但是不能搞混了。AMQP中,在把信道设置为事务模式后,通过信道发送消息后还有多个其他的AMQP命令,这些命令是执行还是忽略,取决于消息的发送是否成功,消息发送成功信道会在事务中完成其他AMQP命令,就可以提交事务了,发送失败则其他AMQP命令将不会执行,我们也会知道发送失败,而采取相应的措施。事务保证了解决这最后的问题。

  代码如下:

using (IConnection conn = conn_factory.CreateConnection())
{
    //2.创建信道
    using (IModel channel = conn.CreateModel())
    {
        try
        {
            channel.TxSelect();//声明事务
            //3.发布消息
            string msg_str = "这是生产者发布的消息";
            IBasicProperties msg_pro = channel.CreateBasicProperties();
            msg_pro.ContentType = "text/plain";//发布的数据类型
            msg_pro.DeliveryMode = 2;
            channel.BasicPublish(
                "HelloExchange",                    //消息发送目标交换器名称
                "hola",                             //路由键
                msg_pro,                            //消息的发布属性
                Encoding.UTF8.GetBytes(msg_str)    //消息
            );
            channel.TxCommit();//提交事务
        }
        catch(Exception ex)
        {
            channel.TxRollback();//回滚事务
        }
    }
}

四、发送方确认模式

  虽然通过事务和持久化的消息、队列和交换器可以确保消息不会丢失,但是对消息的吞吐量有着非常严重的影响,而且使用消息通信就是为了避免同步,可是事务却会导致生产者程序产生同步。所以,有一个更好的方法保证消息投递:发送方确认模式。和事务类似,我们需要将信道channel设置为confirm模式,而且只能通过重新创建信道来关闭该设置。一旦信道进入confirm模式,所有的信道上发布的消息都会被指派一个唯一的ID。当消息被投递到队列后,信道就会发送一个发送方确认模式给生产者程序,使得生产者知道消息安全到达队列了。

  发送发确认模式最大的好处是它们是异步的,没有回滚的概念,更加轻量级,对性能的影响也几乎忽略不计。

  代码如下:

channel.ConfirmSelect();//开启发送确认模式
//3.发布消息
IBasicProperties msg_pro = channel.CreateBasicProperties();
msg_pro.ContentType = "text/plain";//发布的数据类型
msg_pro.DeliveryMode = 2;
for(int i = 0; i < 5; i++)
{
    string msg_str = string.Format("这是生产者发布的消息{0}", i);
    channel.BasicPublish(
        "HelloExchange",                    //消息发送目标交换器名称
        "hola",                             //路由键
        msg_pro,                            //消息的发布属性
        Encoding.UTF8.GetBytes(msg_str)    //消息
        );
    if (channel.WaitForConfirms())
        Console.WriteLine(i);
    else
        Console.WriteLine("发送失败");
}

  可以看到channel.WaitForConfirms()方法是同步的,这样的话效率会低一点,我们可以发送完所有的消息,然后用channel.WaitForConfirmsOrDie()一次性提交,如果中途有一个消息提交失败或者超时,就会报错Exception,需要全部重新提交。

五、小结

  消息持久化的策略大致就是以上几种,我们可以根据自己的实际需求来选择相应的策略。如果有问题欢迎指出!

原文地址:https://www.cnblogs.com/lonelyxmas/p/10693394.html

时间: 2024-10-07 22:36:13

RabbitMQ(三):消息持久化策略的相关文章

RabbitMQ原理与相关操作(三)消息持久化

现在聊一下RabbitMQ消息持久化: 问题及方案描述 1.当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间.在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了. 这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情. 2.在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的.当RabbitMQ死掉了或者重启了,上次创建的队列.消息都不会保存. 这种情况可以使用RabbitMQ提供的消

rabbitmq 消息持久化

rabbitmq 消息持久化 2016-02-18 11:19 224人阅读 评论(0) 收藏 举报  分类: 综合(15)  版权声明:本文为博主原创文章,未经博主允许不得转载. 二: 任务分发 &消息持久化 启用多个接收端的时候如果某一个receive 关闭要保证消息有反馈是否收到 send端 #-*- coding: UTF-8 -*-import pikacred = pika.PlainCredentials('zxl','pwd') #账号密码params = pika.Connec

rabbitMQ学习笔记(三) 消息确认与公平调度消费者

从本节开始称Sender为生产者 , Recv为消费者   一.消息确认 为了确保消息一定被消费者处理,rabbitMQ提供了消息确认功能,就是在消费者处理完任务之后,就给服务器一个回馈,服务器就会将该消息删除,如果消费者超时不回馈,那么服务器将就将该消息重新发送给其他消费者 默认是开启的,在消费者端通过下面的方式开启消息确认,  首先将autoAck自动确认关闭,等我们的任务执行完成之后,手动的去确认,类似JDBC的autocommit一样 QueueingConsumer consumer

Rabbitmq消息持久化

消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢--消息持久化.?为了保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化. queue的持久化 queue的持久化是通过durable=true来实现的.?一般程序中这么使用: /** * amqp_queue_declare * * @param [in] state connection state – TCP连接 * @param

rabbitmq 消息安全接收与消息持久化

可插拔式:一个插件,安装和写在不影响主程序运行 durable=True 持久,持续地 | 队列持久化 delivery_mode=2 消息持久化 import pika import time credentials = pika.PlainCredentials('alex', 'alex123') connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.14.52',credentials=crede

RabbitMQ(消息队列)集群配置与使用篇

介绍 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过 队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求.其中较为成熟的MQ产品有IBM WEBSPHERE MQ等等. MQ特点 MQ是消费-生产者模型的一个典型

千万PV网站架构之RabbitMQ(消息队列)安装、集群

RabbitMQ介绍: MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过 队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求. RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统.他遵循Mozill

RabbitMQ之消息确认机制(事务+Confirm)

概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎

学习RabbitMQ(三):AMQP事务机制

本文转自:http://m.blog.csdn.net/article/details?id=54315940 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经