RabbitMQ (十二) 消息确认机制 - 发布者确认

消费者确认解决的问题是确认消息是否被消费者"成功消费".

它有个前提条件,那就是生产者发布的消息已经"成功"发送出去了.

因此还需要一个机制来告诉生产者,你发送的消息真的"成功"发送了.

在标准的AMQP 0-9-1,保证消息不会丢失的唯一方法是使用事务:在通道上开启事务,发布消息,提交事务.但是事务是非常重量级的,它使得RabbitMQ的吞吐量降低250倍.为了解决这个问题,RabbitMQ 引入了 发布者确认(Publisher Confirms) 机制,它是模仿AMQP协议中的消费者消息确认机制.

事务机制

生产者部分代码:

         try
            {
                channel.TxSelect();//开启事务机制
                channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes("hello world"));
                channel.TxCommit();//提交
                Console.WriteLine($"send {msg}");
            }
            catch (Exception e)
            {
                channel.TxRollback();//回滚
                Console.WriteLine(e);
            }

发布者确认

一旦在信道上使用 confirm.select 方法,就认为该信道处于Publisher Confirms模式.事务信道不能进入Publisher Confirms模式,一旦信道处于Publisher Confirms模式,不能开启事务.即事务和Publisher Confirms模式只能二选一.

发布的消息什么时候会被broker确认?

对于不可路由的消息,broker 将在 exchange 验证消息不会路由到任何队列(发回一个空的队列列表)后发出确认;如果消息被设置为"必需消息"发布,即 BasicPublish() 方法的 "mandatory" 入参为true,那么 BasicReturn 事件将在 BasicAcks 事件之前触发.否定确认 BasicNacks 事件也是如此.

对于可路由消息,当所有队列都接受消息时才触发 BasicAcks 事件,对于路由到持久话队列的持久性消息,这意味着持久化到磁盘后才会触发 BasicAcks 事件;对于消息的镜像队列,这意味着所有镜像都已接受该消息后才会触发 BasicAcks 事件.

发布者确认分为同步和异步两种.

一.同步

生产者部分代码

            //开启confirm机制
            channel.ConfirmSelect();
            string msg = "hello world ";
            for (int i = 0; i < 10; i++)
            {
                channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg + i));
            }

        //可以发送一批消息后,调用该方法;也可以每发一条调用一次.      
            if (channel.WaitForConfirms())
            {
                Console.WriteLine("send is success");
            }
            else
            {
                Console.WriteLine("send is failed");          //实际应用中,这里需要添加发送消息失败的处理逻辑.          //如果是发送一批消息,那么只要有一条失败,则所有的消息发送都会失败.
            }

二.异步

生产者部分代码

          channel.ConfirmSelect();  

//肯定确认
                channel.BasicAcks += (s, e) =>
                {
                    //多条
                    if (e.Multiple)
                    {
                        Console.WriteLine("最后成功的一条是 : " + e.DeliveryTag);
                    }
                    //单条
                    else
                    {
                        Console.WriteLine(e.DeliveryTag + " 成功发送 ");
                    }
                };

                //否定确认
                channel.BasicNacks += (s, e) =>
                {
                    //多条
                    if (e.Multiple)
                    {
                        Console.WriteLine("最后失败的一条是 : " + e.DeliveryTag);
                    }
                    //单条
                    else
                    {
                        Console.WriteLine(e.DeliveryTag + " 发送失败 ");
                    }
                };

