JMS消息

1、消息可分为3部分:消息头、属性和有效负载

  • 消息头:用于标识消息、声明消息属性及提供路由信息的特殊字段组成。
  • 消息的属性区包含了和该消息有关的附加元数据,这个元数据由应用程序开发者进行设置,或者由JMS提供者进行设置。
  • 消息的有效负载:该消息包含的应用程序数据类型。


消息头:提供了描述谁创建了消息、何时创建及其数据有效长度等元数据,还包括消息的目的地(主题或队列)、消息应该如何确认等更多内容的路由信息。

消息目的地可以在创建发布者时指定

//在创建发布者时指定Destination(Topic或者Queue)
Queue queue = (Queue)ctx.lookup(queueName);
QueueSender queueSender = session.createSender(queue);
...
Topic topic = (Topic)ctx.lookup(topicName);
TopicPublisher topicPublisher = session.createPublisher(topic);

或者在发送消息时send()操作指定Destination:

QueueSender queueSender = session.createSender(null);
Message message = session.createMessage();

Queue queue = (Queue)jndi.lookup(queueName);
queueSender.send(queue,message);
...
TopicPublisher topicPublisher = session.createPublisher(null);
Message message = session.createMessage();

Topic topic = (Topic)jndi.lookup(topicName);
topicPublisher.publish(topic,message);

JMSDeliveryMode

在JMS中,传送模式有两种类型:持久模式和非持久模式。

  • 持久性模式:一条持久性消息应该传送“一次而且是仅仅一次”,这意味着如果JMS提供者出现故障,该消息并不会丢失;它将在服务器恢复正常后再次传送。
  • 非持久性模式:一条非持久性消息最多只会传送一次,这意味着如果JMS提供者出现故障,该消息可能会永久丢失,也可能是从未传送。

传送模式可以使用setDeliveryMode()方法来设置,这种方法在TopicPublisher和QueueSender消息生产者中都有定义。javax.jms.DeliveryMode定义了两个常数。用于声明传送模式:PERSISTENT和NON_PERSISTENT:

//发布/订阅
TopicPublisher topicPublisher = topicSession.createPublisher(topic);
topicPublisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

//点对点
QueueSender queueSender = queueSession.createSender(queue);
queueSender.setDeliveryMode(DeliveryMode.PERSISTENT);

一旦为消息生产者设置了传送模式,它就会应用于该生产者所传送的所有消息。传送模式可以使用setDeliveryMode()方法随时修改,后面的消息都会使用修改后的新消息。消息生产者的默认传送模式始终是PERSISTENT。

JMSMessageID

JMSMessageID是唯一的标识一条消息的String值。如何实现标识符的唯一性取决于厂商。它既可能只对消息传送服务器的安装来说是唯一的,也可能是所有方面都是唯一的。

JMSMessageID可以用于应用程序中消息需要被唯一索引的历史仓库(Repository)。

JMSMessageID也可以用于消息关联,它通过JMSCorrelationID消息头来实现。

消息提供者从JMS客户端接收消息后,会自动生成JMSMessageID。JMSMessageID必须以ID:开头,不过它的其余部分可以是任意的字符组合,只要对JMS提供者起到唯一标识消息的作用即可。如下是Progress的SonicMQ生成的JMSMessageID的一个例子:

//SonicMQ 生成JMSMessageID
ID:6c867f96:20001:DF59525514

如果JMS应用程序不需要唯一的消息ID,JMS客户端可以提示消息传送服务器:使用setDisableMessageID()方法不需要ID。注意到这个提示的厂商,因为不需要为每条消息生成唯一的ID,所以可以缩短消息的处理时间。如果不生成JMSMessageID,getJMSMessageID()方法会返回null:

//发布/订阅
TopicPublisher topicPublisher = topicSession.createPublisher(topic);
topicPublisher.setDisableMessageID(true);

//点对点
QueueSender queueSender = queueSession.createSender(topic);
queuSender.setDisableMessageID(true);

JMSTimestamp

调用发送操作时,消息生产者会自动设置JMSTimestamp。JMSTimestamp值是调用发送操作时的大致时间。有时候,消息并不会立即传送给消息服务器。消息延迟会有多种原因。包括如何配置消息生产者、是否为事务性会话,以及所用的确认模式等。send()操作返回时,消息对象会设置时间戳:

