ActiveMQ学习教程(二)——简单示例
一。应用IDEA构建Maven项目
File-》New-》Module...-》Maven-》勾选-》选择-》Next -》
GroupId:com.jd.myMaven | ArtifactId:activeMQ | version:默认 -》Finish
项目构建成功!项目结构如下所示:
二。创建生产者类,模拟生产者发消息
Step1:java/activemq/JMSProducer.java
package activemq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMapMessage; import javax.jms.*; import java.util.Map; /** * 消息生产者 */ public class JMSProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认的连接密码 private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认的连接地址 private static final int SENDNUM = 2;//发送的消息数量 public static void main(String[] args) { ConnectionFactory connectionFactory = null;//连接工厂 Connection connection = null;//连接 Session session = null;//会话,接受或者发送消息的线程 Destination destination = null;//消息的目的地 MessageProducer messageProducer = null;//消息的生产者 //实例化连接工厂(指定连接用户名|密码|连接地址) connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKERURL); try { connection = connectionFactory.createConnection();//通过连接工厂获取连接 connection.start();//启动连接 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建session destination = session.createQueue("TestQueue");//创建消息队列 messageProducer = session.createProducer(destination);//创建消息生产者 sendMessage(session, messageProducer);//发送消息 session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } //发送消息 private static void sendMessage(Session session, MessageProducer messageProducer) { try { //创建消息Map<key,value> MapMessage message = session.createMapMessage(); message.setString("userName", "syf"); message.setInt("age", 30); message.setDouble("salary", 1000); message.setBoolean("isGirl", true); System.out.println("Sending:" + ((ActiveMQMapMessage)message).getContentMap()); //发送消息 messageProducer.send(message); } catch (JMSException e) { e.printStackTrace(); } } }
Step2:启动ActiveMQ,运行生产者类,模拟生产消息
控制台显示:
Sending:{isGirl=true, userName=syf, salary=1000.0, age=30}
打开浏览器,访问activeMQ监控画面 http://127.0.0.1:8161/admin
【1】Queues
【2】Topics
三。模拟消费者消耗数据
package activemq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMapMessage; import javax.jms.*; /** * 消息消费者 */ public class JMSConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认的连接密码 private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory = null;//连接工厂 Connection connection = null;//连接 Session session = null;//会话,接受或者发送消息的线程 Destination destination = null;//消息的目的地 MessageConsumer messageConsumer = null;//消息的消费者 //实例化连接工厂(指定连接用户名|密码|连接地址) connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKERURL); try { connection = connectionFactory.createConnection();//通过连接工厂获取连接 connection.start();//启动连接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建session destination = session.createQueue("TestQueue");//创建连接的消息队列(TestQueue:生产者队列名) messageConsumer = session.createConsumer(destination);//创建消息消费者 while (true) { MapMessage mapMessage= (MapMessage) messageConsumer.receive(10000); //TextMessage textMessage= (TextMessage) messageConsumer.receive(10000);//10秒接收 if (mapMessage != null) { System.out.println("收到的消息:" + ((ActiveMQMapMessage)mapMessage).getContentMap()); } else { System.out.println("没有消息:"); } } } catch (JMSException e) { e.printStackTrace(); } } }
运行代码:
控制台显示:收到的消息:{userName=syf, salary=1000.0, isGirl=true, age=30}
打开浏览器,查看activeMQ监控画面 http://127.0.0.1:8161/admin
【1】Queues(1条消息被消费)
【2】Topics
示例-----企业应用最常见方式之监听器监听方式
增加监听类JMSListener
package activemq; import org.apache.activemq.command.ActiveMQMapMessage; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; /** * 消息监听 * */ public class JMSListener implements MessageListener{ public void onMessage(Message message) { try { System.out.println("receive Message:"+((ActiveMQMapMessage)message).getContentMap()); } catch (JMSException e) { e.printStackTrace(); } } }
重新模拟一个消费者,应用监听器监听消息
package activemq; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMapMessage; import javax.jms.*; public class JMSConsumerByListener { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认的连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认的连接密码 private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认的连接地址 public static void main(String[] args) { ConnectionFactory connectionFactory = null;//连接工厂 Connection connection = null;//连接 Session session = null;//会话,接受或者发送消息的线程 Destination destination = null;//消息的目的地 MessageConsumer messageConsumer = null;//消息的消费者 //实例化连接工厂(指定连接用户名|密码|连接地址) connectionFactory = new ActiveMQConnectionFactory(JMSConsumerByListener.USERNAME, JMSConsumerByListener.PASSWORD, JMSConsumerByListener.BROKERURL); try { connection = connectionFactory.createConnection();//通过连接工厂获取连接 connection.start();//启动连接 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建session destination = session.createQueue("TestQueue2");//创建连接的消息队列(TestQueue:生产者队列名) messageConsumer = session.createConsumer(destination);//创建消息消费者 messageConsumer.setMessageListener(new JMSListener());//注册消息监听 } catch (JMSException e) { e.printStackTrace(); } } }
=》先执行生成者,生产消息!
控制台打印:Sending:{isGirl=true, userName=kaixin, salary=1000.0, age=30}
ActiveMQ监控画面显示:
=》再执行消费者,消费消息!
控制台打印:receive Message:{userName=kaixin, salary=1000.0, isGirl=true, age=30}
ActiveMQ监控画面显示:
OK!!!大功告成!
参考文章:
https://www.toutiao.com/a6345805464718409986/?tt_from=weixin&utm_campaign=client_share&app=news_article&utm_source=weixin&iid=18292470304&utm_medium=toutiao_android&wxshare_count=1
http://blog.csdn.net/xh16319/article/details/12142249