RabbitMQ 连接断开处理-自动恢复

Rabbitmq 官方给的NET consumer示例代码如下,但使用过程,会遇到connection断开的问题,一旦断开,这个代码就会报错,如果你的消费者端是这样的代码的话,就会导致消费者挂掉。

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class ReceiveLogs
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare("logs", "fanout");

                var queueName = channel.QueueDeclare().QueueName;

                channel.QueueBind(queueName, "logs", "");
                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queueName, true, consumer);

                Console.WriteLine(" [*] Waiting for logs." +
                                  "To exit press CTRL+C");
                while (true)
                {
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] {0}", message);
                }
            }
        }
    }
}

那么如何会异常恢复呢?

之前我的操作方式是,建立一个ConnectionPool,在出现异常后,重建channel,也就是说,整个的异常恢复过程是自己处理的。最近研究因为研究Orleans,担心RabbitMQ的NET client使用Task时,会遇到Orleans的坑,所以顺手研究了下RabbitMQ NET Client的源码,研究发现一种自动的错误恢复机制 AutomaticRecoveryEnabled = true 使用方式如下

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class ReceiveLogs
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost", AutomaticRecoveryEnabled = true };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare("logs", "fanout");

                var queueName = channel.QueueDeclare().QueueName;

                channel.QueueBind(queueName, "logs", "");
                var consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queueName, true, consumer);

                Console.WriteLine(" [*] Waiting for logs." +
                                  "To exit press CTRL+C");
                while (true)
                {
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] {0}", message);
                }
            }
        }
    }
}

具体的恢复机制如下

1.在AutoRecoveringConnection初始化时,在链接关闭事件委托上增加断开处理

public void init()
        {
            m_delegate = new Connection(m_factory, false, m_factory.CreateFrameHandler());

            AutorecoveringConnection self = this;
            EventHandler<ShutdownEventArgs> recoveryListener = (_, args) =>
            {
                lock (recoveryLockTarget)
                {
                    if (ShouldTriggerConnectionRecovery(args))
                    {
                        try
                        {
                            self.BeginAutomaticRecovery();
                        }
                        catch (Exception e)
                        {
                            // TODO: logging
                            Console.WriteLine("BeginAutomaticRecovery() failed: {0}", e);
                        }
                    }
                }
            };
            lock (m_eventLock)
            {
                ConnectionShutdown += recoveryListener;
                if (!m_recordedShutdownEventHandlers.Contains(recoveryListener))
                {
                    m_recordedShutdownEventHandlers.Add(recoveryListener);
                }
            }
        }

观察调用的方式BeginAutomaticRecovery,可以看到这个方法内部调用了PerformAutomaticRecovery方法。我们直接看这个方法的内容,其中第一个调用的是方法RecoverConnectionDelegate

protected void PerformAutomaticRecovery()
        {
            lock (recoveryLockTarget)
            {
                RecoverConnectionDelegate();
                RecoverConnectionShutdownHandlers();
                RecoverConnectionBlockedHandlers();
                RecoverConnectionUnblockedHandlers();

                RecoverModels();
                if (m_factory.TopologyRecoveryEnabled)
                {
                    RecoverEntities();
                    RecoverConsumers();
                }

                RunRecoveryEventHandlers();
            }
        }

这个方法中调用的是

protected void RecoverConnectionDelegate()
        {
            bool recovering = true;
            while (recovering)
            {
                try
                {
                    m_delegate = new Connection(m_factory, false, m_factory.CreateFrameHandler());
                    recovering = false;
                }
                catch (Exception)
                {
                    // TODO: exponential back-off
                    Thread.Sleep(m_factory.NetworkRecoveryInterval);
                    // TODO: provide a way to handle these exceptions
                }
            }
        }

可以看出,它是执行了死循环,直到连接重新打开,当然,如果遇到异常,它会调用Thread.Sleep来等待一下,然后再次执行连接恢复。

时间: 2024-08-07 00:16:06

RabbitMQ 连接断开处理-自动恢复的相关文章

htc M8 无法自动恢复数据连接(4g)的问题解决

情况如下:htc m8 tdd-lte的双待手机,4g.2g同时在线. 本月出现,在短时间没有信号的情况后,无法恢复数据连接,哪怕是edge,更不论4g了. 尝试各种方法无解.最后咨询10086解决此问题. 这个应该不是手机的问题,貌似是移动端的2g edge数据流量被关闭/错误设置导致的.   我感觉像是2g-3g-4g之间切换需要依次回落或者升级.因为双待,4g信号开机是有的,可以上网,但是电梯等4g消失的情况发生后,需要借助于2g/3g的数据连接再恢复到4g,没有2g/3g的数据连接就恢复

数据库会自动清除掉超时的空闲连接造成中间件连接池中连接断开的问题

