rabbitmq之消息重入队列

说起消息重入队列还得从队列注册消费者说起,客户端在向队列注册消费者之后,创建的channel也会被主队列进程monitor,当channel挂掉后,主队列进程(rabbit_amqqueue_process)收到‘DOWN‘通知,将未ack的消息重入队列,并根据消息的deliver tag,也就是消费入队列的顺序,将消息重入队列中

主要代码如下:

1.注册消费者

handle_method(#‘basic.consume‘{queue        = QueueNameBin,
                               consumer_tag = ConsumerTag,
                               no_local     = _, % FIXME: implement
                               no_ack       = NoAck,
                               exclusive    = ExclusiveConsume,
                               nowait       = NoWait,
                               arguments    = Args},
              _, State = #ch{consumer_prefetch = ConsumerPrefetch,
                             consumer_mapping  = ConsumerMapping}) ->
    case dict:find(ConsumerTag, ConsumerMapping) of
        error ->
            QueueName = qbin_to_resource(QueueNameBin, State),
            check_read_permitted(QueueName, State),
            ActualConsumerTag =
                case ConsumerTag of
                    <<>>  -> rabbit_guid:binary(rabbit_guid:gen_secure(),
                                                "amq.ctag");
                    Other -> Other
                end,
            case basic_consume(
                   QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
                   ExclusiveConsume, Args, NoWait, State) of
                {ok, State1} ->
                    {noreply, State1};
                {error, exclusive_consume_unavailable} ->
                    rabbit_misc:protocol_error(
                      access_refused, "~s in exclusive use",
                      [rabbit_misc:rs(QueueName)])
            end;
        {ok, _} ->
            %% Attempted reuse of consumer tag.
            rabbit_misc:protocol_error(
              not_allowed, "attempt to reuse consumer tag ‘~s‘", [ConsumerTag])
    end;

2.主队列进程增加消费者,并对channel进程监控

handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
             PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg},
            _From, State = #q{consumers          = Consumers,
                              exclusive_consumer = Holder}) ->
    case check_exclusive_access(Holder, ExclusiveConsume, State) of
        in_use -> reply({error, exclusive_consume_unavailable}, State);
        ok     -> Consumers1 = rabbit_queue_consumers:add(
                                 ChPid, ConsumerTag, NoAck,
                                 LimiterPid, LimiterActive,
                                 PrefetchCount, Args, is_empty(State),
                                 Consumers),
    end;
ch_record(ChPid, LimiterPid) ->
    Key = {ch, ChPid},
    case get(Key) of
        undefined -> MonitorRef = erlang:monitor(process, ChPid),
                     Limiter = rabbit_limiter:client(LimiterPid),
                     C = #cr{ch_pid               = ChPid,
                             monitor_ref          = MonitorRef,
                             acktags              = queue:new(),
                             consumer_count       = 0,
                             blocked_consumers    = priority_queue:new(),
                             limiter              = Limiter,
                             unsent_message_count = 0},
                     put(Key, C),
                     C;
        C = #cr{} -> C
    end.

3.主队列进程收到channel ‘DOWN‘的消息后,删除消费者,获取此被此channel ack的消息

handle_info({‘DOWN‘, _MonitorRef, process, DownPid, _Reason}, State) ->
    case handle_ch_down(DownPid, State) of
        {ok, State1}   -> noreply(State1);
        {stop, State1} -> stop(State1)
    end;
handle_ch_down(DownPid, State = #q{consumers          = Consumers,
                                   exclusive_consumer = Holder,
                                   senders            = Senders}) ->
    State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of
                                   false -> Senders;
                                   true  -> credit_flow:peer_down(DownPid),
                                            pmon:demonitor(DownPid, Senders)
                               end},
    case rabbit_queue_consumers:erase_ch(DownPid, Consumers) of
        not_found ->
            {ok, State1};
        {ChAckTags, ChCTags, Consumers1} ->

            case should_auto_delete(State2) of
                true  -> {stop, State2};
                false -> {ok, requeue_and_run(ChAckTags,
                                              ensure_expiry_timer(State2))}
            end
    end.

4.涉及重入队列时,需要了解backing queue,即消息是如何在镜像队列之间内部以及消息如何在本地内存和磁盘资源之间按需切换,或此部分涉及内容较多,后序会专门列出一个专题来分析此实现。rabbit_amqqueue_process主队列进程的backing_queue是rabbit_mirror_queue_master(镜像队列消息同步),后者因为需要将消息按需放置,所以也有backing_queueu,即rabbit_variable_queue。

根据sequeue_id来判断消息在队列中的位置,从当前队列中pop出队头的消息(最早入队列的消息),若未ack的消息较晚(seqid相对大),则将pop队头的队列再与未ack的消息比较,将消息pop出的消息放置在front队列中,直到符合,插入队列,并将front队列与此队列合入。

queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
    queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds,
                Limit, PubFun, State).

queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
            Limit, PubFun, State)
  when Limit == undefined orelse SeqId < Limit ->
    case ?QUEUE:out(Q) of
        {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
          when SeqIdQ < SeqId ->
            %% enqueue from the remaining queue
            queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds,
                        Limit, PubFun, State);
        {_, _Q1} ->
            %% enqueue from the remaining list of sequence ids
            {MsgStatus, State1} = msg_from_pending_ack(SeqId, State),
            {#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
                PubFun(MsgStatus, State1),
            queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
                        Limit, PubFun, State2)
    end;
queue_merge(SeqIds, Q, Front, MsgIds,
            _Limit, _PubFun, State) ->
    {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}.
