一、概述
遥测传输 (MQTT) 是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放、简单、轻量、易于实现。这些特点使它适用于受限环境。例如,但不仅限于此:
- 网络代价昂贵,带宽低、不可靠。
- 在嵌入设备中运行,处理器和内存资源有限。
该协议的特点有:
- 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
- 对负载内容屏蔽的消息传输。
- 使用 TCP/IP 提供网络连接。
- 有三种消息发布服务质量:
- “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
- “至少一次”,确保消息到达,但消息重复可能会发生。
- “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
- 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
- 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。
因为MQTT是轻量级的发布/订阅的消息传输协议,因此很多应用都可以借用MQTT的思想, 比如Facebook的的Messager据说就是按照MQTT的协议编写的。如果需要了解这个协议,简单的读一下其协议的主要内容其实是不能深刻理解其 中的意思的,就像你看了XMPP的协议之后,不读smack很快就会遗忘掉这个协议的样子一样,程序员对代码的热爱程度会远远大多文档(初级码农),于是 乎读了一下MQTT的实现Eclipse Paho,一下是一些简单的总结。
二、MQTT协议实现Eclipse Paho
MQTT有不同语言,不同版本的诸多的实现,详细信息见http://mqtt.org/software,其中Eclipse Paho只是诸多Java实现中的一个,关于Eclipse Paho的介绍如下http://www.eclipse.org/proposals/technology.paho/,具体下载地址http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.java.git/。
因为MQTT是轻量级的发布/订阅的消息传输协议,其实现Eclipse Paho,也是非常的轻量级,相比smack代码真是小巫见大巫了,看过smack之后,再看Eclipse Paho你心里会豁然开朗,原来代码这么少啊!其主要实现包如下:
其中主要的代码集中在画框的三个包内。
org.eclipse.paho.client.mqttv3: 主要用于对外提供服务,即整个Eclipse Paho对外的窗口,当你的程序需要调用Eclipse Paho时,直接调用org.eclipse.paho.client.mqttv3包内的类就能实现Eclipse Paho所提供的整个功能。当然你也可以调用其他包内的类,这要看你对整个代码的了解程度了。
org.eclipse.paho.client.mqttv3.internal:看看单词internal你可能就猜到了,没错,这就是第一个包的主要功能实现,这个包有承上启下的功能,首先对第一包提供功能的实现,其次调用剩下包中的类以实现MQTT协议的规定。
org.eclipse.paho.client.mqttv3.internal.nls:主要是国际化相关的文件,打开这个包之后,你会欣喜的看到messages_zh_CN.properties,有中文实现!
org.eclipse.paho.client.mqttv3.internal.security:当然是跟安全相关,其中包含了MQTT协议所规定的实现的TLS协议实现,当然在Java中tls的实现当然是SSLSocket。
org.eclipse.paho.client.mqttv3.internal.wire:主要是信息的载体,也就是socket之上传输的心跳包,订阅,发布信息等报文信息。
org.eclipse.paho.client.mqttv3.logging:日志。
org.eclipse.paho.client.mqttv3.persist:
主要用于保存已经发送的数据包。从这里可以看出,MQTT协议最初的面向目标即传感器之间信息的传输,其实现采用了,将数据包保存的文件当中的方式
(MqttDefaultFilePersistence)保证了数据肯定能够发送到服务器,不管程序崩溃不崩溃,网络好不好,只要发送的数据包没有收到
确认,这个数据包就一直保存在文件当中,直到其发送出去为止。
org.eclipse.paho.client.mqttv3.util:工具类。
这些包中,最主要的包就是上图中包含在框中的包,这三个包中,最主要的就是org.eclipse.paho.client.mqttv3.internal这个包,因此只要你看懂了这个包中的主要的类,那么你就拿下了MQTT协议的实现Eclipse Paho!!
三、MQTT协议的报文类别
3.1 MQTT协议规定报文
1.连接请求(CONNECT)
当一个从客户端到服务器的TCP/IP套接字连接被建立时,必须用一个连接流来创建一个协议级别的会话。
2.连接请求确认(CONNECTACK)
连接请求确认报文(CONNECTACK)是服务器发给客户端,用以确认客户端的连接请求
3.发布报文(PUBLISH)
客户端发布报文到服务器端,用来提供给有着不同需求的订阅者们。每个发布的报文都有一个主题,这是一个分层的命名空间,他定义了报文来源分类,方便订阅者订阅他们需要的主题。订阅者们可以注册自己的需要的报文类别。
4.发布确认报文(PUBACK)
发布确认报文(PUBACK)是对服务质量级别为1的发布报文的应答。他可以是服务器对发布报文的客户端的报文确认,也可以是报文订阅者对发布报文的服务器的应答。
5.发布确认报文(PUBREC)
PUBREC报文是对服务质量级别为2的发布报文的应答。这是服务质量级别为2的协议流的第二个报文。PUBREC是由服务器端对发布报文的客户端的应答,或者是报文订阅者对发布报文的服务器的应答。
6.发布确认报文(PUBREL)
PUBREL是报文发布者对来自服务器的PUBREC报文的确认,或者是服务器对来自报文订阅者的PUBREC报文的确认。它是服务质量级别为2的协议流的第三个报文。
7.确定发布完成(PUBCOMP)
PUBCOMP报文是服务器对报文发布者的PUBREL报文的应答,或者是报文订阅者对服务器的PUBREL报文的应答。它是服务质量级别为2的协议流的第四个也是最后一个报文。
8.订阅命名的主题(SUBSCRIBE)
订阅报文(SUBSCRIBE)允许一个客户端在服务器上注册一个或多个感兴趣的主题名字。发布给这些主题的报文作为发布报文从服务器端交付给客户端。订阅报文也描述了订阅者想要收到的发布报文的服务质量等级。
9. 订阅报文确认(SUBACK)
当服务器收到客户端发来的订阅报文时,将发送订阅报文的确认报文给客户端。一个这样的确认报文包含一列被授予的服务质量等级。被授予的服务质量等级次序和对应的订阅报文中的主题名称的次序相符。
10. 退订命名的主题(UNSUBSCRIBE)
退订主题的报文是从客户端发往服务器端,用以退订命名的主题。
11. 退订确认(UNSUBACK)
退订确认报文是从服务器发往客户端,用以确认客户端发来的退订请求报文。
12. Ping请求(PINGREQ)
Ping请求报文是从连接的客户端发往服务器端,用来询问服务器端是否还存在。
13. Ping应答(PINGRESP)
Ping应答报文是从服务器端发往Ping请求的客户端,对客户端的Ping请求进行确认。
14. 断开通知(DISCONNECT)
断开通知报文是从客户端发往服务器端用来指明将要关闭它的TCP/IP连接,他允许彻底地断开,而非只是下线。如果客户端已经和干净会话标志集联系,那么所有先前关于客户端维护的信息将被丢弃。一个服务器在收到断开报文之后,不能依赖客户端关闭TCP/IP连接。
3.2 Eclipse Paho的对报文的实现
Eclipse Paho对MQTT协议报文的实现,主要在org.eclipse.paho.client.mqttv3.internal.wire包下,
其下包含了对MQTT协议14中报文的主要实现如下:
从以上看,其发送一个数据包后,服务器端必须回复一个确认包,这为传输数据包的鲁棒性,降低丢包率,提高准确性提供了很好实现。不同于IM协议MXPP,没有对数据的确认。
3.3 心跳包
还有一个重要一点就是对其对心跳包的设定,看心跳包,主要是要看public class MqttPingReq extends MqttWireMessage 这个类!
[java] view plaincopy
- public class MqttPingReq extends MqttWireMessage {
- public MqttPingReq() {
- super(MqttWireMessage.MESSAGE_TYPE_PINGREQ);
- }
- /**
- * Returns <code>false</code> as message IDs are not required for MQTT
- * PINGREQ messages.
- */
- public boolean isMessageIdRequired() {
- return false;
- }
- protected byte[] getVariableHeader() throws MqttException {
- return new byte[0];
- }
- protected byte getMessageInfo() {
- return 0;
- }
- public String getKey() {
- return new String("Ping");
- }
- }
当然只看这个类,也无法知道其心跳包的内容,这时候,我们需要从其发送的内容当中逆向推出其心跳包的内容。
我
们先看其发送的的模块:找到public class CommsSender implements Runnable
类,看到其有一个private MqttOutputStream
out;私有字段,一看这个方法,我们就能判断,这个字段就是输出流,然后,我们顺藤摸瓜,看public class
MqttOutputStream extends OutputStream这个类,你会看到这样一个方法:
[java] view plaincopy
- /**
- * Writes an <code>MqttWireMessage</code> to the stream.
- */
- public void write(MqttWireMessage message) throws IOException, MqttException {
- byte[] bytes = message.getHeader();
- byte[] pl = message.getPayload();
- // out.write(message.getHeader());
- // out.write(message.getPayload());
- out.write(bytes,0,bytes.length);
- out.write(pl,0,pl.length);
- }
哦,这下好了,原来,其发送的是header和payload,然后,我们就可以看心跳包的header和payload是什么。
public class MqttPingReq extends MqttWireMessage心跳包下有
[java] view plaincopy
- protected byte[] getVariableHeader() throws MqttException {
- return new byte[0];
- }
这个方法,我们就知道了,这个肯定是父类MqttWireMessage中getHeader调用的方法,然后再回到MqttWireMessage,果真getHeader方法如下:
[java] view plaincopy
- public byte[] getHeader() throws MqttException {
- if (encodedHeader == null) {
- try {
- int first = ((getType() & 0x0f) << 4) ^ (getMessageInfo() & 0x0f);
- byte[] varHeader = getVariableHeader();
- int remLen = varHeader.length + getPayload().length;
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- dos.writeByte(first);//1个字节
- dos.write(encodeMBI(remLen));//1个字节
- dos.write(varHeader);//0个字节
- dos.flush();
- encodedHeader = baos.toByteArray();
- } catch(IOException ioe) {
- throw new MqttException(ioe);
- }
- }
- return encodedHeader;
- }
而MqttWireMessage中还有一个getPayload方法,这个方法MqttPingReq 没有重写,也就是说,默认调用这个方法。
[java] view plaincopy
- /**
- * Sub-classes should override this method to supply the payload bytes.
- */
- public byte[] getPayload() throws MqttException {
- return new byte[0];//0个字节
- }
也就是说MQTT的心跳包只有2个字节!