emqtt 3 (我要subscribe 这个topic)

这一次,主要分析client subscribe 某个topic 的处理流程。

由protocol开始

是的,还是要从protocol开始,至于为什么,之前就说过了。

subscribe 类型的packet的处理是:

 1 %% 直接过滤掉topic 为空的情况
 2 process(?SUBSCRIBE_PACKET(PacketId, []), State) ->
 3     send(?SUBACK_PACKET(PacketId, []), State);
 4
 5 process(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) ->
 6     %% 组装client 信息
 7     Client = client(State),
 8     %% 检查ACL
 9     ...
10     %% session 为clientid 对应的session pid
11     %% TopicTable 为 [{TopicName, QoS}]
12     emqttd_session:subscribe(Session, PacketId, TopicTable)
13     ...
14     ;
15     

1、过滤掉topictable 为空的情况

2、组装必要的client 信息,完成ACL检查

3、获取clientid 对应的session pid,并调用emqttd_session:subscribe/3 函数

emqttd_session 模块处理

emqttd_session:subscribe/3 只是一个接口函数,实际的处理逻辑是在emqttd_session 模块的handle_cast callback 中实现。

-spec(subscribe(pid(), mqtt_packet_id(), [{binary(), mqtt_qos()}]) -> ok).
subscribe(SessPid, PacketId, TopicTable) ->
    From   = self(), %%这里的self 是client process id
    AckFun = fun(GrantedQos) ->
               From ! {suback, PacketId, GrantedQos}
             end,    gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}).

接口函数的定义如上。

handle_cast callback 的实现如下:

handle_cast({subscribe, TopicTable0, AckFun},
            Session = #session{client_id     = ClientId,                   %% subscription 是dict
                               subscriptions = Subscriptions}) ->
    %% rewrite topic name 对topic name 做一些处理
    Subscriptions1 = lists:foldl(
        fun({Topic, Qos}, SubDict) ->
            case dict:find(Topic, SubDict) of
                {ok, Qos} ->
                    %% 已经存在,并且QoS 未更新,所以什么都不需要做
                    SubDict;
                {ok, OldQos} ->
                    %% 已经存在,但是QoS 更新,所以,需要更新一下
                    emqttd_server:update_subscription(ClientId, Topic, OldQos, Qos),
                    dict:store(Topic, Qos, SubDict);
                error ->
                    %% 不存在,直接添加
                    emqttd:subscribe(ClientId, Topic, Qos),
                    dict:store(Topic, Qos, SubDict)
            end
        end, Subscriptions, TopicTable),

更新subscribe

更新subscribe,也就是调用emqttd_server:update_subscription/4 。

emqttd_server 也是由pool 组织的gen_server进程,主要作用是subscription 的增删改查,subscription 信息是保存在 subscription mnesia table 中的,subscription mnesia table的字段信息如下:

-record(mqtt_subscription,
        {subid   :: binary() | atom(),
         topic   :: binary(),
         qos = 0 :: 0 | 1 | 2
        }).

其中,subid 即为subscriber id,也就是clientid,topic 即为topic的名称。

而,update subscription 的逻辑:

 1 %% 外部接口
 2 update_subscription(ClientId, Topic, OldQos, NewQos) ->
 3     call(server(self()), {update_subscription, ClientId, Topic, ?QOS_I(OldQos), ?QOS_I(NewQos)}).
 4
 5 handle_call({update_subscription, ClientId, Topic, OldQos, NewQos}, _From, State) ->
 6     if_subsciption(State, fun() ->
 7         OldSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = OldQos},
 8         NewSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = NewQos},
 9         %% 使用事物
10         mnesia:transaction(fun update_subscription_/2, [OldSub, NewSub]),
11         set_subscription_stats()
12     end), ok(State);
13
14 update_subscription_(OldSub, NewSub) ->
15     %% 删除旧的 subscription
16     mnesia:delete_object(subscription, OldSub, write),
17     %% 写入新的 subscription
18     mnesia:write(subscription, NewSub, write).

