emqtt 2 (我要连服务器)

这一篇,主要分析下,client 是怎么 connect server的,以及成功connect server 之后,会做哪些事情,session是怎么 start的。

由protocol 开始

之前分析过message 流向,接受到TCP 数据之后,经过parser 的解析,会交由protocol 进行packet 级别的数据处理,并根据不同的packet type 进行不同的后续处理。

connect 类型的packet 的处理是这样开始的:

 1 process(Packet = ?CONNECT_PACKET(Var), State0) ->
 2     %% 参数提取
 3     ...
 4     %% check proto 以及clientid
 5     case validate_connect(Var, State1) of
 6         ?CONNACK_ACCEPT ->
 7             %% auth
 8             ...
 9             case mqttd_sm:start_session(CleanSess, clientid(State2)) of
10                 ok ->
11                     %% 把client 相关信息,写到一张ets table 里
12                     ...
13                     %% 开始keepalive,后续再分析
14                     start_keepalive(KeepAlive)
15                 _ ->
16                     error
17             end;
18         _ ->
19             error
20     end,
21     %% connack

1、提取必要的参数,并进行参数验证

参数验证会检查proto 的version 以及name,并且进行必要的auth check,都通过验证之后,才会继续处理。否则,只好error了。

2、创建session

这个是connect 流程处理的重头戏。

3、记录额外信息,并开始keepalive

记录额外信息,能够进行额外的stat look,dashboard 会用到的。keepalive 就是一个心跳检查。

P1

创建session

0、准备一下

从上一个小节,可以看到,其实connect 流程的处理,主要就是『创建session』。

client 的session 是由session manager 进程创建的,session manager 进程是一个pool,pool size 和scheduler 的数量相关。而这些session managers 是在application start 的时候start的,挂在emqttd_sm_sup supervisor 进程下面(该进程挂在主sup 进程下面)。

而,session 的存储,主要用到的是 『session』mnesia table,其table 字段如下:

-record(mqtt_session,
        {client_id   :: binary(),
         sess_pid    :: pid(),
         persistent  :: boolean()
        }).

表为set 类型的表,client_id的key,sess_pid 是client_id 对应的session process id .

1、start_seesion

%% @doc Start a session
-spec(start_session(boolean(), binary()) -> {ok, pid(), boolean()} | {error, any()}).
start_session(CleanSess, ClientId) ->
    SM = gproc_pool:pick_worker(?POOL, ClientId),
    call(SM, {start_session, {CleanSess, ClientId, self()}}).

start_session/2 是一个接口函数,其实现部分,是在某一个session manager process 的handle_call callback 实现的。start_session/2 的第一个参数表示是否要清理掉旧的session,第二个参数就是clientid,从pool 里面选取一个session manager process,就是根据clientid(类似于hash)。

1.1 callback 的整体逻辑

 1 handle_call({start_session, Client = {false, ClientId, ClientPid}}, _From, State) ->
 2     %% 查 session
 3     case lookup_session(ClientId) of
 4         undefined ->
 5             %% Create session locally
 6             %% 增 session
 7             create_session(Client, State);
 8         Session ->
 9             %% 恢复 session
10             case resume_session(Session, ClientPid) of
11                 {ok, SessPid} ->
12                     {reply, {ok, SessPid, true}, State};
13                 {error, Erorr} ->
14                     {reply, {error, Erorr}, State}
15              end
16     end;
17 %% Transient Session
18 handle_call({start_session, Client = {true, ClientId, _ClientPid}}, _From, State) ->
19     case lookup_session(ClientId) of
20         undefined ->
21             create_session(Client, State);
22         Session ->
23             %% 删 session
24             case destroy_session(Session) of
25                 ok ->
26                     create_session(Client, State);
27                 {error, Error} ->
28                     {reply, {error, Error}, State}
29             end
30     end;

在handle_call的callback 中,基本的逻辑是这样的

1、首先根据clientid 查找是否存在session

  2、如果不存在,那么就create 一个全新的session

  3、如果存在,则

    4、要么恢复session

    5、要么先销毁session 之后,再重新create 一个全新的session

P2

1.2 查 session

查找session,是根据『session mnesia table』的key(clientid)进行 dirty_read

1.3 create session

create session 的思路是:

  1. spawn 一个新的进程,并且挂在emqttd_session_sup supervisor进程下面,返回sessionpid
  2. 利用transaction 将{clientid, sessionpid, true|false} 写入到『session mnesia table』
  3. session manager process monitor 一下 sessionpid(崩了之后,清理session 信息)

1.4 恢复session

恢复session 会先根据旧的session记录,判断 old session pid 是否存活,如果存活,则进行恢复操作,否则,error。

1.5 销毁session

首先shutdown sessionpid,然后清理session table即可。

总结

好像也不需要?(resume session 的处理流程 后补)

时间: 2024-07-30 18:05:40

emqtt 2 (我要连服务器)的相关文章

