MQTT协议笔记之mqtt.io项目TCP协议支持

前言

MQTT定义了物联网传输协议,其标准倾向于原始TCP实现。构建于TCP的上层协议堆栈,诸如HTTP等,在空间上多了一些处理路径,稍微耗费了CPU和内存,虽看似微乎其微,但对很多处理能力不足的嵌入式设备而言,选择原始的TCP却是最好的选择。

但单纯TCP不是所有物件联网的最佳选择,提供构建与TCP基础之上的传统的HTTP通信支持,尤其是浏览器、性能富裕的桌面涉及领域,还是企业最 可信赖、最可控的传输方式之一。支持多种多样的连接通道,让目前所有一切皆可联网,除了原始TCP Socket,还要支持构建于其之上的HTTP、HTML5 Websocket,就很有必要。

mqtt.io,Pub/Sub中间件,也可以称之为推送服务器,涵盖所有主流桌面系统、浏览器平台,并且倾斜 于移动互联网,以及物联网的广阔适应天地。使用一句英文概括可能更为合适:"Make everything connect”,让所有物件都可连接。其业务目标,可用下图概括:

mqtt.io致力于做下一代支持所有主流桌面平台、所有主流浏览器、所有可联网物件都可以联网的PUB/SUB消息推送系统。

构建此系统,在于降低传统企业各自分散的推送系统,统一运营,统一管理,节省人员、运维开支。

注意事项

  1. mqtt.io是一个项目名称,没有官网,http://www.mqtt.io,和这个项目没有一毛钱关系。
  2. 项目地址:https://github.com/yongboy/mqtt.io
  3. 项目名称启发于 http://socket.io http://netty.io 等知名framework。
  4. 目前只实现QoS 0基本特性,实现概览,后期会根据反馈,做出一些调整

依赖

  1. netty 4,目前JAVA IO界明星
  2. mqtt-library 二进制和MQTT对象的转换,这种苦活累活都是它来做,真心让人喜欢。

数据流转

解码器

用于转换二进制流到JAVA对象的过程:

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
package io.mqtt.handler.coder;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.MessageToMessageDecoder;

import java.io.ByteArrayInputStream;

import java.util.List;

import org.meqantt.message.Message;

import org.meqantt.message.MessageInputStream;

public class MqttMessageNewDecoder extends MessageToMessageDecoder<ByteBuf> {

@Override

public void decode(ChannelHandlerContext ctx, ByteBuf buf,

List<Object> out) throws Exception {

if (buf.readableBytes() < 2) {

return;

}

buf.markReaderIndex();

buf.readByte(); // read away header

int msgLength = 0;

int multiplier = 1;

int digit;

int lengthSize = 0;

do {

lengthSize++;

digit = buf.readByte();

msgLength += (digit & 0x7f) * multiplier;

multiplier *= 128;

if ((digit & 0x80) > 0 && !buf.isReadable()) {

buf.resetReaderIndex();

return;

}

} while ((digit & 0x80) > 0);

if (buf.readableBytes() < msgLength) {

buf.resetReaderIndex();

return;

}

byte[] data = new byte[1 + lengthSize + msgLength];

buf.resetReaderIndex();

buf.readBytes(data);

MessageInputStream mis = new MessageInputStream(

new ByteArrayInputStream(data));

Message msg = mis.readMessage();

mis.close();

out.add(msg);

}

}

view rawMqttMessageNewDecoder.java hosted with ? by GitHub

编码器

对所有要写入网卡缓冲区的JAVA对象转换成二进制:

12345678910111213141516171819202122232425
package io.mqtt.handler.coder;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelHandler.Sharable;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.MessageToMessageEncoder;

import java.util.List;

import org.meqantt.message.Message;

@Sharable

public class MqttMessageNewEncoder extends MessageToMessageEncoder<Object> {

@Override

protected void encode(ChannelHandlerContext ctx, Object msg,

List<Object> out) throws Exception {

if (!(msg instanceof Message)) {

return;

}

byte[] data = ((Message) msg).toBytes();

out.add(Unpooled.wrappedBuffer(data));

}

}

view rawMqttMessageNewEncoder.java hosted with ? by GitHub

借助于mqtt-library项目,编解码不复杂。

MQTT的消息处理

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
package io.mqtt.handler;

import io.mqtt.processer.ConnectProcesser;