时间: 2024-08-15 09:50:34

rabbitmq之消息重入队列的相关文章

WinExec可能会引起消息重入

WinExec不仅会造成延迟,并且还会引起消息的重入. 以下是调用堆栈: WinvoiceCC.exe!CWinvoiceCCDlg::OnMsgHttpReq(unsigned int wParam=38434384, long lParam=0)  行624 C++ mfc90d.dll!CWnd::OnWndMsg(unsigned int message=2564, unsigned int wParam=38434384, long lParam=0, long * pResult=0

RabbitMQ (消息队列)专题学习02 Hello World

一.概述 RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱.邮局.投递员功能综合体,整个过程就是:邮箱接收信件,邮局转发信件,投递员投递信件到达收件人处. RabbitMQ和邮局的主要区别就是RabbitMQ接收.存储和发送的是二进制数据----消息. 在专题学习一中我们已经简单提到了一些概念,在此我们更为深入的学习下RabbitMQ相关的专有名词. 1.生产

RabbitMq+Spring boot 消息生产者向队列发送消息 (一)

本人学习新框架方法. 一.先学习框架基本知识,也就是看这本书的前三章,了解基本概念.比如这个Rabbitmq,我会先看一些概念,比如,交换机,路由器,队列,虚拟机. 二.然后写代码,写demo,有哪些不懂的地方直接再去翻书或者google找资料,带着问题去学习,学的更快更扎实一些. 三.然后再看这个框架的应用场景,自己能否独立的写一些简单的项目,来验证自己的成果. 四.实际项目积累经验. RabbitMq 消息生产者向队列发送消息 (一) MQ分为消息生产者和消息消费者,这次做的主要是消息的生产

C#调用RabbitMQ实现消息队列

我在刚接触使用中间件的时候,发现,中间件的使用并不是最难的,反而是中间件的下载,安装,配置才是最难的. 所以,这篇文章我们从头开始学习RabbitMq,真正的从头开始. 关于消息队列 其实消息队列没有那么神秘,我们这样想一下,用户访问网站,最终是要将数据以HTTP的协议的方式,通过网络传输到主机的某个端口上的. 那么,接收数据的方式是什么呢?自然是端口监听啦. 那消息队列是什么就很好解释了? 它就是端口监听,接到数据后,将数据排列起来. 那这件事,我们不用中间件能做吗? 当然能做啦,写个TCP/

RabbitMQ (消息队列)专题学习07 RPC

(使用Java客户端) 一.概述 在Work Queue的章节中我们学习了如何使用Work Queue分配耗时的任务给多个工作者,但是如果我们需要运行一个函数在远程计算机上,这是一个完全不同的情景,这种模式通常被称之为RPC. 在本章节的学习中,我们将使用RabbitMQ来构建一个RPC系统:一个远程客户端和一个可扩展的RPC服务器,我们没有任何费时的任务进行分配,我们将创建一个虚拟的RPC服务返回Fibonacci数. 1.1.客户端接口(Client Interface) 为了说明一个RPC

RabbitMQ分布式消息队列服务器(一、Windows下安装和部署)

RabbitMQ消息队列服务器在Windows下的安装和部署-> 一.Erlang语言环境的搭建 RabbitMQ开源消息队列服务是使用Erlang语言开发的,因此我们要使用他就必须先进行Erlang语言环境的搭建,其实是非常简单的. 登录Erlang官网,进入下载页,官网地址->http://www.erlang.org/downloads 然后按照自己的系统环境来选择需要下载的安装文件. 我选择 64-bit下载包,因为我的操作系统是64位的 接下来我们需要对Erlang语言的环境变量的配

RabbitMQ (消息队列)专题学习06 Topic

(使用Java客户端) 一.概述 在路由消息分发的学习中,对日志记录系统做了改进,使用direct exchange来替换fanout exchange进行消息分发,可以使日志系统有了直接.并且可以有选择的接收消息. 尽管使用direct exchange改进了系统,但是它仍然有局限性,就是不能根据多个标准来分发消息. 在日志系统中,我们也许想订阅的不仅仅是基于日志消息的严重程度,而且可能是基于日志消息的发送源. 这将给我们带来很多的灵活,我可能想坚挺的错误来自"cron"的消息源,而

RabbitMQ (消息队列)专题学习05 routing(路由)

(使用Java客户端) 一.概述 在前面的学习中,构建了一个简单的日志记录系统,能够广播所有的日志给多个接收者,在该部分学习中,将添加一个新的特点,就是可以只订阅一个特定的消息源,也就是说能够直接把关键的错误日志消息发送到日志文件保存起来,不重要的日志信息文件不保存在磁盘中,但是仍然能够在控制台输出,那么这便是我们这部分要学习的消息的路由分发机制. 二.路由功能实现 2.1.绑定(bindings) 在前面的学习中已经创建了绑定(bindings),代码如下: channel.queueBind

RabbitMQ (消息队列)专题学习03 Work Queues(工作队列)

一.概述 工作队列(Work queues) (使用Java客户端) 在前面的专题学习中,我们使用Java语言实现了一个简单的从名为"hello"的队列中发送和接收消息的程序,在这部内容中我们将创建一个工作队列被用来分配定时消息任务,而且通过多个接收者(工作者)实现. 工作队列(又名任务队列),主要的思想是为了避免立即做一个资源密集型的任务(多消息同时密集发送),不必等待它完成,当运行许多工作者的让任务都在她们之间共享. 它在web应用中是非常有用的,因为在很短的时间内http请求窗口