Spring下ActiveMQ实战

MessageQueue是分布式的系统里经常要用到的组件,一般来说,当需要把消息跨网段、跨集群的分发出去,就可以用这个。一些典型的示例就是:

    1、集群A中的消息需要发送给多个机器共享;

    2、集群A中消息需要主动推送,但彼此的网络不是互通的(如集群A只有过HA才能被外界访问);

    

当然上面的几个点,除了用MQ还有其它实现方式,但是MQ无疑是非常适合用来做这些事的。众多MQ中,ActiveMQ是比较有名气也很稳定的,它发送消息的成本非常廉价,支持Queue与Topic两种消息机制。本文主要就是讲如何在Spring环境下配置此MQ:

1、场景假设

现有机器两台Server、Worker需要进行异步通信,另有一台ActiveMQ机器,关于MQ的配置信息存放在Zookeeper中,Zookeeper的节点有:

- /mq/activemq/ip:mq的机器ip

-/mq/activemq/port:这是mq的机器端口

2、Server的Spring XML配置

Server主要的工作就是接受Worker消息,并发送消息给Worker。主要是定义了连接MQ的连接池接受Worker消息的队列worker,发送消息给Worker的队列server:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="
 3         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
 4         http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.1.xsd">
 5
 6     <!-- ActiveMQ连接池 -->
 7     <bean id="conFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
 8         <property name="connectionFactory">
 9             <bean class="org.apache.activemq.ActiveMQConnectionFactory">
10                 <property name="brokerURL">
11                     <bean class="lekko.mq.util.MQPropertiesFactory" factory-method="getUrl" />
12                 </property>
13                 <property name="closeTimeout" value="60000" />
14                 <!-- <property name="userName" value="admin" /> -->
15                 <!-- <property name="password" value="admin" /> -->
16                 <!-- <property name="optimizeAcknowledge" value="true" /> -->
17                 <property name="optimizedAckScheduledAckInterval" value="10000" />
18             </bean>
19         </property>
20     </bean>
21
22
23     <!-- Worker任务消息 -->
24     <bean id="taskWorkerTopic" class="org.apache.activemq.command.ActiveMQTopic">
25         <constructor-arg value="worker_topic" />
26     </bean>
27     <!-- 任务监听容器 -->
28     <bean id="taskWorkerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
29         <property name="connectionFactory" ref="conFactory" />
30         <property name="destination" ref="taskWorkerTopic" />
31         <property name="messageListener">
32             <bean class="lekko.mq.task.TaskWorkerListener" />
33         </property>
34         <property name="pubSubDomain" value="true" />
35     </bean>
36
37
38     <!-- Server任务消息 -->
39     <bean id="taskServerTopic" class="org.apache.activemq.command.ActiveMQTopic">
40         <constructor-arg value="server_topic" />
41     </bean>
42     <!-- 任务消息发送模板 -->
43     <bean id="taskServerTemplate" class="org.springframework.jms.core.JmsTemplate" p:connectionFactory-ref="conFactory" p:defaultDestination-ref="taskServerTopic" />
44
45 </beans>

一段一段地分析,ActiveMQ连接池这里,定义了连接的bean为“conFactory”,其中broberURL属性是通过后台Java代码的静态方法来设置的,方便线上环境通过Java代码动态地切换,稍后会介绍这块代码,你现在需要知道的是,它实际上返回的就是一个字符串,格式像:tcp://xxx.xxx.xxx.xxx:port,如果不要用后台来管理连接信息,直接改成“<property name="brokerURL" value="tcp://xxx.xxx.xxx.xxx:port">”也是OK的。

接下来,便是Worker消息队列的定义,这里定义为“taskWorkerTopic”,类型是org.apache.activemq.command.ActiveMQTopic,(订阅模式)它表示一个消息可以被多个机器收到并处理,其它的还有org.apache.activemq.command.ActiveMQQueue,(点对点模式)表示一个消息只能被一台机器收到,当收到后消息就出队列了,其它机器无法处理。它们都有一个构造参数constructor-arg,指定了消息队列的名称,一个MQ中一个消息队列的名字是唯一的。

Worker的消息队列定义好了之后,就是接受Worker的里消息了,这里定义了“taskWorkerContainer”,其属性分别定义了连接池、目标队列、消息处理器(我们自己的Java类,后面再讲),参数pubSubDomain用于指定是使用订阅模式还是使用点对点模式,如果是ActiveMQTopic则要设置为true,默认是false。

好了,Server现在已经可以通过自己定义的“lekko.mq.task.TaskWorkerListener”类接受并处理taskWorkerTopic的消息了。

如法炮制,定义一个专门用于往Worker里发消息的队列“taskServerTopic”,并定义发送消息的模板“taskServerTemplate”备用。

3、Server端的接收类与发送类

lekko.mq.task.TaskWorkerListener便是一个接收类示例:

 1 package lekko.mq.task;
 2
 3 import javax.jms.Message;
 4 import javax.jms.MessageListener;
 5
 6 import org.apache.activemq.command.ActiveMQObjectMessage;
 7 import org.apache.log4j.Logger;
 8 import org.springframework.stereotype.Service;
 9 import lekko.mq.model.MessageModel;
