ActiveMQ 重发机制与确认机制 实践

一、配置spring-activemq.xml

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
 5
 6     <!-- 第三方MQ工厂: ConnectionFactory -->
 7     <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
 8         <!-- ActiveMQ Address -->
 9         <property name="brokerURL" value="${activemq.brokerURL}"/>
10         <property name="userName" value="${activemq.userName}"/>
11         <property name="password" value="${activemq.password}"/>
12         <!-- 是否异步发送 -->
13         <property name="useAsyncSend" value="true"/>
14         <!-- 引用重发机制 -->
15         <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />
16         <!-- 消息传输监听器 处理网络及服务器异常 -->
17         <!--<property name="transportListener">-->
18         <!--<bean class="com.schooling.activemq.ActiveMQTransportListener"/>-->
19         <!--</property>-->
20     </bean>
21
22     <!--
23         ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory
24         可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗,要依赖于 activemq-pool包
25      -->
26     <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
27         <property name="connectionFactory" ref="targetConnectionFactory"/>
28         <property name="maxConnections" value="${activemq.pool.maxConnections}"/>
29     </bean>
30
31     <!-- 定义ReDelivery(重发机制)机制 ,重发时间间隔是100毫秒,最大重发次数是3次 -->
32     <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
33         <!--是否在每次尝试重新发送失败后,增长这个等待时间 -->
34         <property name="useExponentialBackOff" value="true"/>
35         <!--重发次数,默认为6次   这里设置为1次 -->
36         <property name="maximumRedeliveries" value="2"/>
37         <!--重发时间间隔,默认为1秒 -->
38         <property name="initialRedeliveryDelay" value="1000"/>
39         <!--第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value -->
40         <property name="backOffMultiplier" value="2"/>
41         <!--最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,
42         第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 -->
43         <property name="maximumRedeliveryDelay" value="1000"/>
44     </bean>
45
46     <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
47     <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
48         <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
49         <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
50     </bean>
51
52     <!--这个是目的地-->
53     <bean id="msgQueue" class="org.apache.activemq.command.ActiveMQQueue">
54         <constructor-arg value="${activemq.queueName}"/>
55     </bean>
56
57     <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
58     <!-- 队列模板 -->
59     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
60         <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
61         <property name="connectionFactory" ref="connectionFactory"/>
62         <property name="defaultDestinationName" value="${activemq.queueName}"/>
63     </bean>
64
65     <!-- 配置自定义监听:MessageListener -->
66     <bean id="msgQueueMessageListener" class="com.schooling.activemq.consumer.MsgQueueMessageListener"/>
67
68     <!-- 将连接工厂、目标对了、自定义监听注入jms模板 -->
69     <bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
70         <property name="connectionFactory" ref="connectionFactory"/>
71         <property name="destination" ref="msgQueue"/>
72         <property name="messageListener" ref="msgQueueMessageListener"/>
73         <!--应答模式是 INDIVIDUAL_ACKNOWLEDGE-->
74         <property name="sessionAcknowledgeMode" value="4"/>
75     </bean>
76
77 </beans>

二、生产者

 1 @Service("activeMQProducer")
 2 public class ActiveMQProducer {
 3
 4     private JmsTemplate jmsTemplate;
 5
 6     public JmsTemplate getJmsTemplate() {
 7         return jmsTemplate;
 8     }
 9
10     @Autowired
11     public void setJmsTemplate(JmsTemplate jmsTemplate) {
12         this.jmsTemplate = jmsTemplate;
13     }
14
15     public void sendMessage(final String info) {
16         jmsTemplate.send(new MessageCreator() {
17             public Message createMessage(Session session) throws JMSException {
18                 return session.createTextMessage(info);
19             }
20         });
21     }
22 }

三、消费者(监听模式)

 1 public class MsgQueueMessageListener implements SessionAwareMessageListener<Message> {
 2
 3     @Override
 4     public void onMessage(Message message, Session session) throws JMSException {
 5
 6         if (message instanceof TextMessage) {
 7
 8             String msg = ((TextMessage) message).getText();
 9
10             System.out.println("============================================================");
11             System.out.println("消费者收到的消息:" + msg);
12             System.out.println("============================================================");
13
14             try {
15                 if ("我是队列消息002".equals(msg)) {
16                     throw new RuntimeException("故意抛出的异常");
17                 }
18                 // 只要被确认后  就会出队,接受失败没有确认成功,会在原队列里面
19                 message.acknowledge();
20             } catch (Exception e) {
21                 // 此不可省略 重发信息使用
22                 session.recover();
23             }
24         }
25     }
26 }

四、测试方法

1 @Test
2 public void send() {
3     for (int i = 1; i < 5; i++) {
4         this.activeMQProducer.sendMessage("我是队列消息00" + i);
5     }
6     while (true) {}
7 }

