MQTT协议实现Eclipse Paho学习总结

转载自:http://xiaoxinzhou.blog.163.com/blog/static/20704538620145411306821/

一、概述

遥测传输 (MQTT) 是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放、简单、轻量、易于实现。这些特点使它适用于受限环境。例如,但不仅限于此:

  • 网络代价昂贵,带宽低、不可靠。
  • 在嵌入设备中运行,处理器和内存资源有限。

该协议的特点有:

  • 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
  • 对负载内容屏蔽的消息传输。
  • 使用 TCP/IP 提供网络连接。
  • 有三种消息发布服务质量:
    • “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
    • “至少一次”,确保消息到达,但消息重复可能会发生。
    • “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
  • 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
  • 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

因为MQTT是轻量级的发布/订阅的消息传输协议,因此很多应用都可以借用MQTT的思想, 比如Facebook的的Messager据说就是按照MQTT的协议编写的。如果需要了解这个协议,简单的读一下其协议的主要内容其实是不能深刻理解其 中的意思的,就像你看了XMPP的协议之后,不读smack很快就会遗忘掉这个协议的样子一样,程序员对代码的热爱程度会远远大多文档(初级码农),于是 乎读了一下MQTT的实现Eclipse Paho,一下是一些简单的总结。

二、MQTT协议实现Eclipse Paho

MQTT有不同语言,不同版本的诸多的实现,详细信息见http://mqtt.org/software,其中Eclipse Paho只是诸多Java实现中的一个,关于Eclipse Paho的介绍如下http://www.eclipse.org/proposals/technology.paho/,具体下载地址http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.java.git/

因为MQTT是轻量级的发布/订阅的消息传输协议,其实现Eclipse Paho,也是非常的轻量级,相比smack代码真是小巫见大巫了,看过smack之后,再看Eclipse Paho你心里会豁然开朗,原来代码这么少啊!其主要实现包如下:

其中主要的代码集中在画框的三个包内。

org.eclipse.paho.client.mqttv3: 主要用于对外提供服务,即整个Eclipse Paho对外的窗口,当你的程序需要调用Eclipse Paho时,直接调用org.eclipse.paho.client.mqttv3包内的类就能实现Eclipse Paho所提供的整个功能。当然你也可以调用其他包内的类,这要看你对整个代码的了解程度了。

org.eclipse.paho.client.mqttv3.internal:看看单词internal你可能就猜到了,没错,这就是第一个包的主要功能实现,这个包有承上启下的功能,首先对第一包提供功能的实现,其次调用剩下包中的类以实现MQTT协议的规定。

org.eclipse.paho.client.mqttv3.internal.nls:主要是国际化相关的文件,打开这个包之后,你会欣喜的看到messages_zh_CN.properties,有中文实现!

org.eclipse.paho.client.mqttv3.internal.security:当然是跟安全相关,其中包含了MQTT协议所规定的实现的TLS协议实现,当然在Java中tls的实现当然是SSLSocket。

org.eclipse.paho.client.mqttv3.internal.wire:主要是信息的载体,也就是socket之上传输的心跳包,订阅,发布信息等报文信息。

org.eclipse.paho.client.mqttv3.logging:日志。

org.eclipse.paho.client.mqttv3.persist:
主要用于保存已经发送的数据包。从这里可以看出,MQTT协议最初的面向目标即传感器之间信息的传输,其实现采用了,将数据包保存的文件当中的方式
(MqttDefaultFilePersistence)保证了数据肯定能够发送到服务器,不管程序崩溃不崩溃,网络好不好,只要发送的数据包没有收到
确认,这个数据包就一直保存在文件当中,直到其发送出去为止。

org.eclipse.paho.client.mqttv3.util:工具类。

这些包中,最主要的包就是上图中包含在框中的包,这三个包中,最主要的就是org.eclipse.paho.client.mqttv3.internal这个包,因此只要你看懂了这个包中的主要的类,那么你就拿下了MQTT协议的实现Eclipse Paho!!

三、MQTT协议的报文类别

3.1 MQTT协议规定报文

1.连接请求(CONNECT)
 当一个从客户端到服务器的TCP/IP套接字连接被建立时,必须用一个连接流来创建一个协议级别的会话。