运用MQTT-JMeter插件测试MQTT服务器性能

今天我们介绍XMeter团队带来的新版MQTT-JMeter插件,您可以更为方便地添加MQTT连接.发布.订阅取样器,构造组合的应用场景,例如背景连接.多发少收.少发多收,计算消息转发时延等.利用该插件,我们为EMQ成功实施了包括百万级并发连接在内的一系列测试场景,这里有详细的测试报告. 该插件发布在github,欢迎下载.使用并告诉我们您的意见建议. 下面介绍一下插件的使用方法. 安装 从github下载您需要的release版本, 解压mqtt–xmeter-jar-with-depende

2emq服务器压力测试(无用)

https://blog.csdn.net/frankcheng5143/article/details/52117057 1登阿里云,进入服务控制界面 https://account.aliyun.com/login/login.htm?oauth_callback=https%3A%2F%2Fecs.console.aliyun.com%2F%3Fspm%3D5176.2020520001.0.0.6B1Uov#/home 账号dongdongmqtt 密码**********4******

树莓派编译安装 EMQ 服务器

前言 EMQ 是一款开源的物联网 MQTT 消息服务器,使用 Erlang/OTP 语言平台设计,在 DIY 智能家居时可以作为网关,前几天摸索了一下在树莓派中安装 EMQ 的方法,记录一下. 步骤 安装 Erlang sudo apt-get install erlang-dev erlang-edoc erlang-eunit erlang-reltool erlang-crypto erlang-eldap erlang-public-key erlang-runtime-tools er

转:EMQTT测试--安装与测试 (windows)

官网 我下载的是windows版 安装 参考 http://emqtt.com/docs/install.html 将下载的压缩包解压,我解压到了D盘 命令行窗口,cd到程序目录 控制台模式启动: .\bin\emqttd console 报错如下 无法启动此程序,因为计算机中丢失 MSVCR120.dll.尝试重新安装该程序以解决次问题. 找一个MSVCR120.dll文件,可以去网上下载,也可以在自己电脑上找 HBuilder下有 mqttfx下也有 XMind下也有 IDEA下也有 我拷贝

emqtt在centos6下的安装

1 emqtt下载地址 http://www.emqtt.com/downloads 右键 复制链接 http://www.emqtt.com/downloads/3011/centos6 2 打开服务器 下载 # 下载MQTT安装包 wget http://www.emqtt.com/downloads/3011/centos6 #解压安装包 unzip centos6 #进入文件夹 cd emqx #开始运行 ./bin/emqx start 其他命令 /bin/emqx stop./bin

使用EMQ搭建MQTT服务器

前言寒假的时候开始搭建mqtt服务器,一开始使用的是RabbitMQ,基于Erlang语言.但是RabbitMQ的本职工作是AMQP,MQTT只是他的一个插件功能,似乎有些大材小用,很多MQTT的功能也没有集成.这次我打算使用EMQ来重新部署我的MQTT服务器.EMQ也是基于 Erlang/OTP 语言平台开发.他是支持大规模连接和分布式集群,发布订阅模式的开源 MQTT 消息服务器.支持的输入协议不仅仅是MQTT,还包括WebSocket,以及物联网同样著名的与MQTT基于TCP传输协议不同的

配置MQTT服务器

第一步:下载一个Xshell 链接:https://pan.baidu.com/s/16oDa5aPw3G6RIQSwaV8vqw 提取码:zsb4 打开Xshell 前往MQTT服务器软件下载地址:http://emqtt.com/downloads ,复制链接.注意对应的版本! 主机下载MQTT软件,把刚刚拷贝的链接复制过来!因为不能直接拷贝,所以我们选择"复制命令输入",最终在控制台输入: wget http://emqtt.com/static/brokers/emqttd-c

EMQTT测试--安装与测试 (windows)

我下载的是windows版 安装 参考http://emqtt.com/docs/install.html 将下载的压缩包解压,我解压到了D盘 命令行窗口,cd到程序目录 控制台模式启动: .\bin\emqttd console 报错如下 无法启动此程序,因为计算机中丢失 MSVCR120.dll.尝试重新安装该程序以解决次问题. 找一个MSVCR120.dll文件,可以去网上下载,也可以在自己电脑上找 HBuilder下有mqttfx下也有XMind下也有IDEA下也有 我拷贝一个mqttf

emqtt emq 的主题访问控制 acl.conf

访问控制(ACL) EMQ 消息服务器通过 ACL(Access Control List) 实现 MQTT 客户端访问控制. ACL 访问控制规则定义: 允许(Allow)|拒绝(Deny) 谁(Who) 订阅(Subscribe)|发布(Publish) 主题列表(Topics) MQTT 客户端发起订阅/发布请求时,EMQ 消息服务器的访问控制模块,会逐条匹配 ACL 规则,直到匹配成功为止: --------- --------- --------- Client -> | Rule1