所有的数据库都会自动清除掉超时的空闲连接,因为数据库本身是一个SOCKET服务器,它必须要定时清除掉僵死连接,来保持其长时间稳定运行. 数据库清除空闲连接以后,中间件连接池里面con.connected还是等于true,也就是说在中间件里面是无法判断连接池中的连接是否已经被数据库给清除了. 事实上中间件连接池中的所有连接必须保持24小时的连接是通的.那么如何解决这个矛盾呢? 答案是在连接池中设置定时器,定时检查池中的每一个连接,当池中的空闲连接已经超过了半小时,就自动将此连接断开并重连. {**

记一次自动恢复的支付故障

故障描述 作为一个老牌OTA公司,公司早些年订单主要来源是PC网站和呼叫中心.我在入职公司大约半年后,遇到一次非常诡异的故障.有一天早上,大概也是这个季节,阳光明媚,程序猿刚起床,洗洗涮涮,准备去迎接初恋般的工作日,却突然收到一大堆报警,线上消息队列大量积压:当然,我还是一如既往的非常勤奋地在9点之前就到公司的:但是作为一名新员工,环视四周,组内其他员工都还没到公司,运维也都在路上,故障就这样突然降临了.我赶紧开机登录堡垒机,连接线上机器,tail 错误日志.但是线上10几个系统,我看了好几个系

zookeeper 大量连接断开重连原因排查

问题现象 最后发现线上的zookeeper的日志zookeeper.out 文件居然有6G,后来设置下日志为滚动输出,参考: http://blog.csdn.net/hengyunabc/article/details/19006911 但是改了之后,发现一天的日志量就是100多M,滚动日志一天就被冲掉了,这个不科学. 再仔细查看下日志里的内容,发现有很多连接建立好,马上又断开: 2014-11-24 15:38:33,348 [myid:3] - INFO [NIOServerCxn.Fac

[办公自动化]计算机突然死机后asd自动恢复文档未能恢复,如何使用

今天计算机突然死机,但是word未能提示自动恢复窗格.所以无法自动恢复word文档.但是在文档所在的文件夹看到了一个“自动恢复”开头的asd恢复文档. 该如何使用这个文档呢? 安装以前的惯例,尝试了如下方法: 1)直接双击无法打开. 2)修改后缀为docx也无法使用. 最后经查阅word帮助文档,可以在word中,单击“打开”文档界面,直接选中打开自动恢复文档“asd”文件,另存为docx文档,就一切恢复正常了. [读书时间] 1.Excel Home出版的系列书籍 2.刘万祥<Excel图表之

新一代-亚马逊自动恢复EC2系统

构造高可用性和高可靠性系统的一项重要原则是假定失效(Design forfailure).换言之,你的设计模型应具有正如亚马逊的首席技术官(CTO)沃纳?威格尔(Werner Vogels)曾说的"一切事物随时有可能失效"的特性.幸运的是,现代数据中心.网络和服务器具有高可靠性,且很少发生故障.然而,若你把偶尔的故障当成是既定的,并简单建立一个在发生故障后能恢复且保持运行的系统,则你能建立一个强大的系统. 新一代自动恢复 今天我想告诉你一个新的EC2功能,当某项EC2实例遭受损害时,该

socket 如何判断远端服务器的连接状态?连接断开,需重连

fluent-logger-java is a Java library, to record events via Fluentd, from Java application. https://github.com/fluent/fluent-logger-java 使用该sdk过程发现,tcp连接断开之后,该sdk的重连机制无效. 2018-01-26 12:36:25,620 ERROR [org.fluentd.logger.sender.RawSocketSender] - <org

Socket:读写处理及连接断开的检测

作为进程间通信及网络通信的一种重要技术,在实际的开发中,socket编程是经常被用到的.关于socket编程的一般步骤,这里不再赘述,相关资料和文章很多,google/baidu即可. 本文主要是探讨如何更好地进行socket读写处理,以及如何检测连接断开. 首先,有以下几点需要注意: 对于全双工的socket,同时读写是没问题的.比如,一个socket程序有两个线程,一个线程对socket进行读操作(recv/read),一个线程对socket进行写操作(send/write),这里是不需要进

SCOM 2012 R2_自动恢复服务脚本

最近做了几个SCOM的项目,也碰少不少情况,以后会陆续将一些项目的部署实施发布出来.今天就简单介绍如何在SCOM服务器监控Windows服务的同时,当SCOM检测到Agent客户端的服务Down掉的时候,Agent会自动恢复服务,而减少故障的滞后性,实现IT管理的自动化的过程. 1. 在SCOM控制面板,点击"创作-管理包对象-监视器": 2. 右击监视器,选择"新建监视器-单元监视器": 3. 在监视器类型,点击"windows服务-基本服务监控器&qu