ActiveMQ Apollo之MQTT

Apollo是apache旗下的基金项目,它是以Apache ActiveMQ5.x为基础,采用全新的线程和消息调度架构重新实现的消息中间件,针对多核处理器进行了优化处理,它的速度更快、更可靠、更易于维护。apollo与ActiveQQ一样支持多协议:STOMP、AMQP、MQTT、Openwire、 SSL、WebSockets,本文只介绍MQTT协议的使用。 
关于ActiveMQ5请参考:http://activemq.apache.org,本文只介绍Apollo在windows下安装和应用,Apollo的详细文档请参考官网:http://activemq.apache.org/apollo/documentation/user-manual.html.

Apollo的下载和安装

1.下载并安装

进入http://activemq.apache.org/apollo/download.html,下载windows版本的压缩包,并解压到自己工作目录(如:E:\apache-apollo-1.7),并创建环境变量APOLLO_HOME=E:\apache-apollo-1.7。如果操作是系统是Windows Vista或更高版本,则需要安装Microsoft Visual C++ 2010 Redistributable (64位JVM:http://www.microsoft.com/en-us/download/details.aspx?id=14632;32位JVM:http://www.microsoft.com/en-us/download/details.aspx?id=5555)。

2.创建broker实例并启动服务

进入E:\apache-apollo-1.7之下的bin目录,打开cmd窗口,执行命令:apollo create E:\apollo_broker,命令执行成功后,在E盘下会有apollo_broker目录,在其下有个bin目录,其中有两个文件:apollo-broker.cmd和apollo-broker-service.exe,第一个是通过cmd命令启动apollo服务的,第二个是创建window服务的。 
cmd命令启动:apollo-broker run,启动成功可以在浏览器中查看运行情况(http://127.0.0.1:61680/,默认用户名/密码:admin/password); 
windows服务启动:执行apollo-broker-service.exe,创建windows服务,就可以以windows服务的方式启动apollo服务。

3.MQTT协议的应用

MQTT协议有众多客户端实现,相关请参考:http://activemq.apache.org/apollo/versions/1.7/website/documentation/mqtt-manual.html。 
本文采用eclipse的paho客户端实现(https://eclipse.org/paho/)。

a.javascript客户端:https://eclipse.org/paho/clients/js/

将javascript客户端项目下载下来,并在其项目根目录下执行mvn命令,进行编译,生成target目录,其下生成mqttws31.js、mqttws31-min.js两个js文件,将其拷贝到自己项目相关目录下,并在页面中引用,即可实现javascript客户端的消息订阅和发布,demo代码如下: 
var client = new Paho.MQTT.Client(location.hostname, 61623,"/", "clientId"); 
// 61623是ws连接的默认端口,可以在apollo中间件中进行配置(关于apollo的配置请参考:http://activemq.apache.org/apollo/documentation/user-manual.html) 
// set callback handlers 
client.onConnectionLost = onConnectionLost; 
client.onMessageArrived = onMessageArrived; 
// connect the client 
client.connect({userName:‘admin‘,password:‘password‘,onSuccess:onConnect}); 
// called when the client connects 
function onConnect() { // 连接成功后的处理 
// Once a connection has been made, make a subscription and send a message. 
console.log("onConnect"); 
client.subscribe("/topic/event"); // 订阅消息的主题 
var message = new Paho.MQTT.Message("Hello,this is a test"); 
message.destinationName = "/topic/event"; 
client.send(message); // 发送消息 

// called when the client loses its connection 
function onConnectionLost(responseObject) { // 连接丢失后的处理 
if (responseObject.errorCode !== 0) { 
console.log("onConnectionLost:"+responseObject.errorMessage); 


// called when a message arrives 
function onMessageArrived(message) { // 消息接收成功后的处理 
console.log("onMessageArrived:"+message.payloadString); 
}

b. java客户端实现

paho目前只支持J2SE和安卓,下载地址:https://eclipse.org/paho/clients/java/,我们采用maven方式。 
maven库地址: 
https://repo.eclipse.org/content/repositories/paho-releases/ - Official Releases 
https://repo.eclipse.org/content/repositories/paho-snapshots/ - Nightly Snapshots 
maven dependency: 
<dependency> 
<groupId>org.eclipse.paho</groupId> 
<artifactId>org.eclipse.paho.client.mqttv3</artifactId> 
<version>1.0.1</version> 
</dependency>
 
说明:版本为1.0.0或0.9.0时,其jar包根本加载不进来,最后搜到1.0.1版本才可以正常使用。 
java端实现: 
public interface IMessage { 
String getHost(); 
Integer getPort(); 
Integer getQos(); 
String getTopic(); 
String getClientId(); 
String getContent(); 
byte[] getContentBytes(); 


Map<String,Object> getOption(); 
Object getSender(); 
Date getSendTime(); 
}
 
public final class MessageProcessingCenter { 
protected static Logger logger=LoggerFactory.getLogger(MessageProcessingCenter.class); 
protected static final String BROKER_PREFIX="tcp://"; 
protected static final String BROKER_HOST="localhost"; 
protected static final int PORT=61613; 
protected static final int QOS=2; 
protected static final String TOPIC="/topic/event"; 
protected static final String CLIENT_ID="clientId"; 
protected static final String MQ_USER="admin"; 
protected static final String MQ_PASSWORD="password"; 
public static void send(IMessage message){ 
String topic= StringUtils.isEmpty(message.getTopic())?TOPIC: message.getTopic(); 
int qos=null == message.getQos()?QOS: message.getQos(); 
String broker=BROKER_PREFIX+ (StringUtils.isEmpty(message.getHost())?BROKER_HOST:message.getHost()); 
int port=null == message.getPort()?PORT:message.getPort(); 
broker+=":"+port; 
String clientId = StringUtils.isEmpty(message.getClientId())?CLIENT_ID:message.getClientId(); 
Map<String,Object> opts=message.getOption(); 
String user=MQ_USER; 
String password=MQ_PASSWORD; 
if(null != opts){ 
if(null != opts.get("userName")){ 
user=opts.get("userName").toString(); 

if(null != opts.get("password")){ 
password=opts.get("password").toString(); 


MemoryPersistence persistence = new MemoryPersistence(); 
try { 
MqttClient sampleClient = new MqttClient(broker, clientId, persistence); 
MqttConnectOptions connOpts = new MqttConnectOptions(); 
connOpts.setUserName(user); 
connOpts.setPassword(password.toCharArray()); 
connOpts.setCleanSession(true); 
sampleClient.connect(connOpts); 
MqttMessage mqm = new MqttMessage(message.getContentBytes()); 
mqm.setQos(qos); 
sampleClient.publish(topic, mqm); 
sampleClient.disconnect(); 
} catch(MqttException me) { 
logger.info("********************* send message exception :"); 
logger.info("********************* reason : " + me.getReasonCode()); 
logger.info("********************* msg : " + me.getMessage()); 
logger.info("********************* loc : " + me.getLocalizedMessage()); 
logger.info("********************* cause : " + me.getCause()); 
logger.info("********************* excep : " + me); 
me.printStackTrace(); 


public static void send(Set<IMessage> set){ 
for(IMessage message:set){ 
send(message); 


}

小结

至此,MQTT协议已部署完毕,java端可以发布消息,而javascript端则可以订阅并接收到java端发布的信息。 
本文只是依照官网手册而实现的简单应用,讲解不一定十分准确,有什么不对的地方还请多多指点,更详细的应用请参考官网文档: 
apollo:http://activemq.apache.org/apollo/documentation/user-manual.html 
eclipse paho:https://eclipse.org/paho/

时间: 2024-10-09 02:55:07

ActiveMQ Apollo之MQTT的相关文章

30.apollo源代码在mac下编译

一. Apollo简介 Apache Apollo是一个代理服务器,其是在ActiveMQ基础上发展而来的,可以支持STOMP, AMQP, MQTT, Openwire, SSL, and WebSockets 等多种协议. 官网链接 自己使用Apollo主要是用它做mqtt服务器. 二. 源代码下载 代码下载编译官方指导 使用svn checkout代码: svn co http://svn.apache.org/repos/asf/activemq/activemq-apollo/trun

转 【MQTT】在Windows下搭建MQTT服务器

MQTT简介 MQ 遥测传输 (MQTT) 是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放.简单.轻量.易于实现.这些特点使它适用于受限环境.该协议的特点有: 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合. 对负载内容屏蔽的消息传输. 使用 TCP/IP 提供网络连接. 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量. 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制. 有三种消息发布服务质量:

ActiveMQ 读书笔记

ActiveMQ 简介 消息可以用在不同应用程序相互通知,或者相互操作.企业级别的消息软件出来的很早,但是开源的消息解决方案出来就晚很多.其中ActiveMQ 就是其中一种,提供了异步的松散的应用程序间的通讯解决方案. ActiveMQ 是一个开源的JMS MOM(message-oriented middleware),采用的是Apache License.当然这个东西成也Apache败也Apache.Apache已经 推出了ActiveMQ Apollo来提供更好更稳定的消息服务.世面上也有

朴人的码农乐园

民以食为天,代码是程序员的食粮,优秀的开源项目和源码就如同美味的大餐,是码农茁壮成长的有力补给: 自己写代码是亲自下厨秀手艺,阅读优秀源码是品尝大厨的杰作,对饥渴的码农而言,都是快感: 还有哪个行业或领域有如开源软件这样的高度分享与奉献?向开源软件的贡献者致敬!希望有朝一日也能添砖加瓦,献绵薄之力: 作为一名JavaEE/Android码农,下面列出工作中涉及到的相关开源软件或开源库(比较乱,待整理完善): 时不时钻进去摸索摸索,即使不能深入挖掘,也求能混个熟悉:坚持,加油! Linux/Ker

常用消息队列对比

作为中间件,消息队列是分布式应用间交换信息的重要组件.消息队列可驻留在内存或磁盘上, 队列可以存储消息直到它们被应用程序读走.通过消息队列,应用程序可以在不知道彼此位置的情况下独立处理消息,或者在处理消息前不需要等待接收此消息.所以消息队列可以解决应用解耦.异步消息.流量削锋等问题,是实现高性能.高可用.可伸缩和最终一致性架构中不可以或缺的一环.下面对消息队列就直接使用MQ表示. 现在比较常见的MQ产品主要是ActiveMQ.RabbitMQ.ZeroMQ.Kafka.MetaMQ.Rocket

消息队列汇总(转)

add by zhj: 文中不仅列出了最常用的MQ,而且还列出了一些文章,分享使用的经验,值得一看 原文:http://queues.io/ About There are many queueing systems out there. Each one of them is different and was created for solving certain problems. This page tries to collect the libraries that are wide

STOMP Over WebSocket原文

What is STOMP? STOMP is a simple text-orientated messaging protocol. It defines an interoperable wire format so that any of the available STOMP clients can communicate with any STOMP message broker to provide easy and widespread messaging interoperab

开源 VS 商业,消息中间件你不知道的那些事

11月23日,新炬网络中间件技术专家刘拓老师在DBA+社群中间件用户组进行了一次主题为“开源 VS 商业,消息中间件你不知道的那些事”的线上分享.小编特别整理出其中精华内容,供大家学习交流. 嘉宾简介 新炬网络中间件技术专家 曾任职于IBM华南GTS 4年,IBM WebSphere.MQ.CICS产品线技术专家 5年移动运营商(广东移动.浙江移动)运维经验,3年JAVA开发及售后经验 演讲实录 随着云计算的兴起,Docker.微服务的流行,分布式消息队列技术成为云计算平台中不可或缺的组件.今天

STOMP Over WebSocket

Show Table of Contents What is STOMP? STOMP is a simple text-orientated messaging protocol. It defines an interoperable wire format so that any of the available STOMP clients can communicate with any STOMP message broker to provide easy and widesprea