前言
MQTT协议专注于网络、资源受限环境,建立之初不曾考虑WEB环境,倒也正常。虽然如此,但不代表它不适合HTML5环境。
HTML5 Websocket是建立在TCP基础上的双通道通信,和TCP通信方式很类似,适用于WEB浏览器环境。虽然MQTT基因层面选择了TCP作为通信通道,但我们添加个编解码方式,MQTT Over Websocket也可以的。
这样做的好处,MQTT的使用范畴被扩展到HTML5、桌面端浏览器、移动端WebApp、Hybrid等,多了一些想像空间。这样看来,无论是移动端,还是WEB端,MQTT都会有自己的使用空间。
浏览器支持
话说,现代化浏览器都已经支持Websocket,这里有一个所有浏览器支持列表:
更详细列表,请直接访问:http://caniuse.com/websockets
毫无疑问,火狐和谷歌浏览器带动了现代浏览器的发展,对HTML5标准的支持也是如此。支持Websocket的浏览器单纯从上面数字来讲,73.88%的支持率。但实际上还得参考浏览器市场占有率:
上图数据,来源于: 2014年4月份全球主流浏览器市场份额排行榜
超过60%用户机器上浏览器的支持Websocket,数据很可观。
移动hybrid型应用会很欢迎Websocket
- 内置浏览器支持HTML5,Javascript操作Websocket连接MQTT
- 借助于原生TCP socket通道连接MQTT服务器,暴露JavaScript接口,间接使用
不支持Websocket的桌面浏览器,可以考虑Flash socket来帮忙!
针对不支持websocker的部分历史浏览器,可以考虑一下Flash socket,虽然使用Flashsocket用以模拟Websocket就很容易理解,但条件如下: - 需要单独占用一个端口专用于安全跨域访问策略 - 需要浏览器支持二进制Blob 支持二进制操作的浏览器现状:
比较一下支持Websocket和XHR2的桌面浏览器,重叠率很高,使用Flash Socket用以模拟Websocket必要性不大,在类似于IE平台上,不如直接使用Flash版本的
https://github.com/yangboz/as3MQTT/tree/master/MQTTClient_AS3
不支持Websocket浏览器怎么办
不是所有浏览器都支持Websocket,尤其是阻碍历史发展的IE6/IE7/IE8/IE9。MQTT协议为二进制协议压根和HTTP纯文本不兼容,尤其浏览器端JavaScript处理文本很合适,但二进制就显得笨手笨角,除非支持XHR2。
- 单独使用Flash MQTT Client,这这方面见解不深,实践很少,不便多说,您要是很了解,不妨告知一二。
- HTTP纯文本方式进行二进制对接
这部分后面专门会讲到。
服务器端支持
现有一些解决方案可能是后面为MQTT Broker,前面是添加一层代理。比如:例如 mod_websocket ,对应在线示范:http://test.mosquitto.org/ws.html
表面上看着很解藕的,实际上模仿的还是传统型的短连接反向代理架构:Nginx/Apache +Java/PHP/Python/Ruby。
客户端建立一条连接,服务器端需要使用到至少两个文件句柄,中间多了一层路径。优雅的解决方案,可以向socket.io看起。一套服务端程序,同时提供若干种协议供终端选择。其实,一台MQTT Broker中间件服务器,可以绑定多个端口,一个面向纯TCP的1883端口,一个面向Websocket的80/8080端口,共享基础逻辑,面向不同协议。
Websocket协议适配
服务器添加对Websocket支持,基本不用做多大改动。对比Tcp的附加到单个Channel的处理器列表:
1234567891011 |
public class TcpChannelInitializer extends ChannelInitializer<SocketChannel> { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast( new MqttMessageNewEncoder(), new MqttMessageNewDecoder(), new MqttMessageHandler()); } } |
view rawTcpChannelInitializer.java hosted with ? by GitHub
Websocket对应单个Channel的处理器列表:
12345678910111213141516171819202122232425262728293031323334 |
import io.mqtt.handler.HttpRequestHandler; import io.mqtt.handler.MqttMessageHandler; import io.mqtt.handler.coder.MqttMessageWebSocketFrameDecoder; import io.mqtt.handler.coder.MqttMessageWebSocketFrameEncoder; import io.mqtt.handler.http.HttpJsonpTransport; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; public class WebsocketChannelInitializer extends ChannelInitializer<SocketChannel> { private final static String websocketUri = "/websocket"; private HttpRequestHandler httpRequestHandler = new HttpRequestHandler( websocketUri); static { HttpJsonpTransport httpJsonpTransport = new HttpJsonpTransport(); HttpRequestHandler.registerTransport(httpJsonpTransport); } @Override public void initChannel(final SocketChannel ch) throws Exception { ch.pipeline().addLast( new HttpServerCodec(), new MqttMessageWebSocketFrameEncoder(), new HttpObjectAggregator(65536), httpRequestHandler, new WebSocketServerProtocolHandler(websocketUri), new MqttMessageWebSocketFrameDecoder(), new MqttMessageHandler()); } } |
view rawWebsocketChannelInitializer.java hosted with ? by GitHub
为了支持Websocket协议,仅仅额外增加了:
1234567891011121314 |
@Sharable public class MqttMessageWebSocketFrameEncoder extends MessageToMessageEncoder<Message> { @Override protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception { if (msg == null) return; byte[] data = ((Message) msg).toBytes(); out.add(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(data))); } } |
view rawMqttMessageWebSocketFrameEncoder.java hosted with ? by GitHub
1234567891011121314151617 |
public class MqttMessageWebSocketFrameDecoder extends MessageToMessageDecoder<BinaryWebSocketFrame> { private MqttMessageNewDecoder messageNewDecoder; public MqttMessageWebSocketFrameDecoder() { messageNewDecoder = new MqttMessageNewDecoder(); } @Override protected void decode(ChannelHandlerContext ctx, BinaryWebSocketFrame wsFrame, List<Object> out) throws Exception { ByteBuf buf = wsFrame.content(); this.messageNewDecoder.decode(ctx, buf, out); } } |
view rawMqttMessageWebSocketFrameDecoder.java hosted with ? by GitHub
小结
啰啰嗦嗦的讲了一大通Websocket,总之对Websocket的支持还算容易。后面有时间写写如何使用HTTP协议达到MQTT OVER HTTP的效果。
原文 http://www.blogjava.net/yongboy/archive/2014/05/26/414130.html