本文基于上一篇:activeMQ服务安装,将展示如何使用activeMQ;
一、简介
上一篇我们了解了如何创建和启动一个activeMQ消息服务。那我们应当要如何去使用这个服务呢?其实也就是我们需要学习怎么去写客户端的代码。
1)activeMQ的客户端需要引入依赖的支持:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.3</version> </dependency>
2)activeMQ是根据JMS规范实现的,而JMS规范定义了它的架构模型,我们可以在文档里面查看:https://docs.oracle.com/javaee/6/tutorial/doc/bnceh.html
根据文档提供的构建模型,我们可以罗列出客户端代码编写的流程大体如下:
2-1、创建ConnectionFactory
2-2、创建Connection
2-3、创建Session
2-4、创建Producer或Consumers
2-5、发送消息或消息监听
其实我们可以简单理解为:将客户端与服务端连接,而后进行消息发送或者消息监听
二、点对点/发布订阅
在具体示例代码之前,我们先了解两个概念
1)点对点:顾名思义,其实就是一对一的消息发送,每一条消息只能有一个消费者;
2)发布订阅:即一条消息发布出去,可以有多个消费者来进行消费,一对多的关系;
三、示例代码
下面将以点对对的方式作为示例代码:
生产者
public class ActiveMQProducer { static String brokerUrl = "tcp://127.0.0.1:61616"; static String userName = "admin"; static String password = "admin"; public static void main(String[] args) throws JMSException { // 创建connection工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, password, brokerUrl); // 创建connection连接 Connection connection = connectionFactory.createConnection(); // 创建session,开启事务,消息自动确认 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建消息体 Message message = session.createTextMessage("hello jms"); // 创建队列 Destination destination = session.createQueue("queue1"); // 创建生产者 MessageProducer messageProducer = session.createProducer(destination); // 发送消息 messageProducer.send(message); // 事务提交 session.commit(); // 关闭连接 connection.close(); System.out.println("发送完毕..."); } }
消费者
public class ActiveMQReceiver { static String brokerUrl = "tcp://127.0.0.1:61616"; static String userName = "admin"; static String password = "admin"; static void createReceiver() throws JMSException {// 创建connection工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, password, brokerUrl); // 创建connection连接 Connection connection = connectionFactory.createConnection(); connection.start(); // 创建session,开启事务,消息自动确认 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建队列 Destination destination = session.createQueue("queue1"); // 创建生产者 MessageConsumer messageConsumer = session.createConsumer(destination); messageConsumer.setMessageListener(new ReceiveListener()); // 接受消息 // Message message = messageConsumer.receive(1000); // 关闭连接 // connection.close(); System.out.println("接收监听..."); } public static void main(String[] args){ try { createReceiver(); createReceiver(); } catch (JMSException e) { e.printStackTrace(); } } } class ReceiveListener implements MessageListener{ @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage)message; System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
注意:这里使用的是实现MessageListener的方式来监听消息,如代码注释部分,我们也可以用调用方法来接收,不过推荐使用监听的方式
参考文档:
JMS文档:https://docs.oracle.com/javaee/6/tutorial/doc/bncdx.html#bnceb
JavaEE JMS的API:http://tool.oschina.net/apidocs/apidoc?api=javaEE6
activeMQ文档:http://activemq.apache.org/getting-started.html
activeMQ的API文档:http://activemq.apache.org/maven/apidocs/index.html
原文地址:https://www.cnblogs.com/lay2017/p/8945301.html