2.连接请求确认(CONNECTACK)
 连接请求确认报文(CONNECTACK)是服务器发给客户端,用以确认客户端的连接请求

3.发布报文(PUBLISH)
客户端发布报文到服务器端,用来提供给有着不同需求的订阅者们。每个发布的报文都有一个主题,这是一个分层的命名空间,他定义了报文来源分类,方便订阅者订阅他们需要的主题。订阅者们可以注册自己的需要的报文类别。

4.发布确认报文(PUBACK)
发布确认报文(PUBACK)是对服务质量级别为1的发布报文的应答。他可以是服务器对发布报文的客户端的报文确认,也可以是报文订阅者对发布报文的服务器的应答。

5.发布确认报文(PUBREC)
PUBREC报文是对服务质量级别为2的发布报文的应答。这是服务质量级别为2的协议流的第二个报文。PUBREC是由服务器端对发布报文的客户端的应答,或者是报文订阅者对发布报文的服务器的应答。

6.发布确认报文(PUBREL)
PUBREL是报文发布者对来自服务器的PUBREC报文的确认,或者是服务器对来自报文订阅者的PUBREC报文的确认。它是服务质量级别为2的协议流的第三个报文。

7.确定发布完成(PUBCOMP)
PUBCOMP报文是服务器对报文发布者的PUBREL报文的应答,或者是报文订阅者对服务器的PUBREL报文的应答。它是服务质量级别为2的协议流的第四个也是最后一个报文。

8.订阅命名的主题(SUBSCRIBE)
订阅报文(SUBSCRIBE)允许一个客户端在服务器上注册一个或多个感兴趣的主题名字。发布给这些主题的报文作为发布报文从服务器端交付给客户端。订阅报文也描述了订阅者想要收到的发布报文的服务质量等级。

9. 订阅报文确认(SUBACK)
当服务器收到客户端发来的订阅报文时,将发送订阅报文的确认报文给客户端。一个这样的确认报文包含一列被授予的服务质量等级。被授予的服务质量等级次序和对应的订阅报文中的主题名称的次序相符。

10. 退订命名的主题(UNSUBSCRIBE)
退订主题的报文是从客户端发往服务器端,用以退订命名的主题。

11. 退订确认(UNSUBACK)
退订确认报文是从服务器发往客户端,用以确认客户端发来的退订请求报文。

12. Ping请求(PINGREQ)
Ping请求报文是从连接的客户端发往服务器端,用来询问服务器端是否还存在。

13. Ping应答(PINGRESP)
Ping应答报文是从服务器端发往Ping请求的客户端,对客户端的Ping请求进行确认。

14. 断开通知(DISCONNECT)
断开通知报文是从客户端发往服务器端用来指明将要关闭它的TCP/IP连接,他允许彻底地断开,而非只是下线。如果客户端已经和干净会话标志集联系,那么所有先前关于客户端维护的信息将被丢弃。一个服务器在收到断开报文之后,不能依赖客户端关闭TCP/IP连接。

3.2 Eclipse Paho的对报文的实现

Eclipse Paho对MQTT协议报文的实现,主要在org.eclipse.paho.client.mqttv3.internal.wire包下,

其下包含了对MQTT协议14中报文的主要实现如下:

从以上看,其发送一个数据包后,服务器端必须回复一个确认包,这为传输数据包的鲁棒性,降低丢包率,提高准确性提供了很好实现。不同于IM协议MXPP,没有对数据的确认。

3.3 心跳包

还有一个重要一点就是对其对心跳包的设定,看心跳包,主要是要看public class MqttPingReq extends MqttWireMessage 这个类!

[java] view plaincopy

  1. public class MqttPingReq extends MqttWireMessage {
  2. public MqttPingReq() {
  3. super(MqttWireMessage.MESSAGE_TYPE_PINGREQ);
  4. }
  5. /**
  6. * Returns <code>false</code> as message IDs are not required for MQTT
  7. * PINGREQ messages.
  8. */
  9. public boolean isMessageIdRequired() {
  10. return false;
  11. }
  12. protected byte[] getVariableHeader() throws MqttException {
  13. return new byte[0];
  14. }
  15. protected byte getMessageInfo() {
  16. return 0;
  17. }
  18. public String getKey() {
  19. return new String("Ping");
  20. }
  21. }