10
11
12 /**
13  * Task消息监听类
14  * @author lekko
15  */
16 @Service
17 public class TaskWorkerListener implements MessageListener {
18
19     private Logger _logger = Logger.getLogger(TaskWorkerListener.class);
20
21     @Override
22     public void onMessage(Message message) {
23         if (message instanceof ActiveMQObjectMessage) {
24             ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage) message;
25             try {
26                 onMessage((MessageModel) aMsg.getObject());
27             } catch (Exception e) {
28                 _logger.warn("Message:${} is not a instance of MessageModel.", e);
29             }
30         } else {
31             _logger.warn("Message:${} is not a instance of ActiveMQObjectMessage.");
32         }
33     }
34
35     /**
36      * 处理消息
37      * @param message 自定义消息实体
38      */
39     public void onMessage(MessageModel message) { ... }
40
41 }

这里给大家演示的并不是最基础的知识,处理的消息是一个自定义的类“lekko.mq.model.MessageModel”,这个类怎么写可以随便整,反正就是一些你要传递的数据字段,但是记得要实现Serializable接口。如果你需要传递的仅仅是纯字符串,那么直接在代码的23行片,把message.toString()即可。这个类通过前面XML配置会处理来自“worker_topic”队列中的消息。

再就是发送类,实际上就是把前面的taskServiceTemplate拿来用就行了:

 1 package lekko.mq.task;
 2
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.beans.factory.annotation.Qualifier;
 5 import org.springframework.jms.core.JmsTemplate;
 6 import org.springframework.stereotype.Service;
 7 import lekko.mq.model.MessageModel;
 8
 9
10 /**
11  * 服务器任务消息分发
12  * @author lekko
13  */
14 @Service
15 public class TaskServerSender {
16
17     @Autowired
18     @Qualifier("taskServerTemplate")
19     private JmsTemplate jmsTemplate;
20
21     /**
22      * 发送消息
23      */
24     public void sendMessage(MessageModel msg) {
25         jmsTemplate.convertAndSend(msg);
26     }
27
28 }

把这个类TaskServerSender注入到任意需要用到的地方,调用sendMessage方法即可。它会往前面定义的“server_topic”中塞消息,等Worker来取。

4、关于Zookeeper配置MQ连接信息

Worker端的配置我这里不再阐述,因为它跟在Server端的配置太相像,区别就在于Server端是从worker_topic中取消息,往server_topic中写消息;而Worker端的代码则是反过来,往worker_topic中写消息,从server_topic中取消息。

那么如何使用Java代码来控制ActiveMQ的配置消息呢:

 1 package lekko.mq.util;
 2
 3 import org.apache.zookeeper.ZooKeeper;
 4 import org.apache.zookeeper.data.Stat;
 5
 6 /**
 7  * 获取MQ配置
 8  * @author lekkoli
 9  */
10 public class MQPropertiesFactory {
11
12     private static boolean isLoaded = false;
13     private static String ZOOKEEPER_CLUST = "xxx.xxx.xxx.xxx:2181";
14     private static ZooKeeper _zk;
15     private static String _ip;
16     private static String _port;
17
18     private static String getProperty(String path) throws Exception {
19         if (_zk == null) {
20             if (ZOOKEEPER_CLUST == null) {
21                 throw new Exception("Zookeeper, Host \"" + ZOOKEEPER_CLUST + "\" is null!");
22             }
23             _zk = new ZooKeeper(ZOOKEEPER_CLUST, 90000, null);
24         }
25         Stat s = _zk.exists(path, false);
26         if (s != null)
27             return new String(_zk.getData(path, false, s));
28         throw new Exception("Zookeeper, Path \"" + path + "\" is not exist!");
29     }
30
31     private static void load() throws Exception {
32         if (!isLoaded) {
33             _ip = getProperty("/mq/activemq/ip");
34             _port = getProperty("/mq/activemq/port");
35             isLoaded = true;
36         }
37     }
38
39     public static String getUrl() throws Exception {
40         load();
41         StringBuilder failover = new StringBuilder();
42         String[] ips = _ip.split(";"), ports = _port.split(";");
43         for (int i = 0; i < ips.length; ++i) {
44             failover.append("tcp://").append(ips[i]).append(":").append(ports[i]).append(",");
45         }
46         failover.setLength(failover.length() - 1);
47         String failovers = failover.toString();
48         if (ips.length > 1) {
49             failovers = "failover:(" + failovers + ")";
50         }
51         return failovers;
52     }
53 }