发布者的否定确认(BasicNacks)

  • 在特殊情况下,当 broker 无法成功处理消息而不是 BasicAck 时,broker 将发送 BasicNack.在这种情况下,BasicNack 的字段与 BasicAck 相对应的字段意义相同,并且 requeue 字段是没有意义的.是否重发消息由发送者自己决定;
  • 将channel设置为发布者确认模式后,所有后续发布的消息都只会被 confirm 一次或者 nack 一次;
  • 没有机制保证消息需要多久被 confirmed;
  • 消息不会同时被confirmed和nack`d;
  • BasicNacks 事件只在负责队列的Erlang进程中发生内部错误时才会触发;

持久化消息的延迟肯定确认

前面说到,

如果是持久化的消息,要等到消息持久化到磁盘后才会触发 BasicAcks 事件;对于消息的镜像队列,要等到所有镜像都已接受该消息后才会触发 BasicAcks 事件.

而为了保证持久化效率, RabbitMQ不是来一条存一条,而是定时批量地持久化消息到磁盘.RabbitMQ 消息存储一段时间(几百毫秒)之后或者当队列空闲时,才会批量写到磁盘.

这意味着在恒定负载下,BasicAck 的延迟可以达到几百毫秒.如果队列支持镜像队列,则延迟时间更大.

所以,为了提高吞吐量,强烈建议应用程序采用异步确认方式,或者发布批量消息后等待确认.

发布者确认的注意事项

在大多数情况下,RabbitMQ将以与发布时相同的顺序向发布者确认消息(这适用于在单个频道上发布的消息).但是,发布者确认是异步发出的,可以确认单个消息或一组消息.发出确认的确切时刻取决于消息的传递模式(持久性与瞬态)以及消息路由到的队列的属性.也就是说,RabbitMQ可能不以消息发布的顺序向发布者发送确认消息.生产者端尽量不要依赖消息确认的顺序处理业务逻辑.

原文地址:https://www.cnblogs.com/refuge/p/10357546.html

时间: 2024-11-03 01:38:51

RabbitMQ (十二) 消息确认机制 - 发布者确认的相关文章

RabbitMQ (十二) 远程过程调用(RPC)

在远程计算机上运行一个函数并等待结果,我们通常叫这种模式为远程过程调用或者RPC. 通过 RabbitMQ 进行 RPC 很容易,客户端发送请求消息,服务器回复响应消息.为了接收响应,我们需要发送带有"回调"队列地址的请求. 同时,这里面涉及到几个比较重要的消息属性: 消息属性 Durable : 将消息标记为持久或者非持久; DeliveryMode:熟悉 AMQP 0-9-1协议的人可以选择使用此属性而不是Persistent,他们控制着同样的事情; ContentType:用于描

Hulu机器学习问题与解答系列 | 十二:注意力机制

几天不见想死你们啦~ 今儿的课题很好玩,跟上队伍一起来读! 今天的内容是 [注意力机制] 场景描述 作为生物体,我们的视觉和听觉会不断地获得带有序列的声音和图像信号,并交由大脑理解:同时我们在说话.打字.开车等过程中,也在不断地输出序列的声音.文字.操作等信号.在互联网公司日常要处理的数据中,也有很多是以序列形式存在的,例如文本.语音.视频.点击流等.因此如何更好的对序列进行建模,一向是研究的要点. 为了解决这些问题,注意力机制(attention mechanism)被引入Seq2Seq模型中

RabbitMQ消息发布和消费的确认机制

前言 新公司项目使用的消息队列是RabbitMQ,之前其实没有在实际项目上用过RabbitMQ,所以对它的了解都谈不上入门.趁着周末休息的时间也猛补习了一波,写了两个窗体应用,一个消息发布端和消息消费端.园子里解释RabbitMQ基础的很多了,这里就不对RabbitMQ的基础再做叙述了,来点实际工作中一定会碰到的问题和解决的方案. RabbitMQ 消息发布确认机制 默认情况下消息发布端执行BasicPublish方法后,消息是否到达指定的队列的结果发布端是未知的.BasicPublish方法的

rabbitMQ基础知识--消息确认机制

一:确认种类 RabbitMQ的消息确认有两种. 一种是消息发送确认.这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递.发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列. 第二种是消费接收确认.这种是确认消费者是否成功消费了队列中的消息. 具体建议参考:https://www.cnblogs.com/nizuimeiabc1/p/9397326.html 这里我们重点研究下接收确认的情况. 为了保证消息从队列可靠的到达消费者,RabbitMQ提供了

RabbitMQ之消息确认机制(事务+Confirm)

概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎

RabbitMQ 消息确认机制

消息确认机制 在之前异常处理部分就已经写了,对于consumer的异常退出导致消息丢失,可以时候consumer的消息确认机制.重复的就不说了,这里说一些不一样的. consumer的消息确认机制 当一个消费者收到一个快递,但是这个包裹是破损的,这时候一般会有以下选择 拒收快递,让快递员把快递寄回. (如果有多个consumer可能这条消息会到其它的consumer中,如果只有一个,那么下次获取还是可以拿到) 签收快递,然后偷偷的扔了(钱多任性) 拒收快递,联系商家再给我补发一个 下面是具体的方

(转)RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

springboot项目整合rabbitMq涉及消息的发送确认,消息的消费确认机制

1.引入maven依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>2.在application.yml的配置: spring: rabbitmq: host: 106.52.82.241 port: 5672 username: yang