activeMq-JMS消息可靠性机制-4

消息接收确认

  JMS消息只有在被确认之后,才认为已经被成功地消费了。
  消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。

  //参数1:是否启用事务(false表示不开启事务)   参数2:接收模式(一般设置为自动接收)
  Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

  在事务性会话中,当一个事务被提交的时候(session.commit() ),确认自动发生。

  在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。
    该参数有以下四个可选值:
      Session.AUTO_ACKNOWLEDGE:
            当客户成功的从receive方法返回的时候,或者从
            MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。

      Session.CLIENT_ACKNOWLEDGE:
            客户通过调用消息的acknowledge方法确认消
            息。需要注意的是,在这种模式中,确认是在会话层上进行,确认一个被消费的消息
            将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10 个消
            息,然后确认第5 个消息,那么所有10 个消息都被确认。            

          开发者需要需要关注几个方法:
            1) message.acknowledge(),
            2) ActiveMQMessageConsumer.acknowledege(),
            3) ActiveMQSession.acknowledge();
          其1)和3)是等效的,将当前session中所有consumer中尚未ACK的消息都一起确认,
          2)只会对当前consumer中那些尚未确认的消息进行确认。
          通常会在基于Group(消息分组)情况下会使用CLIENT_ACKNOWLEDGE,
          我们将在一个group的消息序列接受完毕之后确认消息(组);不过当你认为消息很重要,
          只有当消息被正确处理之后才能确认时,也可以使用此模式 。         

          Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);          Queue queue = session.createQueue("queue");          MessageConsumer consumer = session.createConsumer(queue);          int i = 0;          while(i < 3){              TextMessage msg = (TextMessage)consumer.receive();              if(i == 2){               // 如果不确认签收,消息一直存在,当再次启动客户端会再次接收到消息                  msg.acknowledge();// 确认签收              }              i++;          }

      Session.SESSION_TRANSACTED:
            用session.commit()进行签收 ,要么全部正常确认,要么全部redelivery。
            这种严谨性,通常在基于GROUP(消息分组)或者其他场景下特别适合。

      Session.DUPS_ACKNOWLEDGE:
            该选择只是会话迟钝的确认消息的提交。如果JMS
            provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS
            provider 必须把消息头的JMSRedelivered字段设置为true

消息持久性     

JMS 支持以下两种消息提交模式:
  
PERSISTENT:只是JMS provider持久保存消息,以保证消息不会因为JMS provider的失败而丢失
  NON_PERSISTENT:不要求JMS provider持久保存消息

  // producer.setDeliveryMode(DeliveryMode.PERSISTENT);将消息传递特性置为持久化  producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);// 非持久化

 消息优先级

  可以使用消息优先级来指示JMS provider首先提交紧急的消息。优先级分
  10个级别,从0(最低)到9(最高)。如果不指定优先级,默认级别是4。需要
  注意的是,JMS provider并不一定保证按照优先级的顺序提交消息

  producer.setPriority(int i)

消息过期

  可以设置消息在一定时间后过期,默认是永不过期

  producer.setTimeToLive(Long aliveTime);

  注意timeToLive属性只会在DisableMessageTimestamp=false(禁用消息时间戳)的情况下才有意义。

消息的临时目的地

  可以通过会话上的createTemporaryQueue 方法和createTemporaryTopic
  方法来创建临时目的地。它们的存在时间只限于创建它们的连接所保持的时间。
  只有创建该临时目的地的连接上的消息消费者才能够从临时目的地中提取消息

持久订阅

    首先消息生产者必须使用PERSISTENT提交消息。客户可以通过会话上的
  createDurableSubscriber方法来创建一个持久订阅,该方法的第一个参数必须
  是一个topic。第二个参数是订阅的名称。
    JMS provider会存储发布到持久订阅对应的topic上的消息。如果最初创建
  持久订阅的客户或者任何其它客户,使用相同的连接工厂和连接的客户ID,相同
  的主题和相同的订阅名,再次调用会话上的createDurableSubscriber方法,那
  么该持久订阅就会被激活。JMS provider会向客户发送客户处于非激活状态时所
  发布的消息。
    持久订阅在某个时刻只能有一个激活的订阅者。持久订阅在创建之后会一
  直保留,直到应用程序调用会话上的unsubscribe方法。

本地事务

    在一个JMS客户端,可以使用本地事务来组合消息的发送和接收。JMS
  Session接口提供了commit和rollback方法。事务提交意味着生产的所有消息被
  发送,消费的所有消息被确认;事务回滚意味着生产的所有消息被销毁,消费的
  所有消息被恢复并重新提交,除非它们已经过期。
    事务性的会话总是牵涉到事务处理中,commit或rollback方法一旦被调
  用,一个事务就结束了,而另一个事务被开始。关闭事务性会话将回滚其中的事务。
    需要注意的是,如果使用请求/回复机制,即发送一个消息,同时希望在同
  一个事务中等待接收该消息的回复,那么程序将被挂起,因为知道事务提交,发
  送操作才会真正执行。
    需要注意的还有一个,消息的生产和消费不能包含在同一个事务中

JMS的PTP模型

    JMS PTP(Point-to-Point)模型定义了客户端如何向队列发送消息,从队列接收
  消息,以及浏览队列中的消息。
    PTP模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队
  列的存在使得消息的异步传输成为可能。和邮件系统中的邮箱一样,队列可以包
  含各种消息,JMS Provider 提供工具管理队列的创建、删除。
  PTP的一些特点
    1:如果在Session 关闭时,有一些消息已经被收到,但还没有被签收
      (acknowledged),那么,当消费者下次连接到相同的队列时,这些消息还会被再
      次接收
    2:如果用户在receive 方法中设定了消息选择条件,那么不符合条件的消息会留在
      队列中,不会被接收到
    3:队列可以长久地保存消息直到消费者收到消息。消费者不需要因为担心消息会丢
      失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势

JMS的Pub/Sub模型

    JMS Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作topic
  主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者
  (subscribe) 从主题订阅消息。主题使得消息订阅者和消息发布者保持互相独立,不需要
  接触即可保证消息的传送。
  Pub/Sub的一些特点:
    1:消息订阅分为非持久订阅和持久订阅
      非持久订阅只有当客户端处于激活状态,也就是和JMS Provider保持连接状态才能
        收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会
        丢失,永远不会收到。
      持久订阅时,客户端向JMS 注册一个识别自己身份的ID,当这个客户端处于离线
        时,JMS Provider会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS
        Provider时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息。
    2:如果用户在receive 方法中设定了消息选择条件,那么不符合条件的消息不会被接收
    3:非持久订阅状态下,不能恢复或重新派送一个未签收的消息。只有持久订阅才能恢复或重
      新派送一个未签收的消息。
    4:当所有的消息必须被接收,则用持久订阅。当丢失消息能够被容忍,则用非持久订阅

 非持久的Topic消息示例

/*对于非持久的Topic消息的发送
基本跟前面发送队列信息是一样的,只是把创建Destination的地方,由创
建队列替换成创建Topic*/
Destination destination = session.createTopic("MyTopic");
/*对于非持久的Topic消息的接收
1:必须要接收方在线,然后客户端再发送信息,接收方才能接收到消息
2:同样把创建Destination的地方,由创建队列替换成创建Topic*/
Destination destination = session.createTopic("MyTopic");
/*3:由于不知道客户端发送多少信息,因此改成while循环的方式了*/
Message message = consumer.receive();
    while(message!=null) {
    TextMessage txtMsg = (TextMessage)message;
    System.out.println("收到消 息:" + txtMsg.getText());
    message = consumer.receive(1000L);
}

 持久的Topic消息示例

消息的发送

ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://192.168.1.106:61616");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic("MyTopic");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for(int i=0; i<2; i++) {
    TextMessage message = session.createTextMessage("messagedd--"+i);
    Thread.sleep(1000);
    //通过消息生产者发出消息
    producer.send(message);
}
session.commit();
session.close();
connection.close();
1:要用持久化订阅,发送消息者要用 DeliveryMode.PERSISTENT 模式发现,在连接之前设定
2:一定要设置完成后,再start 这个 connection

消息的接收

ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://192.168.1.106:61616");
Connection connection = cf.createConnection();
connection.setClientID("cc1");
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Topic destination = session.createTopic("MyTopic");
TopicSubscriber ts = session.createDurableSubscriber(destination, "T1");
connection.start();
Message message = ts.receive();
while(message!=null) {
    TextMessage txtMsg = (TextMessage)message;
    session.commit();
    System.out.println("收到消 息:" + txtMsg.getText());
    message = ts.receive(1000L);
}
session.close();
connection.close();
1:需要在连接上设置消费者id,用来识别消费者
2:需要创建TopicSubscriber来订阅
3:要设置好了过后再start 这个 connection
4:一定要先运行一次,等于向消息服务中间件注册这个消费者,然后再运行客户端发送信息,这个时候,
无论消费者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收过的消息都接收下来。

参考:

  Producer特性详解:http://shift-alt-ctrl.iteye.com/blog/2034440

   

原文地址:https://www.cnblogs.com/for-what/p/9689153.html

时间: 2024-10-10 11:21:08

activeMq-JMS消息可靠性机制-4的相关文章

ActiveMQ的消息持久化机制

为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制. ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的. 就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件.内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试. 消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去. 1. J

ActiveMQ(03):JMS的可靠性机制

一.消息接收确认 JMS消息只有在被确认之后,才认为已经被成功地消费了.消息的成功消费通常包含三个阶段:客户接收消息.客户处理消息和消息被确认. 事务相关 1.在事务性会话中,当一个事务被提交的时候,确认自动发生.     final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);     ....     session.commit(); 2.在非事务性会话中,消息何时被确

