JMS是一种应用于异步消息传递的标准API

JMS是一种应用于异步消息传递的标准API,作为Java平台的一部分,JMS可以允许不同应用、不同模块之间实现可靠、异步数据通信。

一些概念

JMS provider
    An implementation of the JMS interface for a Message Oriented Middleware (MOM). Providers are implemented as either a Java JMS implementation or an adapter to a non-Java MOM.
JMS client
    An application or process that produces and/or receives messages.
JMS producer/publisher
    A JMS client that creates and sends messages.
JMS consumer/subscriber
    A JMS client that receives messages.
JMS message
    An object that contains the data being transferred between JMS clients.
JMS queue
    A staging area that contains messages that have been sent and are waiting to be read. Note that, contrary to what the name queue suggests, messages don‘t have to be delivered in the order sent. A JMS queue only guarantees that each message is processed only once.
JMS topic
    A distribution mechanism for publishing messages that are delivered to multiple subscribers.

在JMS中,支持两种消息模型,点对点(Point-to-point)和发布-订阅(Publish and subscribe),这两种模式分别对应于JMS中的两种消息目标(Message Destination):队列及主题。

在点对点模型中,每个消息都有一个发送者和一个接收者,消息中介(broker)收到发送者的消息,会将消息放入队列中,而接收者请求并接收队列中的一条消息后,这条消息就会从队列中删除。消息队列中的每条消息只能投递给一个接收者,但并不意味着只能使用一个接收者从队列中取消息,根据业务需要,可以使用多个接收者同时从队列中请求消息,分担处理压力。但是需要注意的是,单个接收者收到的消息是按照发送顺序的,多个接收者因为多线程的关系,并不能保证收到的消息一定是原序的。

在发布-订阅模式中,消息会发送给一个主题,但是与点对点模式不同的是消息不再只被投递给一个接收者,而是所有此主题的订阅者都会收到该消息。

JMS消息类型

在JMS1.1规范中,定义了五种消息类型,分别为:
1.StreamMessage :消息体是 Java 流,写入和读出都是顺序的
2.MapMessage :消息体包含 key-value 对, key 为 String , value 为基本类型,可以通过迭代器访问
3.TextMessage :消息体是 String
4.ObjectMessage :消息体是可序列化的 Java 对象
5.BytesMessage :消息体是字节数组

可以通过 message.clearBody() 来清除消息体;但在消费端,消息体是只读的,针对消息的写操作都会抛出 MessageNotWritableException 异常

JMS消息头

所有消息的消息头都具体相同的字段,用于 JMS Client 以及 JMS Provider 对它们进行区别以及进行消息路由

1.JMSDestination

消息发送的目的地(队列或主题);创建消息时可以设置 JMSDestination ,但是在发送完成时其值会更新为发送方所指定的 JMSDestination ,也就是说发送前该字段会被忽略;当消息被消费时,该字段的值与在它被发送时被设置的值是相同的

以下所有示例均基于ActiveMQ

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建2个目的地
Destination destination = session.createQueue("JMS.DEMO");
Destination destination2 = session.createQueue("JMS.DEMO2");

// 创建生产者
MessageProducer publisher = session.createProducer(destination);

// 设置传输模式
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);

// 创建消息
TextMessage message = session.createTextMessage("Test Message");
// 设置消息的目的地为destination2
message.setJMSDestination(destination2);

// 发送消息
publisher.send(message);

System.out.println(message.getJMSDestination());

代码中,通过 message.setJMSDestination(destination2); 设置了 message 的 JMSDestination 消息头属性值,我们再看看其输出结果

queue://JMS.DEMO

通过这个例子可以看出,虽然在发送前设置了消息的目的地,但是发送后消息的目的地被重置了

2.JMSDeliveryMode

指明消息的传输模式,有两种:

DeliveryMode.PERSISTENT :保证消息仅传一次, JMS Provider 服务停止后消息不会丢失;

DeliveryMode.NON_PERSISTENT :消息最多传一次,消息会因 JMS Provider 停止后丢失;

同 JMSDestination 一样,在发送前设置的会被忽略

看下面的例子

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地
Destination destination = session.createQueue("JMS.DEMO");

// 创建生产者
MessageProducer publisher = session.createProducer(destination);

// 设置传输模式
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
// 发送PERSISTENT消息
publisher.send(session.createTextMessage("PERSISTENT MESSAGE"));

// 设置传输模式
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 发送PERSISTENT消息
publisher.send(session.createTextMessage("NON_PERSISTENT MESSAGE"));

例子中分别发送了一条 PERSISTENT 的消息和一条 NON_PERSISTENT 的消息;当 Active MQ 重启后,启动消费端,收到的消息如下

PERSISTENT MESSAGE

该例子说明,在 JMS Provider 重启后, NON_PERSISTENT 消息丢失了,而 PERSISTENT 消息能正常被消费者消费

3.JMSMessageID

