MQTT结构分析

  

MQTT,是:

  • 轻量级的消息订阅和发布(publish/subscribe)协议
  • 建立在TCP/IP协议之上

IoT,internet of things,物联网,MQTT在这方面应用较多。

官方网站:http://mqtt.org/

MQTT协议是针对如下情况设计的:

  • M2M(Machine to Machine) communication,机器端到端通信,比如传感器之间的数据通讯
  • 因为是Machine to Machine,需要考虑:
    • Machine,或者叫设备,比如温度传感器,硬件能力很弱,协议要考虑尽量小的资源消耗,比如计算能力和存储等
    • M2M可能是无线连接,网络不稳定,带宽也比较小

MQTT协议的架构,用一个示例说明。比如有1个温度传感器(1个Machine),2个小的显示屏(2个Machine),显示屏要显示温度传感器的温度值。

可通过MQTT V3.1 Protocol Specification查阅详细规范的细节。

显示器需要先通过MQTT协议subscribe(订阅)一个比如叫temperature的topic(主题):

当温度传感器publish(发布)温度数据,显示器就可以收到了:

注:以上两张图,取自MQTT and CoAP, IoT Protocols

协议里还有2个主要的角色:

  • client,客户端
  • broker,服务器端

它们是通过TCP/IP协议连接的。

因为MQTT是协议,所以不能拿来直接用的,就好比HTTP协议一样。需要找实现这个协议的库或者服务器来运行。

这里是官方的Server support

我服务器端使用nodejs开发,因此选择了:

  • MQTT.js:MQTT协议的底层实现库,服务器端很简易,需要自己编写代码才可使用
  • Mosca:在MQTT.js基础上完善的服务器端

MQTT.js最基本使用

安装是很简单的:

npm install mqtt

MQTT.js实现的服务器端

代码如下:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66


var mqtt = require(‘mqtt‘);

//{‘topicName‘:[clientObj,clientObj ..]}

var subscribeTopics={};

//创建服务器对象

var server = mqtt.createServer(function(client) {

//建立连接时触发

client.on(‘connect‘, function(packet) {

client.connack({returnCode: 0});

});

//客户端发布主题时触发

client.on(‘publish‘, function(packet) {

var topic=packet.topic;

var payload=packet.payload;

//如果没有创建空的主题对应的client数组

if(subscribeTopics[topic]==null){

subscribeTopics[topic]=[];

}else{

//遍历该主题下全部client,并逐一发送消息

for(var i in subscribeTopics[topic]){

var client=subscribeTopics[topic][i];

client.publish({

topic: topic,

payload: payload

});

}

}

});

//当客户端订阅时触发

client.on(‘subscribe‘, function(packet) {

var topic=packet.subscriptions[0].topic;

//如没有,创建空的主题对应的client数组

if(subscribeTopics[topic]==null){

subscribeTopics[topic]=[];

}

//如果client数组中没有当前client,加入

if(subscribeTopics[topic].indexOf(client)==-1){

subscribeTopics[topic].push(client);

}

});

client.on(‘pingreq‘, function(packet) {

client.pingresp();

});

client.on(‘disconnect‘, function(packet) {

//遍历所有主题,检查对应的数组中是否有当前client,从数组中删除

for (var topic in subscribeTopics){

var index=subscribeTopics[topic].indexOf(client);

if(index>-1){

subscribeTopics[topic].splice(index,1);

}

}

});

});

//监听端口

server.listen(1883);

这是一个最基本的服务器端,消息的存储和查询都需要自己编程处理。

比如你如果需要用redis保存和触发数据,可参考这篇中文文章:node mqtt server (redis pub/sub)

MQTT.js实现的客户端

代码:


1

2

3

4

5

6

7

8

9

10

11

12


var mqtt = require(‘mqtt‘);

client = mqtt.createClient(1883, ‘localhost‘);

client.subscribe(‘testMessage‘);

client.publish(‘testMessage‘, ‘发布测试信息‘);

client.on(‘message‘, function (topic, message) {

console.log(message);

client.end();

});

写的很简易,订阅了主题,然后向相同主题发布消息,接收到消息后client停止。

使用Mosca

MQTT.js只是实现了最基础的MQTT协议部分,对于服务器端的处理需要自己完成。

有关MQTT.js是否实现了MQTT server,详细的说明,可参见MQTT Server: MQTT.js or Mosca?

正好,Mosca在MQTT基础上实现了这些,它可以:

  • 作为独立运行的MQTT服务器运行
  • 集成到nodejs程序里使用

