消息mqtt服务运用

### 摘要
>Apollo是apache旗下的基金项目,它是以Apache ActiveMQ5.x为基础,采用全新的线程和消息调度架构重新实现的消息中间件,针对多核处理器进行了优化处理,它的速度更快、更可靠、更易于维护。apollo与ActiveQQ一样支持多协议:STOMP、AMQP、MQTT、Openwire、 SSL、WebSockets,本文只介绍MQTT协议的使用。
关于ActiveMQ5请参考:http://activemq.apache.org,本文只介绍Apollo在windows下安装和应用,Apollo的详细文档请参考官网:http://activemq.apache.org/apollo/documentation/user-manual.html.

### Apollo的下载和安装

#### 1.下载并安装

>进入http://activemq.apache.org/apollo/download.html,下载windows版本的压缩包,并解压到自己工作目录(如:E:\apache-apollo-1.7),并创建环境变量APOLLO_HOME=E:\apache-apollo-1.7。如果操作是系统是Windows Vista或更高版本,则需要安装Microsoft Visual C++ 2010 Redistributable (64位JVM:http://www.microsoft.com/en-us/download/details.aspx?id=14632;32位JVM:http://www.microsoft.com/en-us/download/details.aspx?id=5555)。

#### 2.创建broker实例并启动服务
>进入E:\apache-apollo-1.7之下的bin目录,打开cmd窗口,执行命令:apollo create E:\apollo_broker,命令执行成功后,在E盘下会有apollo_broker目录,在其下有个bin目录,其中有两个文件:apollo-broker.cmd和apollo-broker-service.exe,第一个是通过cmd命令启动apollo服务的,第二个是创建window服务的。
> ###### cmd命令启动
在cmd命令行执行apollo-broker run;
等启动成功可以在浏览器中查看运行情况(http://127.0.0.1:61680/,默认用户名/密码:admin/password)。
#### 3.MQTT协议的应用
>MQTT协议有众多客户端实现,相关请参考:http://activemq.apache.org/apollo/versions/1.7/website/documentation/mqtt-manual.html。
本文采用eclipse的paho客户端实现(https://eclipse.org/paho/)。
> ##### a.javascript客户端:https://eclipse.org/paho/clients/js/
将javascript客户端项目下载下来,并在其项目根目录下执行mvn命令,进行编译,生成target目录,其下生成mqttws31.js、mqttws31-min.js两个js文件,将其拷贝到自己项目相关目录下,并在页面中引用,即可实现javascript客户端的消息订阅和发布,demo代码如下:
`var client = new Paho.MQTT.Client(location.hostname, 61623,"/", "clientId"); `
`// 61623是ws连接的默认端口,可以在apollo中间件中进行配置(关于apollo的配置请参考: http://activemq.apache.org/apollo/documentation/user-manual.html)`
`// set callback handlers`
`client.onConnectionLost = onConnectionLost;`
`client.onMessageArrived = onMessageArrived;`
`// connect the client`
`client.connect({userName:‘admin‘,password:‘password‘,onSuccess:onConnect});`
`// called when the client connects`
`function onConnect() { // 连接成功后的处理·`
`// Once a connection has been made, make a subscription and send a message.`
`console.log("onConnect");`
`client.subscribe("/topic/event"); // 订阅消息的主题`
`var message = new Paho.MQTT.Message("Hello,this is a test");`
`message.destinationName = "/topic/event";`
`client.send(message); // 发送消息`
`}`
`// called when the client loses its connection`
`function onConnectionLost(responseObject) { // 连接丢失后的处理`
`if (responseObject.errorCode !== 0) {`
` console.log("onConnectionLost:"+responseObject.errorMessage);`
` }`
` }`
` // called when a message arrives`
` function onMessageArrived(message) { // 消息接收成功后的处理`
` console.log("onMessageArrived:"+message.payloadString);`
` }`
>##### b. java客户端实现
paho目前只支持J2SE和安卓,下载地址:https://eclipse.org/paho/clients/java/,我们采用maven方式。
**maven库地址:**
https://repo.eclipse.org/content/repositories/paho-releases/ - Official Releases
https://repo.eclipse.org/content/repositories/paho-snapshots/ - Nightly Snapshots
**maven dependency:**
`<dependency>`
` <groupId>org.eclipse.paho</groupId>`
` <artifactId>org.eclipse.paho.client.mqttv3</artifactId>`
` <version>1.0.1</version>`
` </dependency>`
说明:版本为1.0.0或0.9.0时,其jar包根本加载不进来,最后搜到1.0.1版本才可以正常使用。
**java端实现:**
`public interface IMessage {`
` String getHost();`
` Integer getPort();`
` Integer getQos();`
`String getTopic();`
`String getClientId();`
`String getContent();`
`byte[] getContentBytes();`
`Map<String,Object> getOption();`
`Object getSender();`
`Date getSendTime();`
`}`
`public final class MessageProcessingCenter {`
`protected static Logger logger=LoggerFactory.getLogger(MessageProcessingCenter.class);`
`protected static final String BROKER_PREFIX="tcp://";`
`protected static final String BROKER_HOST="localhost";`
`protected static final int PORT=61613;`
`protected static final int QOS=2;`
`protected static final String TOPIC="/topic/event";`
`protected static final String CLIENT_ID="easypm";`
`protected static final String MQ_USER="admin";`
`protected static final String MQ_PASSWORD="password";`
`public static void send(IMessage message){`
`String topic= StringUtils.isEmpty(message.getTopic())?TOPIC: message.getTopic();`
`int qos=null == message.getQos()?QOS: message.getQos();`
`String broker=BROKER_PREFIX+ (StringUtils.isEmpty(message.getHost())?BROKER_HOST:message.getHost());`
`int port=null == message.getPort()?PORT:message.getPort();`
`broker+=":"+port;`
`String clientId = StringUtils.isEmpty(message.getClientId())?CLIENT_ID:message.getClientId();`
`Map<String,Object> opts=message.getOption();`
`String user=MQ_USER;`
`String password=MQ_PASSWORD;`
`if(null != opts){`
`if(null != opts.get("userName")){`
`user=opts.get("userName").toString();`
`}`
`if(null != opts.get("password")){`
`password=opts.get("password").toString();`
`}`
`}`
`MemoryPersistence persistence = new MemoryPersistence();`
`try {`
`MqttClient sampleClient = new MqttClient(broker, clientId, persistence);`
`MqttConnectOptions connOpts = new MqttConnectOptions();`
`connOpts.setUserName(user);`
`connOpts.setPassword(password.toCharArray());`
`connOpts.setCleanSession(true);`
`sampleClient.connect(connOpts);`
`MqttMessage mqm = new MqttMessage(message.getContentBytes());`
`mqm.setQos(qos);`
`sampleClient.publish(topic, mqm);`
`sampleClient.disconnect();`
`} catch(MqttException me) {`
`logger.info("********************* send message exception :");`
`logger.info("********************* reason : " + me.getReasonCode());`
`logger.info("********************* msg : " + me.getMessage());`
`logger.info("********************* loc : " + me.getLocalizedMessage());`
`logger.info("********************* cause : " + me.getCause());`
`logger.info("********************* excep : " + me);`
`me.printStackTrace();`
`}`
`}`
`public static void send(Set<IMessage> set){`
`for(IMessage message:set){`
`send(message);`
`}`
` }`
`}`

### 小结
>至此,MQTT协议已部署完毕,java端可以发布消息,而javascript端则可以订阅并接收到java端发布的信息。
本文只是依照官网手册而实现的简单应用,讲解不一定十分准确,有什么不对的地方还请多多指点,更详细的应用请参考官网文档:
**apollo:**http://activemq.apache.org/apollo/documentation/user-manual.html
**eclipse paho:**https://eclipse.org/paho/

时间: 2024-11-08 19:36:53

消息mqtt服务运用的相关文章

Java连接MQTT服务-wss方式

特别提示:本人博客部分有参考网络其他博客,但均是本人亲手编写过并验证通过.如发现博客有错误,请及时提出以免误导其他人,谢谢!欢迎转载,但记得标明文章出处:http://www.cnblogs.com/mao2080/ 说明:前面介绍的tcp.ws方式适合Java程序在局域网内使用,不涉及到安全问题.但由于Android手机APP需要通过websocket方式来连接,就必须考虑安全性问题了,这时候就采用了wss+CA证书方式进行认证,而且在数据传输中也是加密的.大致与ws方式相同,只不过是加了证书

玩转OneNET物联网平台之MQTT服务④ —— 远程控制LED(设备自注册)+ Android App控制

授人以鱼不如授人以渔,目的不是为了教会你具体项目开发,而是学会学习的能力.希望大家分享给你周边需要的朋友或者同学,说不定大神成长之路有博哥的奠基石... QQ技术互动交流群:ESP8266&32 物联网开发 群号622368884,不喜勿喷 一.你如果想学基于Arduino的ESP8266开发技术 一.基础篇 ESP8266开发之旅 基础篇① 走进ESP8266的世界 ESP8266开发之旅 基础篇② 如何安装ESP8266的Arduino开发环境 ESP8266开发之旅 基础篇③ ESP826

ros wiki翻译之创建消息和服务

描述:本教程介绍如何创建和构建msg和srv文件以及rosmsg,rossrv和roscp命令行工具. 1 msg和srv简介 msg:msg文件是描述ROS消息字段的简单文本文件.它们用于为不同语言(c++或者python等)的消息生成源代码. srv:srv文件用来描述服务.它由两部分组成:请求(request)和响应(response). msg文件存储在包的msg目录中,而srv文件存储在srv目录中. msg只是简单的文本文件,每行有一个字段类型和字段名称.您可以使用的字段类型有(如同

2.OpenStack-安装消息队列服务

安装消息队列服务(安装在控制器上) yum install rabbitmq-server -y systemctl start mariadb.service 配置消息队列服务 systemctl enable rabbitmq-server.service systemctl restart rabbitmq-server.service 修改密码 rabbitmqctl change_password guest Abcd1234 Creating user "openstack"

C#中使用消息队列服务

C#中使用Windows消息队列服务 http://www.cnblogs.com/xinhaijulan/archive/2010/08/22/1805768.html http://h2appy.blog.51cto.com/609721/184323 http://www.cnblogs.com/isdavid/archive/2012/08/16/2642867.html http://www.cnblogs.com/beniao/archive/2008/06/26/1229934.h

浅析腾讯云分布式高可靠消息队列服务CMQ架构

在分布式大行其道的今天,我们在系统内部.平台之间广泛运用消息中间件进行数据交换及解耦.CMQ是腾讯云内部自研基于的高可靠.强一致.可扩展分布式消息队列,在腾讯内部包括微信手机QQ业务红包.腾讯话费充值.广告订单等都有广泛使用.目前已上线腾讯云对外开放,本文对腾讯云CMQ核心技术原理进行分享介绍. CMQ消息队列主要适用于金融.交易.订单等对可靠性.可用性有较高要求的业务场景. 以腾讯充值系统为例,该充值系统通过CMQ 对交易模块.发货部分.结算系统进行异步解耦.削峰填谷,一方面大大降低了模块间耦

NoSQL初探之人人都爱Redis:(3)使用Redis作为消息队列服务场景应用案例

一.消息队列场景简介 “消息”是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含文本字符串:也可以更复杂,可能包含嵌入对象.消息被发送到队列中,“消息队列”是在消息的传输过程中保存消息的容器. 在目前广泛的Web应用中,都会出现一种场景:在某一个时刻,网站会迎来一个用户请求的高峰期(比如:淘宝的双十一购物狂欢节,12306的春运抢票节等),一般的设计中,用户的请求都会被直接写入数据库或文件中,在高并发的情形下会对数据库服务器或文件服务器造成巨大的压力,同时呢,也使响应延迟加剧.这也说明

[Python]webservice学习(2) --自己写soap消息请求服务

上文中webservice学习(1) ,使用soaplib建立了一个超简单的webservice服务,也是用suds调用成功了,那如果想使用http包自己组成一个soap消息来调用接口怎么办呢? 这个时候我们就想到使用wsdl这个文件了,我看了些wsdl的文档,也参照这其他人使用java,php等语言实现的soap消息调用的格式来写,但是怎么调试都没成功.. 就是说他总是会返回500或者是405各种错误,就是下面代码中的old_soap_body 变量中的消息格式. #coding: utf-8

简单消息队列服务 HTTPSQS

HTTPSQS(HTTP?Simple?Queue?Service)是一款基于 HTTP GET/POST 协议的轻量级开源简单消息队列服务,使用 Tokyo Cabinet 的 B+Tree Key/Value 数据库来做数据的持久化存储. 队列(Queue)又称先进先出表(First In First Out),即先进入队列的元素,先从队列中取出.加入元素的一头叫"队头",取出元素的一头叫"队尾".利用消息队列可以很好地异步处理数据传送和存储,当你频繁地向数据库