ActiveMQ(20):Consumer高级特性之重新投递(Redelivery Policy)

一、简介

ActiveMQ在接收消息的Client有以下几种操作的时候,需要重新传递消息:

1:Client用了transactions,且在session中调用了rollback()

2:Client用了transactions,且在调用commit()之前关闭

3:Client在CLIENT_ACKNOWLEDGE的传递模式下,在session中调用了recover()

二、定制想要的再次传送策略

可以通过设置ActiveMQConnectionFactory和ActiveMQConnection来定制想要的再次传送策略,可用的Redelivery属性如下:

1:collisionAvoidanceFactor:设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时才生效。

也就是在延迟时间上再加一个时间波动范围。默认值为0.15

2:maximumRedeliveries:最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重

传。默认值为6。

3:maximumRedeliveryDelay:最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间

隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重

连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。默认为-1。

4:initialRedeliveryDelay:初始重发延迟时间,默认1000L

5:redeliveryDelay:重发延迟时间,当initialRedeliveryDelay=0时生效,默认1000L

6:useCollisionAvoidance:启用防止冲突功能,默认false

7:useExponentialBackOff:启用指数倍数递增的方式增加延迟时间,默认false

8:backOffMultiplier:重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生

在接受的Client可以如下设置:

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)?randomize=false");
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setMaximumRedeliveries(3);
cf.setRedeliveryPolicy(policy)

三、死队列

3.1 简介

当消息试图被传递的次数超过配置中maximumRedeliveries属性的值时,那么,broker会认定该消息是一个死消息,并被把该消息发送到

死队列中。 默认,aciaveMQ中死队列被声明为“ActivemMQ.DLQ”,所有不能消费的消息都被传递到该死队列中。 你可以在

acivemq.xml中配置individualDeadLetterStrategy属性,示例如下:

<policyEntry queue= "> " >
    <deadLetterStrategy>
        <individualDeadLetterStrategy queuePrefix= "DLQ." useQueueForQueueMessages= "true" />
    </deadLetterStrategy>
</policyEntry>

3.2 自动删除过期消息

有时需要直接删除过期的消息而不需要发送到死队列中,可以使用属性processExpired=false来设置,示例如下:

<policyEntry queue= "> " >
    <deadLetterStrategy>
        <sharedDeadLetterStrategy processExpired= "false" />
    </deadLetterStrategy>
</policyEntry>

3.3 存放非持久消息到死队列中

默认情况下,Activemq不会把非持久的死消息发送到死队列中。非持久性如果你想把非持久的消息发送到死队列中,

需要设置属性processNonPersistent=“true”,示例如下:

<policyEntry queue= "> " >
    <deadLetterStrategy>
        <sharedDeadLetterStrategy processNonPersistent= "true" />
    </deadLetterStrategy>
</policyEntry>

四、为每一个Destination配置一个Redelivery Policy

在V5.7之后,你可以为每一个Destination配置一个Redelivery Policy。示例如:

ActiveMQConnection connection ... // Create a connection
RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
queuePolicy.setInitialRedeliveryDelay(0);
queuePolicy.setRedeliveryDelay(1000);
queuePolicy.setUseExponentialBackOff(false);
queuePolicy.setMaximumRedeliveries(2);

RedeliveryPolicy topicPolicy = new RedeliveryPolicy();
topicPolicy.setInitialRedeliveryDelay(0);
topicPolicy.setRedeliveryDelay(1000);
topicPolicy.setUseExponentialBackOff(false);
topicPolicy.setMaximumRedeliveries(3);
// Receive a message with the JMS API
RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
map.put(new ActiveMQTopic(">"), topicPolicy);
map.put(new ActiveMQQueue(">"), queuePolicy);

五、定制想要的再次传送策略(spring.xml)

<!--创建连接工厂 -->  
<amq:connectionFactory id="amqConnectionFactory"
    brokerURL="${activemq.brokerURL}" 
    userName="${activemq.userName}" 
    password="${activemq.password}">
    <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />  <!-- 引用重发机制 --> 
</amq:connectionFactory>
    
<!-- 定义ReDelivery(重发机制)机制 ,重发时间间隔是100毫秒,最大重发次数是3次 http://www.kuqin.com/shuoit/20140419/339344.html -->  
<bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">  
    <!--是否在每次尝试重新发送失败后,增长这个等待时间 -->  
    <property name="useExponentialBackOff" value="true"></property>  
    <!--重发次数,默认为6次   这里设置为5次 -->  
    <property name="maximumRedeliveries" value="5"></property>  
    <!--重发时间间隔,默认为1秒 -->  
    <property name="initialRedeliveryDelay" value="2000"></property>  
    <!--第一次失败后重新发送之前等待2000毫秒,第二次失败再等待2000 * 1毫秒,这里的1就是value -->  
    <property name="backOffMultiplier" value="2"></property>  
    <!--
      	最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,
       	那么第   二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,
       	以后每次重连时间间隔都为最大重连时间间隔。
    -->  
