RabbitMQ消息发布和消费的确认机制

前言

新公司项目使用的消息队列是RabbitMQ,之前其实没有在实际项目上用过RabbitMQ,所以对它的了解都谈不上入门。趁着周末休息的时间也猛补习了一波,写了两个窗体应用,一个消息发布端和消息消费端。园子里解释RabbitMQ基础的很多了,这里就不对RabbitMQ的基础再做叙述了,来点实际工作中一定会碰到的问题和解决的方案。

RabbitMQ 消息发布确认机制

默认情况下消息发布端执行BasicPublish方法后,消息是否到达指定的队列的结果发布端是未知的。BasicPublish方法的返回值是void。假设我们想对消息进行监控,针对消息发送失败后进行补发则需要一个消息确认机制来帮我们实现。

  • 事务机制
  • Confirm确认机制

上面是已知可通过RabbitMQ自带的特性实现消息确认机制的两种方式。

事务机制

事务机制依赖三个RabbitMQ提供的方法

  • txSelect()
  • txCommit()
  • txRollback()
    看名字大概知道意思了,特别是Commit和Rollback,使用方式和数据库的事务使用几乎一样,txSelect()声明事务的开始,txCommit()提交事务,txRollBack()执行提交失败后的回滚。
    使用代码如下:
        // 采取RabbitMQ事务方式传输消息
        private void SendMessageByTransaction(RabbitMQConnectionDTO mqConnection, string exchangeName, string queueName,
         string routingKey, EnumRabbitExchangeType exchangeType, string message)
        {
            try
            {
                ConnectionFactory rabbitMqFactory = new ConnectionFactory()
                {
                    HostName = mqConnection.HostName,
                    UserName = mqConnection.UserName,
                    Password = mqConnection.Password,
                    Port = mqConnection.Port
                };
                using (IConnection conn = rabbitMqFactory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType.ToString(), durable: true, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queueName, durable: false, autoDelete: false, exclusive: false, arguments: null);
                        // 必须执行QueueBind 需要将routingKey与队列和交换机进行绑定 否则就算事务提交了队列也不会有数据~
                        channel.QueueBind(queueName, exchangeName, routingKey);
                        byte[] messagebuffer = Encoding.UTF8.GetBytes(message);
                        try
                        {
                            channel.TxSelect();
                            channel.BasicPublish(exchangeName, routingKey, null, messagebuffer);
                            //if (1 == 1) throw new Exception("没错!我是故意抛出异常的!看看最终队列是否写入了消息~");
                            channel.TxCommit();
                        }
                        catch (Exception ex)
                        {
                            Rtx_Receive.Text = Rtx_Receive.Text + $"\r 异常产生时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")},异常信息:{ex.Message}";
                            channel.TxRollback();
                            // TODO 进行补发OR其他逻辑处理
                        }
                        Rtx_Receive.Text = Rtx_Receive.Text + $"\r 发送时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}";
                    }
                }
            }
            catch (Exception ex)
            {
                MessageBox.Show($"发送消息失败!{ex.Message}");
            }
        }

  

需要注意的是 这里的事务其实也只能保证在执行BasicPublish方法后且TxCommit方法执行前但凡出现异常则回滚!
上面是什么意思呢?意思就是我只管消息发送到队列里,且在我定义的事务内没有出现异常,出现了异常则将发布的数据给撤销!
但是,如果事务也提交了,但是消息还是有可能不会送达队列里去

比如,我将上面的代码改下

        // 采取RabbitMQ事务方式传输消息
        private void SendMessageByTransaction(RabbitMQConnectionDTO mqConnection, string exchangeName, string queueName,
         string routingKey, EnumRabbitExchangeType exchangeType, string message)
        {
            try
            {
                ConnectionFactory rabbitMqFactory = new ConnectionFactory()
                {
                    HostName = mqConnection.HostName,
                    UserName = mqConnection.UserName,
                    Password = mqConnection.Password,
                    Port = mqConnection.Port
                };
                using (IConnection conn = rabbitMqFactory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType.ToString(), durable: true, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queueName, durable: false, autoDelete: false, exclusive: false, arguments: null);
                        // 必须执行QueueBind 需要将routingKey与队列和交换机进行绑定 否则就算事务提交了队列也不会有数据~
                       //  channel.QueueBind(queueName, exchangeName, routingKey);
                        byte[] messagebuffer = Encoding.UTF8.GetBytes(message);
                        try
                        {
                            channel.TxSelect();
                            channel.BasicPublish(exchangeName, routingKey, null, messagebuffer);
                            //if (1 == 1) throw new Exception("没错!我是故意抛出异常的!看看最终队列是否写入了消息~");
                            channel.TxCommit();
                        }
                        catch (Exception ex)
                        {
                            Rtx_Receive.Text = Rtx_Receive.Text + $"\r 异常产生时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")},异常信息:{ex.Message}";
                            channel.TxRollback();
                            // TODO 进行补发OR其他逻辑处理
                        }
                        Rtx_Receive.Text = Rtx_Receive.Text + $"\r 发送时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}";
                    }
                }
            }
            catch (Exception ex)
            {
                MessageBox.Show($"发送消息失败!{ex.Message}");
            }
        }

  上面代码我将`channel.QueueBind(queueName, exchangeName, routingKey);` 这一行代码注释掉,不将routingKey进行绑定,然后在RabbitMQ管理页面将队列、交换机删除。如下图

