JMS消息的可靠性机制

ActiveMQ消息签收机制:

  客户端成功接收一条消息的标志是一条消息被签收,成功应答。

消息的签收请求分为两种:

  1.带事务的session

    如果session带有事务,并且事务成功提交,则消息被自动签收。如果事务回滚,则消息会被再次传送。

  2.不带事务的session

    不带事务的session的签收方式,取决于session的配置

    ActiveMQ支持以下三种模式:

      Seesion.AUTO_ACKNOWLEDGE:消息自动签收;

      Session.CLIENT_ACKNOWLEDGE:客户端调用acknowledge方法手动签收

        textMessage.acknowledge();//手动签收

      Session.DUPS_OK_ACKNOWLEDGE:不是必须签收,消息可能会重复发送。在第二次重新传送消息的时候,消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户端接收消息客户处理消息和消息被确认。在事务性会话中,当一个事务提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式。

带事务session的案例

  生产者

    必须在生产完数据之后手动提交session

package com.wn.ddd;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class Producter {
    public static void main(String[] args) throws JMSException {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
        // JMS 客户端到JMS Provider 的连接
        Connection connection = connectionFactory.createConnection();
        //启动连接
        connection.start();
        // Session: 一个发送或接收消息的线程    false:代表不带事务的session   AUTO_ACKNOWLEDGE:代表自动签收
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        // Destination :消息的目的地;消息发送给谁.
        // 获取session注意参数值my-queue是Query的名字
        Queue queue = session.createQueue("my-queue");
        // MessageProducer:创建消息生产者
        MessageProducer producer = session.createProducer(queue);
        // 设置不持久化  PERSISTENT:代表持久化  NON_PERSISTENT:代表不持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        // 发送消息
        for (int i = 1; i <= 5; i++) {
            sendMsg(session, producer, i);
        }
        System.out.println("发送成功!");
        session.commit();
        session.close();
        connection.close();
    }
    /**
     * 在指定的会话上,通过指定的消息生产者发出一条消息
     *
     * @param session
     *            消息会话
     * @param producer
     *            消息生产者
     */
    public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
        // 创建一条文本消息
        TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
        // 通过消息生产者发出消息
        producer.send(message);
    }
}

  消费者

    消费完数据之后必须手动提交session

package com.wn.ddd;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsReceiver {
    public static void main(String[] args) throws JMSException {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
        // JMS 客户端到JMS Provider 的连接
        Connection connection = connectionFactory.createConnection();
        connection.start();
        // Session: 一个发送或接收消息的线程  true:表单开启事务  AUTO_ACKNOWLEDGE:代表自动签收
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        // Destination :消息的目的地;消息发送给谁.
        // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
        Queue queue = session.createQueue("my-queue");
        // 消费者,消息接收者
        MessageConsumer consumer = session.createConsumer(queue);
        while (true) {
            //receive():获取消息
            TextMessage message = (TextMessage) consumer.receive();
            if (null != message) {
                System.out.println("收到消息:" + message.getText());
                session.commit();
            } else {
                break;
            }
        }
        //回收资源
        session.close();
        connection.close();
    }
}

  测试

    1.测试在消费数据的时候不commit提交session

      1.1 启动生产者

        

        查看队列中的情况

        

      1.2 启动消费者

        这里不手动提交session

        

        控制台中可以正确接收到数据,但是队列中的数据就不是正确的

        

    2.正常提交(生产者和消费者都手动提交session)

        

不带事务session的案例

  1.自动签收

    

  2.手动签收

    生产者

package com.wn.ddd;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class Producter {
    public static void main(String[] args) throws JMSException {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
        // JMS 客户端到JMS Provider 的连接
        Connection connection = connectionFactory.createConnection();
        //启动连接
        connection.start();
        // Session: 一个发送或接收消息的线程    false:代表不带事务的session   AUTO_ACKNOWLEDGE:代表自动签收
       /* Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);*/
        Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
        // Destination :消息的目的地;消息发送给谁.
        // 获取session注意参数值my-queue是Query的名字
        Queue queue = session.createQueue("my-queue");
        // MessageProducer:创建消息生产者
        MessageProducer producer = session.createProducer(queue);
        // 设置不持久化  PERSISTENT:代表持久化  NON_PERSISTENT:代表不持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        // 发送消息
        for (int i = 1; i <= 5; i++) {
            sendMsg(session, producer, i);
        }
        System.out.println("发送成功!");
        session.close();
        connection.close();
    }
    /**
     * 在指定的会话上,通过指定的消息生产者发出一条消息
     *
     * @param session
     *            消息会话
     * @param producer
     *            消息生产者
     */
    public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
        // 创建一条文本消息
        TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
        // 通过消息生产者发出消息
        producer.send(message);
     message.acknowledge();  //手动提交
  } }

    消费者

package com.wn.ddd;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import sun.plugin2.os.windows.SECURITY_ATTRIBUTES;

import javax.jms.*;

public class JmsReceiver {
    public static void main(String[] args) throws JMSException {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
        // JMS 客户端到JMS Provider 的连接
        Connection connection = connectionFactory.createConnection();
        connection.start();
        // Session: 一个发送或接收消息的线程  true:表单开启事务  AUTO_ACKNOWLEDGE:代表自动签收
        /*Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);*/
        Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
        // Destination :消息的目的地;消息发送给谁.
        // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
        Queue queue = session.createQueue("my-queue");
        // 消费者,消息接收者
        MessageConsumer consumer = session.createConsumer(queue);
        while (true) {
            //receive():获取消息
            TextMessage message = (TextMessage) consumer.receive();
            if (null != message) {
                System.out.println("收到消息:" + message.getText());
                message.acknowledge();  //手动提交
            } else {
                break;
            }
        }
        //回收资源
        session.close();
        connection.close();
    }
}

    测试

      启动生产者

        

      启动消费者

        

        如果没有手动签收,则会出现和没有commit提交session的情况一样,都是已经消费完的消息,没有情况,造成多次消费。