<!--         <property name="maximumRecdeliveryDelay" value="10000"></property>   -->
</bean> 

<!-- 定义Queue监听器 -->
<jms:listener-container transaction-manager="jmsTransactionManager" destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
    <jms:listener destination="test.queue" ref="queueReceiver1"/>
</jms:listener-container>
    
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory" ref="amqConnectionFactory" />
</bean>

时间: 2024-11-07 14:43:19

ActiveMQ(20):Consumer高级特性之重新投递(Redelivery Policy)的相关文章

Consumer高级特性

Queue队列的消息一般是按照顺序各个队列依次获取消息,每次获取一个.所以假设有两个队列queue1,queue2,发送的消息为1.2.3.4.5.则默认情况下queue1获取到的消息为1.3.5,queue2获取到的消息为2.4. 1.Exclusive Consume 用于队列消息 独有消费者:Queue中的消息是按照顺序被分发到consumer的,然而,当你有多个consumers同时从相同的queue中提取消息时,你将失去这个保证.因为这些消息是被多个线程并发的处理.有的时候,保证消息按

ActiveMQ(十二)——Consumer高级特性

一.独有消费者    Queue中的消息是按照顺序被分发到consumers的,然而,当有多个consumers同时从相同的queue中提取消息时,你将失去这个保证.因为这些消息是被多个多线程并发的处理.有的时候,保证消息按照顺序处理是很重要的,但是你可能不希望在插入订单操作结束之前执行更新这个订单的操作.    ActiveMQ从4.x版本起开始支持Exclusive Consumer.Broker会从多个consumers中挑选一个consumer来处理queue中所有的消息,从而保证消息的

ActiveMQ(23):Consumer高级特性之Message dispatche async、Consumer Priority与Message Selectors

一.Message dispatche async(消息异步分发) 在activemq4.0以后,你可以选择broker同步或异步的把消息分发给消费者.可以设置dispatchAsync 属性,默认是true,通常情况下这是最佳的. 修改:可以通过如下几种方式 1:在ConnectionFactory层设置 ((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false); 2:在Connection上设置 这个设置将会

ActiveMQ(19):Consumer高级特性之独有消费者(Exclusive Consumer)

一.简介 Queue中的消息是按照顺序被分发到consumers的.然而,当你有多个consumers同时从相同的queue中提取消息时, 你将失去这个保证.因为这些消息是被多个线程并发的处理.有的时候,保证消息按照顺序处理是很重要的. 如,你可能不希望在插入订单操作结束之前执行更新这个订单的操作. 二.使用 ActiveMQ从4.x版本起开始支持Exclusive Consumer. Broker会从多个consumers中挑选一个consumer来处理queue中 所有的消息,从而保证了消息

ActiveMQ(22):Consumer高级特性之消息分组(Message Groups)

一.简介 Message Groups就是对消息分组,它是Exclusive Consumer功能的增强. 逻辑上,Message Groups 可以看成是一种并发的Exclusive Consumer.跟所有的消息都由唯一的consumer处理不同,JMS 消息属性JMSXGroupID 被用来区分message group.Message Groups特性保证所有具有相同JMSXGroupID的消息会被分发到相同的consumer(只要这个consumer保持active). 另外一方面,M

ActiveMQ(24):Consumer高级特性之Slow Consumer Handling(慢消费者的处理)

一.Prefetch机制 ActiveMQ通过Prefetch机制来提高性能,方式是在客户端的内存里可能会缓存一定数量的消息.缓存消息的数量由prefetch limit来控制.当某个consumer的prefetch buffer已经达到上限,那么broker不会再向consumer分发消息,直到consumer向broker发送消息的确认,确认后的消息将会从缓存中去掉. 可以通过在ActiveMQConnectionFactory或者ActiveMQConnection上设置ActiveMQ

ActiveMQ(21):Consumer高级特性之管理持久化订阅(Manage Durable Subscribers)

消息的持久化,保证了消费者离线后,再次进入系统,不会错过消息,但是这也会消耗很 多的资源.从5.6开始,可以对持久化消息进行如下管理: 我们还可能希望删除那些不活动的订阅者,如下: <broker name="localhost"      offlineDurableSubscriberTimeout="600000"     offlineDurableSubscriberTaskSchedule="30000"> 1.offli

ActiveMQ中的Destination高级特性(一)

---------------------------------------------------------------------------------------- Destination高级特性----->Composite Destinations 组合队列Composite Destinations : 允许用一个虚拟的destination代表多个destinations,这样就可以通过composite destinations在一个操作中同时向多个queue/topic发

activemq的高级特性:消息存储持久化

activemq的高级特性之消息存储持久化 有基于文件的,数据库的,内存的.默认的是基于文件的,在安装目录/data/kahadb.在conf/activemq.xml文件中. <persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter> 把存储持久化换成mysql的数据库. 1:修改配置文件 <persistenceAdapte