当然只看这个类,也无法知道其心跳包的内容,这时候,我们需要从其发送的内容当中逆向推出其心跳包的内容。


们先看其发送的的模块:找到public class CommsSender implements Runnable
类,看到其有一个private MqttOutputStream
out;私有字段,一看这个方法,我们就能判断,这个字段就是输出流,然后,我们顺藤摸瓜,看public class
MqttOutputStream extends OutputStream这个类,你会看到这样一个方法:

[java] view plaincopy

  1. /**
  2. * Writes an <code>MqttWireMessage</code> to the stream.
  3. */
  4. public void write(MqttWireMessage message) throws IOException, MqttException {
  5. byte[] bytes = message.getHeader();
  6. byte[] pl = message.getPayload();
  7. //      out.write(message.getHeader());
  8. //      out.write(message.getPayload());
  9. out.write(bytes,0,bytes.length);
  10. out.write(pl,0,pl.length);
  11. }

哦,这下好了,原来,其发送的是header和payload,然后,我们就可以看心跳包的header和payload是什么。

public class MqttPingReq extends MqttWireMessage心跳包下有

[java] view plaincopy

  1. protected byte[] getVariableHeader() throws MqttException {
  2. return new byte[0];
  3. }

这个方法,我们就知道了,这个肯定是父类MqttWireMessage中getHeader调用的方法,然后再回到MqttWireMessage,果真getHeader方法如下:

[java] view plaincopy

  1. public byte[] getHeader() throws MqttException {
  2. if (encodedHeader == null) {
  3. try {
  4. int first = ((getType() & 0x0f) << 4) ^ (getMessageInfo() & 0x0f);
  5. byte[] varHeader = getVariableHeader();
  6. int remLen = varHeader.length + getPayload().length;
  7. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  8. DataOutputStream dos = new DataOutputStream(baos);
  9. dos.writeByte(first);//1个字节
  10. dos.write(encodeMBI(remLen));//1个字节
  11. dos.write(varHeader);//0个字节
  12. dos.flush();
  13. encodedHeader = baos.toByteArray();
  14. } catch(IOException ioe) {
  15. throw new MqttException(ioe);
  16. }
  17. }
  18. return encodedHeader;
  19. }

而MqttWireMessage中还有一个getPayload方法,这个方法MqttPingReq 没有重写,也就是说,默认调用这个方法。

[java] view plaincopy

  1. /**
  2. * Sub-classes should override this method to supply the payload bytes.
  3. */
  4. public byte[] getPayload() throws MqttException {
  5. return new byte[0];//0个字节
  6. }

也就是说MQTT的心跳包只有2个字节!

时间: 2024-08-29 17:56:46

MQTT协议实现Eclipse Paho学习总结的相关文章

MQTT协议实现Eclipse Paho学习总结二

一.概述 前一篇博客(MQTT协议实现Eclipse Paho学习总结一) 写了一些MQTT协议相关的一些概述和其实现Eclipse Paho的报文类别,同时对心跳包进行了分析.这篇文章,在不涉及MQTT逻辑实现的基础之上分析一下Eclipse Paho中Socket通信的实现,这里我们主要阐述其采用Java同步技术将同步的Socket通信异步化的过程. 二.上菜 先看一下在org.eclipse.paho.client.mqttv3.internal有两个类,CommsSender,Comms

[3] MQTT,mosquitto,Eclipse Paho---如何使用 Eclipse Paho MQTT工具来发送订阅MQTT消息?

在上两节,笔者主要介绍了 MQTT,mosquitto,Eclipse Paho的基本概念已经如何安装mosquitto. 在这个章节我们就来看看如何用 Eclipse Paho MQTT工具来发送接收MQTT消息.Eclipse Paho MQTT工具是一个基于Java的Eclipse桌面客户端程序,其底层的和MQTT服务器进行的交互的java类库就是Eclipse Paho java库.假设我们在本机(127.0.0.1)已经启动了一个mosquitto MQTT服务器,其端口为1883.如

