emqtt 1 (初初初初稿)

第一篇,先简单分析一下整个emqtt 的大致结构,包括两个部分:

1、message packet 类型

2、message 流向

message packet 类型

P1:mqtt_packet 的基本结构,其中header 中的type 与variable 的mqtt_packet_* 一一对应。

emqtt 的packet 定义如下:

1 -record(mqtt_packet,
2         {header    :: #mqtt_packet_header{},
3          variable  :: #mqtt_packet_connect{} | #mqtt_packet_connack{}
4                     | #mqtt_packet_publish{} | #mqtt_packet_puback{}
5                     | #mqtt_packet_subscribe{} | #mqtt_packet_suback{}
6                     | #mqtt_packet_unsubscribe{} | #mqtt_packet_unsuback{}
7                     | mqtt_packet_id() | undefined,
8          payload   :: binary() | undefined }).

包括包头、包的类型、实际的数据消息。

在emqtt 中,message packet 的类型主要分为一下几类(这些类型,基本上定义在emqttd_protocol.hrl 文件中):

1、connect/connack

P2:connect packet的基本结构

用于客户端向server端建立链接以及server 端向client 确定链接。

 1 -record(mqtt_packet_connect,
 2         {client_id   = <<>>              :: mqtt_client_id(),
 3          proto_ver   = ?MQTT_PROTO_V311  :: mqtt_vsn(),
 4          proto_name  = <<"MQTT">>        :: binary(),
 5          will_retain = false             :: boolean(),
 6          will_qos    = ?QOS_0            :: mqtt_qos(),
 7          will_flag   = false             :: boolean(),
 8          clean_sess  = false             :: boolean(),
 9          keep_alive  = 60                :: non_neg_integer(),
10          will_topic  = undefined         :: undefined | binary(),
11          will_msg    = undefined         :: undefined | binary(),
12          username    = undefined         :: undefined | binary(),
13          password    = undefined         :: undefined | binary()}).
14
15 -record(mqtt_packet_connack,
16         {ack_flags = ?RESERVED   :: 0 | 1,
17          return_code             :: mqtt_connack() }).

其中,主要的几个字段,client_id、proto_ver、proto_name以及keep_alive。

client_id在username undefined的情况下,充当username的角色,是客户端的唯一标识,在接下来的session manage以及 subscription 管理中,具有非常重要的作用。

2、subscribe/suback

P3:subscribe packet的基本结构

用于处理客户端 subscribe 某个topic,已经server端向客户端的确认。

1 -record(mqtt_packet_subscribe,
2         {packet_id   :: mqtt_packet_id(),
3          topic_table :: list({binary(), mqtt_qos()}) }).
4 -record(mqtt_packet_suback,
5         {packet_id   :: mqtt_packet_id(),
6          qos_table   :: list(mqtt_qos() | 128) }).

topic_table 字段表示的是所要subscribe的topic 以及对应的QoS。

3、unsubscribe/unsuback

subscribe的反向操作

4、publish/puback

P4: publish packet的基本结构

1 -record(mqtt_packet_publish,
2         {topic_name  :: binary(),
3          packet_id   :: mqtt_packet_id() }).
4
5 -record(mqtt_packet_puback,
6         {packet_id   :: mqtt_packet_id() }).

topic_name指定了将要publish到的topic的名字。

在emqtt中,用Erlanger中record数据类型,定义了以上一种message packet的类型,而且,这些类型的packet的作用都显而易见。

message 流向

首先,mqtt是基于TCP协议的,因此,emqtt本身也是架构在TCP server上的service。其底层,基于esockd(主要借鉴RabbitMQ networking的实现)。socket controlling 进程的执行逻辑是emqttd_client module。emqttd_client module主要与esockd application 中的esockd_connection module 相关联,并存在必要的callback 关系。

由于socket controlling 进程的入口以及主要执行逻辑由emqttd_client module实现。

因此:

  • message进入到server后,首先由emqttd_client module进行处理

