rabbitmq method之basic.consume

basic.consume指的是channel在 某个队列上注册消费者,那在这个队列有消息来了之后,就会把消息转发到给此channel处理,如果 这个队列有多个消费者,则会采用轮转的方式将消息分发给消息者.

首先是rabbit_reader接收数据包后,解析组装出其中的method,channel方法交给channel处理.具体过程见http://www.cnblogs.com/haoqingchuan/p/4354692.html

channel进程处理basic.consume的方法.先从状态中查看是否已经存在此tag(以channel为域,不同的consumer_tag标识了不同的消费者,每个channel的内的consumer tag必须是唯一的).如果没有查找到则正常,如果未对队列名字命名,则会产生一个uuid来作为队列名.

 1 handle_method(#‘basic.consume‘{queue        = QueueNameBin,
 2                                consumer_tag = ConsumerTag,
 3                                no_local     = _, % FIXME: implement
 4                                no_ack       = NoAck,
 5                                exclusive    = ExclusiveConsume,
 6                                nowait       = NoWait,
 7                                arguments    = Args},
 8               _, State = #ch{consumer_prefetch = ConsumerPrefetch,
 9                              consumer_mapping  = ConsumerMapping}) ->
10     case dict:find(ConsumerTag, ConsumerMapping) of
11         error ->
12             QueueName = qbin_to_resource(QueueNameBin, State),
13             check_read_permitted(QueueName, State),
14             ActualConsumerTag =
15                 case ConsumerTag of
16                     <<>>  -> rabbit_guid:binary(rabbit_guid:gen_secure(),
17                                                 "amq.ctag");
18                     Other -> Other
19                 end,
20             case basic_consume(
21                    QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
22                    ExclusiveConsume, Args, NoWait, State) of
23                 {ok, State1} ->
24                     {noreply, State1};
25                 {error, exclusive_consume_unavailable} ->
26                     rabbit_misc:protocol_error(
27                       access_refused, "~s in exclusive use",
28                       [rabbit_misc:rs(QueueName)])
29             end;
30         {ok, _} ->
31             %% Attempted reuse of consumer tag.
32             rabbit_misc:protocol_error(
33               not_allowed, "attempt to reuse consumer tag ‘~s‘", [ConsumerTag])
34     end;
 1 basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
 2               ExclusiveConsume, Args, NoWait,
 3               State = #ch{conn_pid          = ConnPid,
 4                           limiter           = Limiter,
 5                           consumer_mapping  = ConsumerMapping}) ->
 6     case rabbit_amqqueue:with_exclusive_access_or_die(
 7            QueueName, ConnPid,
 8            fun (Q) ->
 9                    {rabbit_amqqueue:basic_consume(
10                       Q, NoAck, self(),
11                       rabbit_limiter:pid(Limiter),
12                       rabbit_limiter:is_active(Limiter),
13                       ConsumerPrefetch, ActualConsumerTag,
14                       ExclusiveConsume, Args,
15                       ok_msg(NoWait, #‘basic.consume_ok‘{
16                                consumer_tag = ActualConsumerTag})),
17                     Q}
18            end) of
19         {ok, Q = #amqqueue{pid = QPid, name = QName}} ->
20             CM1 = dict:store(
21                     ActualConsumerTag,
22                     {Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}},
23                     ConsumerMapping),
24             State1 = monitor_delivering_queue(
25                        NoAck, QPid, QName,
26                        State#ch{consumer_mapping = CM1}),
27             {ok, case NoWait of
28                      true  -> consumer_monitor(ActualConsumerTag, State1);
29                      false -> State1
30                  end};
31         {{error, exclusive_consume_unavailable} = E, _Q} ->
32             E
33     end.

rabbit_amqqueue.erl

1 basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid,
2               LimiterActive, ConsumerPrefetchCount, ConsumerTag,
3               ExclusiveConsume, Args, OkMsg) ->
4     ok = check_consume_arguments(QName, Args),
5     delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
6                          ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume,
7                          Args, OkMsg}).

rabbit_amqqueue_process

 1 handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
 2              PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg},
 3             _From, State = #q{consumers          = Consumers,
 4                               exclusive_consumer = Holder}) ->
 5     case check_exclusive_access(Holder, ExclusiveConsume, State) of
 6         in_use -> reply({error, exclusive_consume_unavailable}, State);
 7         ok     -> Consumers1 = rabbit_queue_consumers:add(
 8                                  ChPid, ConsumerTag, NoAck,
 9                                  LimiterPid, LimiterActive,
10                                  PrefetchCount, Args, is_empty(State),
11                                  Consumers),
12                   ExclusiveConsumer =
13                       if ExclusiveConsume -> {ChPid, ConsumerTag};
14                          true             -> Holder
15                       end,
16                   State1 = State#q{consumers          = Consumers1,
17                                    has_had_consumers  = true,
18                                    exclusive_consumer = ExclusiveConsumer},
19                   ok = maybe_send_reply(ChPid, OkMsg),
20                   emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
21                                         not NoAck, qname(State1),
22                                         PrefetchCount, Args, none),
23                   notify_decorators(State1),
24                   reply(ok, run_message_queue(State1))
25     end;

rabbit_queue_consumers.erl

更新进程字典,并为队列增加新消费者.

 1 add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty,
 2     State = #state{consumers = Consumers,
 3                    use       = CUInfo}) ->
 4     C = #cr{consumer_count = Count,
 5             limiter        = Limiter} = ch_record(ChPid, LimiterPid),
 6     Limiter1 = case LimiterActive of
 7                    true  -> rabbit_limiter:activate(Limiter);
 8                    false -> Limiter
 9                end,