Message message = topicSession.createMessage();
topicPublisher.publish(message);
long time = message.getJMSTimestamp();

该时间戳是自动设置的,因此调用send()操作时,会忽略并丢弃JMS客户端显示设置的所有制。时间戳是以毫秒为测量单位的时间长度,从1970年1月1日午夜12时以来的UTC时间开始计算。

消息消费者可以使用时间戳,表示消息生产者传送消息的大致时间。在消息排序或者用于历史仓库时,时间戳非常有用。

JMSTimestamp是在发送操作期间设置的,而且它既可以通过客户端上的生产者进行本地计算而得,也可以从消息服务器获得。在第一种情况下,生产者计算时间戳,时间戳可能会因JMS客户端而异。这事因为,时间戳是从JMS客户端的本地系统时钟获得的,它可能与其他JMS客户端机器并不同步。对于使用同一JMS提供者的JMS客户端来说,从消息服务武器获得的时间戳具有更好的一致性,因此所有的时间都从同一俩预案获取:公共消息服务器。调用setDisableMessageTimestamp()方法,可能会禁用时间戳——至少暗示它们不需要时间戳,这种方法,TopicPublisher和QueueSender对象都可以使用:

//发布/订阅
TopicPublisher topicPublisher = topicSession.createPublisher(topic);
topicPublisher.setDisableMessageTimestamp(true);

//点对点
QueueSender queueSender = queueSession.createSender(topic);
queuSender.setDisableMessageTimestamp(true);

如果JMS提供者注意到禁用时间戳的提示,JMSTimestamp会被设置为0,表示没有设置时间戳,禁用时间戳会减轻JMS提供者使用消息服务器生成时间戳的工作量,同事还将消息的大小减小至少8个字节(一个long值的大小),从而减少了网络流量。禁用时间戳是一个可选项,这就是说,无论您需要与否,一些厂商都会设置时间戳。

JMSExpiration

message对象就像盒装牛奶一样,也有有效期。对于只和固定时间相关的消息,有效期就非常有用。消息生产者使用QueueSender或TopicPublisher的setTimeToLive()方法,以毫秒为单位为消息设置有效期,如下所示:

//发布/订阅
TopicPublisher topicPublisher = topicSession.createPublisher(topic);
//将生存时间设置为1小时(1000毫秒*60*60)
topicPublisher.setTimeToLive(3600000);

//点对点
QueueSender queueSender = queueSession.createSender(topic);
//将生存时间设置为2日(1000毫秒*60*60*48)
queuSender.setDisableMessageTimestamp(172800000);

默认情况下,timeToLive设置为零,表示消息永不过期。以零为参数调用setTimeToLive(),可以确保创建的消息没有有效期,消息有效期还可以使用消息生产者的send()或publish()方法来设置

//发布/订阅
//将生存时间设置为1小时(1000毫秒*60*60)
TopicPublisher topicPublisher = topicSession.createPublisher(topic);
topicPublisher.publish(message,DeliveryMode.PERSISTENT,5,3600000);//点对点
QueueSender queueSender = queueSession.createSender(topic);
//将生存时间设置为2日(1000毫秒*60*60*48)
queuSender.send(message,DeliveryMode.NON_PERSISTENT,5,172800000);

JMSExpiration日期自身的计算法方式为:

JMSExpiration = currenttime + timeToLive。

JMSExpiration是消息到期的日期和时间。编写的JMS客户端程序应该丢弃所有未被处理的到期消息,因为该消息的数据和事件已不再有效。消息提供者(服务器)也应该丢弃其队列和主题中所有未被传送的到期消息。即使是持久性消息,如果在传送之前已经到期,也应该丢弃。

JMSRedelivered

JMSRedelivered消息头表示是否将消息重新传送给消费者。如果消息已被重新传送,JMSRedelivered消息头的值就是true;如果没有重新传送,就是false。如果消费者确认传送失败,或者JMS提供者不确定消费者是否接收到消息,那么,这条消息就可以标识为重新发送。

