简单的介绍下基于JMS的消息传送
Java消息队列JMS整体设计结构
基本要素:生产者(producer),消费者(consumere),消息服务(broker)
交互模型:
JMS两种消息传送模式
点对点(Point-to-Point):专门用于使用队列Queue传送消息;
发布/订阅(Publish/Subscribe):专门用于使用主题Topic传送消息
两种传送方式比较
基于队列Queue的点对点消息只能被一个消费者消费,如多个消费者都注册到同一个消息队列上,当生产者发送一条消息后,而只有其中一个消费者会接收到该消息,而不是所有消费者都能接收到该消息。(1 对 1)
基于主题的发布与订阅消息能被多个消费者消费,生产者发送的消息,所有订阅了该topic的消费者都能接收到。(1 对 多)
Java消息服务JMS API总体概览
JMS API概览:
JMS API可以归于3个主要部分:
公共API:可用于向一个队列或主题发送消息或从其中接收消息
点对点API:专门用于使用队列Queue传送消息
发布/订阅API:专门用于使用主题Topic传送消息
JMS公共API:
在JMS公共API内部,和发送与接收消息有关的JMS API接口主要是:
- ConnectionFactory
- Connection
- Session
- Message
- Destination
- MessageProducer
- MessageConsumer
它们的关系是:一旦有了ConnectionFactory,就可以创建Connection,一旦有了Connection,就可以创建Session,而一旦有了Session,就可以创建Message、MessageProducer和MessageConsumer。
JMS点对点API
点对点消息传送模型API是指JMS API之内基于队列(Queue)的接口:
- QueueConnectionFactory
- QueueConnection
- QueueSession
- Message
- Queue
- QueueSender
- QueueReceiver
从接口的命名可以看出,大多数接口名称仅仅是在公共API接口名称之前添加Queue一词。一般来说,使用点对点消息传送模型的应用程序将使用基于队列的API,而不使用公共API 。
JMS发布/订阅API
发布/订阅消息传送模型API是指JMS API之内基于主题(Topic)的接口
- TopicConnectionFactory
- TopicConnection
- TopicSession
- Message
- Topic
- TopicPublisher
- TopicSubscriber
由于基于主题(Topic)的JMS API类似于基于队列(Queue)的API,因此在大多数情况下,Queue这个词会由Topic取代。
ActiveMQ点对点发送与接收消息示例
创建普通的maven项目activemq-java
在pom.xml文件中加入jms 和 activemq 的相关依赖
<dependencies>
<!-- JMS规范的jar依赖 --> <dependency> <groupId>javax.jms</groupId> <artifactId>javax.jms-api</artifactId> <version>2.0.1</version> </dependency> <!-- activeMQ对jms具体实现的jar依赖 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.8</version> </dependency> <!--slf4j的简单实现 可加可不加,解决输出警告日志--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> <version>1.7.25</version> </dependency>
</dependencies>
在com.beijing.activemq.send包下编写一个消费发送者QueueSender发送消息
package com.beijing.activemq.send; import com.beijing.activemq.model.User; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class QueueSender { public static final String BROKER_URL = "tcp://192.168.32.130:61616"; //相当于一个数据库(其实是一个队列) public static final String DESTINATION = "myQueue"; public static void main(String[] args) { sendMessage(); } public static void sendMessage(){ //创建一个连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); Connection connection = null; Session session = null; MessageProducer messageProducer = null; try { //获取一个连接 connection = connectionFactory.createConnection(); //创建一个Session第一个参数:是否是事务消息 第二个参数:消息确认机制(自动确认还是手动确认)) //提示:消息确认机制的设置很关键,错误的设置容易导致数据丢失问题,建议手动确认,这里暂时定为自动确认 session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE); //有了session之后,就可以创建消息,目的地,生产者和消费者 Message message = session.createTextMessage("Hello ActiveMQ"); //目的地 Destination destination = session.createQueue(DESTINATION); //生产者 messageProducer = session.createProducer(destination); //发消息 没有返回值,是非阻塞的 messageProducer.send(message); }catch (JMSException e ){ e.printStackTrace(); }finally { try { if (null != messageProducer) { messageProducer.close(); } if (null != session) { session.close(); } if (null == connection) { connection.close(); } }catch (JMSException e){ e.printStackTrace(); } } } }
启动ActiveMQ,关闭防火墙
[root@localhost bin]# ./activemq start
[root@localhost bin]# systemctl stop firewalld
运行程序,连接ActiveMQ的web控制台查看
在com.beijing.activemq.receive包下编写一个消费接收者QueueReceiver接收消息
package com.beijing.activemq.receive; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; //接收点对点消息 public class QueueReceiver { public static final String BROKER_URL = "tcp://192.168.32.130:61616"; //相当于一个数据库(其实是一个队列) public static final String DESTINATION = "myQueue"; public static void main(String[] args) { receiveMessage(); } private static void receiveMessage() { //创建一个连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); Connection connection = null; Session session = null; MessageConsumer messageConsumer = null; try { //获取一个连接 connection = connectionFactory.createConnection(); //接收消息,需要将连接启动一下,才可以接收到消息 connection.start(); //创建一个Session 第一个参数:是否是事务消息 第二个参数:消息确认机制(自动确认还是手动确认) session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE); //目的地 Destination destination = session.createQueue(DESTINATION); //消费者 messageConsumer = session.createConsumer(destination); //循环接收消息 while (true){ //接收消息 有返回值,值阻塞的 Message message = messageConsumer.receive(); //判断消息类型 if(message instanceof TextMessage){ String text = ((TextMessage) message).getText(); System.out.println(text); } } }catch (JMSException e){ e.printStackTrace(); }finally { try { if (null != messageConsumer) { messageConsumer.close(); } if (null != session) { session.close(); } if (null == connection) { connection.close(); } }catch (JMSException e){ e.printStackTrace(); } } } }
注意:接收方要调用connection的start方法才能接收到
运行接收者的代码,在ActiveMQ的web控制台观察消息数据
之前:
现在:
ActiveMQ发布与订阅示例
发布订阅和点对点的代码基本相同,只是修改一下目的地,myQueue改为myTopic,以及在创建目的地的时候,将createQueue改为createTopic
直接在原有代码上更改
更改这两处地方 //相当于一个数据库(其实是一个队列) public static final String DESTINATION = "myTopic"; //目的地 Destination destination = session.createTopic(DESTINATION);
消息订阅者先运行,然后再运行消息发布者
在ActiveMQ的web控制台观察消息数据
Queue与Topic比较
原文地址:https://www.cnblogs.com/really199/p/10485139.html