前言
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
一、使用介绍
环境准备
1、activemq下载传送门:https://activemq.apache.org/download.html、MyEclipse
2、启动activemq,我电脑是win64位,所以启动bin木下win64中的activemq.bat
3、需要输入用户名和密码才能进入,页面成功启动的效果。(默认用户名和密码皆为:admin)
开始测试
1、新建一个JMS java工程,导入下载的activemq文件里的jar包,例如作者目录下的activemq-all-5.11.1.jar
导入工程
2、新建消息生产者 JMSProducer.java
package com.hcg.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息生产者 * * @author babylon * 2016-5-9 */ public class JMSProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 private static final int SENDNUM= 10; // 发送的消息数量 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话,接收或者发送消息的线程 Destination destination; // 消息的目的地 MessageProducer messageProducer; // 消息生产者 // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL); try { connection = connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建会话 destination = session.createQueue("FirstQueue1"); // 创建消息队列 messageProducer = session.createProducer(destination); // 创建消息生产者 // 发送消息 sendMessage(session, messageProducer); // 正式提交发送消息的操作 session.commit(); } catch (JMSException e) { e.printStackTrace(); } finally { // 关闭连接 if(connection!=null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 发送消息 * @param session * @param messageProducer * @throws JMSException */ public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException{ for(int i=0; i < JMSProducer.SENDNUM; i++){ TextMessage message = session.createTextMessage("ActiveMQ 发送的消息"+i); System.out.println("发送消息:"+i); messageProducer.send(message); } } }
3、F11运行实例,可以看见成功发送了10条消息
3、创建消费者 JMSConsumer.java
package com.hcg.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息消费者 * @author babylon * 2016-5-9 */ public class JMSConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话,接收或者发送消息的线程 Destination destination; // 消息的目的地 MessageConsumer messageConsumer; // 消息生产者 // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL); try { connection = connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建会话,不加事务 destination = session.createQueue("FirstQueue1"); // 创建消息队列 messageConsumer = session.createConsumer(destination); // 创建消息消费者 while(true){ TextMessage textMessage = (TextMessage) messageConsumer.receive(100000); if(textMessage != null){ System.out.println("收到的消息:"+textMessage.getText()); } else { break; } } } catch (JMSException e) { e.printStackTrace(); } } }
4、F11运行消费者,可以看见在控制台新增了一个消费者,发送的消息都已被消费处理
5、while(true)这种方式处理消费是不合适的,下面以监听的方式处理创建消费者 JMSConsumer_listener.java。
package com.hcg.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息消费者 - 监听方式消费 * @author babylon * 2016-5-9 */ public class JMSConsumer_listener { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话,接收或者发送消息的线程 Destination destination; // 消息的目的地 MessageConsumer messageConsumer; // 消息生产者 // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JMSConsumer_listener.USERNAME, JMSConsumer_listener.PASSWORD, JMSConsumer_listener.BROKEURL); try { connection = connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建会话,不加事务 destination = session.createQueue("FirstQueue1"); // 创建消息队列 messageConsumer = session.createConsumer(destination); // 创建消息消费者 messageConsumer.setMessageListener(new Listener()); // 注册消息监听 } catch (JMSException e) { e.printStackTrace(); } } }
监听对象需要实现MessageListener
package com.hcg.activemq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 消息监听 * * @author babylon * 2016-5-9 */ public class Listener implements MessageListener{ /* * 收到的消息 */ @Override public void onMessage(Message message) { try { System.out.println("收到的消息:"+((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
6、运行该类,发现消费者数量增加了一个。
消息收发和订阅的方式
1、创建消息发布者
package com.hcg.activemq2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息生产者 - 消息发布者 * * @author babylon * 2016-5-9 */ public class JMSProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 private static final int SENDNUM = 10; // 发送的消息数量 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话,接收或者发送消息的线程 Destination destination; // 消息的目的地 MessageProducer messageProducer; // 消息生产者 // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL); try { connection = connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 创建会话 // destination = session.createQueue("FirstQueue1"); // 创建消息队列 destination = session.createTopic("FirstTopic1"); messageProducer = session.createProducer(destination); // 创建消息生产者 // 发送消息 sendMessage(session, messageProducer); // 正式提交发送消息的操作 session.commit(); } catch (JMSException e) { e.printStackTrace(); } finally { // 关闭连接 if(connection!=null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 发布的消息 * @param session * @param messageProducer * @throws JMSException */ public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException{ for(int i=0; i < JMSProducer.SENDNUM; i++){ TextMessage message = session.createTextMessage("ActiveMQ 发送的消息"+i); System.out.println("发送消息:"+i); messageProducer.send(message); } } }
2、创建消息订阅者1
package com.hcg.activemq2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息消费者 - 消息订阅者1 * @author babylon * 2016-5-9 */ public class JMSConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话,接收或者发送消息的线程 Destination destination; // 消息的目的地 MessageConsumer messageConsumer; // 消息生产者 // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL); try { connection = connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建会话,不加事务 // destination = session.createQueue("FirstQueue1"); // 创建消息队列 destination = session.createTopic("FirstTopic1"); messageConsumer = session.createConsumer(destination); // 创建消息消费者 messageConsumer.setMessageListener(new Listener()); // 注册消息监听 } catch (JMSException e) { e.printStackTrace(); } } }
3、创建消息订阅者2
package com.hcg.activemq2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息消费者 - 消息订阅者2 * @author babylon * 2016-5-9 */ public class JMSConsumer2 { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话,接收或者发送消息的线程 Destination destination; // 消息的目的地 MessageConsumer messageConsumer; // 消息生产者 // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL); try { connection = connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建会话,不加事务 // destination = session.createQueue("FirstQueue1"); // 创建消息队列 destination = session.createTopic("FirstTopic1"); messageConsumer = session.createConsumer(destination); // 创建消息消费者 messageConsumer.setMessageListener(new Listener2()); // 注册消息监听 } catch (JMSException e) { e.printStackTrace(); } } }
最终目录结构:
demo下载地址:https://github.com/JasonBabylon/activemq
时间: 2024-10-18 00:09:57