由 JMS Provider 指定的消息的唯一标识符;同上面的字段一样,在发送前设置的会被忽略,在发送完成时,由 JMS Provider 重置该字段

4.JMSReplyTo

发送端在发送消息时,可以指定该属性(为一个 JMSDestination ),表示期望收到客户端的响应;是否响应由消费端决定

如下面的例子:

发送端:

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地
Destination destination = session.createQueue("JMS.DEMO");
Destination destination2 = session.createQueue("JMS.DEMO3");

// 创建生产者
MessageProducer publisher = session.createProducer(destination);

// 设置传输模式
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);

// 创建消息
TextMessage message = session.createTextMessage("Test Message");
message.setJMSReplyTo(destination2);
// 发送消息
publisher.send(message);

接收端(可以根据情况决定是否需要回复)

public void onMessage(Message message) {
    try {
        System.out.println("Receive message: " + message);
        if (message.getJMSReplyTo() != null) {
            session.createProducer(message.getJMSReplyTo()).send(session.createTextMessage("This is a reply to"
                                                                                           + message.getJMSReplyTo()));
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

5.JMSRedelivered

当消费者收到带有 JMSRedelivered 的消息头时,表明该消息在过去传输过但没有被确认

JMS Provider 必须对该字段进行设置,当为 true 时即告知消费者该消息是重传的,消费者需要自行处理重复的消息

6.JMSExpiration

消息的过期时间,其值为当前时间加上存活时间(毫秒);当存活时间设置为 0 时,该字段的值也被设置为 0 ,表示永不过期;

消费端在一般情况下都不会接收到过期的消息,但 JMS Provider 并不保证这一点;

下面的例子说明了如何设置消息的过期时间

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地
Destination destination = session.createQueue("JMS.DEMO");

// 创建生产者
MessageProducer publisher = session.createProducer(destination);

// 设置传输模式
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);

// 创建消息
TextMessage message = session.createTextMessage("Test Message");
// 发送消息
publisher.setTimeToLive(5000);
publisher.send(message);

7.JMSPriority

消息的优先级, 0 代表最低优先级, 9 代表最高优先级;一般 0~4 为普通优先级, 5~9 为加快优先级

JMS 规范里并没有要求 JMS Provider 严格按这个优先级来实现,但是尽可能实现加快优先级消息的传输在普通消息的前面

同 JMSDestination 一样,该字段在发送前被忽略,在发送完成时重置

消息属性

除了前面提到的消息头以外, JMS 消息还提供了对“属性值对”的支持,以对消息头进行扩展;消息属性主要用于消息选择器 (message selector 详见下文 )

1.属性名
属性名必须服务消息选择器的命名规则

2.属性值

可以是基本类型及其对象类型以及 Map 、 List 和 String

下面的例子中,消息带 HashMap 的属性

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地
Destination destination = session.createQueue("JMS.DEMO");

// 创建生产者
MessageProducer publisher = session.createProducer(destination);

// 设置传输模式
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);

// 创建消息
TextMessage message = session.createTextMessage("Test Message");
// 发送消息
message.setObjectProperty("myProp", new HashMap() {

{
        this.put("key1", "value1");
        this.put("key2", "value2");
    }
});
publisher.send(message);

3.清除属性

JMS 不能清除单个属性,但可以通过 Message.clearProperties() 方法清除所有消息属性

JMS实现(Provider implementations)

要使用JMS,必须要有相应的实现来管理session以及队列,从Java EE1.4开始,所有的Java EE应用服务器必须包含一个JMS实现。

以下是一些JMS实现:

Apache ActiveMQ

Apache Qpid, using AMQP

BEA Weblogic (part of the Fusion Middleware suite) and Oracle AQ from Oracle

EMS from TIBCO

FFMQ, GNU LGPL licensed

JBoss Messaging and HornetQ from JBoss

JORAM, from the OW2 Consortium

Open Message Queue, from Sun Microsystems

OpenJMS, from The OpenJMS Group

RabbitMQ, using AMQP

Solace JMS from Solace Systems

SonicMQ from Progress Software

StormMQ, using AMQP

SwiftMQ

Tervela

Ultra Messaging from 29 West (acquired by Informatica)

webMethods from Software AG

WebSphere Application Server from IBM, which provides an inbuilt default messaging provider known as the Service Integration Bus (SIBus), or which can connect to WebSphere MQ as a JMS provider [5]

WebSphere MQ (formerly MQSeries) from IBM

来源: <http://www.blogjava.net/caojianhua/archive/2012/07/26/384095.html>

无论是持久消息,还是非持久消息,如果消息没有对应的消费者,那么activeMQ会认为这些消息无用,直接删除。

持久订阅者/非持久订阅者,只影响离线的时候消息(包括持久消息和非持久消息)是否能接收到,和消息是否持久无关;持久消息/非持久消息,只是影响jms provider宕机后。消息是否会丢失,如果永远不会宕机,那么持久消息和非持久消息没有区别。

来自为知笔记(Wiz)

时间: 2024-10-06 21:01:57

JMS是一种应用于异步消息传递的标准API的相关文章

使用Spring JMS轻松实现异步消息传递

异步进程通信是面向服务架构(SOA)一个重要的组成部分,因为企业里很多系统通信,特别是与外部组织间的通信,实质上都是异步的.Java消息服务(JMS)是用于编写使用异步消息传递的JEE应用程序的API.传统的使用JMS API进行消息传递的实现包括多个步骤,例如JNDI查询队列连接工厂和Queue资源,在实际发送和接收消息前创建一个JMS会话. Spring框架则简化了使用JEE组件(包括JMS)的任务.它提供的模板机制隐藏了典型的JMS实现的细节,这样开发人员可以集中精力放在处理消息的实际工作

Android异步消息传递机制源码分析&amp;&amp;相关知识常被问的面试题

1.Android异步消息传递机制有以下两个方式:(异步消息传递来解决线程通信问题) handler 和 AsyncTask 2.handler官方解释的用途: 1).定时任务:通过handler.postDelay(Runnable r, time)来在指定时间执行msg. 2).线程间通信:在执行较为耗时操作的时候,在子线程中执行耗时任务,然后handler(主线程的)把执行的结果通过sendmessage的方式发送给UI线程去执行用于更新UI. 3.handler源码分析 一.在Activ