上面的代码需要解释的地方跟MQ相关的不多,主要就是如果是mq集群,则格式是:failover:(tcp://192.168.1.117:1001,tcp://192.168.1.118:1001,tcp://xxx.xxx.xxx.xxx:port)。其它上面代码没有对Zookeeper集群都挂了的情况,做应急连接方案。当然,无论如何本节都不是全文的重点,但是多学一技何尝不可?

最近工作越来越忙,更新博客也是时有时无,但是我会坚持下去,还有许多工作中的点滴,在这里沉淀一下,也希望更进一步吧。

转载请注明原址:http://www.cnblogs.com/lekko/p/4940976.html

时间: 2024-08-10 00:07:42

Spring下ActiveMQ实战的相关文章

在Spring下集成ActiveMQ

1.参考文献 Spring集成ActiveMQ配置 Spring JMS异步发收消息 ActiveMQ 2.环境 在前面的一篇ActiveMQ入门实例中我们实现了消息的异步传送,这篇博文将如何在spring环境下集成ActiveMQ.如果要在spring下集成ActiveMQ,那么就需要将如下jar包导入项目: 本文有两篇参考文献,因此有两个实例,项目结构如下图所示: 3.实例1 信息发送者:HelloSender.java package edu.sjtu.erplab.springactiv

Java消息队列-Spring整合ActiveMq

1.概述 首先和大家一起回顾一下Java 消息服务,在我之前的博客<Java消息队列-JMS概述>中,我为大家分析了: 消息服务:一个中间件,用于解决两个活多个程序之间的耦合,底层由Java 实现. 优势:异步.可靠 消息模型:点对点,发布/订阅 JMS中的对象  然后在另一篇博客<Java消息队列-ActiveMq实战>中,和大家一起从0到1的开启了一个ActiveMq 的项目,在项目开发的过程中,我们对ActiveMq有了一定的了解: 多种语言和协议编写客户端.语言: Java

Spring Data MongoDB实战(上)

Spring Data MongoDB实战(上) 作者:chszs,版权所有,未经同意,不得转载.博主主页:http://blog.csdn.net/chszs 本文会详细展示Spring Data MongoDB是如何访问MongoDB数据库的.MongoDB是一个开源的文档型NoSQL数据库,而Spring Data MongoDB是Spring Data的模块之一,专用于访问MongoDB数据库.Spring Data MongoDB模块既提供了基于方法名的查询方式,也提供了基于注释的查询

Spring集成ActiveMQ配置 --转

转自:http://suhuanzheng7784877.iteye.com/blog/969865 集成环境 Spring采用2.5.6版本,ActiveMQ使用的是5.4.2,从apache站点可以下载.本文是将Spring集成ActiveMQ来发送和接收JMS消息. 集成步骤 将下载的ActiveMQ解压缩后文件夹如下 activemq-all-5.4.2.jar是activemq的所有的类jar包.lib下面是模块分解后的jar包.将lib下面的 Java代码 /lib/activati

Apache ActiveMQ实战(1)-基本安装配置与消息类型

ActiveMQ简介 ActiveMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的.可扩展的.稳定的和安全的企业级消息通信.ActiveMQ使用Apache提供的授权,任何人都可以对其实现代码进行修改. ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件.ActiveMQ实现了JMS标准并提供了很多附加的特性.这些附加的特性包括,JMX管理(java Management Extensions,即java

Spring整合ActiveMQ

1.管理ActiveMQ 地址  http://localhost:8161/admin/ 默认用户和密码:admin=admin 运行发送者,eclipse控制台输出,如下图: 此时,我们先看一下ActiveMQ服务器,Queues内容如下: 我们可以看到创建了一个名称为HelloWorld的消息队列,队列中有10条消息未被消费,我们也可以通过Browse查看是哪些消息,如下图: 如果这些队列中的消息,被删除,消费者则无法消费. 我们继续运行一下消费者,eclipse控制台打印消息,如下: 

Spring整合ActiveMQ及多个Queue消息监听的配置

消息队列(MQ)越来越火,在java开发的项目也属于比较常见的技术,MQ的相关使用也成java开发人员必备的技能.笔者公司采用的MQ是ActiveMQ,且消息都是用的点对点的模式.本文记录了实现Spring整合ActivateMQ的全过程及如何使用MQ,便于后续查阅. 一.项目的搭建 采用maven构建项目,免去了copy jar包的麻烦.因此,我们创建了一个java类型的Maven Project (1)项目结构图 先把项目结构图看一下,便于对项目的理解. (2)pom.xml 我们需要加入以

Spring+Stomp+ActiveMq实现websocket长连接

stomp.js+spring+sockjs+activemq实现websocket长连接,使用java配置. pom.xml(只列出除了spring基本依赖意外的依赖,spring-version为4.3.3.RELEASE): <dependency> <groupId>javax.websocket</groupId> <artifactId>javax.websocket-api</artifactId> <version>1

彻底征服 Spring AOP 之 实战篇

接上一小节彻底征服 Spring AOP 之 理论篇 Spring AOP 实战 看了上面这么多的理论知识, 不知道大家有没有觉得枯燥哈. 不过不要急, 俗话说理论是实践的基础, 对 Spring AOP 有了基本的理论认识后, 我们来看一下下面几个具体的例子吧.下面的几个例子是我在工作中所遇见的比较常用的 Spring AOP 的使用场景, 我精简了很多有干扰我们学习的注意力的细枝末节, 以力求整个例子的简洁性. 下面几个 Demo 的源码都可以在我的 Github 上下载到. HTTP 接口