再执行代码,发现队列是创建了,交换器也创建了,但是队列里没有数据!

当然,问题出在没有将队列和交换器以及routingKey进行绑定,我们的消息没有进入到队列的路由,最终导致了消息进入了所谓的“黑洞”。

所以上面的队列不是说能完全保证只要执行了TxRollback()我们的消息队列就一定会有数据!!!

Confirm确认机制

Confirm确认机制也很容易理解,它要求消息生产端(Producer)对消息发送后RabbitMQ服务端返回一个已接收的指令,Producer收到该指令则认为该消息已经发送成功。同时消费端(Consumer)也有同样的机制,在从RabbitMQ服务端接收到消息后,需要返回一个已处理的指令给服务端,服务端收到后则会认为该消息已被消费。

下面是采取Confirm确认机制后的发布消息代码

        // 采取确认机制方式传输消息
        private void SendMessageByAck(RabbitMQConnectionDTO mqConnection, string exchangeName, string queueName,
         string routingKey, EnumRabbitExchangeType exchangeType, string message)
        {
            try
            {
                ConnectionFactory rabbitMqFactory = new ConnectionFactory()
                {
                    HostName = mqConnection.HostName,
                    UserName = mqConnection.UserName,
                    Password = mqConnection.Password,
                    Port = mqConnection.Port
                };
                using (IConnection conn = rabbitMqFactory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType.ToString(), durable: false, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queueName, durable: false, autoDelete: false, exclusive: false, arguments: null);
                        // 必须执行QueueBind 需要将routingKey与队列和交换机进行绑定 否则就算事务提交了队列也不会有数据~
                        channel.QueueBind(queueName, exchangeName, routingKey);
                        byte[] messagebuffer = Encoding.UTF8.GetBytes(message);
                        channel.ConfirmSelect(); // 启用服务器确认机制方式
                        channel.BasicPublish(exchangeName, routingKey, mandatory: true, null, messagebuffer);
                        if (channel.WaitForConfirms())
                        {
                            Rtx_Receive.Text = $"\r 消息发送成功! 发送时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}";
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                MessageBox.Show($"发送消息失败!{ex.Message}");
            }
        }

  

关键代码在于执行BasicPublish之前调用channel.ConfirmSelect()启用服务器确认,然后在发布后通过调用 WaitForConfirms()得到消息发布结果,
true则表示消息已发布到了队列里。 OK,现在试下,到服务器上删除掉队列、交换机信息。然后代码去掉绑定交换机和路由键后试下,看看是否和事务方式一样无法确认消息是否真正抵达队列。

删除队列和交换机,接下来更改代码,直接把上面的 channel.QueueBind(queueName, exchangeName, routingKey);注释掉,然后执行下,看看channel.WaitForConfirms()返回true还是false~
哈哈哈~执行结果是true 但是我们队列是不会有消息进来的,所以确认机制和事务机制对消息的发布是否抵达队列监控是一样的,没有说哪一种方式能绝对保证消息抵达了队列

针对消息提交到了指定交换机但是最终没有写入到队列的消息如何追踪

我们有一种方式可以捕获发布了消息但是该消息最终没有写入到队列的情况,我们需要注册IModel的BasicReturn事件,更新后的代码如下:

       // 采取确认机制方式传输消息
        private void SendMessageByAck(RabbitMQConnectionDTO mqConnection, string exchangeName, string queueName,
         string routingKey, EnumRabbitExchangeType exchangeType, string message)
        {
            try
            {
                ConnectionFactory rabbitMqFactory = new ConnectionFactory()
                {
                    HostName = mqConnection.HostName,
                    UserName = mqConnection.UserName,
                    Password = mqConnection.Password,
                    Port = mqConnection.Port
                };
                using (IConnection conn = rabbitMqFactory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType.ToString(), durable: false, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queueName, durable: false, autoDelete: false, exclusive: false, arguments: null);
                        // 必须执行QueueBind 需要将routingKey与队列和交换机进行绑定 否则就算事务提交了队列也不会有数据~
                        channel.QueueBind(queueName, exchangeName, routingKey);
                        byte[] messagebuffer = Encoding.UTF8.GetBytes(message);
                        channel.ConfirmSelect(); // 启用服务器确认机制方式
                        channel.BasicReturn += Channel_BasicReturn;
                        //mandatory为true表示交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ 会调用Basic.Return 命令将消息返回给生产者
                        channel.BasicPublish(exchangeName, routingKey, mandatory: true, null, messagebuffer);
                        if (channel.WaitForConfirms())
                        {
                            Rtx_Receive.Text = $"\r 消息发送成功! 发送时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}";
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                MessageBox.Show($"发送消息失败!{ex.Message}");
            }
        }

        /// <summary>
        /// 当消息发送不到队列时候触发
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void Channel_BasicReturn(object sender, RabbitMQ.Client.Events.BasicReturnEventArgs e)
        {
            BeginInvoke(new Action(() => { Rtx_Receive.Text = $"\r 消息发送失败!"; }));
        }  

消息发布确认机制结论

1、事务机制与确认机制都无法百分之百确认消息是否写入到了缓存,可以理解为两者都只能确认发布的动作是否成功~但是消息有无进入队列是无法给予客户端准确结果;
2、两者性能比较而言事务的性能损耗更大;
3、注册IModel的BasicReturn事件可以追踪到没有写入到队列的消息

RabbitMQ 消息消费确认机制

上面已经知道了如何对消息的发布进行确认,那么消费数据时候我们肯定也想在消费完成后确认该消息已经处理,希望队列对其进行删除。
而不是在我们的消费端程序未将消息处理后,队列就将其删除了。

在此之前说下RabbitMQ消费者对象的两种实现方式

  • 继承DefaultBasicConsumer类
  • 实例化EventingBasicConsumer对象

继承DefaultBasicConsumer方式

DefaultBasicConsumer是RabbitMQ.Client提供的一个消费者基类,该类实现了IBasicConsumer接口。
继承DefaultBasicConsumer类后可重写基类的部分方法来实现消息获取以及当前消费者各个状态变更的事件,本文的示例代码即采用这种方式实现消费者对象。

实例化EventingBasicConsumer对象

这种方式采取注册事件的方式接受消息发布者推送到队列的消息,代码如下:

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => {
     string message = Encoding.UTF8.GetString(ea.Body);
     Console.WriteLine($"接受到消息为:{message}");
 };  

消费端确认消息已处理有两种方式

  • 自动确认
  • 手动确认

自动确认

自动确认我理解的就是服务端认为你接受到消息后即确认了,但是如果当你拿到消息后,依赖此消息的业务逻辑未处理完毕,但是却中途异常了的话,此消息也会消失掉!所以建议消费端采取手动确认!

手动确认

手动确认可以完美解决上面自动确认出现的问题,但是它也意味着我们开发者需要对确认的流程进行一个完整的闭环。即所有的消息在消费端获取到后必须有一个明确的结果返回给服务端(Broker)。 我们对消息的处理结果要么是确认处理,要么是拒绝该消息(返回给Broker,再分发给其他消费者)。如果我们没有对消息接收后进行任何反馈的话该条消息在队列的状态会变成Unacked 直到我们消费端AMQP连接中断后该消息状态又会变成Ready。状态为Unacked的消息会导致所有消费者都无法对该消息进行二次消费(包含当前消费者),所以此类消息越多则占用的内存资源也会越多。当消息变回Ready也会很烦人,因为我们已经对该消息进行过一次处理了,如若我们没有对消息进行校验则又会执行一遍。 所以手动确认必须执行回执!!!!!!

下面是手动确认消息的代码:

        private void ReceiveMessage(RabbitMQConnectionDTO connectionDTO, string exchangeName, string queueName, string routtingKey)
        {
            try
            {
                var factory = new ConnectionFactory
                {
                    HostName = connectionDTO.HostName,
                    Password = connectionDTO.Password,
                    UserName = connectionDTO.UserName,
                    Port = connectionDTO.Port,
                };

                UseDefaultBasicConsumerType(factory, queueName);
                //DirectAcceptExchangeEvent(factory, exchangeName, queueName, routtingKey);
            }
            catch (Exception ex)
            {
                Rtx_SendContext.Text = $"出现异常:{ex.Message}";
            }
        }

        private void UseDefaultBasicConsumerType(ConnectionFactory factory, string queueName)
        {
            var connection = factory.CreateConnection();
            _channel = connection.CreateModel();
            // accept only one unack-ed message at a time
            // uint prefetchSize, ushort prefetchCount, bool global
            _channel.BasicQos(0, 1, false);

            //定义一个继承了DefaultBasicConsumer类的消费类(DefaultBasicConsumer是继承了IBasicConsumer接口的一个基类,里面存在许多可重写的方法)
            MessageReceiver messageReceiver = new MessageReceiver(_channel, (string msg, ulong deliveryTag) =>
            {
                string key = Txt_Key.Text.Trim();
                string keyNoReturn = Txt_KeyNoReturn.Text.Trim();
                bool isExecFlag = false;
                if (!string.IsNullOrWhiteSpace(key) && msg.StartsWith(key)) // 这里要小心 如果只有当前1个消费者那你懂的~~~~~~
                    _channel.BasicReject(deliveryTag, requeue: true); //requeue表示消息被拒绝后是否重新放回queue中
                else if (!string.IsNullOrWhiteSpace(keyNoReturn) && msg.StartsWith(keyNoReturn))
                    _channel.BasicReject(deliveryTag, requeue: false); //requeue表示消息被拒绝后是否重新放回queue中
                else
                {
                    _channel.BasicAck(deliveryTag, multiple: false); //确认已处理消息 multiple表示是否确认多条
                    isExecFlag = true;
                }
                BeginInvoke(new Action(() => { Rtx_SendContext.Text = Rtx_SendContext.Text + "\r" + $"处理标识{isExecFlag.ToString()} " + string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msg); }));
            });
            _channel.BasicConsume(queueName, false, messageReceiver); //不开启自动确认回执
        }

这里的消费者对象通过继承DefaultBasicConsumer对象而实现

代码如下:

    public class MessageReceiver : DefaultBasicConsumer
    {
        private readonly Logger _logger;
        private readonly Action<string, ulong> _action;
        public MessageReceiver(IModel channel, Action<string, ulong> action)
        {
            _action = action;
            _logger = LogManager.GetCurrentClassLogger();
        }

        public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body)
        {
            string msg = Encoding.UTF8.GetString(body);
            _logger.Debug($"***************************Consuming Topic Message  时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}*********************************");
            _logger.Debug(string.Concat("Message received from the exchange ", exchange));
            _logger.Debug(string.Concat("Consumer tag: ", consumerTag));
            _logger.Debug(string.Concat("Delivery tag: ", deliveryTag));
            _logger.Debug(string.Concat("Routing tag: ", routingKey));
            _logger.Debug(string.Concat("Message: ", msg));
            _action?.Invoke(msg, deliveryTag);
        }

        /// <summary>
        /// 捕获通道连接的关闭事件
        /// </summary>
        /// <param name="model"></param>
        /// <param name="reason"></param>
        public override void HandleModelShutdown(object model, ShutdownEventArgs reason)
        {
            _logger.Debug($"进入MessageReceiver.HandleModelShutdown方法");
            base.HandleModelShutdown(model, reason);
        }

        public override void HandleBasicConsumeOk(string consumerTag)
        {
            _logger.Debug($"进入MessageReceiver.HandleBasicConsumeOk方法 consumerTag:{consumerTag}");
            base.HandleBasicConsumeOk(consumerTag);
        }

        /// <summary>
        ///  删除队列 会进入
        /// </summary>
        /// <param name="consumerTag"></param>
        public override void HandleBasicCancel(string consumerTag)
        {
            _logger.Debug($"进入MessageReceiver.HandleBasicCancel方法 consumerTag:{consumerTag}");
            base.HandleBasicCancel(consumerTag);
        }
    }