[基础]同步消息和异步消息传递的区别?

在系统交互时候选择同步还是异步有时候很让人困扰,希望通过阅读这篇文章可以帮助更好的理解同步与异步. 同步与异步消息的区别 1.同步消息 同步消息传递涉及到等待服务器响应消息的客户端.消息可以双向地向两个方向流动.本质上,这意味着同步消息传递是双向通信.即发送方向接收方发送消息,接收方接收此消息并回复发送方.发送者在收到接收者的回复之前不会发送另一条消息. 2.异步消息 异步消息传递涉及不等待来自服务器的消息的客户端.事件用于从服务器触发消息.因此,即使客户机被关闭,消息传递也将成功完成.异步消息

JMS的两种模式 P2P,PUB/SUB 消息发送

1.P2P模型 在P2P模型中,有下列概念:消息队列(Queue).发送者(Sender).接收者(Receiver).每个消息都被发送到一个特定的队列,接收者从队列中获取消息.队列保留着消息,直到它们被消费或超时. ? 每个消息只有一个消费者 (Consumer)(即一旦被消费,消息就不再在消息队列中) ? 发送者和接收者之间在时间上没有依赖性 ,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列. ? 接收者在成功接收消息之后需向队列应答成功 如果你希望发

WINDOWS硬件通知应用程序的常方法(五种方式:异步过程调用APC,事件方式VxD,消息方式,异步I/O方式,事件方式WDM)

摘要:在目前流行的Windows操作系统中,设备驱动程序是操纵硬件的最底层软件接口.为了共享在设备驱动程序设计过程中的经验,给出设备驱动程序通知应用程序的5种方法,详细说明每种方法的原理和实现过程,并给出实现的部分核心代码.希望能够给设备驱动程序的设计者提供一些帮助. 关键词:设备驱动程序   异步I/O   Virtual   Device   Driver(VxD)   Windows   Driver   Model(WDM) 引   言  在DOS操作系统中,应用程序可以直接与硬件打交道

Spring整合JMS(二)——三种消息监听器

一.消息监听器MessageListener 在Spring整合JMS的应用中我们在定义消息监听器的时候一共可以定义三种类型的消息监听器,分别是MessageListener.SessionAwareMessageListener和MessageListenerAdapter.下面就分别来介绍一下这几种类型的区别. 1).MessageListener MessageListener是最原始的消息监听器,它是JMS规范中定义的一个接口.其中定义了一个用于处理接收到的消息的onMessage方法,

jms的俩种模式

package com.jiangchong.job; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax

一种C# TCP异步编程中遇到的问题

最近在维护公司的一个socket服务端工具,该工具主要是提供两个socket server服务,对两端连接的程序进行数据的透明转发. 程序运行期间,遇到一个问题,程序的一端是GPRS设备,众所周知,GPRS设备的网络连接十分的不问题,由此会产生不少的"奇怪"问题.实际过程中,程序运行几个小时后,无线端的socket server断开就再也无法打开.找了很久都没发现. 通过wireshark抓取通信报文,一般是在TCP的三次握手时出的问题.常规的TCP三次握手,由TCP的标识可简单看作:

C# 委托异步 和 async /await 两种实现的异步

最近频繁使用异步所以自己综合的学习了一把异步相关的知识,自己稍加整理了一下(这也是我试着写的第一篇,如果有不对的,希望大神来指正!) 首先是 委托实现的异步 class Program { public delegate int weituo();//定义了个委托 public int xxx() { Thread.Sleep(3000); Console.WriteLine("11111."); return 1; } ///定义了个方法 static void Main(strin