因为 subscription mnesia table的类型为bag,也就是一个clientid 可能会和多个topic 相对应,所以,不能依据key 进行delete,必须使用delete_object的方式。

创建subscribe

create subscribe的处理略微有些绕,不知道是作者有意而为之,还是其他什么原因。

首先,create subscribe的入口函数在emqttd module中,

-spec(subscribe(binary(), binary(), mqtt_qos()) -> {ok, mqtt_qos()}).
subscribe(ClientId, Topic, Qos) ->
    emqttd_server:subscribe(ClientId, Topic, Qos).

在此调用emqttd_server:subscribe/3 函数,并请求emqttd_server 进程,emqttd_server 进程调用handle_call callback 函数,处理请求。

 1 %% 外部接口
 2 -spec(subscribe(binary(), binary(), mqtt_qos()) -> ok).
 3 subscribe(ClientId, Topic, Qos) ->
 4     %% 这里的self 是emqttd_session 进程,这个调用是在emqttd_session
 5     %% module 中的 handle_cast callback 发起的
 6     From = self(),
 7     call(server(From), {subscribe, From, ClientId, Topic, ?QOS_I(Qos)}).
 8
 9 handle_call({subscribe, SubPid, ClientId, Topic, Qos}, _From, State) ->
10     %% call pubsub process
11     pubsub_subscribe_(SubPid, Topic),
12     if_subsciption(State, fun() ->
13         %% 将subscription 信息写入到 subscription mnesia table 中
14         add_subscription_(ClientId, Topic, Qos),
15         set_subscription_stats()
16     end),
17     %% monitor session pid,当起DOWN 之后,去掉subscribe 并移除相关信息
18     ok(monitor_subscriber_(ClientId, SubPid, State));
19
20 %% @private
21 %% @doc Call pubsub to subscribe
22 pubsub_subscribe_(SubPid, Topic) ->
23     case ets:match(subscribed, {SubPid, Topic}) of
24         [] ->
25             emqttd_pubsub:async_subscribe(Topic, SubPid),
26             ets:insert(subscribed, {SubPid, Topic});
27         [_] ->
28             false
29     end.

L26处,用了subscribed ets table,记录session pid subscribe 的所有topic,这样在 session pid DOWN的时候,就可以移除所有的topic 中session pid 相关的信息了。

而,emqttd_pubsub 同样是由pool 组织的gen_server 进程。

 1 %% 外部接口,发起请求
 2 -spec(async_subscribe(binary(), pid()) -> ok).
 3 async_subscribe(Topic, SubPid) when is_binary(Topic) ->
 4     cast(pick(Topic), {subscribe, Topic, SubPid}).
 5
 6 handle_cast({subscribe, Topic, SubPid}, State) ->
 7     %% 实际的处理函数
 8     add_subscriber_(Topic, SubPid),
 9     {noreply, setstats(State)};
10
11 add_subscriber_(Topic, SubPid) ->
12     %% 检查该Topic 是否已经存在
13     %% 若不存在,则先增加{Topic,Node}信息,为多node 场景服务
14     ...
15     ets:insert(subscriber, {Topic, SubPid}). %% 这里的subscriber 是一张ets table,接下来的publish 主要就是用的这张表 ********

至此,subscribe 操作的处理逻辑就ok了。

总结

应该也不需要,只是这代码贴的有点多了。(示意图待补)

时间: 2024-10-29 18:33:49

emqtt 3 (我要subscribe 这个topic)的相关文章

emqtt 1 (初初初初稿)

