JMS学习(八)-ActiveMQ Consumer 使用 push 还是 pull 获取消息

ActiveMQ是一个消息中间件,对于消费者而言有两种方式从消息中间件获取消息:

①Push方式:由消息中间件主动地将消息推送给消费者;②Pull方式:由消费者主动向消息中间件拉取消息。看一段官网对Push方式的解释:

To be able to achieve high performance it is important to stream messages to consumers as fast as possible
so that the consumer always has a buffer of messages, in RAM, ready to process
- rather than have them explicitly pull messages from the server which adds significant latency per message.

采用Push方式,可以尽可能快地将消息发送给消费者(stream messages to consumers as fast as possible)

而采用Pull方式,会增加消息的延迟,即消息到达消费者的时间有点长(adds significant latency per message)。

但是,Push方式会有一个坏处:如果消费者的处理消息的能力很弱(一条消息需要很长的时间处理),而消息中间件不断地向消费者Push消息,消费者的缓冲区可能会溢出。

那ActiveMQ是怎么解决这个问题的呢?那就是 prefetch limit

prefetch limit 规定了一次可以向消费者Push(推送)多少条消息。

 Once the prefetch limit is reached, no more messages are dispatched to the consumer
until the consumer starts sending back acknowledgements of messages (to indicate that the message has been processed)

当推送消息的数量到达了perfetch limit规定的数值时,消费者还没有向消息中间件返回ACK,消息中间件将不再继续向消费者推送消息。

那prefetch limit的值设置为多少合适?视具体的应用场景而定。

 If you have very few messages and each message takes a very long time to process
you might want to set the prefetch value to 1 so that a consumer is given one message at a time. 

如果消息的数量很少(生产者生产消息的速率不快),但是每条消息 消费者需要很长的时间处理,那么prefetch limit设置为1比较合适。这样,消费者每次只会收到一条消息,当它处理完这条消息之后,向消息中间件发送ACK,此时消息中间件再向消费者推送下一条消息。

prefetch limit 设置成0意味着什么?

Specifying a prefetch limit of zero means the consumer will poll for more messages, one at a time,
instead of the message being pushed to the consumer.

意味着此时,消费者去轮询消息中间件获取消息。不再是Push方式了,而是Pull方式了。即消费者主动去消息中间件拉取消息。

perfetch limit是“消息预取”的值,这是针对消息中间件如何向消费者发消息 而设置的。与之相关的还有针对 消费者以何种方式向消息中间件返回确认ACK(响应):比如消费者是每次消费一条消息之后就向消息中间件确认呢?还是采用“延迟确认”---即采用批量确认的方式(消费了若干条消息之后,统一再发ACK)。这就是 Optimized Acknowledge

ActiveMQ can acknowledge receipt of messages back to the broker in batches (to improve performance). 

引用 一段话:“如果prefetchACK为true,那么prefetch必须大于0;当prefetchACK为false时,你可以指定prefetch为0以及任意大小的正数。
不过,当prefetch=0是,表示consumer将使用PULL(拉取)的方式从broker端获取消息,broker端将不会主动push消息给client端,直到client端发送PullCommand时;
当prefetch>0时,就开启了broker push模式,此后只要当client端消费且ACK了一定的消息之后,会立即push给client端多条消息。”

那么,在程序中如何采用Push方式或者Pull方式呢?

从是否阻塞来看,消费者有两种方式获取消息。同步方式和异步方式。

同步方式使用的是ActiveMQMessageConsumer的receive()方法。而异步方式则是采用消费者实现MessageListener接口,监听消息。

使用同步方式receive()方法获取消息时,prefetch limit即可以设置为0,也可以设置为大于0

prefetch limit为零 意味着:“receive()方法将会首先发送一个PULL指令并阻塞,直到broker端返回消息为止,这也意味着消息只能逐个获取(类似于Request<->Response)”

prefetch limit 大于零 意味着:“broker端将会批量push给client 一定数量的消息(<= prefetch),client端会把这些消息(unconsumedMessage)放入到本地的队列中,只要此队列有消息,那么receive方法将会立即返回,当一定量的消息ACK之后,broker端会继续批量push消息给client端。”

当使用MessageListener异步获取消息时,prefetch limit必须大于零了。因为,prefetch limit 等于零 意味着消息中间件不会主动给消费者Push消息,而此时消费者又用MessageListener被动获取消息(不会主动去轮询消息)。这二者是矛盾的。