activemq的消息确认机制ACK

一.简介 消息消费者有没有接收到消息,需要有一种机制让消息提供者知道,这个机制就是消息确认机制. ACK(Acknowledgement)即确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符.表示发来的数据已确认接收无误. 二.ACK_MODE有几类 我们在开发JMS应用程序的时候,会经常使用到上述ACK_MODE,其中"INDIVIDUAL_ACKNOWLEDGE "只有ActiveMQ支持,当然开发者也可以使用它. ACK_MODE描述了Consumer与broker确认

JMS消息传输机制

JMS消息传送模型: 消息传送机制, 是基于拉取(pull)或者轮询(polling)的方式.  JMS具备两种"消息传送模型": P2P和Pub/sub. (1) P2P:点对点消息传送模型, 允许JMS客户端通过队列(queue)这个虚拟通道来同步或异步发送消息; 消息的生产者为Sender, 消费者为receiver.   receiver主动到队列中请求消息,而不是JMS提供者将消息推送到客户端;   主要原因是一个队列通道可能有多个receiver,每个receiver可能对

消息中间件--ActiveMQ&amp;JMS消息服务

### 消息中间件 ### ---------- **消息中间件** 1. 消息中间件的概述 2. 消息中间件的应用场景(查看大纲文档,了解消息队列的应用场景) * 异步处理 * 应用解耦 * 流量削峰 * 消息通信 ---------- ### JMS消息服务 ### ---------- **JMS的概述** 1. JMS消息服务的概述 2. JMS消息模型 * P2P模式 * Pub/Sub模式 3. 消息消费的方式 * 同步的方式---手动 * 异步的方式---listener监听 4.

