RabbitMQ (九) : 消息确认机制之 confirm 机制

confirm 机制分串行和并行两种.

串行

生产者

    public class Producer
    {
        private const string QueueName = "test_confirm_queue";
        public static void Send()
        {
            IConnection connection = ConnectionHelper.GetConnection();
            IModel channel = connection.CreateModel();
            channel.QueueDeclare(QueueName, false, false, false, null);

            //开启confirm机制.注意 : 一个队列如果之前已经设置成了事务机制,那么不能再设置为 confirm 机制.反之亦然
            channel.ConfirmSelect();

            string msg = "hello world ";
            //发送消息
            for (int i = 0; i < 10; i++)
            {
                channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg + i));
            }

            //设置confirm机制的串行机制,如果消息采用的是持久化,那么确认消息会在持久化后发出.        //可以发一批后,调用该方法;也可以每发一条调用一次.        //当然是发一批消息后调用好些!
            if (channel.WaitForConfirms())
            {
                Console.WriteLine("send is success");
            }
            else
            {
                Console.WriteLine("send is failed");
            }

            channel.Close();
            connection.Close();
        }
    }

并行(异步)

生产者

    public class Producer
    {
        private const string QueueName = "test_confirm_queue";

        //这里一定要用 SortedSet
        private static readonly SortedSet<ulong> ConfirmSort = new SortedSet<ulong>();
        public static void Send()
        {
            using (IConnection connection = ConnectionHelper.GetConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.QueueDeclare(QueueName, false, false, false, null);
                    channel.ConfirmSelect();

                    //成功
                    channel.BasicAcks += (s, e) =>
                    {
                        //多条
                        if (e.Multiple)
                        {
                            Console.WriteLine("最后成功的一条是 : " + e.DeliveryTag);
                            ConfirmSort.RemoveWhere(r => r <= e.DeliveryTag);
                        }
                        //单条
                        else
                        {
                            Console.WriteLine(e.DeliveryTag + " 成功发送 ");
                            ConfirmSort.Remove(e.DeliveryTag);
                        }
                    };

                    //失败
                    channel.BasicNacks += (s, e) =>
                    {
                        //多条
                        if (e.Multiple)
                        {
                            Console.WriteLine("最后失败的一条是 : " + e.DeliveryTag);
                        }
                        //单条
                        else
                        {
                            Console.WriteLine(e.DeliveryTag + " 发送失败 ");
                        }
                    };

                    //发送消息
                    string msg = "hello world ";
                    int i = 0;
                    while (true)
                    {
                        ulong seqNo = channel.NextPublishSeqNo;
                        channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg + i));
                        ConfirmSort.Add(seqNo);//把编号加入到集合中
                        i++;
                    }
                }
            }
        }
    }

原文地址:https://www.cnblogs.com/refuge/p/10351218.html

时间: 2024-11-29 12:07:45

RabbitMQ (九) : 消息确认机制之 confirm 机制的相关文章

RabbitMQ之消息确认机制(事务+Confirm)

概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎

RabbitMQ的消息确认机制

一:确认种类 RabbitMQ的消息确认有两种. 一种是消息发送确认.这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递.发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列. 第二种是消费接收确认.这种是确认消费者是否成功消费了队列中的消息. 二:消息发送确认 (1)ConfirmCallback 通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调. 使用该功能需要开启确认,spring-boot中配置如下: spr

RabbitMQ (八) : 消息确认机制之事务机制

实在没啥好说的. 生产者 public class Producer { private const string QueueName = "test_work_queue"; public static void Send() { //获取一个连接 IConnection connection = ConnectionHelper.GetConnection(); //从连接中获取一个通道 IModel channel = connection.CreateModel(); //声明

rabbitMQ基础知识--消息确认机制

一:确认种类 RabbitMQ的消息确认有两种. 一种是消息发送确认.这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递.发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列. 第二种是消费接收确认.这种是确认消费者是否成功消费了队列中的消息. 具体建议参考:https://www.cnblogs.com/nizuimeiabc1/p/9397326.html 这里我们重点研究下接收确认的情况. 为了保证消息从队列可靠的到达消费者,RabbitMQ提供了

rabbitMQ基础知识--消息确认机制之生产者端的确认机制

一:消息确认种类 RabbitMQ的消息确认有两种. 一种是消息发送确认.这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递.发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列. 第二种是消费接收确认.这种是确认消费者是否成功消费了队列中的消息. 具体建议参考:https://www.cnblogs.com/nizuimeiabc1/p/9397326.html 这里我们重点研究下生产者确认的情况. 生产者确认模式实现原理: 生产者将信道设置成conf

RabbitMQ(三):任务分发机制

 在上篇文章中,我们解决了从发送端(Producer)向接收端(Consumer)发送"Hello World"的问题.在实际的应用场景中,这是远远不够的.从本篇文章开始,我们将结合更加实际的应用场景来讲解更多的高级用法. 当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load.试想一下,对于web application来说,在一个很多的HTTP request里是没有时间来处理复杂的运算的,只能通过后

(转)RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

RabbitMQ 消息确认机制

消息确认机制 在之前异常处理部分就已经写了,对于consumer的异常退出导致消息丢失,可以时候consumer的消息确认机制.重复的就不说了,这里说一些不一样的. consumer的消息确认机制 当一个消费者收到一个快递,但是这个包裹是破损的,这时候一般会有以下选择 拒收快递,让快递员把快递寄回. (如果有多个consumer可能这条消息会到其它的consumer中,如果只有一个,那么下次获取还是可以拿到) 签收快递,然后偷偷的扔了(钱多任性) 拒收快递,联系商家再给我补发一个 下面是具体的方