rabbitmq 配置多个消费者(转载)

Concurrency与Prefetch

在通常的使用中(Java项目),我们一般会结合spring-amqp框架来使用RabbitMQ,spring-amqp底层调用RabbitMQ的java client来和Broker交互,比如我们会用如下配置来建立RabbitMQ的连接池、声明Queue以及指明监听者的监听行为:

<rabbit:connection-factory id="connectionFactory" />

<!-- template非必须,主要用于生产者发送消息-->

<rabbit:template id="template" connection-factory="connectionFactory" />

<rabbit:queue name="remoting.queue" />

<rabbit:listener-container connection-factory="connectionFactory" concurrency="2" prefetch="3">

<rabbit:listener ref="listener" queue-names="remoting.queue" />

</rabbit:listener-container>

listener-container可以设置消费者在监听Queue的时候的各种参数,其中concurrency和prefetch是本篇文章比较关心的两个参数,以下是spring-amqp文档的解释:

prefetchCount(prefetch)
The number of messages to accept from the broker in one socket frame. The higher this is the faster the messages can be delivered, but the higher the risk of non-sequential processing. Ignored if the acknowledgeMode
is NONE. This will be increased, if necessary, to match the txSize

concurrentConsumers(concurrency)
The number of concurrent consumers to initially start for each listener.

简单解释下就是concurrency设置的是对每个listener在初始化的时候设置的并发消费者的个数,prefetch是每次从一次性从broker里面取的待消费的消息的个数,上面的配置在监控后台看到的效果如下:

图中可以看出有两个消费者同时监听Queue,但是注意这里的消息只有被一个消费者消费掉就会自动ack,另外一个消费者就不会再获取到此消息,Prefetch Count为配置设置的值3,意味着每个消费者每次会预取3个消息准备消费。每个消费者对应的listener有个Exclusive参数,默认为false, 如果设置为true,concurrency就必须设置为1,即只能单个消费者消费队列里的消息,适用于必须严格执行消息队列的消费顺序(先进先出)。

源码剖析

这里concurrency的实现方式不看源码也能猜到,肯定是用多线程的方式来实现的,此时同一进程下打开的本地端口都是56278.下面看看listener-contaner对应的org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer的源码:

protected int initializeConsumers() {

int count = 0;

synchronized (this.consumersMonitor) {

if (this.consumers == null) {

this.cancellationLock.reset();

this.consumers = new HashMap<BlockingQueueConsumer, Boolean>(this.concurrentConsumers);

for (int i = 0; i < this.concurrentConsumers; i++) {

BlockingQueueConsumer consumer = createBlockingQueueConsumer();

this.consumers.put(consumer, true);

count++;

}

}

}

return count;

}

container启动的时候会根据设置的concurrency的值(同时不超过最大值)创建n个BlockingQueueConsumer。

protected void doStart() throws Exception {

//some code

synchronized (this.consumersMonitor) {

int newConsumers = initializeConsumers();

//some code

Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();

for (BlockingQueueConsumer consumer : this.consumers.keySet()) {

AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);

processors.add(processor);

this.taskExecutor.execute(processor);

}

//some code

}

}

在doStart()方法中调用initializeConsumers来初始化所有的消费者,AsyncMessageProcessingConsumer作为真实的处理器包装了BlockingQueueConsumer,而AsyncMessageProcessingConsumer其实实现了Runnable接口,由this.taskExecutor.execute(processor)来启动消费者线程。

private final class AsyncMessageProcessingConsumer implements Runnable {

private final BlockingQueueConsumer consumer;

private final CountDownLatch start;

private volatile FatalListenerStartupException startupException;

private AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {

this.consumer = consumer;

this.start = new CountDownLatch(1);

}

//some code

@Override

public void run() {

//some code

}

}

那么prefetch的值意味着什么呢?其实从名字上大致能看出,BlockingQueueConsumer内部应该维护了一个阻塞队列BlockingQueue,prefetch应该是这个阻塞队列的长度,看下BlockingQueueConsumer内部有个queue,这个queue不是对应RabbitMQ的队列,而是Consumer自己维护的内存级别的队列,用来暂时存储从RabbitMQ中取出来的消息:

private final BlockingQueue<Delivery> queue;

public BlockingQueueConsumer(ConnectionFactory connectionFactory,

MessagePropertiesConverter messagePropertiesConverter,

ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,

boolean transactional, int prefetchCount, boolean defaultRequeueRejected,

Map<String, Object> consumerArgs, boolean exclusive, String... queues) {

//some code

this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);

}

BlockingQueueConsumer的构造函数清楚说明了每个消费者内部的队列大小就是prefetch的大小。

业务问题

前面说过,设置并发的时候,要考虑具体的业务场景,对那种对消息的顺序有苛刻要求的场景不适合并发消费,而对于其他场景,比如用户注册后给用户发个提示短信,是不太在意哪个消息先被消费,哪个消息后被消费,因为每个消息是相对独立的,后注册的用户先收到短信也并没有太大影响。

设置并发消费除了能提高消费的速度,还有另外一个好处:当某个消费者长期阻塞,此时在当前消费者内部的BlockingQueue的消息也会被一直阻塞,但是新来的消息仍然可以投递给其他消费者消费,这种情况顶多会导致prefetch个数目的消息消费有问题,而不至于单消费者情况下整个RabbitMQ的队列会因为一个消息有问题而全部堵死。所有在合适的业务场景下,需要合理设置concurrency和prefetch值。

