JMS编程模型

模型结构

JMS编程模型由以下几个组成:

  • ConnectionFactory:连接工厂(创建连接)
  • Connection:连接(创建会话)
  • Session:会话(创建目的地、生产者、消费者、消息)
  • Destination:目的地(消息发送目标)
  • MessageProducer:消息生产者(发送消息)
  • MessageConsumer:消息消费者(消费消息)
  • Message:消息(内容主体)

下面用一张图片展示几个组成部分是如何联系在一起的

下面将逐个了解每个部分,并且以activeMQ的实现作为代码片段部分示例。

ConnectionFactory

顾名思义,一个ConnectionFactory是客户端用来创建Connection的接口。基于工厂模式,它简化了Connection的创建。除了ConnectionFactory接口,常见的还有QueueConnectionFactory和TopicConnectionFactory,它们都继承自ConnectionFactory。

创建一个ConnectionFactory的代码片段如下:

1 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");

Connection

有了ConnectionFactory我们就可以创建Connection了,Connection表示的是一个虚拟的连接,也就是代表着打开了一个由客户端到消息代理端的socket连接。Connection可以用来创建Session。

下面我们看看ConnectionFactory来创建Connection的示例:

1 Connection connection = connectionFactory.createConnection();

在使用Connection之前,你必须先调用start方法开启连接

1 connection.start();

在使用完了之后,你必须调用close方法关闭资源。注意,close方法会关闭Connection创建的Session、MessageProducer、MessageConsumer。另外,如果close方法调用失败,那么将会导致资源未被释放的问题。

但是,如果你只是想暂时停止一下消息的传送,那么可以调用stop方法,而不是将Connection进行close。

Session

session是一个Message的生产和消费的上下文,我们称作会话,由Connection创建。session可以创建MessageProducer、MessageConsumer、Message、Destination。

我们创建一个session

1 Session session = connection.createSession(false, Session.AUTO_ACKNOWEDGE);

第一个入参传入了false,表示不需要事务。第二个入参表示消息被接收以后session会自动做ack确认操作。如果要创建一个有事务的session呢?

1 Session session = connection.createSession(true, 0);

第一个参数true表示开启事务,第二个参数表示不指定ack确认机制。在业务代码完成以后,需要显示提交事务

1 session.commit();

Destination

一个destination表示的是生产者的消息发送目的地,以及消费者消费消息的源头。在点对点模式中,destination又被称作queue(队列)。在发布订阅模式中,destination被称作topic(话题)。

Destination由session创建,创建一个queue

1 Destination destination = session.createQueue("queue1");

创建一个topic

1 Destination destination = session.createTopic("topic1");

MessageProducer

MessageProducer是由session创建的,用于发送Message到destination。我们使用session创建一个MessageProducer,如下

1 MessageProducer producer = session.createProducer(destination);

如果你创建了一个Message对象,你可以使用MessageProducer发送消息

1 producer.send(message);

MessageConsumer

MessageConsumer是由session创建的,将会作为一个消费者消费destination中的Message。创建一个MessageConsumer

1 MessageConsumer consumer = session.createConsumer(destination);

创建了消费者,就可以消费消息了

1 Message message = consumer.receive();

receive方法是同步消费消息的方法,有时候我们不想等待那么久,所以采用异步监听的方式,如

1 Listener listener = new Listener();
2 consumer.setMessageListener(listener);

这里假设Listener是实现了MessageListener接口的监听器,当消息到达的时候onMessage方法将被触发。

Message

Message表示具体的消息,JMS定义了五种消息格式,如:

  1. TextMessage:文本
  2. MapMessage:键值对
  3. BytesMessage:字节码
  4. StreamMessage:流
  5. ObjectMessage:对象

以TextMessage为例,创建一个消息

1 TextMessage message = session.createTextMessage();
2 message.setText("text content");
3 producer.send(message);

如果是MessageConsumerreceive消息

1 Message message = consumer.receive();
2 if (message instanceof TextMessage) {
3     TextMessage textMessage = (TextMessage)message;
4     System.out.println("receive message" + message.getText());
5 }

完整代码

生产

 1 // Create a ConnectionFactory
 2 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
 3
 4 // Create a Connection
 5 Connection connection = connectionFactory.createConnection();
 6 connection.start();
 7
 8 // Create a Session
 9 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
10
11 // Create the destination (Topic or Queue)
12 Destination destination = session.createQueue("TEST.FOO");
13
14 // Create a MessageProducer from the Session to the Topic or Queue
15 MessageProducer producer = session.createProducer(destination);
16 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
17
18 // Create a messages
19 String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
20 TextMessage message = session.createTextMessage(text);
21
22 // Tell the producer to send the message
23 System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
24 producer.send(message);
25
26 // Clean up
27 session.close();
28 connection.close();

消费

 1 // Create a ConnectionFactory
 2 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
 3
 4 // Create a Connection
 5 Connection connection = connectionFactory.createConnection();
 6 connection.start();
 7
 8 connection.setExceptionListener(this);
 9
10 // Create a Session
11 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
12
13 // Create the destination (Topic or Queue)
14 Destination destination = session.createQueue("TEST.FOO");
15
16 // Create a MessageConsumer from the Session to the Topic or Queue
17 MessageConsumer consumer = session.createConsumer(destination);
18
19 // Wait for a message
20 Message message = consumer.receive(1000);
21
22 if (message instanceof TextMessage) {
23     TextMessage textMessage = (TextMessage) message;
24     String text = textMessage.getText();
25     System.out.println("Received: " + text);
26 } else {
27     System.out.println("Received: " + message);
28 }
29
30 consumer.close();
31 session.close();
32 connection.close();

