一、简介
ActiveMQ在接收消息的Client有以下几种操作的时候,需要重新传递消息:
1:Client用了transactions,且在session中调用了rollback()
2:Client用了transactions,且在调用commit()之前关闭
3:Client在CLIENT_ACKNOWLEDGE的传递模式下,在session中调用了recover()
二、定制想要的再次传送策略
可以通过设置ActiveMQConnectionFactory和ActiveMQConnection来定制想要的再次传送策略,可用的Redelivery属性如下:
1:collisionAvoidanceFactor:设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时才生效。
也就是在延迟时间上再加一个时间波动范围。默认值为0.15
2:maximumRedeliveries:最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重
传。默认值为6。
3:maximumRedeliveryDelay:最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间
隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重
连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。默认为-1。
4:initialRedeliveryDelay:初始重发延迟时间,默认1000L
5:redeliveryDelay:重发延迟时间,当initialRedeliveryDelay=0时生效,默认1000L
6:useCollisionAvoidance:启用防止冲突功能,默认false
7:useExponentialBackOff:启用指数倍数递增的方式增加延迟时间,默认false
8:backOffMultiplier:重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生
在接受的Client可以如下设置:
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)?randomize=false"); RedeliveryPolicy policy = new RedeliveryPolicy(); policy.setMaximumRedeliveries(3); cf.setRedeliveryPolicy(policy)
三、死队列
3.1 简介
当消息试图被传递的次数超过配置中maximumRedeliveries属性的值时,那么,broker会认定该消息是一个死消息,并被把该消息发送到
死队列中。 默认,aciaveMQ中死队列被声明为“ActivemMQ.DLQ”,所有不能消费的消息都被传递到该死队列中。 你可以在
acivemq.xml中配置individualDeadLetterStrategy属性,示例如下:
<policyEntry queue= "> " > <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix= "DLQ." useQueueForQueueMessages= "true" /> </deadLetterStrategy> </policyEntry>
3.2 自动删除过期消息
有时需要直接删除过期的消息而不需要发送到死队列中,可以使用属性processExpired=false来设置,示例如下:
<policyEntry queue= "> " > <deadLetterStrategy> <sharedDeadLetterStrategy processExpired= "false" /> </deadLetterStrategy> </policyEntry>
3.3 存放非持久消息到死队列中
默认情况下,Activemq不会把非持久的死消息发送到死队列中。非持久性如果你想把非持久的消息发送到死队列中,
需要设置属性processNonPersistent=“true”,示例如下:
<policyEntry queue= "> " > <deadLetterStrategy> <sharedDeadLetterStrategy processNonPersistent= "true" /> </deadLetterStrategy> </policyEntry>
四、为每一个Destination配置一个Redelivery Policy
在V5.7之后,你可以为每一个Destination配置一个Redelivery Policy。示例如:
ActiveMQConnection connection ... // Create a connection RedeliveryPolicy queuePolicy = new RedeliveryPolicy(); queuePolicy.setInitialRedeliveryDelay(0); queuePolicy.setRedeliveryDelay(1000); queuePolicy.setUseExponentialBackOff(false); queuePolicy.setMaximumRedeliveries(2); RedeliveryPolicy topicPolicy = new RedeliveryPolicy(); topicPolicy.setInitialRedeliveryDelay(0); topicPolicy.setRedeliveryDelay(1000); topicPolicy.setUseExponentialBackOff(false); topicPolicy.setMaximumRedeliveries(3); // Receive a message with the JMS API RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap(); map.put(new ActiveMQTopic(">"), topicPolicy); map.put(new ActiveMQQueue(">"), queuePolicy);
五、定制想要的再次传送策略(spring.xml)
<!--创建连接工厂 --> <amq:connectionFactory id="amqConnectionFactory" brokerURL="${activemq.brokerURL}" userName="${activemq.userName}" password="${activemq.password}"> <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" /> <!-- 引用重发机制 --> </amq:connectionFactory> <!-- 定义ReDelivery(重发机制)机制 ,重发时间间隔是100毫秒,最大重发次数是3次 http://www.kuqin.com/shuoit/20140419/339344.html --> <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> <!--是否在每次尝试重新发送失败后,增长这个等待时间 --> <property name="useExponentialBackOff" value="true"></property> <!--重发次数,默认为6次 这里设置为5次 --> <property name="maximumRedeliveries" value="5"></property> <!--重发时间间隔,默认为1秒 --> <property name="initialRedeliveryDelay" value="2000"></property> <!--第一次失败后重新发送之前等待2000毫秒,第二次失败再等待2000 * 1毫秒,这里的1就是value --> <property name="backOffMultiplier" value="2"></property> <!-- 最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2, 那么第 二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时, 以后每次重连时间间隔都为最大重连时间间隔。 --> <!-- <property name="maximumRecdeliveryDelay" value="10000"></property> --> </bean> <!-- 定义Queue监听器 --> <jms:listener-container transaction-manager="jmsTransactionManager" destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <jms:listener destination="test.queue" ref="queueReceiver1"/> </jms:listener-container> <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager"> <property name="connectionFactory" ref="amqConnectionFactory" /> </bean>