个人理解:如果对消息的顺序有苛刻的要求,可以建立多个queue,将某一类需要顺序操作的消息放入(只使用一个生产者)同一个queue中,然后只使用一个消费者来读取,并处理。这种情况下无法满足高并发的要求,好的情况是,对顺序有要求的业务逻辑并不太多,将这些业务逻辑分类,建立多个queue,一类放入一个queue,也可以并行执行并保证顺序。

作者:王鸿缘
链接:http://www.jianshu.com/p/04a1d36f52ba
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

时间: 2024-11-10 08:18:07

rabbitmq 配置多个消费者(转载)的相关文章

Python安装、配置图文详解(转载)

Python安装.配置图文详解 目录: 一. Python简介 二. 安装python 1. 在windows下安装 2. 在Linux下安装 三. 在windows下配置python集成开发环境(IDE) 1. 在Eclipse中安装PyDev插件 2. 配置Python Interpreters 四. 创建Python Project 五. 编写HelloWorld 六. 小结 一. Python简介: Python在Linux.windows.Mac os等操作系统下都有相应的版本,不管在

springboot RabbitMQ 配置

引用自 http://www.cnblogs.com/ityouknow/p/6120544.html 自己留一份 记录一下 RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用. 消息中间件在互联网公司的使用中越来越多,刚才还看到新闻阿里将RocketMQ捐献给了apache,当然了今天的主角还是讲RabbitMQ.消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来

rabbitmq 配置用户信息

本文摘自:http://my.oschina.net/hncscwc/blog/262246 1. 用户管理 用户管理包括增加用户,删除用户,查看用户列表,修改用户密码. 相应的命令 (1) 新增一个用户 rabbitmqctl  add_user  Username  Password (2) 删除一个用户 rabbitmqctl  delete_user  Username (3) 修改用户的密码 rabbitmqctl  change_password  Username  Newpass

RabbitMQ配置负载均衡的意义及RabbitMQ集群是否可以随意使用

说明构建RabbitMQ集群来确保可用性和性能只是保障弹性消息通信基础架构的一半,另一半则是编写当集群节点发生故障时知道如何重连到集群的应用程序.处理到集群的重连有多种策略,这篇wiki所关注的这种是使用负载均衡来处理节点的选择.使用负载均衡的意义通过使用负载均衡,不仅可以减少应用程序处理节点故障代码的复杂性,又能确保在集群中连接的平均分布.但是即便使用了负载均衡,编写处理节点故障的应用程序也要比建立到集群的连接复杂的多,应用程序需要准备好重新创建交换器和队列以应对初始节点故障.为Rabbit做

keil 的 配置向导 configuration wizard(转载)

keil 的 配置向导 configuration wizard 以前发现keil 的很棒的功能 今天终于会用了.  分享给大家.转载的. 一 前言          很多人使用keil的时候感觉keil的configuration wizard 很神奇,用起来特别方便,但是苦于不知道怎么去编写自己的configuration wizard,其实keil的help文档就有,只是很多人用着感觉英文不方便,又或者看了没理解,为此,特写了一个教程,希望大家能从中学到一些知识. 二 基本介绍 Confi

【Ogre编程入门与进阶】第三章 Ogre框架配置及概要分析 [转载]

分类: Orge模块2014-01-07 23:26 425人阅读 评论(0) 收藏 举报 3.1 Ogre支持的系统平台 笔者认为,Ogre几乎可以支持所有的系统平台.这并不是天方夜谭,Ogre的确拥有很强的跨平台性. Ogre是一个用C++语言开发的图形渲染引擎,Ogre最开始主要应用于Windows系统平台.不过随着Ogre的不断发展,Ogre的核心团队吸纳了精通Mac OS X和Linux系统的人才,来开发适用于这两种系统平台的Ogre,目前,Ogre开发团队中有专门的人员来维护Mac

spring-security2配置精讲(转载)

本文转载自牛人downpour的帖子: http://www.iteye.com/topic/319965 Spring 论坛上看了不少Spring Security的相关文章.这些文章基本上都还是基于Acegi-1.X的配置方式,而主要的配置示例也来自于SpringSide的贡献. 众所周知,Spring Security针对Acegi的一个重大的改进就在于其配置方式大大简化了.所以如果配置还是基于Acegi-1.X这样比较繁琐的配置方式的话,那么我们还不如直接使用Acegi而不要去升级了.所

RabbitMQ下的生产消费者模式与订阅发布模式

??所谓模式,就是在某种场景下,一类问题及其解决方案的总结归纳.生产消费者模式与订阅发布模式是使用消息中间件时常用的两种模式,用于功能解耦和分布式系统间的消息通信,以下面两种场景为例: 数据接入 ??假设有一个用户行为采集系统,负责从App端采集用户点击行为数据.通常会将数据上报和数据处理分离开,即App端通过REST API上报数据,后端拿到数据后放入队列中就立刻返回,而数据处理则另外使用Worker从队列中取出数据来做,如下图所示. ??这样做的好处有:第一,功能分离,上报的API接口不关心

golang rabbitmq实践 (一 rabbitmq配置)

1:环境选择 系统为ubuntu 15.04 ,我装在虚拟机里面的 2:rabbitmq tabbitmq 3.5.4  download url : http://www.rabbitmq.com/ 3:安装 在Ubuntu环境下,建议直接下载deb安装包,可以再ubuntu软件包管理中直接安装,并且安装其他依赖包 4:启动 如果是deb包直接安装的话,默认是直接启动的,也可以通过 sudo  rabbitmq-server start 启动.如果提示 node with name "rabb