原文地址:https://www.cnblogs.com/wnwn/p/12307608.html

时间: 2024-10-07 16:55:39

JMS消息的可靠性机制的相关文章

JMS消息传输机制

JMS消息传送模型: 消息传送机制, 是基于拉取(pull)或者轮询(polling)的方式.  JMS具备两种"消息传送模型": P2P和Pub/sub. (1) P2P:点对点消息传送模型, 允许JMS客户端通过队列(queue)这个虚拟通道来同步或异步发送消息; 消息的生产者为Sender, 消费者为receiver.   receiver主动到队列中请求消息,而不是JMS提供者将消息推送到客户端;   主要原因是一个队列通道可能有多个receiver,每个receiver可能对

ActiveMQ(03):JMS的可靠性机制

一.消息接收确认 JMS消息只有在被确认之后,才认为已经被成功地消费了.消息的成功消费通常包含三个阶段:客户接收消息.客户处理消息和消息被确认. 事务相关 1.在事务性会话中,当一个事务被提交的时候,确认自动发生.     final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);     ....     session.commit(); 2.在非事务性会话中,消息何时被确

activeMq-JMS消息可靠性机制-4

消息接收确认 JMS消息只有在被确认之后,才认为已经被成功地消费了. 消息的成功消费通常包含三个阶段:客户接收消息.客户处理消息和消息被确认. //参数1:是否启用事务(false表示不开启事务) 参数2:接收模式(一般设置为自动接收) Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 在事务性会话中,当一个事务被提交的时候(session.commit() ),确认自动发生.

学习ActiveMQ(六):JMS消息的确认与重发机制

当我们发送消息的时候,会出现发送失败的情况,此时我们需要用到activemq为我们提供了消息重发机制,进行消息的重新发送.那么我们怎么知道消息有没有发送失败呢?activemq还有消息确认机制,消费者在接收到消息的时候可以进行确认.本节将确认机制和重发机制一起在原有的代码中学习. 消息确认机制有四种:定义于在session对象中 AUTO_ACKNOWLEDGE= 1 :自动确认 CLIENT_ACKNOWLEDGE= 2:客户端手动确认 UPS_OK_ACKNOWLEDGE= 3: 自动批量确

Storm消息可靠处理机制

在很多应用场景中,分布式系统的可靠性保障尤其重要.比如电商平台中,客户的购买请求需要可靠处理,不能因为节点故障等原因丢失请求:比如告警系统中,产生的核心告警必须及时完整的知会监控人员,不能因为网络故障而丢失数据. Storm消息可靠性保障是Storm核心特性之一,其中消息树的跟踪管理机制是Storm核心算法之一,本文将详细介绍Storm消息可靠处理机制.我们从Storm初探中的例子入手. 一.消息处理流程 1. Spout节点 (1) Spout接收到一个文本消息: msg1 刘备 关羽 张飞

JMS消息持久化,将ActiveMQ消息持久化到mySql数据库中

ActiveMQ5.8.0版本采用kahadb作为默认的消息持久化方式.使用默认的持久化机制,我们不容易直接看到消息究竟是如何持久的.ActiveMQ提供的JDBC持久化机制,能够将持久化信息存储到数据库.通过查看数据库中ActiveMQ生成的表结构和存储的数据,能够帮助我们更好的了解消息的持久化机制.现在介绍如何配置activemq,将数据持久化到mysql中. 1.配置activeMQ需要的mySql数据源 为了能够使用JDBC访问mysql数据库,显然必须要配置消息服务器的数据库源.在ac

Oozie 生成JMS消息并向 JMS Provider发送消息过程分析

一,涉及到的工程 从官网下载源码,mvn 编译成 Eclipse工程文件: 对于JMS消息这一块,主要涉及到两个工程: oozie-core工程有问题的原因是还需要一些其他的依赖工程未导入: 二,Oozie 生成 JMS消息 主要涉及到的一些类 oozie-core 工程中的: oozie-client工程中的: 三,相关代码: 对于Oozie Server而言,它是消息的生产者.在oozie-default.xml/oozie-site.xml里面配置好连接参数,消息服务器....Oozie就

Storm 官方文档翻译 --- 消息的可靠性保障

消息的可靠性保障 Storm 能够保证每一个由 Spout 发送的消息都能够得到完整地处理.本文详细解释了 Storm 如何实现这种保障机制,以及作为用户如何使用好 Storm 的可靠性机制. 消息的“完整性处理”是什么意思 一个从 spout 中发送出的 tuple 会产生上千个基于它创建的 tuples.例如,有这样一个 word-count 拓扑: TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sent

JMS消息服务模型

JMS--仅仅是一种规范,一种接口规约,一种编程模型.类似的JPA,JSR等 场景: 1.多个系统之间交互,实现可以采取RPC,但是交互复杂,基本就是点对点的方式 2.其实交互就是消息,而JMS就是消息规范,支持事务机制(保证安全)--不就是类似于RDBMS吗,存储消息,转存发送 3.大家想想队列的机制(集合存储----队列存储---消息存储---消息服务器---数据库服务器----分布式存储------分布式文件系统) 解决办法: 多个系统采用消息交互,形成CS模型交互(集中式结构),当然还有