ActiveMQ(24):Consumer高级特性之Slow Consumer Handling(慢消费者的处理)

一、Prefetch机制

ActiveMQ通过Prefetch机制来提高性能,方式是在客户端的内存里可能会缓存一定数量的消息。缓存消息的数量由prefetch limit来控制。当某个consumer的prefetch buffer已经达到上限,那么broker不会再向consumer分发消息,直到consumer向broker发送消息的确认,确认后的消息将会从缓存中去掉。

可以通过在ActiveMQConnectionFactory或者ActiveMQConnection上设置ActiveMQPrefetchPolicy对象来配置prefetch policy。也可以通过connection options或者destination options来配置。例如:

tcp://localhost:61616?jms.prefetchPolicy.all=50

tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1

queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");

prefetch size的缺省值如下:

1:persistent queues (default value: 1000)

2:non-persistent queues (default value: 1000)

3:persistent topics (default value: 100)

4:non-persistent topics (default value: Short.MAX_VALUE -1)

二、慢Consumer处理

慢消费者会在非持久的topics上导致问题:一旦消息积压起来,会导致broker把大量消息保存在内存中,broker也会因此而变慢。目前ActiveMQ使用Pending Message Limit Strategy来解决这个问题。除了prefetch buffer之外,你还要配置缓存消息的上限,超过这个上限后,新消息到来时会丢弃旧消息。

通过在配置文件的destination map中配置PendingMessageLimitStrategy,可以为不用的topic namespace配置不同的策略。

Pending Message Limit Strategy(等待消息限制策略)目前有以下两种:

1: Constant Pending Message Limit Strategy

Limit可以设置0、>0、-1三种方式:

0表示:不额外的增加其预存大小。

>0表示:再额外的增加其预存大小。

-1表示:不增加预存也不丢弃旧的消息。

这个策略使用常量限制,配置如下:

<constantPendingMessageLimitStrategy limit="50"/>

2:Prefetch Rate Pending Message Limit Strategy

这种策略是利用Consumer的之前的预存的大小乘以其倍数等于现在的预存大小。比如:<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>

说明:在以上两种方式中,如果设置0意味着除了prefetch之外不再缓存消息;如果设置-1意味着禁止丢弃消息。

三、配置消息的丢弃策略

消息的丢弃策略,目前有三种方式:

1:oldestMessageEvictionStrategy:这个策略丢弃最旧的消息。

2:oldestMessageWithLowestPriorityEvictionStrategy:这个策略丢弃最旧的,而且具有最低优先级的消息。

3:uniquePropertyMessageEvictionStrategy:从5.6开始,可以根据自定义的属性来进行抛弃,比如<uniquePropertyMessageEvictionStrategy propertyName=“STOCK” />,这就表示抛弃属性名称为Stock的消息

配置示例:

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry topic="FOO.>">
                <dispatchPolicy>
                    <roundRobinDispatchPolicy />
                </dispatchPolicy>
            </policyEntry>
            <policyEntry topic="ORDERS.>">
                <dispatchPolicy>
                    <strictOrderDispatchPolicy />
                </dispatchPolicy>
            </policyEntry>
            <policyEntry topic="PRICES.>">
                <!-- lets force old messages to be discarded for slow consumers -->
                <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="10"/>
                </pendingMessageLimitStrategy>
            </policyEntry>
            <policyEntry tempTopic="true" advisoryForConsumed="true" />
            <policyEntry tempQueue="true" advisoryForConsumed="true" />
        </policyEntries>
    </policyMap>
</destinationPolicy>
时间: 2024-10-12 04:43:25

ActiveMQ(24):Consumer高级特性之Slow Consumer Handling(慢消费者的处理)的相关文章

ActiveMQ(23):Consumer高级特性之Message dispatche async、Consumer Priority与Message Selectors

