学习ActiveMQ(六):JMS消息的确认与重发机制

  当我们发送消息的时候,会出现发送失败的情况,此时我们需要用到activemq为我们提供了消息重发机制,进行消息的重新发送。那么我们怎么知道消息有没有发送失败呢?activemq还有消息确认机制,消费者在接收到消息的时候可以进行确认。本节将确认机制和重发机制一起在原有的代码中学习。

消息确认机制有四种:定义于在session对象中

AUTO_ACKNOWLEDGE= 1 :自动确认

CLIENT_ACKNOWLEDGE= 2:客户端手动确认

UPS_OK_ACKNOWLEDGE= 3: 自动批量确认

SESSION_TRANSACTED= 0:事务提交并确认

但是在activemq补充了一个自定义的ACK模式:

INDIVIDUAL_ACKNOWLEDGE= 4:单条消息确认

首先在配置文件中定义重发机制ReDelivery:设置重发两次

 <!-- 定义ReDelivery(重发机制)机制 ,重发时间间隔是100毫秒,最大重发次数是3次 -->
    <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
        <!--是否在每次尝试重新发送失败后,增长这个等待时间 -->
        <property name="useExponentialBackOff" value="true"/>
        <!--重发次数,默认为6次-->
        <property name="maximumRedeliveries" value="2"/>
        <!--重发时间间隔,默认为1秒 -->
        <property name="initialRedeliveryDelay" value="1000"/>
        <!--第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value -->
        <property name="backOffMultiplier" value="2"/>
        <!--最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,
        第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 -->
        <property name="maximumRedeliveryDelay" value="1000"/>
    </bean>

在工厂中引用重发机制:

 <!--PooledConnectionFactory对session和消息producer的缓存机制而带来的性能提升-->
    <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
          destroy-method="stop">
        <property name="connectionFactory">
            <!--连接mq的连接工厂-->
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL">
                    <value>tcp://127.0.0.1:61616</value>
                </property>
                <!-- 引用重发机制 -->
                <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>

在监听容器中新增配置消息的确认机制:

 <!--配置 消息监听容器-->
    <bean id="jmsContainer" class=" org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="queueDestination"/>
        <property name="messageListener" ref="consumerMessageListener"/>
        <!--应答模式是 INDIVIDUAL_ACKNOWLEDGE-->
        <!--AUTO_ACKNOWLEDGE = 1    自动确认
        CLIENT_ACKNOWLEDGE = 2    客户端手动确认
        DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
        SESSION_TRANSACTED = 0    事务提交并确认
        INDIVIDUAL_ACKNOWLEDGE = 4    单条消息确认-->
        <property name="sessionAcknowledgeMode" value="4"/>
    </bean>

发送者无需改动,正常发送即可,我们在消费者中进行改动,在消息接收到的时候调用确认方法(上面配置了4单挑消息确认),然后再接收消息做点手脚让其报错不能确认使其触发重发机制:

public class ConsumerMessageListener implements SessionAwareMessageListener<Message> {//消息确认需要session,需要实现SessionAwareMessageListener

    @Override
    public void onMessage(Message message, Session session)  throws JMSException {
        if (message instanceof TextMessage){
            String msg = ((TextMessage) message).getText();

            System.out.println("------------------------------------------");
            System.out.println("消费者收到的消息:" + msg);
            System.out.println("------------------------------------------");

            try {
                if ("test2".equals(msg)) {
                    throw new RuntimeException("故意抛出的异常");
                }
                // 确认消息。只要被确认后  就会出队,接受失败没有确认成功,会在原队列里面
                message.acknowledge();
            } catch (Exception e) {
                // 此不可省略 重发信息使用,如果不写此方法,将不会实现重发操作。失败的消息将会一直在队列中,因为没有进行消息确认。
                // 下次还会监听到这条消息。效果将会是:第一次接受一个消息2。第二次接受2个,依次累加
                session.recover();
            }
        }
}

发送4条消息:

  //发送字符串
        for (int i = 0; i < 4; i++) {

            service.sendMessage("test" + i);
        }

结果

由结果可见,test2由于抛出异常,未能进行消息确认,所有重发了两次,共三次。

代码地址:https://github.com/MrLiu1227/ActiveMQ

原文地址:https://www.cnblogs.com/liuyuan1227/p/10776189.html

时间: 2024-10-09 11:26:59

学习ActiveMQ(六):JMS消息的确认与重发机制的相关文章

ActiveMQ学习笔记(六)——JMS消息类型

1.前言 ActiveMQ学习笔记(四)--通过ActiveMQ收发消息http://my.oschina.net/xiaoxishan/blog/380446 和ActiveMQ学习笔记(五)--使用Spring JMS收发消息http://my.oschina.net/xiaoxishan/blog/381209   中,发送和接受的消息类型都是TextMessage,即文本消息(如下面的代码所示).显然消息类型只有文本类型是不能满足要求的. //发送文本消息  session.create

AMQP 与ActiveMQ,JMS消息队列之间的比较

http://blog.csdn.net/kimmking/article/details/8253549 http://www.csdn123.com/html/mycsdn20140110/8f/8f42bb0680685c547107a0079e557686.html http://blog.sina.com.cn/s/blog_999d1f4c01010dpx.html

Oozie 使用ActiveMQ实现 JMS通知

一,介绍 提交给Oozie的作业,作业在运行过程中的状态会发生变化如:执行成功了,或者失败了……Oozie能够监控这些作业状态的改变并且将这些消息发送到JMS消息服务器.这里,使用ActiveMQ作为JMS消息服务器. Oozie supports publishing notifications to a JMS Provider for job status changes and SLA met and miss events. For Oozie to send/receive mess

ActiveMQ的学习(三)(ActiveMQ的消息事务和消息的确认机制)

ActiveMQ的消息事务 消息事务,是保证消息传递原子性的一个重要特性,和JDBC的事务特征类似. 一个事务性发送,其中一组消息要么能够全部保证到达服务器,要么都不到达服务器.生产者,消费者与消息服务器都支持事务性.ActiveMQ得事务主要偏向在生产者得应用. ActiveMQ消息事务流程图: 原生jms事务发送(生产者的事务发送) 不加事务得情况:(程序没有错误,10条消息会到达mq中) 不加事务得情况:(程序有错误,结果是发送成功3条,其余不成功---因为没有加事务) 加事务得情况:(程

ActiveMQ(16):Message Dispatch的分发策略、消息批量确认和生产者流量控制

一.分发策略(Dispatch Policies) 1.1 严格顺序分发策略(Strict Order Dispatch Policy) 通常ActiveMQ会保证topic consumer以相同的顺序接收来自同一个producer的消息,但有时候也需要保证不同的topic consumer以 相同的顺序接收消息,然而,由于多线程和异步处理,不同的topic consumer可能会以不同的顺序接收来自不同producer的消息. Strict order dispatch policy会保证每

JMS消息持久化,将ActiveMQ消息持久化到mySql数据库中

ActiveMQ5.8.0版本采用kahadb作为默认的消息持久化方式.使用默认的持久化机制,我们不容易直接看到消息究竟是如何持久的.ActiveMQ提供的JDBC持久化机制,能够将持久化信息存储到数据库.通过查看数据库中ActiveMQ生成的表结构和存储的数据,能够帮助我们更好的了解消息的持久化机制.现在介绍如何配置activemq,将数据持久化到mysql中. 1.配置activeMQ需要的mySql数据源 为了能够使用JDBC访问mysql数据库,显然必须要配置消息服务器的数据库源.在ac

JMS消息队列ActiveMQ(点对点模式)

生产者(producer)->消息队列(message queue) package com.java1234.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session

JMS学习(三)JMS 消息结构之属性及消息体详解

一.前言 通过上一篇的学习我们知道了消息分为三个部分,即消息头,属性及消息体,并对消息头的十个属性进行了详细的介绍,本文再对消息属性及消息体进行详细的介绍. 二.属性介绍 消息属性的主要作用是可以对头信息进行一个额外的补充,毕竟消息头信息一是有限,二是很多不能由应用程序设定.通常,消息属性可以用在消息选择器的表达式里,结合起来实现对消息的过滤. 消息属性的值只能是基本的类型,或者这些基本类型对应的包装类型.也就是说,不能将一个自定义的对象作为属性值.通常情况下,如果能够放在body里的内容,就不

JMS消息队列ActiveMQ(发布/订阅模式)

消费者1(Consumer)--订阅(subcribe)-->主题(Topic) package com.java1234.activemq2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.