模型结构
JMS编程模型由以下几个组成:
- ConnectionFactory:连接工厂(创建连接)
- Connection:连接(创建会话)
- Session:会话(创建目的地、生产者、消费者、消息)
- Destination:目的地(消息发送目标)
- MessageProducer:消息生产者(发送消息)
- MessageConsumer:消息消费者(消费消息)
- Message:消息(内容主体)
下面用一张图片展示几个组成部分是如何联系在一起的
下面将逐个了解每个部分,并且以activeMQ的实现作为代码片段部分示例。
ConnectionFactory
顾名思义,一个ConnectionFactory是客户端用来创建Connection的接口。基于工厂模式,它简化了Connection的创建。除了ConnectionFactory接口,常见的还有QueueConnectionFactory和TopicConnectionFactory,它们都继承自ConnectionFactory。
创建一个ConnectionFactory的代码片段如下:
1 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
Connection
有了ConnectionFactory我们就可以创建Connection了,Connection表示的是一个虚拟的连接,也就是代表着打开了一个由客户端到消息代理端的socket连接。Connection可以用来创建Session。
下面我们看看ConnectionFactory来创建Connection的示例:
1 Connection connection = connectionFactory.createConnection();
在使用Connection之前,你必须先调用start方法开启连接
1 connection.start();
在使用完了之后,你必须调用close方法关闭资源。注意,close方法会关闭Connection创建的Session、MessageProducer、MessageConsumer。另外,如果close方法调用失败,那么将会导致资源未被释放的问题。
但是,如果你只是想暂时停止一下消息的传送,那么可以调用stop方法,而不是将Connection进行close。
Session
session是一个Message的生产和消费的上下文,我们称作会话,由Connection创建。session可以创建MessageProducer、MessageConsumer、Message、Destination。
我们创建一个session
1 Session session = connection.createSession(false, Session.AUTO_ACKNOWEDGE);
第一个入参传入了false,表示不需要事务。第二个入参表示消息被接收以后session会自动做ack确认操作。如果要创建一个有事务的session呢?
1 Session session = connection.createSession(true, 0);
第一个参数true表示开启事务,第二个参数表示不指定ack确认机制。在业务代码完成以后,需要显示提交事务
1 session.commit();
Destination
一个destination表示的是生产者的消息发送目的地,以及消费者消费消息的源头。在点对点模式中,destination又被称作queue(队列)。在发布订阅模式中,destination被称作topic(话题)。
Destination由session创建,创建一个queue
1 Destination destination = session.createQueue("queue1");
创建一个topic
1 Destination destination = session.createTopic("topic1");
MessageProducer
MessageProducer是由session创建的,用于发送Message到destination。我们使用session创建一个MessageProducer,如下
1 MessageProducer producer = session.createProducer(destination);
如果你创建了一个Message对象,你可以使用MessageProducer发送消息
1 producer.send(message);
MessageConsumer
MessageConsumer是由session创建的,将会作为一个消费者消费destination中的Message。创建一个MessageConsumer
1 MessageConsumer consumer = session.createConsumer(destination);
创建了消费者,就可以消费消息了
1 Message message = consumer.receive();
receive方法是同步消费消息的方法,有时候我们不想等待那么久,所以采用异步监听的方式,如
1 Listener listener = new Listener(); 2 consumer.setMessageListener(listener);
这里假设Listener是实现了MessageListener接口的监听器,当消息到达的时候onMessage方法将被触发。
Message
Message表示具体的消息,JMS定义了五种消息格式,如:
- TextMessage:文本
- MapMessage:键值对
- BytesMessage:字节码
- StreamMessage:流
- ObjectMessage:对象
以TextMessage为例,创建一个消息
1 TextMessage message = session.createTextMessage(); 2 message.setText("text content"); 3 producer.send(message);
如果是MessageConsumerreceive消息
1 Message message = consumer.receive(); 2 if (message instanceof TextMessage) { 3 TextMessage textMessage = (TextMessage)message; 4 System.out.println("receive message" + message.getText()); 5 }
完整代码
生产
1 // Create a ConnectionFactory 2 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); 3 4 // Create a Connection 5 Connection connection = connectionFactory.createConnection(); 6 connection.start(); 7 8 // Create a Session 9 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 10 11 // Create the destination (Topic or Queue) 12 Destination destination = session.createQueue("TEST.FOO"); 13 14 // Create a MessageProducer from the Session to the Topic or Queue 15 MessageProducer producer = session.createProducer(destination); 16 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 17 18 // Create a messages 19 String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode(); 20 TextMessage message = session.createTextMessage(text); 21 22 // Tell the producer to send the message 23 System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName()); 24 producer.send(message); 25 26 // Clean up 27 session.close(); 28 connection.close();
消费
1 // Create a ConnectionFactory 2 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); 3 4 // Create a Connection 5 Connection connection = connectionFactory.createConnection(); 6 connection.start(); 7 8 connection.setExceptionListener(this); 9 10 // Create a Session 11 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 12 13 // Create the destination (Topic or Queue) 14 Destination destination = session.createQueue("TEST.FOO"); 15 16 // Create a MessageConsumer from the Session to the Topic or Queue 17 MessageConsumer consumer = session.createConsumer(destination); 18 19 // Wait for a message 20 Message message = consumer.receive(1000); 21 22 if (message instanceof TextMessage) { 23 TextMessage textMessage = (TextMessage) message; 24 String text = textMessage.getText(); 25 System.out.println("Received: " + text); 26 } else { 27 System.out.println("Received: " + message); 28 } 29 30 consumer.close(); 31 session.close(); 32 connection.close();
原文
https://docs.oracle.com/javaee/1.4/tutorial/doc/JMS4.html#wp78884
JavaDoc
https://docs.oracle.com/javaee/7/api/javax/jms/package-frame.html
原文地址:https://www.cnblogs.com/lay2017/p/11080107.html