import io.mqtt.processer.DisConnectProcesser;

import io.mqtt.processer.PingReqProcesser;

import io.mqtt.processer.Processer;

import io.mqtt.processer.PublishProcesser;

import io.mqtt.processer.SubscribeProcesser;

import io.mqtt.processer.UnsubscribeProcesser;

import io.netty.channel.ChannelFutureListener;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import io.netty.handler.timeout.ReadTimeoutException;

import java.util.Collections;

import java.util.HashMap;

import java.util.Map;

import org.meqantt.message.ConnAckMessage;

import org.meqantt.message.ConnAckMessage.ConnectionStatus;

import org.meqantt.message.DisconnectMessage;

import org.meqantt.message.Message;

import org.meqantt.message.Message.Type;

import org.meqantt.message.PingRespMessage;

public class MqttMessageHandler extends ChannelInboundHandlerAdapter {

private static PingRespMessage PINGRESP = new PingRespMessage();

private static final Map<Message.Type, Processer> processers;

static {

Map<Message.Type, Processer> map = new HashMap<Message.Type, Processer>(

6);

map.put(Type.CONNECT, new ConnectProcesser());

map.put(Type.PUBLISH, new PublishProcesser());

map.put(Type.SUBSCRIBE, new SubscribeProcesser());

map.put(Type.UNSUBSCRIBE, new UnsubscribeProcesser());

map.put(Type.PINGREQ, new PingReqProcesser());

map.put(Type.DISCONNECT, new DisConnectProcesser());

processers = Collections.unmodifiableMap(map);

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable e)

throws Exception {

try {

if (e.getCause() instanceof ReadTimeoutException) {

ctx.write(PINGRESP).addListener(

ChannelFutureListener.CLOSE_ON_FAILURE);

} else {

ctx.channel().close();

}

} catch (Throwable t) {

t.printStackTrace();

ctx.channel().close();

}

e.printStackTrace();

}

@Override

public void channelRead(ChannelHandlerContext ctx, Object obj)

throws Exception {

Message msg = (Message) obj;

Processer p = processers.get(msg.getType());

if (p == null) {

return;

}

Message rmsg = p.proc(msg, ctx);

if (rmsg == null) {

return;

}

if (rmsg instanceof ConnAckMessage

&& ((ConnAckMessage) rmsg).getStatus() != ConnectionStatus.ACCEPTED) {

ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE);

} else if (rmsg instanceof DisconnectMessage) {

ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE);

} else {

ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

}

}

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

ctx.flush();

}

}

view rawMqttMessageHandler.java hosted with ? by GitHub

更具体的可以查看项目。

小结

简单介绍了一个简单的不能再简单的MQTT Server,只具有最基本的QoS 0类型的消息订阅等。

后面,对HTML 5 Websocket,会在现有基础代码之上,不做多大改动,增加对MQTT Over WebSocket的支持。

时间: 2024-10-14 12:32:38

MQTT协议笔记之mqtt.io项目TCP协议支持的相关文章

MQTT学习笔记——Yeelink MQTT服务 使用mqtt.js和paho-mqtt