向消费者传送消息时,消费者必须确认接收该消息。如果它没有确认,消息服务器可以尝试重新传送该消息。消费者可以自动或者手动确认消息,这取决于消费者的创建方式。使用AUTO_ACKNOWLEDGE或DUPS_OK_ACKNOWLEDGE确认模式创建的消费之,会自动通知消息服务器已经接收到消息。而使用CLIENT_ACKNOWLEDGE模式创建消费者,JMS客户端必须使用acknowledge()方法手动确认消息。

一般来说,消息的JMSRedelivered值为false时,消费者应该假定此前没有机会看到这条消息。如果重新传送标识为true,客户端此前可能已经接收到这条消息,因此它可能需要某些预防措施,而这些措施其他情况下并不需要。有多种情况可能会导致消息重新传送,而且JMS提供者在拿不住故障、错误条件和其他反常条件的情况下,可以令一条消息重新传送。

JMSPriority

传送消息时,消息生产者可能会给它分配一个优先级。消息服务器可以使用消息的优先级,按照顺序向消费者传送消息;高优先级消息的传送要先于低优先级消息。

消息的优先级包含在JMSPriority头中,它由JMS提供者自动设置。如果没有指定,消息优先级会设置为默认值4。消息的优先级可以有JMS客户端使用MessageProducer(而非Message对象的!)的setPriority()方法声明。下列代码显示了p2p和发布/订阅消息模型如何使用这种方法:

//p2p将消息优先级设置为9
QueueSender queueSender = QueueSession.createSender(someQueue);
queueSender.setPriority(9);

//发布/订阅将消息优先级设置为9
TopicPublisher topicPublisher = TopicSession.createPublisher(someTopic);
topicPublisher.setPriority(9);

一旦消息生产者建立了一个优先级(QueueSender或TopicPublisher),该优先级就会用于该生产者传送的所有消息,除非对它显示重写。在发送或者发布操作期间,可以重写特定消息的优先级。下列代码显示了如何在发送和发布期间重写消息的优先级。

//p2p为发送操作设置优先级
QueueSender queueSender = QueueSession.createSender(someQueue);
queueSender.send(message,DeliveryMode.PERSISTENT,3,0);

//发布/订阅为发送操作设置优先级
TopicPublisher topicPublisher = TopicSession.createPublisher(someTopic);
topicPublisher.publish(message,DeliveryMode.PERSISTENT,3,0);

消息优先级有两种基本类型:0~4级是普通优先级;5~9是加急优先级。消息服务器并不要求强制执行基于JMSPriority消息头的消息顺序,但是它们应该尝试在普通消息之前传送加急消息。

传送消息时会自动设置JMSPriority消息头。它可以由JMS客户端使用Message.getJMSPriority()方法读取,而取值函数方法则主要是发送消息时由消息服务器使用。

JMSReplyTo

有些情况下,消息生产者可能要求消费者对一条消息做出应答。JMSReplyTo消息头(如果存在的话)表示JMS消费者应答的目的地。JMSReplyTo消息头由JMS客户端显式设置,它的内容是javax.jms.Destination对象(Topic或Queue)。

有些情况下,JMS客户端想要消息消费者对JMS客户端建立的一个临时主题或者队列做出应答。下面给出的是发布/订阅JMS客户端的一个例子,它创建了一个临时主题,并使用其Topic对象标识符作为JMSReplyTo消息头:

TopicSession session =
connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
...
Topic tempTopic = session.createTemporaryTopic();
...
TextMessage message = session.createTextMessage();
message.setText(text);
message.setJMSReplyTo(tempTopic);
publisher.publish(message);

JMS消费者接收到一条包含JMSReplyTo目的地的消息时,可以使用该目的地来应答。JMS消费者并未被要求发出应答,不过在某些JMS应用程序中,会将客户端编程为这种方式。下面给出的是一个JMS消费者的例子,它使用已接收消息的JMSReplyTo消息头来发出应答。这时候,订阅者只要向发布者发回一个确认,表示已经接收到该消息:

Topic chatTopic = ... get topic from somewhere
...
//创建发布者,没有特定主题
TopicPublisher publisher = session.createPublisher(null);
...