上面有引用到的_logger和_action先不用管,重点是_channel.BasicReject方法和_channel.BasicAck方法。他们分别是代表拒绝消费和确认消费。

上面的代码是消费者消费数据后给予了Broker明确的回执,我们试下将回执代码注释掉后看下队列里的消息变成什么样子了。 先删除掉交换器和队列,然后再发布数据,看看消费数据后不回执的消息状态~
这是发布消息到队列后,Ready状态的消息为1条

接下来,我们去消费数据但是不进行回执确认,看看结果如何

如上图,还是那条数据,状态从Ready变成了Unacked,这时候是因为我的消费端应用还没关闭,AMQP的链接也还在。我们到任务管理器内将消费应用关闭~

关闭后又变成了Ready, 意味着我们再次开启消费端程序又可以从队列获取到之前的消息了~
我们将上面的回执代码部分注释取消,看看回执成功后队列内的消息状态是什么样?

可以看到回执确认后,我们的消息就从队列里移除了~

消息消费确认机制结论

1、自动确认虽然省代码但是可能会出现消息丢失业务未处理完毕的情况;
2、手动确认消息则是在获取到消息后,在没有返回回执前,消息会一致存储在队列

本文对应的代码已上传至Github,地址:https://github.com/QQ897878763/RabbitMQ_Sample