MQTT学习笔记——MQTT协议体验 Mosquitto安装和使用

0 前言 MQTT是IBM开发的一个即时通讯协议.MQTT是面向M2M和物联网的连接协议,采用轻量级发布和订阅消息传输机制.Mosquitto是一款实现了 MQTT v3.1 协议的开源消息代理软件,提供轻量级的,支持发布/订阅的的消息推送模式,使设备对设备之间的短消息通信简单易用. 若初次接触MQTT协议,可先理解以下概念: [MQTT协议特点]--相比于RESTful架构的物联网系统,MQTT协议借助消息推送功能,可以更好地实现远程控制. [MQTT协议角色]--在RESTful架构的物联网

org.eclipse.paho.client mqtt客户端连接超时

客户端订阅mqtt 的时候 显示超时.第二次重起服务可能就好了,但是可能会丢消息. 报错如下 org.eclipse.paho.client.mqttv3.MqttException: 等待来自服务器的响应时超时 at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:31) ~[org.eclipse.paho.client.mqttv3-1.2.1.

物联网协议测评平台——吊兰使用指南-MQTT协议 Python Paho

在忙了一个周末之后,还是小有成果的.发布了一个简单的物联网平台: http://mqtt.phodal.com,简单地写一下使用指南. CoAP协议 使用Libcoap的话可以用 <code style="box-sizing: border-box; font-family: Consolas, Menlo, Monaco, 'Courier New', monospace; font-size: 1em; padding: 0px; color: inherit; border-top

MQTT协议学习及实践(Linux服务端,Android客户端的例子)

前言 MQTT(Message Queuing Telemetry Transport),是一个物联网传输协议,它被设计用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务.MQTT是专门针对物联网开发的轻量级传输协议.MQTT协议针对低带宽网络,低计算能力的设备,做了特殊的优化,使得其能适应各种物联网应用场景.本文旨在研究其在消息发布/订阅/接收场景下的应用. MQTT协议中的几个重要概念 服务端 是发送消息的客户端和请求订阅的客户端之间的中介,又称

MQTT 协议学习:001-有关概念入门

背景 MQTT 协议是物联网中常见的协议之一,"轻量级物联网消息推送协议" 概念 MQTT是机器对机器(M2M)/物联网(IoT)连接协议.它被设计为一个极其轻量级的发布/订阅消息传输协议.对于需要较小代码占用空间和/或网络带宽非常宝贵的远程连接非常有用,是专为受限设备和低带宽.高延迟或不可靠的网络而设计.这些原则也使该协议成为新兴的"机器到机器"(M2M)或物联网(IoT)世界的连接设备,以及带宽和电池功率非常高的移动应用的理想选择.例如,它已被用于通过卫星链路与

MQTT 协议学习:Keep Alive 和连接保活

(2020-02-05 10:30) 我们提到过 Broker 需要知道 Client 是否非正常地断开了和它的连接,以发送遗愿消息.实际上 Client 也需要能够很快地检测到它失去了和 Broker 的连接,以便重新连接. MQTT 协议是基于 TCP 的一个应用层协议,理论上 TCP 协议在丢失连接时会通知上层应用,但是 TCP 有一个半打开连接的问题(half-open connection).这里我不打算深入分析 TCP 协议,需要记住的是,在这种状态下,一端的 TCP 连接已经失效,

物联网MQTT协议分析和开源Mosquitto部署验证

在<物联网核心协议—消息推送技术演进>一文中已向读者介绍了多种消息推送技术的情况,包括HTTP单向通信.Ajax轮询.Websocket.MQTT.CoAP等,其中MQTT协议为IBM制定并力推,其具有开放.简单.轻量级以及易于实现的特点使得其即便在资源受限的环境中也能得到很好的使用,比如运行在资源紧缺型的嵌入式系统中或网络带宽非常昂贵的环境中,除此之外,它也被广泛用于遥感勘测.智能家居.能源监测和医疗应用程序等各个领域,是物联网的重要组成部分,将来可能会成为物联网的事实标准. 本篇文章将帮助