安装很简单:

npm install mosca bunyan -g

作为独立服务器运行

运行:

mosca -v | bunyan

然后,还可以用我上文的客户端代码运行测试。

集成在自己程序中使用

我考虑的后端持久化,是用MongoDB。Mosca另外几个选项:

  • Redis,缺点是更注重作为缓存,而不适合可靠持久化
  • LevelUp,头一次听说,不打算做技术准备了,是用nodejs的包装起来的LevelDB
  • Memory,使用内存,估计默认的就是这个,不适合我使用的情况

首先要安装mosca的库:

npm install mosca

然后,在本机将mongodb运行起来,应该就可以执行下面的代码了:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24


var mosca = require(‘mosca‘)

var settings = {

port: 1883,

backend:{

type: ‘mongo‘,

url: ‘mongodb://localhost:27017/mqtt‘,

pubsubCollection: ‘ascoltatori‘,

mongo: {}

},

persistence:{

factory: mosca.persistence.Mongo,

url: "mongodb://localhost:27017/mosca"

}

};

var server = new mosca.Server(settings);

server.on(‘ready‘, function(){

console.log(‘Mosca server is up and running‘);

});

server.on(‘published‘, function(packet, client) {

console.log(‘Published‘, packet.payload);

});

直接运行作者文档中的代码会在多次运行客户端后出现错误,我是参考了他2天前加上的示例代码

作者Matteo Collina生活在意大利的博洛尼亚,写代码很勤奋,这个项目更新很快,是不是说明这个方向(mqtt)很活跃呢?

作者也写了个幻灯片,MQTT and Node.js

MQTT高级问题

keepalive和PING

从这篇文章MQTT协议笔记之连接和心跳

心跳时间(Keep Alive timer)

以秒为单位,定义服务器端从客户端接收消息的最大时间间隔。一般应用服务会在业务层次检测客户端网络是否连接,不是TCP/IP协议层面的 心跳机制(比如开启SOCKET的SO_KEEPALIVE选项)。 一般来讲,在一个心跳间隔内,客户端发送一个PINGREQ消息到服务器,服务器返回PINGRESP消息,完成一次心跳交互,继而等待下一轮。若客户端 没有收到心跳反馈,会关闭掉TCP/IP端口连接,离线。 16位两个字节,可看做一个无符号的short类型值。最大值,2^16-1 = 65535秒 = 18小时。最小值可以为0,表示客户端不断开。一般设为几分钟,比如微信心跳周期为300秒。

下面的代码中我设置的是10秒:


1

2

3

4

5

6

7

8

9

10

11


var mqtt = require(‘mqtt‘);

var settings = {

keepalive: 10,

protocolId: ‘MQIsdp‘,

protocolVersion: 3,

clientId: ‘client-b‘,

clean: false

}

client = mqtt.createClient(1883, ‘localhost‘,settings);

可以使用MQTT.js编写简单的服务器代码,观察到服务器端接收到PING请求,并发回PING响应:


1

2

3

4


client.on(‘pingreq‘, function(packet) {

client.pingresp();

console.log(‘pingreq & resp‘);

});

完整代码上面已经贴过,另见Gist

QoS

QoS在MQTT中有(摘自MQ 遥测传输 (MQTT) V3.1 协议规范):

  • “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
  • “至少一次”,确保消息到达,但消息重复可能会发生。
  • “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。

MQTT.js只是支持了MQTT协议,并没有支持QoS,也就是说,只支持最低级别的“至多一次”(QoS0)。

Mosca支持QoS0和1,但不支持2,见Add support QOS 2

接收离线消息

我在应用中的一个主要场景是,使用MQTT.js+Mosca做聊天服务器。

默认Mosca是不支持离线消息的,表现的现象是,如果是有人(client-a)先在主题上发布了消息:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19


var mqtt = require(‘mqtt‘);

var settings = {

keepalive: 10,

protocolId: ‘MQIsdp‘,

protocolVersion: 3,

clientId: ‘client-a‘

}

client = mqtt.createClient(1883, ‘localhost‘,settings);

client.publish(‘testMessage‘, ‘发布new测试信息0‘,{qos:1,retain: true});

client.publish(‘testMessage‘, ‘发布new测试信息1‘,{qos:1,retain: true});

client.publish(‘testMessage‘, ‘发布new测试信息2‘,{qos:1,retain: true});

client.publish(‘testMessage‘, ‘发布new测试信息3‘,{qos:1,retain: true});