0 前言 2014年8月yeelink推出基于MQTT协议的开关类型设备控制API,相比于基于HTTP RESTful的轮训方式,通过订阅相关主题消息,可以远程控制类应用实时性更好.本文使用两种方式实现开关类型设备的远程控制,一种是基于nodeJS的MQTT.js扩展库,另一种是基于python的paho-mqtt扩展库. [相关博文--MQTT] [MQTT学习笔记--MQTT协议体验 Mosquitto安装和使用] [MQTT学习笔记--树莓派MQTT客户端 使用Mosquitto和paho

MQTT协议笔记之mqtt.io项目Websocket协议支持

前言 MQTT协议专注于网络.资源受限环境,建立之初不曾考虑WEB环境,倒也正常.虽然如此,但不代表它不适合HTML5环境. HTML5 Websocket是建立在TCP基础上的双通道通信,和TCP通信方式很类似,适用于WEB浏览器环境.虽然MQTT基因层面选择了TCP作为通信通道,但我们添加个编解码方式,MQTT Over Websocket也可以的. 这样做的好处,MQTT的使用范畴被扩展到HTML5.桌面端浏览器.移动端WebApp.Hybrid等,多了一些想像空间.这样看来,无论是移动端

MQTT学习笔记——树莓派MQTT客户端 使用Mosquitto和paho-python

0 前言 本文说明如何在树莓派上安装Mosquitto.本文通过两个简单的例子说明树莓派中如何使用MQTT协议实现消息订阅,这些例子包括Mosquitto_sub指令实现消息订阅和paho-python扩展库实现GPIO端口的远程控制.本文中使用了两个工具--Mosquitto paho-python,其中Mosquitto是一款实现了 MQTT v3.1 协议的开源消息代理软件,提供轻量级的,支持发布/订阅的的消息推送模式,使设备对设备之间的消息通信简单易用:另外,paho-python是一个

学习笔记——网络编程3(基于TCP协议的网络编程)

TCP协议基础 IP协议是Internet上使用的一个关键协议,它的全称是Internet Protocol,即Internet协议,通常简称IP协议. 使用ServerSocket创建TCP服务器 在两个通信实体没有建立虚拟链路之前,必须有一个通信实体先做出“主动姿态”,主动接收来自其他通信实体的连接请求. Java中能接收其他通信实体连接请求的类是ServerSocket,ServerSocket对象用于监听来自客户端Socket连接,如果没有连接,它将一直处于等待状态. ServerSoc

TCP协议的学习(三)TCP协议三次握手及攻击

(一)三次握手 ACK : TCP协议规定,只有ACK=1时有效,也规定连接建立后所有发送的报文的ACK必须为1 SYN(SYNchronization) : 在连接建立时用来同步序号.当SYN=1而ACK=0时,表明这是一个连接请求报文.对方若同意建立连接,则应在响应报文中使SYN=1和ACK=1. 因此, SYN置1就表示这是一个连接请求或连接接受报文. 发送序列号:Sequence Number 确认序列号:Acknowledgment Number CLOSED: 初始状态. LISTE

MQTT协议笔记之mqtt.io项目HTTP协议支持

前言 MQTT协议诞生之初,就未曾考虑通过HTTP传输.这也正常,网络受限.不稳定网络不太适合HTTP(2G/3G网络大家使用WAP不也OK嘛).在网络较为充裕的桌面端而言,虽纯文本对比二进制而言没多大优势,受制于历史遗留和使用习惯,以及一大票传统基础设施方便控制事宜,传统互联网/企业型应用,HTTP协议都是默认最佳选择,安全可控,人机友好.选择HTTP也在情理之中. 虽桌面端日渐式微,但做统一的全平台化消息系统/消息中间件,也是趋势. MQTT OVER HTTP,为WEB环境提供HTTP通道

Java基础知识强化之网络编程笔记06:TCP之TCP协议发送数据 和 接收数据

1. TCP协议发送数据 和 接收数据 TCP协议接收数据:• 创建接收端的Socket对象• 监听客户端连接.返回一个对应的Socket对象• 获取输入流,读取数据显示在控制台• 释放资源 TCP协议发送数据: • 创建发送端的Socket对象• 这一步如果成功,就说明连接已经建立成功了.• 获取输出流,写数据• 释放资源 2. 代码实现: (1)发送端: 1 package cn.itcast_06; 2 3 import java.io.IOException; 4 import java

Java中的基于Tcp协议的网络编程

一:网络通信的三要素? IP地址     端口号     通信协议 IP地址:是网络中设备的通信地址.由于IP地址不易记忆,故可以使用主机名.本地环回地址,127.0.0.1   本地主机名localhost 端口号:发送端准备的数据要发送到指定的目的应用程序上,为了标识这些应用程序,所以用网络数字来标识这些不同的应用程序,这些数 字称为端口号.端口号是不同进程之间的标识.一般来说,有0~65535的端口可供使用,但是1~1024系统使用,或者称作保留端口. 通信协议:指定义的通信规则,这个规则

计算机网络(7)-----TCP协议概述

传输控制协议(Transmission Control Protocol) 概念 一种面向连接的.可靠的.基于字节流的传输层通信协议,由IETF的RFC 793定义.在简化的计算机网络OSI模型中,它完成第四层传输层所指定的功能,用户数据报协议(UDP)是同一层内另一个重要的传输协议. 主要特点 (1)TCP是面向连接的运输层协议.这就是说,应用程序在使用TCP协议之前,必须先建立TCP协议.在传送数据完毕后,必须释放已经建立的TCP连接.类似于“打电话”,需要先拨号建立连接,通话完之后要挂机释