消息接收确认
JMS消息只有在被确认之后,才认为已经被成功地消费了。
消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。
//参数1:是否启用事务(false表示不开启事务) 参数2:接收模式(一般设置为自动接收) Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
在事务性会话中,当一个事务被提交的时候(session.commit() ),确认自动发生。
在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。
该参数有以下四个可选值:
Session.AUTO_ACKNOWLEDGE:
当客户成功的从receive方法返回的时候,或者从
MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
Session.CLIENT_ACKNOWLEDGE:
客户通过调用消息的acknowledge方法确认消
息。需要注意的是,在这种模式中,确认是在会话层上进行,确认一个被消费的消息
将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10 个消
息,然后确认第5 个消息,那么所有10 个消息都被确认。
开发者需要需要关注几个方法:
1) message.acknowledge(),
2) ActiveMQMessageConsumer.acknowledege(),
3) ActiveMQSession.acknowledge();
其1)和3)是等效的,将当前session中所有consumer中尚未ACK的消息都一起确认,
2)只会对当前consumer中那些尚未确认的消息进行确认。
通常会在基于Group(消息分组)情况下会使用CLIENT_ACKNOWLEDGE,
我们将在一个group的消息序列接受完毕之后确认消息(组);不过当你认为消息很重要,
只有当消息被正确处理之后才能确认时,也可以使用此模式 。
Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE); Queue queue = session.createQueue("queue"); MessageConsumer consumer = session.createConsumer(queue); int i = 0; while(i < 3){ TextMessage msg = (TextMessage)consumer.receive(); if(i == 2){ // 如果不确认签收,消息一直存在,当再次启动客户端会再次接收到消息 msg.acknowledge();// 确认签收 } i++; }
Session.SESSION_TRANSACTED:
用session.commit()进行签收 ,要么全部正常确认,要么全部redelivery。
这种严谨性,通常在基于GROUP(消息分组)或者其他场景下特别适合。
Session.DUPS_ACKNOWLEDGE:
该选择只是会话迟钝的确认消息的提交。如果JMS
provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS
provider 必须把消息头的JMSRedelivered字段设置为true
消息持久性
JMS 支持以下两种消息提交模式:
PERSISTENT:只是JMS provider持久保存消息,以保证消息不会因为JMS provider的失败而丢失
NON_PERSISTENT:不要求JMS provider持久保存消息
// producer.setDeliveryMode(DeliveryMode.PERSISTENT);将消息传递特性置为持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);// 非持久化
消息优先级
可以使用消息优先级来指示JMS provider首先提交紧急的消息。优先级分
10个级别,从0(最低)到9(最高)。如果不指定优先级,默认级别是4。需要
注意的是,JMS provider并不一定保证按照优先级的顺序提交消息
producer.setPriority(int i)
消息过期
可以设置消息在一定时间后过期,默认是永不过期
producer.setTimeToLive(Long aliveTime);
注意timeToLive属性只会在DisableMessageTimestamp=false(禁用消息时间戳)的情况下才有意义。
消息的临时目的地
可以通过会话上的createTemporaryQueue 方法和createTemporaryTopic
方法来创建临时目的地。它们的存在时间只限于创建它们的连接所保持的时间。
只有创建该临时目的地的连接上的消息消费者才能够从临时目的地中提取消息
持久订阅
首先消息生产者必须使用PERSISTENT提交消息。客户可以通过会话上的
createDurableSubscriber方法来创建一个持久订阅,该方法的第一个参数必须
是一个topic。第二个参数是订阅的名称。
JMS provider会存储发布到持久订阅对应的topic上的消息。如果最初创建
持久订阅的客户或者任何其它客户,使用相同的连接工厂和连接的客户ID,相同
的主题和相同的订阅名,再次调用会话上的createDurableSubscriber方法,那
么该持久订阅就会被激活。JMS provider会向客户发送客户处于非激活状态时所
发布的消息。
持久订阅在某个时刻只能有一个激活的订阅者。持久订阅在创建之后会一
直保留,直到应用程序调用会话上的unsubscribe方法。
本地事务
在一个JMS客户端,可以使用本地事务来组合消息的发送和接收。JMS
Session接口提供了commit和rollback方法。事务提交意味着生产的所有消息被
发送,消费的所有消息被确认;事务回滚意味着生产的所有消息被销毁,消费的
所有消息被恢复并重新提交,除非它们已经过期。
事务性的会话总是牵涉到事务处理中,commit或rollback方法一旦被调
用,一个事务就结束了,而另一个事务被开始。关闭事务性会话将回滚其中的事务。
需要注意的是,如果使用请求/回复机制,即发送一个消息,同时希望在同
一个事务中等待接收该消息的回复,那么程序将被挂起,因为知道事务提交,发
送操作才会真正执行。
需要注意的还有一个,消息的生产和消费不能包含在同一个事务中。
JMS的PTP模型
JMS PTP(Point-to-Point)模型定义了客户端如何向队列发送消息,从队列接收
消息,以及浏览队列中的消息。
PTP模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队
列的存在使得消息的异步传输成为可能。和邮件系统中的邮箱一样,队列可以包
含各种消息,JMS Provider 提供工具管理队列的创建、删除。
PTP的一些特点:
1:如果在Session 关闭时,有一些消息已经被收到,但还没有被签收
(acknowledged),那么,当消费者下次连接到相同的队列时,这些消息还会被再
次接收
2:如果用户在receive 方法中设定了消息选择条件,那么不符合条件的消息会留在
队列中,不会被接收到
3:队列可以长久地保存消息直到消费者收到消息。消费者不需要因为担心消息会丢
失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势
JMS的Pub/Sub模型
JMS Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作topic
主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者
(subscribe) 从主题订阅消息。主题使得消息订阅者和消息发布者保持互相独立,不需要
接触即可保证消息的传送。
Pub/Sub的一些特点:
1:消息订阅分为非持久订阅和持久订阅
非持久订阅只有当客户端处于激活状态,也就是和JMS Provider保持连接状态才能
收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会
丢失,永远不会收到。
持久订阅时,客户端向JMS 注册一个识别自己身份的ID,当这个客户端处于离线
时,JMS Provider会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS
Provider时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息。
2:如果用户在receive 方法中设定了消息选择条件,那么不符合条件的消息不会被接收
3:非持久订阅状态下,不能恢复或重新派送一个未签收的消息。只有持久订阅才能恢复或重
新派送一个未签收的消息。
4:当所有的消息必须被接收,则用持久订阅。当丢失消息能够被容忍,则用非持久订阅
非持久的Topic消息示例
/*对于非持久的Topic消息的发送 基本跟前面发送队列信息是一样的,只是把创建Destination的地方,由创 建队列替换成创建Topic*/ Destination destination = session.createTopic("MyTopic"); /*对于非持久的Topic消息的接收 1:必须要接收方在线,然后客户端再发送信息,接收方才能接收到消息 2:同样把创建Destination的地方,由创建队列替换成创建Topic*/ Destination destination = session.createTopic("MyTopic"); /*3:由于不知道客户端发送多少信息,因此改成while循环的方式了*/ Message message = consumer.receive(); while(message!=null) { TextMessage txtMsg = (TextMessage)message; System.out.println("收到消 息:" + txtMsg.getText()); message = consumer.receive(1000L); }
持久的Topic消息示例
消息的发送
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.106:61616"); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic destination = session.createTopic("MyTopic"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); for(int i=0; i<2; i++) { TextMessage message = session.createTextMessage("messagedd--"+i); Thread.sleep(1000); //通过消息生产者发出消息 producer.send(message); } session.commit(); session.close(); connection.close(); 1:要用持久化订阅,发送消息者要用 DeliveryMode.PERSISTENT 模式发现,在连接之前设定 2:一定要设置完成后,再start 这个 connection
消息的接收
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://192.168.1.106:61616"); Connection connection = cf.createConnection(); connection.setClientID("cc1"); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Topic destination = session.createTopic("MyTopic"); TopicSubscriber ts = session.createDurableSubscriber(destination, "T1"); connection.start(); Message message = ts.receive(); while(message!=null) { TextMessage txtMsg = (TextMessage)message; session.commit(); System.out.println("收到消 息:" + txtMsg.getText()); message = ts.receive(1000L); } session.close(); connection.close(); 1:需要在连接上设置消费者id,用来识别消费者 2:需要创建TopicSubscriber来订阅 3:要设置好了过后再start 这个 connection 4:一定要先运行一次,等于向消息服务中间件注册这个消费者,然后再运行客户端发送信息,这个时候, 无论消费者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收过的消息都接收下来。
参考:
Producer特性详解:http://shift-alt-ctrl.iteye.com/blog/2034440
原文地址:https://www.cnblogs.com/for-what/p/9689153.html