程序的运行截图如下:

原文地址:https://www.cnblogs.com/hunanzp/p/RabbitMQ.html

时间: 2024-07-30 10:09:31

RabbitMQ消息发布和消费的确认机制的相关文章

springboot项目整合rabbitMq涉及消息的发送确认,消息的消费确认机制

1.引入maven依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>2.在application.yml的配置: spring: rabbitmq: host: 106.52.82.241 port: 5672 username: yang

RabbitMQ之消息确认机制(事务+Confirm)

概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎

RabbitMQ (十二) 消息确认机制 - 发布者确认

消费者确认解决的问题是确认消息是否被消费者"成功消费". 它有个前提条件,那就是生产者发布的消息已经"成功"发送出去了. 因此还需要一个机制来告诉生产者,你发送的消息真的"成功"发送了. 在标准的AMQP 0-9-1,保证消息不会丢失的唯一方法是使用事务:在通道上开启事务,发布消息,提交事务.但是事务是非常重量级的,它使得RabbitMQ的吞吐量降低250倍.为了解决这个问题,RabbitMQ 引入了 发布者确认(Publisher Confir

RabbitMQ 消息确认机制

消息确认机制 在之前异常处理部分就已经写了,对于consumer的异常退出导致消息丢失,可以时候consumer的消息确认机制.重复的就不说了,这里说一些不一样的. consumer的消息确认机制 当一个消费者收到一个快递,但是这个包裹是破损的,这时候一般会有以下选择 拒收快递,让快递员把快递寄回. (如果有多个consumer可能这条消息会到其它的consumer中,如果只有一个,那么下次获取还是可以拿到) 签收快递,然后偷偷的扔了(钱多任性) 拒收快递,联系商家再给我补发一个 下面是具体的方

rabbitMQ基础知识--消息确认机制

一:确认种类 RabbitMQ的消息确认有两种. 一种是消息发送确认.这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递.发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列. 第二种是消费接收确认.这种是确认消费者是否成功消费了队列中的消息. 具体建议参考:https://www.cnblogs.com/nizuimeiabc1/p/9397326.html 这里我们重点研究下接收确认的情况. 为了保证消息从队列可靠的到达消费者,RabbitMQ提供了

rabbitMQ基础知识--消息确认机制之生产者端的确认机制

一:消息确认种类 RabbitMQ的消息确认有两种. 一种是消息发送确认.这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递.发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列. 第二种是消费接收确认.这种是确认消费者是否成功消费了队列中的消息. 具体建议参考:https://www.cnblogs.com/nizuimeiabc1/p/9397326.html 这里我们重点研究下生产者确认的情况. 生产者确认模式实现原理: 生产者将信道设置成conf

RabbitMQ消息确认机制—消息发送确认和 消息接收确认

/** * RabbitMQ消息确认机制 * 关于rabbit的生产和消费方的一些实用的操作: * producer的confirm和consumer的ack,这两者使用的模式都是用来保证数据完整性,防止数据丢失 */ /** * producer的confirm模式 * 业务场景描述: * 促销系统在做活动前,需要给用户的手机发送一条活动内容短信希望用户来参加, * 因为用户量有点大,所以通过往短信mq中插入数据方式,让短信服务来消费mq发短信: * 此时插入mq消息的服务为了保证给所有用户发

(转)RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message