public void onMessage(Message message){
try{
   TextMessage textMessage = (TextMessage)message;
   Topic replyToopic = (Topic)textMessage.getJMSReplyTo();
   TextMessage replyMessage = session.createTextMessge("Received Messge ...");
   publisher.publish(replyTopic,replyMessage);
}catch(JMSException jmse){
   jmse.printStackTrace();
}

}

消息生产者设置的JMSReplyTo目的地,可以使消息传送系统中的任何目的地。使用其他已经建立的主题或队列,会允许消息生产者为消息自身或为应答该消息指明路由选择优先权。通常,工作流应用程序会使用这种路由选择。在工作流应用程序中,一条消息代表由若干JMS客户端同时处理的某些任务——这可能会持续数天。例如,一个订单消息可能会由销售客户端首先处理,接着是存户客户端,然后是运运输客户端,最后是收款客户端。每个JMS客户端(销售、存货、运输或收款)完成订单数据处理后,可以使用JMSReplyTo地址向下一个环节传送消息。

JMSCorrelationID

JMSCorrelationID提供了一个消息头,用于将当前消息和前面的而一些消息或应用程序特定的ID关联起来。大多数情况下,JMSCorrelationID用于将一条消息标记为此前消息的应答。下面代码显示了如何设置JMSCorrelationID,并和JMSReplyTo、JMSMessageID结合使用,向一条消息发送应答:

public void onMessage(Message message){
    try{
        TextMessage textMessage = (TextMessage)message;
        Queue replyQueue = (Queue)textMessage.getJMSReplyTo();

        Message replyMessage = session.createMessage();
        replyMessage.setJMSCorrelationID(message.getJMSMessageID());
        sender.send(replyQueue,replyMessage);
    catch(JMSException jmse){
        jmse.printStackTrace();
    }
}

接收应答消息时,JMS客户端会将新消息的JMSCorrelationID与它所发送消息的对应JMSMessageID相匹配,以便了解哪一条消息接收到了应答。JMSCorrelationID可以使任意值,而不仅限于JMSMessageID。JMSCorrelationID消息头经常与应用程序特定的标识符一起使用。不过,JMSCorrelationID不必一定是JMSMessageID,尽管通常情况下是如此。如果决定使用自己的ID,注意不要以ID:作为应用程序特定的JMSCorrelationID的开头,这个前缀是为JMS提供者生成的ID保留的。

访问和修改JMSCorrelationID的方法有两种形式:String和AsBytes。基于String的消息头最常用,并且JMS提供者必须支持这种形式。基于字节数组的AsBytes方法是可选特性,并不要去JMS提供者必须支持。它用于将JMSCorrelationID设置为某些本地JMS提供者的关联ID:

Message message = queueSession.createMessage();
byte[] byteArray = ...set to some JMS specific byte array
...
message.setJMSCorrelationIDAsBytes(byteArray);
sender.send(message);

JMSType

JMSType是由JMS客户端设置的可选消息头。它的名称容易使人误解,因为它与正在发送的消息类型(ByteMessage、MapMessage等)无关。它主要是标识消息结构和有效负载的类型;仅有少数几家厂商支持这个消息头。



消息属性:就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露(expose)消息选择器在消息过滤时使用的数据。Message接口为读取和写入属性提供了若干个取值函数和赋值函数方法。消息的属性值可以使String、boolean、byte、double、int、long或float型。

消息属性有3种基本类型:应用程序特定的属性、JMS定义的属性和提供者特定的属性。应用程序属性由应用程序开发者定义并应用到Message对象上;JMS扩展和提供者特定的属性大多是由JMS提供者自动添加的附加消息头。

应用程序特定的属性

由应用程序开发者定义的所有属性都可以作为一个应用程序特定的属性。应用程序属性在消息传送之前进行设置。并不存在预先定义的应用程序属性,开发者可以自由定义能都满足他们需要的任何属性。例如,可以再聊天示例中,添加一个特定的属性,该属性标识了正在发送消息的用户:

TextMessage message = pubSession.createTextMessage();
message.setText(text);
message.setStringProperty("username",username);
publisher.publish(message);

作为一个应用程序的特定属性,username一旦离开Chat应用程序就变得毫无意义;它专门用于应用程序根据发布者身份对消息进行过滤。

属性值可以是boolean、byte、short、int、long、float、double或String类型。javax.jms.Message接口为每种类型的属性值都提供了取值函数和赋值函数方法。

一旦一条消息发布或发送之后,它就变成了只读(read-only)属性;消费者或生产者都无法修改它的属性。如果消费者试图设置某个属性,该方法就会抛出一个javax.jms.MessageNotWritableException。不够,通过调用clearProperties()方法,就可以修改消息的属性,该方法将删除一条消息的所有属性,以便能够添加新的的属性。

Message接口中的getPropertyNames()方法可以用于获取该消息所有属性的名称枚举(Enumeration)。接下来,这些名称就可供属性取值函数方法是用,以获取属性值。例如:

public void onMessage(Message message){

     Enumeration propertyNames = message.getPropertyName():
     while(proertyNames.hasMoreElements()){

        String name = (String)propertyName.nextElement();
        Object value=getObjectProperty(name);
        System.out.println("\nname"+" = "+value);
    }   

}

JMS定义的属性

JMS定义的属性具有和应用程序属性相同的特性,除了前者大多数在消息发送时由JMS提供者来设置之外。JMS定义的属性可以作为可选的JMS消息头;对于某些另有声明的例外(noted exception),各厂商可以分别选择不支持、部分支持或全部支持。下面是JMS定义的9个属性清单

  • JMSXUserID
  • JMSXAppID
  • JMSXProducerTXID
  • JMSXConsumerTXID
  • JMSXRcvTimestamp
  • JMSXDeliveryCount
  • JMSState
  • JMSGroupID
  • JMSGoupSeq

在这份清单中,只有JMSGroupID和JMSXGroupSeq需要所有JMS提供者的支持。这些可选属性用于聚合(group)信息。

请注意:在Message接口中,您将无法找到对应的setJMSX<PROPERTY>()和getJMSX<PROPERTY>()方法定义;在使用这些方法时,必须使用和应用程序特定属相相同的方式来设定它们:

message.setStringProperty("JMSXGroupID","ERF-001");
message.setIntPorperty("JMSXGroupSeq",3);

提供者特定的属性

每个JMS提供者都可以定义一组私有属性,这些属性可以有客户端或者提供者自动设置。提供者特定的属性必须以前缀JMS开头,后面紧接着是属性名称。提供者特定的属性,其作用就是支持厂商的私有属性。

2、会话和线程

Chat应用程序分别为发布者和订阅者设置了两个单独的会话:pubSession和subSession。其原因在于JMS强制实行的线程限制。按照JMS规范的规定,一个会话不能同时在一个以上的线程中运行。在我们的例子中,有两个控制线程是活动的(active):Chat应用程序默认的主线程和调用onMessage()处理程序(handler)的线程。调用onMessage()处理器的线程属于JMS提供者所有。

JMS消息

时间: 2024-09-29 15:54:43

JMS消息的相关文章

JMS消息服务器(二)——点对点消息传送模型

一.点对点模型概览 当你只需要将消息发布送给唯一的一个消息消费者是,就应该使用点对点模型.虽然可能或有多个消费者在队列中侦听统一消息,但是,只有一个且仅有一个消费者线程会接受到该消息. 在p2p模型中,生产者称为发送者,而消费者则称为接受者.点对点模型最重要的特性如下: 消息通过称为队列的一个虚拟通道来进行交换.队列是生产者发送消息的目的地和接受者消费消息的消息源. 每条消息通仅会传送给一个接受者.可能会有多个接受者在一个队列中侦听,但是每个队列中的消息只能被队列中的一个接受者消费. 消息存在先

JMS消息服务器——Message消息分析(1)

一.Message结构 Message消息是整个JMS规范最为重要的部分.一个JMS应用程序中的所有数据和事件都是使用消息进行通信的,同时JMS的其余部分也都在为消息传输服务.因此可以说,消息时一个系统的命脉所在. 一个Message对象有3个部分:消息头.消息属性,最后就是消息数据内容,它称为负载或消息体. 二.消息头 每条JMS消息都有一组标准的消息头.每个消息头都由一组取值函数和赋值函数所标识.这些方法名称紧随在术语setJMS**(),getJMS**()方法之后.如下图: JMS消息可

JMS消息传输机制

JMS消息传送模型: 消息传送机制, 是基于拉取(pull)或者轮询(polling)的方式.  JMS具备两种"消息传送模型": P2P和Pub/sub. (1) P2P:点对点消息传送模型, 允许JMS客户端通过队列(queue)这个虚拟通道来同步或异步发送消息; 消息的生产者为Sender, 消费者为receiver.   receiver主动到队列中请求消息,而不是JMS提供者将消息推送到客户端;   主要原因是一个队列通道可能有多个receiver,每个receiver可能对

JMS消息服务器——Message消息分析(2)

3 消息类型 JAVA消息服务定义了6种JMS提供者必须支持的Message接口类型.尽管JMS定义了Message接口,但它并未定义它们的实现方式.这就允许提供者以它们自己的方式实现和传送消息,同时为JMS应用程序开发者维护了一个兼容的标准接口.这6个消息接口是Message和它的5个子接口:TextMessage.StreamMessage.MapMessage.ObjectMessage和ByteMessage. 3.1 Message 最简单的消息类型是javax.jms.Message

Intellij IDEA 创建消息驱动Bean - 接收JMS消息

除了同步方式的调用之外,有时还需要异步调用,用来处理不需要即时处理的信息,例如短信.邮件等,这需要使用EJB中的独特组件——消息驱动Bean(Message-Driven Bean,MDB),它提供了Java消息服务JMS(Java Messaging Service)的处理能力,由消息驱动Bean来处理JMS消息.JMS的消息由客户端程序产生,并被发布到服务器的消息队列,消息驱动Bean随之检索消息并执行其内容.这种事件或者数据的通信就称为异步形式,客户端或者服务端Bean都无须依赖对方的直接

JavaEE(7) - JMS消息事务和异常

1. 使用事务性Session为消息增加事务(NetBeans创建java project: TxSession) MessageSender.java package lee; import javax.jms.*; import javax.naming.*; import java.util.Properties; public class MessageSender { public void sendMessage() throws NamingException, JMSExcept

JMS消息持久化,将ActiveMQ消息持久化到mySql数据库中

ActiveMQ5.8.0版本采用kahadb作为默认的消息持久化方式.使用默认的持久化机制,我们不容易直接看到消息究竟是如何持久的.ActiveMQ提供的JDBC持久化机制,能够将持久化信息存储到数据库.通过查看数据库中ActiveMQ生成的表结构和存储的数据,能够帮助我们更好的了解消息的持久化机制.现在介绍如何配置activemq,将数据持久化到mysql中. 1.配置activeMQ需要的mySql数据源 为了能够使用JDBC访问mysql数据库,显然必须要配置消息服务器的数据库源.在ac

Oozie 生成JMS消息并向 JMS Provider发送消息过程分析

一,涉及到的工程 从官网下载源码,mvn 编译成 Eclipse工程文件: 对于JMS消息这一块,主要涉及到两个工程: oozie-core工程有问题的原因是还需要一些其他的依赖工程未导入: 二,Oozie 生成 JMS消息 主要涉及到的一些类 oozie-core 工程中的: oozie-client工程中的: 三,相关代码: 对于Oozie Server而言,它是消息的生产者.在oozie-default.xml/oozie-site.xml里面配置好连接参数,消息服务器....Oozie就

JMS消息服务模型

JMS--仅仅是一种规范,一种接口规约,一种编程模型.类似的JPA,JSR等 场景: 1.多个系统之间交互,实现可以采取RPC,但是交互复杂,基本就是点对点的方式 2.其实交互就是消息,而JMS就是消息规范,支持事务机制(保证安全)--不就是类似于RDBMS吗,存储消息,转存发送 3.大家想想队列的机制(集合存储----队列存储---消息存储---消息服务器---数据库服务器----分布式存储------分布式文件系统) 解决办法: 多个系统采用消息交互,形成CS模型交互(集中式结构),当然还有

Flume 读取JMS 消息队列消息,并将消息写入HDFS

利用Apache Flume 读取JMS 消息队列消息.并将消息写入HDFS,flume agent配置例如以下: flume-agent.conf #name the  components on this agent agentHdfs.sources  = jms_source agentHdfs.sinks =  hdfs_sink agentHdfs.channels  = mem_channel #  Describe/configure the source agentHdfs.s