一.Message dispatche async(消息异步分发) 在activemq4.0以后,你可以选择broker同步或异步的把消息分发给消费者.可以设置dispatchAsync 属性,默认是true,通常情况下这是最佳的. 修改:可以通过如下几种方式 1:在ConnectionFactory层设置 ((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false); 2:在Connection上设置 这个设置将会

Consumer高级特性

Queue队列的消息一般是按照顺序各个队列依次获取消息,每次获取一个.所以假设有两个队列queue1,queue2,发送的消息为1.2.3.4.5.则默认情况下queue1获取到的消息为1.3.5,queue2获取到的消息为2.4. 1.Exclusive Consume 用于队列消息 独有消费者:Queue中的消息是按照顺序被分发到consumer的,然而,当你有多个consumers同时从相同的queue中提取消息时,你将失去这个保证.因为这些消息是被多个线程并发的处理.有的时候,保证消息按

ActiveMQ(十二)——Consumer高级特性

一.独有消费者    Queue中的消息是按照顺序被分发到consumers的,然而,当有多个consumers同时从相同的queue中提取消息时,你将失去这个保证.因为这些消息是被多个多线程并发的处理.有的时候,保证消息按照顺序处理是很重要的,但是你可能不希望在插入订单操作结束之前执行更新这个订单的操作.    ActiveMQ从4.x版本起开始支持Exclusive Consumer.Broker会从多个consumers中挑选一个consumer来处理queue中所有的消息,从而保证消息的

ActiveMQ(19):Consumer高级特性之独有消费者(Exclusive Consumer)

一.简介 Queue中的消息是按照顺序被分发到consumers的.然而,当你有多个consumers同时从相同的queue中提取消息时, 你将失去这个保证.因为这些消息是被多个线程并发的处理.有的时候,保证消息按照顺序处理是很重要的. 如,你可能不希望在插入订单操作结束之前执行更新这个订单的操作. 二.使用 ActiveMQ从4.x版本起开始支持Exclusive Consumer. Broker会从多个consumers中挑选一个consumer来处理queue中 所有的消息,从而保证了消息

ActiveMQ(22):Consumer高级特性之消息分组(Message Groups)

一.简介 Message Groups就是对消息分组,它是Exclusive Consumer功能的增强. 逻辑上,Message Groups 可以看成是一种并发的Exclusive Consumer.跟所有的消息都由唯一的consumer处理不同,JMS 消息属性JMSXGroupID 被用来区分message group.Message Groups特性保证所有具有相同JMSXGroupID的消息会被分发到相同的consumer(只要这个consumer保持active). 另外一方面,M

ActiveMQ(20):Consumer高级特性之重新投递(Redelivery Policy)

一.简介 ActiveMQ在接收消息的Client有以下几种操作的时候,需要重新传递消息: 1:Client用了transactions,且在session中调用了rollback() 2:Client用了transactions,且在调用commit()之前关闭 3:Client在CLIENT_ACKNOWLEDGE的传递模式下,在session中调用了recover() 二.定制想要的再次传送策略 可以通过设置ActiveMQConnectionFactory和ActiveMQConnect

ActiveMQ(21):Consumer高级特性之管理持久化订阅(Manage Durable Subscribers)

消息的持久化,保证了消费者离线后,再次进入系统,不会错过消息,但是这也会消耗很 多的资源.从5.6开始,可以对持久化消息进行如下管理: 我们还可能希望删除那些不活动的订阅者,如下: <broker name="localhost"      offlineDurableSubscriberTimeout="600000"     offlineDurableSubscriberTaskSchedule="30000"> 1.offli

ActiveMQ中的Destination高级特性(一)

---------------------------------------------------------------------------------------- Destination高级特性----->Composite Destinations 组合队列Composite Destinations : 允许用一个虚拟的destination代表多个destinations,这样就可以通过composite destinations在一个操作中同时向多个queue/topic发

ActiveMQ(14):Destination高级特性

一.Wildcards Wildcards用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展.ActiveMQ支持以下三种wildcards: 1:"." 用于作为路径上名字间的分隔符 2:"*" 用于匹配路径上的任何名字 3:">" 用于递归地匹配任何以这个名字开始的destination 示例,设想你有如下两个destinations PRICE.STOCK.NASDAQ.IBM (IBM在NASDAQ的股价) P