第一篇,先简单分析一下整个emqtt 的大致结构,包括两个部分: 1.message packet 类型 2.message 流向 message packet 类型 P1:mqtt_packet 的基本结构,其中header 中的type 与variable 的mqtt_packet_* 一一对应. emqtt 的packet 定义如下: 1 -record(mqtt_packet, 2 {header :: #mqtt_packet_header{}, 3 variable :: #mqtt

javascript设计模式——Publish/Subscribe

我们先引出问题的所在,这里使用一个订单系统: 在一个电子商务系统中,现在我们有订单模块,和信息模块两个主要模块,当下单成功时,我们用要发送订单信息的给客户. 下面是一个简单的解决办法,我们在一个类中调用另一个类的方法. // 订单类,存储订单的所有变量和函数 function Order( goods ){ this.goods = goods; } Order.prototype = { done: function(){ // 订单完成代码 this.sendSuccessMsg(); },

【RocketMQ】同一个项目中,同一个topic,可以通过不同的tag来订阅消息吗?

一.问题答案 是不可以的 而且后注册的会替换前注册的,MqConsumer2会替换MqConsumer,并且只结束tag-2的消息 /** * @date 2019/05/28 */ @Component @Slf4j public class MqConsumer implements MessageConsumer { @Override @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUI

mqtt client python example

This is a simple example showing how to use the [Paho MQTT Python client](https://eclipse.org/paho/clients/python/) to send data to Azure IoT Hub. You need to assemble the rights credentials and configure TLS and the MQTT protocol version appropriate

简单的订阅发布机制实现(Golang)

Redis和NSQ都有完善的订阅和发布实现,但参考它们的源码实现,做个实际例子,对两边的异同和这种机制会更有印象. 练习实现简单的 订阅/取消订阅/发布信息 功能,足够了. Server.go Server结构中的Dict用map保存了Channel的相关信息,而Channel结构中则用一个map保存了订阅这个Channel的Client. 这个与Redis中不太一样,Redis中的Dict用Channel的名字作为map的key,value则是其对应的Client列表.而Client中则保 存

iOS 即时通讯,从入门到 “放弃”?

原文链接:http://www.jianshu.com/p/2dbb360886a8 本文会用实例的方式,将 iOS 各种 IM 的方案都简单的实现一遍.并且提供一些选型.实现细节以及优化的建议. —— 由宇朋Look分享 前言 本文会用实例的方式,将iOS各种IM的方案都简单的实现一遍.并且提供一些选型.实现细节以及优化的建议. 注:文中的所有的代码示例,在github中都有demo:iOS即时通讯,从入门到“放弃”?(demo)可以打开项目先预览效果,对照着进行阅读. 言归正传,首先我们来总

innerHTML与IE浏览器内存泄露问题

使用 sIEve 扫描和筛选 如果大量使用 JavaScript 和 Ajax 技术开发 Web 2.0 应用程序,您很有可能会遇到浏览器的内存泄漏问题.如果您有一个单页应用程序或者一个页面要处理很多 UI 操作,问题可能比较严重.在本文中,学习如何使用 sIEve 工具检测并解决内存泄漏问题,本文也包含内存泄漏问题的应用示例以及解决方案. 简介 一般来说,浏览器的内存泄漏对于 web 应用程序来说并不是什么问题.用户在页面之间切换,每个页面切换都会引起浏览器刷新.即使页面上有内存泄漏,在页面切

观察者模式和订阅/发布者模式(转)

在翻阅资料的时候,有人把观察者(Observer)模式等同于发布(Publish)/订阅(Subscribe)模式,也有人认为这两种模式还是存在差异,而我认为确实是存在差异的,本质上的区别是调度的地方不同. 观察者模式 比较概念的解释是,目标和观察者是基类,目标提供维护观察者的一系列方法,观察者提供更新接口.具体观察者和具体目标继承各自的基类,然后具体观察者把自己注册到具体目标里,在具体目标发生变化时候,调度观察者的更新方法. 比如有个“天气中心”的具体目标A,专门监听天气变化,而有个显示天气的

js 之观察者模式

观察者模式又叫发布订阅模式(Publish/Subscribe),它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己. 使用观察者模式的好处: 支持简单的广播通信,自动通知所有已经订阅过的对象. 页面载入后目标对象很容易与观察者存在一种动态关联,增加了灵活性. 目标对象与观察者之间的抽象耦合关系能够单独扩展以及重用. JS里对观察者模式的实现是通过回调来实现的,我们来先定义一个pubsub对象,其内部包