setTimeout(function(){

client.end();

},1000);

那么另外一个人(client-b),随后订阅,仅能看到最后一条消息:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18


var mqtt = require(‘mqtt‘);

var settings = {

keepalive: 10,

protocolId: ‘MQIsdp‘,

protocolVersion: 3,

clientId: ‘client-b‘

}

client = mqtt.createClient(1883, ‘localhost‘,settings);

client.subscribe(‘testMessage‘,{qos:1},function(){

console.log(‘subscribe ok.‘);

});

client.on("message", function(topic, payload) {

console.log(‘message: ‘+payload);

});

运行结果类似这样:

subscribe ok.
message: 发布new测试信息3

离线消息,需要以下几点:

  • 客户端订阅设置QoS=1
  • 客户端连接属性clean: false,作用是断开连接重连的时候服务器端帮助恢复session,不需要再次订阅

用代码说明以下,先运行这段代码:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16


var mqtt = require(‘mqtt‘);

var settings = {

keepalive: 10,

protocolId: ‘MQIsdp‘,

protocolVersion: 3,

clientId: ‘client-b‘,

clean: false

}

client = mqtt.createClient(1883, ‘localhost‘,settings);

client.subscribe(‘testMessage‘,{qos:1},function(){

console.log(‘subscribe ok.‘);

client.end();

});

然后执行刚才发布多条消息的代码。再执行下面的代码:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15


var mqtt = require(‘mqtt‘);

var settings = {

keepalive: 10,

protocolId: ‘MQIsdp‘,

protocolVersion: 3,

clientId: ‘client-b‘,

clean: false

}

client = mqtt.createClient(1883, ‘localhost‘,settings);

client.on("message", function(topic, payload) {

console.log(‘message: ‘+payload);

});

运行结果类似这样:

message: 发布new测试信息1
message: 发布new测试信息3
message: 发布new测试信息2
message: 发布new测试信息0

收到消息的顺序是乱的,为什么会这样,其实很好理解,为了小型受限设备以及网络不稳定的情况,消息是不好保证顺序的。

解决办法是发送的消息带时间戳,接收后再做排序。

另外,担心客户端没有做client.end()而非正常退出,那么再次连接是否能恢复session,测试了一下,注释client.end(),没有问题,正常收到多条离线消息。

SSL连接

Mosca支持SSL连接,可根据Nodejs TLS创建公钥私钥。

然后类似这样启动:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23


var mosca = require(‘mosca‘)

var SECURE_KEY = __dirname + ‘/../../test/secure/tls-key.pem‘;

var SECURE_CERT = __dirname + ‘/../../test/secure/tls-cert.pem‘;

var settings = {

port: 8443,

logger: {

name: "secureExample",

level: 40,

},

secure : {

keyPath: SECURE_KEY,

certPath: SECURE_CERT,

}

};

var server = new mosca.Server(settings);

server.on(‘ready‘, setup);

// fired when the mqtt server is ready

function setup() {

console.log(‘Mosca server is up and running‘)

}

这部分我没有测试,直接转自Mosca Encryption Support

认证和授权

Mosca Authentication提供了个简易的命令行,可创建账号用于认证并授权。

但是它不适合我的需求场景,我需要自己编写认证和授权的逻辑。

虽然在作者官方网站上未找到,但在问题管理记录中提交了这方面的支持:Authentication & Authorization

有下面两条支持,应该可以写出自己的回调,并集成到Mosca中:

  • add a callback to authorize a publish.
  • add a callback to authorize a subscribe.

不过这块没有写代码,只是大致能确定。

性能问题

MQTT.js并不是完整解决方案,不需要考虑它的性能问题。

说一下Mosca,有一个这方面问题作者的答复,what about mosca’s performance,问问题的还是个中国人,我前面还引用了他的文章。作者基本意思是:

It basically depends on the RAM. On an AWS large instance it can reach
10k concurrent connections, with roughly 10k messages/second.
时间: 2024-08-13 22:02:31

MQTT结构分析的相关文章

【物联网云端对接-2】通过MQTT协议与阿里云物联网套件进行云端通信

 在<程序员>杂志2017.4刊上,曾写过一篇<微软百度阿里三大物联网平台探析>,上面曾介绍了阿里云物联网套件的一些内容,在写该篇文章的时候,凌霄物联网网关还无法对接到此平台(TLS必须1.1版本以上).但是随着阿里云物联网套件的不断发展,目前设备除了支持HTTPS认证外,也支持MQTT客户端域名直连认证(可以是TCP直连模式,也可以是TLS直联模式). 最近有幸参与了阿里的飞凤物联网平台计划,可以更为深入的去研究阿里云物联网套件,除了用直接编写代码连接云外,也尝试用组态的方式去对