10     C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
11     update_ch_record(
12       case parse_credit_args(Prefetch, Args) of
13           {0,       auto}            -> C1;
14           {_Credit, auto} when NoAck -> C1;
15           {Credit,  Mode}            -> credit_and_drain(
16                                           C1, CTag, Credit, Mode, IsEmpty)
17       end),
18     Consumer = #consumer{tag          = CTag,
19                          ack_required = not NoAck,
20                          prefetch     = Prefetch,
21                          args         = Args},
22     State#state{consumers = add_consumer({ChPid, Consumer}, Consumers),
23                 use       = update_use(CUInfo, active)}.

%%将consumer加入consumers列表里面,也就是后面分发消息的时候会从这个列表里将消息取出

1 in(X, 0, {  queue, [_] = In, [], 1}) ->
2     {queue, [X], In, 2};
3 in(X, 0, {queue, In, Out, Len}) when is_list(In), is_list(Out) ->
4     {queue, [X|In], Out, Len + 1};
时间: 2024-11-08 22:30:08

rabbitmq method之basic.consume的相关文章

rabbitmq method之queue.declare

queue.declare即申请队列,首先对队列名作处理,若未指定队列名则随机生成一个,然后查询数据库队列是否已经创建,若创建完成则会申请队列返回 handle_method(#'queue.declare'{queue = QueueNameBin, passive = false, durable = DurableDeclare, exclusive = ExclusiveDeclare, auto_delete = AutoDelete, nowait = NoWait, argumen

rabbitmq channel参数详解

1.Channel 1.1 channel.exchangeDeclare(): type:有direct.fanout.topic三种durable:true.false true:服务器重启会保留下来Exchange.警告:仅设置此选项,不代表消息持久化.即不保证重启后消息还在.原文:true if we are declaring a durable exchange (the exchange will survive a server restart)autoDelete:true.f

RabbitMQ Consumer获取消息的两种方式(poll,subscribe)解析

以下转自:http://blog.csdn.net/yangbutao/article/details/10395599 rabbitMQ中consumer通过建立到queue的连接,创建channel对象,通过channel通道获取message, Consumer可以声明式的以API轮询poll的方式主动从queue的获取消息,也可以通过订阅的方式被动的从Queue中消费消息, 最近翻阅了基于java的客户端的相关源码,简单做个分析. 编程模型伪代码如下: ConnectionFactory

RabbitMQ实例详解+Spring中的MQ使用

RabbitMQ实例详解 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构. Queue Queue(队列)是RabbitMQ的内部对象,用于存储消息,用下图表示. RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费. 多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不

Python开发【第十篇】:RabbitMQ队列

简介 RabbitMQ是流行的开源消息队列系统,用erlang语言开发.RabbitMQ是AMQP(高级消息队列协议)的标准实现. 安装 首先安装erlang环境. 官网:http://www.erlang.org/ Windows版下载地址:http://erlang.org/download/otp_win64_20.0.exe Linux版:yum安装 Windows安装步骤 第一步运行 第二步 第三步 第四步 第五步 Erlang安装完成. 然后安装RabbitMQ,首先下载Rabbit

RabbitMQ 入门【精+转】

rabbitmq可以用一本书取讲,这里只是介绍一些使用过程中,常用到的基本的知识点.官方文档覆盖的内容,非常全面:http://www.rabbitmq.com/documentation.html . 1. 介绍 RabbitMQ,即消息队列系统,它是一款开源消息队列中间件,采用Erlang语言开发,RabbitMQ是AMQP(Advanced Message Queueing Protocol)的标准实现. AMQP是一个公开发布的异步消息的规范,是提供统一消息服务的应用层标准高级消息队列协

RabbitMQ-从基础到实战(1)— Hello RabbitMQ

转自:https://yq.aliyun.com/articles/589923 1.简介 本篇博文介绍了在windows平台下安装RabbitMQ Server端,并用JAVA代码实现收发消息 2.安装RabbitMQ RabbitMQ是用Erlang开发的,所以需要先安装Erlang环境,在这里下载对应系统的Erlang安装包进行安装 点击这里下载对应平台的RabbitMQ安装包进行安装 Windows平台安装完成后如图 3.启用RabbitMQ Web控制台 RabbitMQ提供一个控制台

rabbitmq普通使用

rabbitmq是一种消息队列服务,可以实现rpc,一般情况下openstack的rpc用的就是用它做的,它有很多的用途.除了Qpid以外它是唯一实现了AMQP标准的代理服务器 1.安装安装rabbitmq是一件十分容易的事情,在yum源正常的情况下直接即可yum install rabbitmq-server -y 2.信道当你连接rmq后,你的应用程序与rmq服务器之间就会创建一条TCP连接,当你通过认证打开TCP连接后就会创建一条AMQP信道,这条信道是创建在真实TCP连接内的虚拟连接,当

关于openstack的Rabbitmq安装

RABBITMQ服务 一.RATTITMQ的概念 RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件). AMQP是一个定义了在应用或者组织之间传送消息的协议的开放标准 (an open standard for passing business messages between applications or organizations).AMQP 目标在于解决在两个应用之间传送消息存在的下列问题: · 网络是不可靠的 =>消息需要保存后再转发并有出