想象场景:
有一条任务,需要在特定环境下进行。用ActiveMQ 来讲分两步,第一:发送者发布一条消息;第二:接收者接收到这条消息后需要干某些事情。
本文依然直接贴出demo代码!
1、项目结构图:
2、activeMQ的jar包依赖,部分pom.xml文件代码:
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.7.0</version> </dependency> <!-- activemq-spring --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-spring</artifactId> <version>5.7.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>3.0.7.RELEASE</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency> </dependencies>
3、日志属性文件log4j.properties:
log4j.rootLogger=DEBUG,INFO,console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH\:mm\:ss,SSS} [%c]-[%p] %m%n
4、消息接收配置属性文件receive.properties:
jsm_url=tcp://localhost:61616 jms_name=com.andy.demo.test jsm_type=topic fliter= test_key=com.andy.demo.util.activeMQ.DoSomethingImpl max_caches=100
5、消息发送配置属性文件send.properties:
jsm_url=tcp://localhost:61616 jms_name=com.andy.demo.test jsm_type=topic max_caches=100 persist=persist
6、场景中说到的,当我们收到消息后需要处理一些事情。
本例中将需要处理的事情摘出来,分成需要处理事情的接口以及实现类两部分:
(一)接口IDoSomething.java:
package com.andy.demo.activeMQ.work; import javax.jms.Message; // 处理一些事情 的接口 public interface IDoSomething { // 干点实事1 public void doSomeThing01(Message message); // 干点实事2 public void doSomeThing02(Message message); }
(二)接口实现类DoSomethingImpl.java:
package com.andy.demo.activeMQ.work.impl; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.TextMessage; import com.andy.demo.activeMQ.work.IDoSomething; // 处理一些事情 的接口实现类 public class DoSomethingImpl implements IDoSomething { @Override public void doSomeThing01(Message message) { // TODO Auto-generated method stub if (message instanceof TextMessage) { TextMessage msg = (TextMessage) message; try { System.out.println("doSomeThing01 处理的消息内容:" + msg.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } @Override public void doSomeThing02(Message message) { // TODO Auto-generated method stub if (message instanceof TextMessage) { TextMessage msg = (TextMessage) message; try { System.out.println("doSomeThing02 处理的消息内容:" + msg.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
7、消息生产者或者叫消息发送者Sender.java:
package com.andy.demo.activeMQ; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @package :com.andy.demo.util.activeMQ<br> * @file :Sender.java<br> * @describe :消息发送者<br> * @author :wanglongjie<br> * @createDate :2015年11月6日下午1:21:33<br> * @updater :<br> * @updateDate :<br> * @updateContent :<br> */ public class Sender { private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class); private ActiveMQConnectionFactory factory; private Connection conn; private Session session; private Destination destination; private MessageProducer producer; private String url; private String jmsname; private boolean isTopic; private boolean isPersist; private boolean isConnection; private BlockingQueue<String> queue; private String msg; public Sender(String url, String jmsname, boolean isTopic, boolean isPersist, int maxcaches) { super(); System.out.println("Sender.Sender(): 通过构造函数实例化对象......"); this.url = url; this.jmsname = jmsname; this.isTopic = isTopic; this.isPersist = isPersist; this.queue = new LinkedBlockingQueue<String>(maxcaches); } public static Sender getSenderCase(String url, String jmsname, boolean isTopic, boolean isPersist, int maxcaches) { System.out.println("Sender.getSenderCase(): 通过静态方法实例化对象......"); return new Sender(url, jmsname, isTopic, isPersist, maxcaches); } public void addMessage(String msg) throws InterruptedException { System.out.println("Sender.addMessage(): 向队列添加消息......"); queue.put(msg); } private void sendMsg(String msg) throws InterruptedException, JMSException { System.out.println("Sender.sendMsg(): 向服务器发送消息......"); Thread.sleep(5 * 1000); producer.send(session.createTextMessage(msg)); } public void send() { System.out.println("Sender.send(): 从队列中取出消息......"); while (!queue.isEmpty()) { try { msg = queue.take(); initActiveMQ(); sendMsg(msg); } catch (Exception e) { // TODO: handle exception LOGGER.error(e.getMessage()); close(); } } close(); } public void sendMessage(String msg) throws JMSException, InterruptedException { System.out.println("Sender.sendMessage(): 发送消息主方法开始运行......"); initActiveMQ(); sendMsg(msg); close(); } // 初始化 activeMQ private void initActiveMQ() throws JMSException { System.out.println("Sender.initActiveMQ(): 初始化 activeMQ......"); if (isConnection) { return; } factory = new ActiveMQConnectionFactory(url); conn = factory.createConnection(); session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); destination = isTopic ? session.createTopic(jmsname) : session .createQueue(jmsname); producer = session.createProducer(destination); producer.setDeliveryMode(isPersist ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); isConnection = true; } // 关闭释放资源 private void close() { System.out.println("Sender.close(): 关闭释放资源......"); try { producer.close(); session.close(); conn.close(); } catch (Exception e) { // TODO: handle exception LOGGER.error(e.getMessage()); } isConnection = false; } }
8、消息订阅者或者叫消息接收者Receiver.java:
package com.andy.demo.activeMQ; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.andy.demo.activeMQ.work.IDoSomething; /** * @package :com.andy.demo.util.activeMQ<br> * @file :Receive.java<br> * @describe :消息接收者<br> * @author :wanglongjie<br> * @createDate :2015年11月6日下午1:20:27<br> * @updater :<br> * @updateDate :<br> * @updateContent :<br> */ public class Receiver extends Thread implements MessageListener, ExceptionListener, Runnable { private static final Logger LOGGER = LoggerFactory .getLogger(Receiver.class); private ActiveMQConnectionFactory factory; private Connection conn; private Session session; private Destination destination; private MessageConsumer consumer; private String url; private String jmsname; private boolean isTopic; private String filter; private BlockingQueue<Message> queue; private IDoSomething doSomething; public Receiver(String url, String jmsname, boolean isTopic, String filter, IDoSomething doSomething, int maxcaches) { System.out.println("Receiver.Receiver(): 构造函数实例化对象......"); this.url = url; this.jmsname = jmsname; this.isTopic = isTopic; this.filter = filter; this.doSomething = doSomething; queue = new LinkedBlockingQueue<Message>(maxcaches); } public static Receiver getReceiverCase(String url, String jmsname, boolean isTopic, String filter, IDoSomething doSomething, int maxcaches) throws JMSException { System.out.println("Receiver.getReceiverCase(): 静态方法实例化对象......"); Receiver receiver = new Receiver(url, jmsname, isTopic, filter, doSomething, maxcaches); receiver.initActiveMQ(); receiver.start(); return receiver; } // 初始化 activeMQ 参数 private void initActiveMQ() throws JMSException { System.out.println("Receiver.initActiveMQ():初始化activeMQ......."); factory = new ActiveMQConnectionFactory(url); conn = factory.createConnection(); conn.start(); session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); destination = isTopic ? session.createTopic(jmsname) : session .createQueue(jmsname); consumer = (isNull(filter)) ? session.createConsumer(destination) : session.createConsumer(destination, filter); consumer.setMessageListener(this); conn.setExceptionListener(this); } public void close() { System.out.println("Receiver.close(): 关闭释放资源......"); try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block LOGGER.error(e.getMessage()); } try { conn.stop(); conn.close(); } catch (Exception e) { // TODO: handle exception LOGGER.error(e.getMessage()); } } // 判断是否为空 private boolean isNull(String param) { return param == null || param.equals(""); } /* * (non-Javadoc) * * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) */ @Override public void onException(JMSException exception) { // TODO Auto-generated method stub System.out.println("Receiver.onException():activeMQ 异常监听......"); while (true) { try { initActiveMQ(); break; } catch (Exception e) { // TODO: handle exception LOGGER.error(e.getMessage()); try { Thread.sleep(10 * 1000); } catch (InterruptedException e1) { // TODO Auto-generated catch block LOGGER.error(e1.getMessage()); } } } } /* * (non-Javadoc) * * @see javax.jms.MessageListener#onMessage(javax.jms.Message) */ @Override public void onMessage(Message message) { // TODO Auto-generated method stub System.out.println("Receiver.onMessage(): activeMQ 消息接收监听......"); try { if (isTopic) { queue.put(message); } else { doSomethingWork(message); } } catch (Exception e) { // TODO: handle exception LOGGER.error(e.getMessage()); } } /* * (non-Javadoc) * * @see java.lang.Thread#run() */ @Override public void run() { // TODO Auto-generated method stub System.out.println("Receiver.run(): Runnble接口监听......" + Thread.currentThread().getName()); while (true) { Message message = null; try { message = queue.take(); doSomethingWork(message); } catch (Exception e) { // TODO: handle exception LOGGER.error(e.getMessage()); } } } // 具体任务 public void doSomethingWork(Message message) { System.out.println("Receiver.doSomethingWork(): 开始干实事了......"); doSomething.doSomeThing01(message); doSomething.doSomeThing02(message); } }
9、消息发送者和消息接收者的封装类ActiveMQUtils.java:
package com.andy.demo.activeMQ; import java.io.InputStream; import java.util.Properties; import javax.jms.JMSException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.andy.demo.activeMQ.work.IDoSomething; /** * @package :com.andy.demo.util.activeMQ<br> * @file :ActiveMQUtils.java<br> * @describe :<br> * @author :wanglongjie<br> * @createDate :2015年11月6日下午1:22:13<br> * @updater :<br> * @updateDate :<br> * @updateContent :<br> */ public class ActiveMQUtils { private static final Logger LOGGER = LoggerFactory .getLogger(ActiveMQUtils.class); public static final String jms_url = null; public static final String jms_name = null; public static final String jms_filter = null; public static final String jms_type = "topic"; public static final String test_key = null; public static final String persist = "persist"; /** * * @method :getReceiverBean<br> * @describe :获取 消息接收者实例<br> * @author :wanglongjie<br> * @createDate :2015年11月6日下午4:03:31 <br> * @param properties * @param doSomething * @return Receiver * @throws JMSException * */ public static Receiver getReceiverBean(String properties, IDoSomething doSomething) throws JMSException { Properties p = loadProperties(properties); String url = p.getProperty("jsm_url"); String jmsname = p.getProperty("jms_name"); boolean isTopic = p.getProperty("jsm_type", "topic").equals("topic"); String filter = p.getProperty("fliter"); int maxcaches = Integer.parseInt(p.getProperty("max_caches", "1000")); Receiver receiver = Receiver.getReceiverCase(url, jmsname, isTopic, filter, doSomething, maxcaches); return receiver; } /** * * @method :getSenderCase<br> * @describe :获取 消息发送者实例<br> * @author :wanglongjie<br> * @createDate :2015年11月6日下午4:03:48 <br> * @param properties * @return Sender */ public static Sender getSenderCase(String properties) { Properties p = loadProperties(properties); String url = p.getProperty("jsm_url"); String jmsname = p.getProperty("jms_name"); boolean isTopic = p.getProperty("jsm_type", "topic").equals("topic"); boolean isPersist = p.getProperty("persist", "persist").equals( "persist"); int maxcaches = Integer.parseInt(p.getProperty("max_caches", "1000")); Sender sender = Sender.getSenderCase(url, jmsname, isTopic, isPersist, maxcaches); return sender; } /** * * @method :loadProperties<br> * @describe :加载 属性文件<br> * @author :wanglongjie<br> * @createDate :2015年11月6日下午1:36:44 <br> * @param properties * @return Properties */ private static Properties loadProperties(String properties) { InputStream in = null; try { in = ActiveMQUtils.class.getResourceAsStream(properties); Properties p = new Properties(); p.load(in); return p; } catch (Exception e) { // TODO: handle exception LOGGER.error(e.getMessage()); } finally { try { in.close(); } catch (Exception e2) { // TODO: handle exception LOGGER.error(e2.toString()); } } return null; } }
10、测试类,分发送消息测试、接收消息测试两部分:
(一)消息发送测试类SenderAPPTest.java:
package com.andy.demo.activeMQ.test; import java.util.Date; import javax.jms.JMSException; import com.andy.demo.activeMQ.ActiveMQUtils; import com.andy.demo.activeMQ.Sender; // 发送消息测试类 public class SenderAPPTest { public static void main(String[] args) { String properties = "/send.properties"; Sender sender = ActiveMQUtils.getSenderCase(properties); int sum = 5; for (int i = 0; i < sum; i++) { String msg = new Date(System.currentTimeMillis()) + " ; Hello, I am andy And this is a activeMQ test[" + (i + 1 ) + "]!"; try { sender.sendMessage(msg); } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
(二)接收消息测试类ReceiverAPPTest.java:
package com.andy.demo.activeMQ.test; import javax.jms.JMSException; import com.andy.demo.activeMQ.ActiveMQUtils; import com.andy.demo.activeMQ.Receiver; import com.andy.demo.activeMQ.work.impl.DoSomethingImpl; // 接收消息测试类 public class ReceiverAPPTest { public static void main(String[] args) { String properties = "/receive.properties"; DoSomethingImpl doSomething = new DoSomethingImpl(); int num = 5; Receiver[] receivers = new Receiver[num]; try { for (int i = 0; i < num; i++) { receivers[i] = ActiveMQUtils.getReceiverBean(properties, doSomething); } } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
11、本例的源码已贴完,再说一下JMS中topic和queue的区别:
(一)topic:
在JMS中,Topic实现publish和subscribe语义。一条消息被publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的subscriber能够获得消息的一个拷贝。
所以把配置文件中的jsm_type属性写成topic测试时,必须先启动ReceiverAPPTest.java接收者,然后在启动SenderAPPTest.java 发送者。也就是说在topic情况下,是先有接收者存在的情况下才能接收到发送者发送的消息。
(二)queue:
MS Queue执行load balancer语义。一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该message的consumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转到另一个consumer那儿。一个Queue可以有很多consumer,并且在多个可用的consumer中负载均衡
所以把配置文件中的jms_type属性写成queue测试时,先启动发送者或者接收者都可以。