进入到emqtt server的message,主要是二进制socket数据,想要转换成Erlang的内部数据结构,就必要进行必要的数据解析。

1、socket 数据包接收

1 handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
2     Size = size(Data),
3     ?LOG(debug, "RECV ~p", [Data], State),
4     emqttd_metrics:inc(‘bytes/received‘, Size),
5     received(Data, rate_limit(Size, State#client_state{await_recv = false}));

L2,L3,L4这三行代码主要用于调试和metric。

2、socket 数据包解析

socket 二进制数据包的解析,主要是由emqttd_parser module进行处理,包括二进制协议的解析以及socket 粘包的处理。

  • message 由emqttd_parser module进行二进制数据协议的解析

解析成为mqtt_pakcet record 类型的数据结构。

3、mqtt packet 处理

emqtt中,mqtt packet的处理是由emqtt_protocol module完成,其入口函数为received/2:

 1 %% A Client can only send the CONNECT Packet once over a Network Connection.
 2 -spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}).
 3 received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) ->
 4     process(Packet, State#proto_state{connected = true});
 5
 6 received(?PACKET(?CONNECT), State = #proto_state{connected = true}) ->
 7     {error, protocol_bad_connect, State};
 8
 9 %% Received other packets when CONNECT not arrived.
10 received(_Packet, State = #proto_state{connected = false}) ->
11     {error, protocol_not_connected, State};
12
13 received(Packet = ?PACKET(_Type), State) ->
14     trace(recv, Packet, State),
15     case validate_packet(Packet) of
16         ok ->
17             process(Packet, State);
18         {error, Reason} ->
19             {error, Reason, State}
20     end.

前三个子函数,主要处理"是否已经login"相关的packet,而最后一个子函数则是在login 之后,处理正常的packet。

在emqttd_protocol module 中,由入口函数received 做简单的处理之后,则交由本module 中的process/2 函数进行处理,并最后交由后续的实际业务module 进行处理。

  • mqtt packet 由emqttd_protocol module进行处理,并交由后续的module

综上,在emqtt中,message 基本的流向为:

  1. message进入到server后,首先由emqttd_client module进行处理
  2. message 由emqttd_parser module进行二进制数据协议的解析
  3. mqtt packet 由emqttd_protocol module进行处理,并交由后续的module
  4. 后续根据不同类型的packet,再做不同的处理

图示如下:

P5: message基本流向示意图

总结

需要?

时间: 2024-10-09 21:34:30

emqtt 1 (初初初初稿)的相关文章

2016年总结:教师路的开启,爱情味的初尝 (上)

哎!2016年终于结束了,感觉这是自己二十五年生命中最漫长的一年,发生的事情真的太多太多.有毕业母校.同学.老师和朋友的留念,毕竟在帝都一待六年,还是有太多的不舍与情怀:有找工作.做毕业设计以及帮助二十多个陌生人完成毕业设计的艰辛,这些人大多数都是从未谋面的过客,只是自己刚好会写点代码,多做点挺好:有为了自己从小的教师梦,来回奔波京黔,最终放弃互联网.离开北京,虽然遗憾,但从未后悔,有的只是享受:还有讲台前给学生分享自己的大学生活,以及他们带给我的无数感动,数不清的加班夜和凌晨三点的财大:更有年

PHP协程初体验

PHP协程初体验 By warezhou 2014.11.24 上次通过C扩展为PHP添加coroutine尝试失败之后,由于短期内啃下Zend可能性几乎为零,只能打语言原生能力的主意了.Google之后发现,PHP5.5引入了Generator和Coroutine新特性,于是才有了本文的诞生. 背景阅读 <当C/C++后台开发遇上Coroutine> http://blog.csdn.net/cszhouwei/article/details/14230529 <一次失败的PHP扩展开

初笋科技:天阔资本穆延飞:只有你擅长的才是真正的风口

初笋科技:天阔资本穆延飞:只有你擅长的才是真正的风口 这个已过了不惑之年的西北汉子,军人出身的他和现在的资本家------天阔资本首席执行官,似乎八竿子打不到一块儿.但是,在投资市场上的战绩却历历在目.算起来,天阔资本是从06年开始,在投资行业涉入的时间还是比较早,一开始主要是实体投资,主要与政府间用BOT合作,今年还斥资9800万做了一个BOT项目.而互联网的投资主要在大热的时候开始,是从14年开始,就做到了新三板.也许,多年的军队行政管理,专业技术,机关参谋多岗位工作经历,养就了穆延飞对待投

cocos2d-x ios游戏开发初认识(五) CCsprite精灵类

这次写一下精灵创建的几种类型: 一.通过文件创建: 在原有的基础上添加如下代码: //一.通过文件创建精灵 CCSprite *bg =CCSprite::create("map.png"); CCSize winSize  =CCDirector::sharedDirector()->getWinSize(); //得到屏幕的尺寸 bg->setPosition(ccp(winSize.width/2, winSize.height/2)); this->addCh

erlang 初体验

最近测试了一下 erlang的坑... 如不出意外.... 大家第一眼看到这语法... 心里第一句一定是"我擦.这TM都是啥!!!!!" 没有变量!!! 没有结构体!!! 没有循环!!! 好吧,至少我是这样想的. 找了半天..连个if也不知道怎么写.. 这记录一些基本常识.. -module(module_name)  %%定义模块 括号内的要和文件名相同. -export([fun1/1 fun2/2]) %%这里是导出2个函数对外使用  函数名/参数名. 一个简单的函数定义如下 f

linux初体验

第一次听到linux这个'词语'是在一次偶然的朋友聊天中朋友提到的,之前压根没听到过'这个东西',所以我可以说是个linux的新新手,菜鸟都不算. 截至到目前,我已经开始linux系统运维学习有差不多10天时间了.在没接触linux之前,我对它的认识仅仅是:它是个计算机系统.决定学习linux系统运维之前,自我以为运维应该是对系统的一些日常维护之类的,不会很难的东西,我更希望运维是个不难的东西,我个人很笨,对难的东西可能接受的很慢,所以我愿意认为运维是很简单的,这样我就可以轻轻松松的掌握运维相关

初尝Mcafee之在ePO中进行策略和客户端任务设置【06】

一.策略和客户端任务概述 在ePO中点击"菜单",可以看到一个策略的大分类:ePO就是通过分配策略和客户端任务给客户端代理,然后代理将这些策略和客户端任务分配给本地相应的Mcafee杀毒防护软件进行执行: 策略是针对软件的内在参数和计划任务的配置,例如VirusScan是否扫描压缩文件,VirusScan的扫描计划的设置: 客户端任务是针对软件的外在交互,例如安装,部署,更新,信息统计等: 二.策略和客户端任务的分配结构: 策略和客户端任务的分配结构有点跟Windows Server的

wxFormBuilder初体验

第一步 打开wxFormBuilder 修改工程信息并保存工程 Name: 工程名 File: 生成代码(.py)文件名 Code_generation: 生成代码类型 第二步 创建窗体 切换至forms页 选择Form按钮创建框架(或Dialog按钮创建对话框) 修改窗体信息 name:窗体类名 title:窗体标题 第三步 创建布局 切换至Layout页 选择BoxSizer按钮创建单行/列布局 第四步 创建面板容器 切换至Containers页 选择Panel按钮创建面板 并取消边框选项

初到博客园

因为一份新的工作,开始接触到软件开发的知识,虽说只需要掌握一些基础的linux,mongodb的知识,可是对于我来说,也是个不小的挑战. 已经接触linux的基础命令,知晓一些基础的命令行,参考鸟哥,从一开始的小白,安装软件,到现在基本的一些常识都在慢慢的积累,希望自己不要跑偏,朝着目前的工作需求慢慢前行. 也许正是因为工作需求,我自己才能从头开始学起一门新的知识,毕业才两年,似乎很多时候,已经很难静下心去学习新的东西,此次正好换城市,换工作,换行业,而且,领导给我时间让我自己学习,这是多么仁慈