五、测试结果

六、测试小结

“我是队列消息002”由于异常,未接收成功。在重发2次都失败的情况下被发送到“死信队列”。其他4条信息都接收成功。

原文地址:https://www.cnblogs.com/sjshare/p/8962340.html

时间: 2024-11-04 06:45:03

ActiveMQ 重发机制与确认机制 实践的相关文章

RabbitMQ发送端事务管理 —— 事务机制 和 确认机制

一.AMQP提供 事务机制,比较消耗性能 try { channel.txSelect(); channel.basicPublish(EXCHANGE_NAME, "queue22", true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); channel.addReturnListener(new ReturnListener() { public void handleReturn(int arg0, Str

RabbitMQ消息发布和消费的确认机制

前言 新公司项目使用的消息队列是RabbitMQ,之前其实没有在实际项目上用过RabbitMQ,所以对它的了解都谈不上入门.趁着周末休息的时间也猛补习了一波,写了两个窗体应用,一个消息发布端和消息消费端.园子里解释RabbitMQ基础的很多了,这里就不对RabbitMQ的基础再做叙述了,来点实际工作中一定会碰到的问题和解决的方案. RabbitMQ 消息发布确认机制 默认情况下消息发布端执行BasicPublish方法后,消息是否到达指定的队列的结果发布端是未知的.BasicPublish方法的

activemq的消息确认机制ACK

一.简介 消息消费者有没有接收到消息,需要有一种机制让消息提供者知道,这个机制就是消息确认机制. ACK(Acknowledgement)即确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符.表示发来的数据已确认接收无误. 二.ACK_MODE有几类 我们在开发JMS应用程序的时候,会经常使用到上述ACK_MODE,其中"INDIVIDUAL_ACKNOWLEDGE "只有ActiveMQ支持,当然开发者也可以使用它. ACK_MODE描述了Consumer与broker确认

ActiveMQ的学习(三)(ActiveMQ的消息事务和消息的确认机制)

ActiveMQ的消息事务 消息事务,是保证消息传递原子性的一个重要特性,和JDBC的事务特征类似. 一个事务性发送,其中一组消息要么能够全部保证到达服务器,要么都不到达服务器.生产者,消费者与消息服务器都支持事务性.ActiveMQ得事务主要偏向在生产者得应用. ActiveMQ消息事务流程图: 原生jms事务发送(生产者的事务发送) 不加事务得情况:(程序没有错误,10条消息会到达mq中) 不加事务得情况:(程序有错误,结果是发送成功3条,其余不成功---因为没有加事务) 加事务得情况:(程

activemq 消息阻塞优化和消息确认机制优化

一.消息阻塞优化 1.activemq消费者在从待消费队列中获取消息是会先进行预读取,默认是1000条(prefetch=1000).这样很容易造成消息积压. 2.可以通过设置prefetch的默认值来调整预读取条数,java代码如下 //设置预读取为1ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();p.setQueuePrefetch(1);//创建一个链接工厂connectionFactory = new ActiveMQCon

ActiveMQ讯息传送机制以及ACK机制

http://blog.csdn.net/lulongzhou_llz/article/details/42270113 ActiveMQ消息传送机制以及ACK机制详解 AcitveMQ是作为一种消息存储和分发组件,涉及到client与broker端数据交互的方方面面,它不仅要担保消息的存储安全性,还要提供额外的手段来确保消息的分发是可靠的. 一. ActiveMQ消息传送机制 Producer客户端使用来发送消息的, Consumer客户端用来消费消息:它们的协同中心就是ActiveMQ br

RabbitMQ之消息确认机制(事务+Confirm)

概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎

RabbitMQ (十二) 消息确认机制 - 发布者确认

消费者确认解决的问题是确认消息是否被消费者"成功消费". 它有个前提条件,那就是生产者发布的消息已经"成功"发送出去了. 因此还需要一个机制来告诉生产者,你发送的消息真的"成功"发送了. 在标准的AMQP 0-9-1,保证消息不会丢失的唯一方法是使用事务:在通道上开启事务,发布消息,提交事务.但是事务是非常重量级的,它使得RabbitMQ的吞吐量降低250倍.为了解决这个问题,RabbitMQ 引入了 发布者确认(Publisher Confir

8、RabbitMQ-消息的确认机制(生产者)

RabbitMQ 之消息确认机制(事务+Confirm) https://blog.csdn.net/u013256816/article/details/55515234 概述: 在 Rabbitmq 中我们可以通过持久化来解决因为服务器异常而导致丢失的问题 除此之外我们还会遇到一个问题:生产者将消息发送出去之后,消息到底有没有正 确到达 Rabbit 服务器呢?如果不错得数处理,我们是不知道的,(即 Rabbit 服务器 不会反馈任何消息给生产者),也就是默认的情况下是不知道消息有没有正确到