原文

https://docs.oracle.com/javaee/1.4/tutorial/doc/JMS4.html#wp78884

JavaDoc

https://docs.oracle.com/javaee/7/api/javax/jms/package-frame.html

原文地址:https://www.cnblogs.com/lay2017/p/11080107.html

时间: 2024-10-25 04:09:52

JMS编程模型的相关文章

看看一个配置文件,了解ESB的编程模型

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/

Linux的I/O模式、事件驱动编程模型

大纲: (1)基础概念回顾 (2)Linux的I/O模式 (3)事件驱动编程模型 (4)select/poll/epoll的区别和Python示例 网络编程里常听到阻塞IO.非阻塞IO.同步IO.异步IO等概念,总听别人装13不如自己下来钻研一下.不过,搞清楚这些概念之前,还得先回顾一些基础的概念. 1.基础知识回顾 注意:咱们下面说的都是Linux环境下,跟Windows不一样哈~~~ 1.1 用户空间和内核空间 现在操作系统都采用虚拟寻址,处理器先产生一个虚拟地址,通过地址翻译成物理地址(内

Storm介绍及核心组件和编程模型

离线计算 离线计算:批量获取数据.批量传输数据.周期性批量计算数据.数据展示 代表技术:Sqoop批量导入数据.HDFS批量存储数据.MapReduce批量计算数据.Hive批量计算数据.azkaban/oozie任务调度 流式计算 流式计算:数据实时产生.数据实时传输.数据实时计算.实时展示 代表技术:Flume实时获取数据.Kafka/metaq实时数据存储.Storm/JStorm实时数据计算.Redis实时结果缓存.持久化存储(mysql). 一句话总结:将源源不断产生的数据实时收集并实

ARMV8 datasheet学习笔记4:AArch64系统级体系结构之编程模型(4)- 其它

1. 前言 2.可配置的指令使能/禁用控制和trap控制 指令使能/禁用 当指令被禁用,则这条指令就会变成未定义 指令Trap控制 控制某条或某些指令在运行时进入陷阱,进入陷阱的指令会产生trap异常,路由规则如下: (1)当前为EL1,则陷阱异常传递给EL1(HCR_EL2.TGE定义为1时,会路由到EL2); (2)当前为EL2,则陷阱异常传递给EL2; (3)当前为EL3,则陷阱异常传递给EL3; 3. 系统调用 SVC 默认情况下SVC产生supervisor call,同步异常目标级别

网络编程模型

课程索引 1. 编程模型 2. 编程模型 Socket的实质就是一个接口 , 利用该接口,用户在使用不同的网络协议时,操作函数得以统一. 而针对不同协以统一. 而针对不同协议的差异性操作,则交给了 socket去自行解决. 3. TCP编程模型 4. UDP编程模型

多线程编程模型

在学习muduo网络库前,应该先熟悉一下多线程网络服务编程模型.在6.6.2节介绍了11种方案.方案0到方案4用的是阻塞I/O.方案5到方案11用的都是阻塞I/O. 方案0: accept+read/write 方案0不是并发模型,只是一个循环处理.用代码表示的话,可以表示为: while(true) { int fd=accept(--); read(fd,--) or write(fd--); close(fd); } 一次只能处理一个连接,第一个连接处理完毕后,才可以进入下一次循环,否则阻

Java的多线程编程模型5--从AtomicInteger开始

Java的多线程编程模型5--从AtomicInteger开始 2011-06-23 20:50 11393人阅读 评论(9) 收藏 举报 java多线程编程jniinteger测试 AtomicInteger,一个提供原子操作的Integer的类.在Java语言中,++i和i++操作并不是线程安全的,在使用的时候,不可避免的会用到synchronized关键字.而AtomicInteger则通过一种线程安全的加减操作接口. 来看AtomicInteger提供的接口. //获取当前的值 publ

【收藏转】WCF后传系列(8):深度通道编程模型Part 1—设计篇

引言 从本质上说,WCF是一个通信服务框架,它允许我们使用不同的传输协议,使用不同的消息编码形式,跟不同的WS-*系列规范交互,而所有这些细节都是由通道堆栈来处理的.为了简化这些处理,在WCF中提供了两种模型,一是针对开发者的应用程序编程模型:二是用来通信的通道模型,这样对于开发者来说,只要了解应用程序编程模型就足够了,而不会涉及到通道模型,然而,对于通道模型进行必要的学习,可以让我们真正理解WCF中“通信”概念,了解WCF的 整个架构体系,从而构建出更加健壮的WCF服务或者对WCF框架进行扩展

【收藏转】WCF后传系列(9):深度通道编程模型Part 2—实例篇

引言 从本质上说,WCF是一个通信服务框架,它允许我们使用不同的传输协议,使用不同的消息编码形式,跟不同的WS-*系列规范交互,而所有这些细节都是由通道堆栈来处理的.在<WCF专题系列(8):深度通道编程模型Part 1—设计篇>中,对于WCF中的通道模型有了深入的认识,本文中,我将通过实例来说明在通道模型中,服务端是如何接收消息,客户端是如何发送消息的. 服务端通道 本文将不使用WCF的编程模型,而直接利用通道模型来进行通信,这样有助于我们更进一步加深对服务端处理消息的认识,在服务端侦听并接