此外,还有一个要注意的地方,即消费者采用同步获取消息(receive方法) 与 异步获取消息的方法(MessageListener) ,对消息的确认时机是不同的。

具体可参考:这篇文章

参考资料:  ActiveMQ消息传送机制以及ACK机制详解

时间: 2024-10-14 12:50:14

JMS学习(八)-ActiveMQ Consumer 使用 push 还是 pull 获取消息的相关文章

JMS学习八(ActiveMQ的消息持久化到Mysql数据库)

1.将连接Mysql数据库的jar文件,放到ActiveMQ的lib目录下 2.修改ActiveMQ的conf目录下的active.xml文件,修改数据持久化的方式 2.1  修改原来的kshadb的持久化数据的方式 <persistenceAdapter> <!-- <kahaDB directory="${activemq.data}/kahadb"/> --> <jdbcPersistenceAdapter dataSource=&quo

JMS学习(五)--ActiveMQ中的消息的持久化和非持久化 以及 持久订阅者 和 非持久订阅者之间的区别与联系

一,消息的持久化和非持久化 ①DeliveryMode 这是传输模式.ActiveMQ支持两种传输模式:持久传输和非持久传输(persistent and non-persistent delivery),默认情况下使用的是持久传输. 可以通过MessageProducer 类的 setDeliveryMode方法设置传输模式: MessageProducer producer = ...; producer.setDeliveryMode(DeliveryMode.PERSISTENT); 持

JMS学习(七)-ActiveMQ消息的持久存储方式之KahaDB存储

一,介绍 自ActiveMQ5.4以来,KahaDB成为了ActiveMQ默认的持久化存储方式.相比于原来的AMQ存储方式,官方宣称KahaDB使用了更少的文件描述符,并且提供了更快的存储恢复机制. 二,KahaDB存储配置 在 conf/activemq.xml 中配置如下: <broker brokerName="broker" ... > <persistenceAdapter> <kahaDB directory="activemq-da

JMS学习(六)-ActiveMQ的高可用性实现

一,ActiveMQ高可用性的架构 ActiveMQ的高可用性架构是基于Master/Slave 模型的.ActiveMQ总共提供了四种配置方案来配置HA,其中Shared Nothing Master/Slave 在5.8版本之后不再使用了,并在ActiveMQ5.9版本中引入了基于Zookeeper的Replicated LevelDB Store HA方案. 二,Master/Slave架构的配置解释 ①Shared Nothing Master/Slave   该架构最大的特点是: 1)

JMS学习之路(一):整合activeMQ到SpringMVC

JMS的全称是Java Message Service,即Java消息服务.它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息.把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑.对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应:另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收. 整合activeMQ到springmv

Spring整合JMS(一)——基于ActiveMQ实现

1.1     JMS简介 JMS的全称是Java Message Service,即Java消息服务.它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息.把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑.对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应:另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收. 1.2  

activeMQ学习(2)---------点对点、发布订阅的消息代码实现

以下是个人在学习activemq时从网上找到的资料, 总结留给自己以后复习 点对点的实现 2 @Test public void sendMessage(){ 3 try { 4 // 创建一个连接工厂 5 String url = "tcp://localhost:61616"; 6 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); 7 // 设置用户名和密码,这个用户名

消息中间件系列一:入门、JMS规范、ActiveMQ使用

一.入门 1. 消息中间件的定义 没有标准定义,一般认为,采用消息传送机制/消息队列 的中间件技术,进行数据交流,用在分布式系统的集成 2. 为什么要用消息中间件 解决分布式系统之间消息的传递.电商场景: 用户下单减库存,调用物流系统.随着业务量的增大,需要对系统进行拆分(服务化和业务拆分),拆分后的系统之间的交互一般用RPC(远程过程调用).如果系统扩充到有几十个接口,就需要用消息中间件来解决问题. 3. 消息中间件和RPC有什么区别 3.1 功能特点: 在架构上,RPC和Message的差异

Oracle学习(八):处理数据

1.知识点:能够对比以下的录屏进行阅读 SQL> --SQL语句 SQL> --1. DML语句(Data Manipulation Language 数据操作语言): insert update delete select SQL> --2. DDL语句(Data Definition Language 数据定义语言): create/alter/drop/truncate table SQL> -- create/drop view,create/drop index(sequ