使用JavaScript和MQTT如何开发物联网应用?

如果说Java和C#哪个是最好的开发语言,无疑会挑起程序员之间的相互怒怼,那如果说JavaScript是动态性最好的语言,相信大家都不会有太大的争议.随着越来越多的硬件平台和开发板开始支持JavaScript,JavaScript在硬件端以及物联网开发领域有了新的机会. IoT应用开发的数据链路 图1是一个智能家居物联平台的数据链路. 图1 智能家居物联平台的数据链路 一般来说,可以把IoT应用分为如图所示的四层. .client层:指的是IoT设备,可以是冰箱.空调,也可以是一些温湿度传感器.

mqtt client python example

This is a simple example showing how to use the [Paho MQTT Python client](https://eclipse.org/paho/clients/python/) to send data to Azure IoT Hub. You need to assemble the rights credentials and configure TLS and the MQTT protocol version appropriate

工业物联网的云端协议将以MQTT+SSL/TLS为主,协议格式以JSON为主

工业物联网是什么? 简单来说,就是物联网在工业控制上的具体应用. SSL/TLS是什么? SSL(Secure Sockets Layer 安全套接层),及其继任者传输层安全(Transport Layer Security,TLS)是为网络通信提供安全及数据完整性的一种安全协议.TLS与SSL在传输层对网络连接进行加密.大部分互联网登录都是用的SSL/TLS,可以去网易邮箱http://WWW.126.COM看下,右下角上面"正使用SSL登录"的标识. MQTT是什么? MQTT(M

MQTT与Mosquitto服务器搭建以及Android推送(一)MQTT简介

文章钢要: 对MQTT协议有一定认识 对MQTT运行原理有一定了解 一.什么是MQTT MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议. 国内很多企业都广泛使用MQTT作为Android手机客户端与服务器端推送消息的协议.其中Sohu,Cmstop手机客户端中均有使用到MQTT作为消息推送消息. MQTT由于开放源代码,耗电量小等特点,将会在移动消息推送领域会有更多的贡献,在物联网领域,传感器与服务器的通信,信息

物联网MQTT协议分析和开源Mosquitto部署验证

在<物联网核心协议—消息推送技术演进>一文中已向读者介绍了多种消息推送技术的情况,包括HTTP单向通信.Ajax轮询.Websocket.MQTT.CoAP等,其中MQTT协议为IBM制定并力推,其具有开放.简单.轻量级以及易于实现的特点使得其即便在资源受限的环境中也能得到很好的使用,比如运行在资源紧缺型的嵌入式系统中或网络带宽非常昂贵的环境中,除此之外,它也被广泛用于遥感勘测.智能家居.能源监测和医疗应用程序等各个领域,是物联网的重要组成部分,将来可能会成为物联网的事实标准. 本篇文章将帮助

TCP/IP WebSocket MQTT

http://www.cnblogs.com/shanyou/p/4085802.html TCP/IP, WebSocket 和 MQTT

MQTT协议简记

 一.定义 MQTT - MQ Telemetry Transport 轻量级的 machine-to-machine 通信协议. publish/subscribe模式. 基于TCP/IP. 支持QoS. 适合于低带宽.不可靠连接.嵌入式设备.CPU内存资源紧张. 是一种比较不错的Android消息推送方案. FacebookMessenger采用了MQTT. MQTT有可能成为物联网的重要协议. MQTT是轻量级基于代理的发布/订阅的消息传输协议,它可以通过很少的代码和带宽和远程设备连接.例

[9] MQTT,mosquitto,Eclipse Paho---MQTT消息格式之SUBACK(消息订阅应答)消息分析

0.前言 在上一节中(MQTT消息格式之SUBSCRIBE(消息订阅)消息分析),客户端发送了订阅的消息,这个时候,服务器端收到订阅主题的MQTT消息之后,肯定需要给一个应答,这个应答信息就是SUBACK(消息订阅应答).消息订阅应答相对来说比较简单. 1.准备步骤 (1) 首先打开WireShark软件,并启动监听 (2) 在Eclipse里面运行下面的代码 [java] view plaincopy import org.eclipse.paho.client.mqttv3.MqttClie