学习ActiveMQ(六):JMS消息的确认与重发机制

当我们发送消息的时候,会出现发送失败的情况,此时我们需要用到activemq为我们提供了消息重发机制,进行消息的重新发送.那么我们怎么知道消息有没有发送失败呢?activemq还有消息确认机制,消费者在接收到消息的时候可以进行确认.本节将确认机制和重发机制一起在原有的代码中学习. 消息确认机制有四种:定义于在session对象中 AUTO_ACKNOWLEDGE= 1 :自动确认 CLIENT_ACKNOWLEDGE= 2:客户端手动确认 UPS_OK_ACKNOWLEDGE= 3: 自动批量确

ActiveMQ消息持久化机制

为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制. ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的. 就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件.内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试. 消息中心启动以后首先要检查制定的存储位置,如果有未发送成功的消息,则需要把消息发送出去. >>

ActiveMQ的学习(三)(ActiveMQ的消息事务和消息的确认机制)

ActiveMQ的消息事务 消息事务,是保证消息传递原子性的一个重要特性,和JDBC的事务特征类似. 一个事务性发送,其中一组消息要么能够全部保证到达服务器,要么都不到达服务器.生产者,消费者与消息服务器都支持事务性.ActiveMQ得事务主要偏向在生产者得应用. ActiveMQ消息事务流程图: 原生jms事务发送(生产者的事务发送) 不加事务得情况:(程序没有错误,10条消息会到达mq中) 不加事务得情况:(程序有错误,结果是发送成功3条,其余不成功---因为没有加事务) 加事务得情况:(程

ActiveMQ Topic消息重发

MQ学习系列: 消息队列概念与认知 ActiveMQ Topic消息重发 ActiveMQ Topic 消息重发 准备工作 windows下ActiveMQ的下载与启动 百度的教程:链接 ←这里包含基本的下载安装启动以及简单的配置账号 登录控制台主页:http://localhost:8161/admin/ 启动错误以及解决方案 activeMQ启动错误 BeanFactory not initialized https://blog.csdn.net/huang_sheng0527/artic