emqtt 4 (我要publish消息了)

这次,分析处理publish msg的流程。

由protocol开始

publish 类型的packet的处理是:

 1 process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) ->
 2     %% ACL check
 3     ...
 4     publish(Packet, State);
 5     ...
 6 publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId),
 7         #proto_state{client_id = ClientId, username = Username, session = Session}) ->
 8     %% 处理packet 获得msg
 9     Msg = emqttd_message:from_packet(Username, ClientId, Packet),
10     %% 调用emqttd_session module的publish/2 函数
11     %% subscribe的时候,也是由protocol 进入的emqttd_session module
12     emqttd_session:publish(Session, Msg);

1、ACL 检查

2、处理packet 获得msg

3、调用session module进行处理

emqttd_session 模块处理

和subscribe的处理流程类似,emqttd_session:publish/2 也只是一个接口函数,该函数要根据QoS的不同,决定是

  1. 自己调用后续函数完成处理
  2. call session process 完成后续处理
 1 %% @doc Publish message
 2 -spec(publish(pid(), mqtt_message()) -> ok | {error, any()}).
 3 publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_0}) ->
 4     %% publish qos0 directly
 5     emqttd:publish(Msg);
 6 publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_1}) ->
 7     %% publish qos1 directly, and client will puback automatically
 8     emqttd:publish(Msg);
 9 publish(SessPid, Msg = #mqtt_message{qos = ?QOS_2}) ->
10     %% publish qos2 by session
11     gen_server2:call(SessPid, {publish, Msg}, ?PUBSUB_TIMEOUT). 

直接处理

如果是自己调用后续函数完成处理的话,则继续调用emqttd:publish/2,则在emqttd module 中继续调用emqttd_server:publish/1:

%% @doc Publish a Message
-spec(publish(Msg :: mqtt_message()) -> any()).
publish(Msg = #mqtt_message{from = From}) ->
    ...
    %% 处理topic
    ...
    %% pulish
    emqttd_pubsub:publish(Topic, Msg2),
    ...

还是subscribe处理套路:

emqttd_protocol ---> emqttd_session ---> emqttd ---> emqttd_server ---> emqttd_pubsub

在emqttd_pubsub module中的处理是:

 1 %% @doc Publish message to Topic.
 2 -spec(publish(binary(), any()) -> any()).
 3 publish(Topic, Msg) ->
 4     lists:foreach(
 5         fun(#mqtt_route{topic = To, node = Node}) when Node =:= node() ->
 6             ?MODULE:dispatch(To, Msg);
 7            (#mqtt_route{topic = To, node = Node}) ->
 8             rpc:cast(Node, ?MODULE, dispatch, [To, Msg])
 9         end, emqttd_router:lookup(Topic)).
10
11 dispatch(Topic, Msg) ->
12     case subscribers(Topic) of
13         [] ->
14             dropped(Topic);
15         [SubPid] ->
16             SubPid ! {dispatch, Topic, Msg};
17         SubPids ->
18             lists:foreach(fun(SubPid) ->
19                 SubPid ! {dispatch, Topic, Msg}
20             end, SubPids)
21     end.
22 %% @private
23 %% @doc Find all subscribers
24 subscribers(Topic) ->
25     case ets:member(subscriber, Topic) of
26         true -> %% faster then lookup?
27             try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end;
28         false ->
29             []
30     end.

至此,msg 就已经以{dispatch, Topic, Msg}的形式发送给 clientid 对应的session process了。

那么,就需要在emqttd_session module中的handle_info callback 函数处进行处理:

 1 %% Dispatch Message
 2 handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions})
 3     when is_record(Msg, mqtt_message) ->
 4     dispatch(tune_qos(Topic, Msg, Subscriptions), Session);
 5
 6 %% Deliver qos0 message directly to client
 7 dispatch(Msg = #mqtt_message{qos = ?QOS0}, Session = #session{client_pid = ClientPid}) ->
 8     ClientPid ! {deliver, Msg},
 9     hibernate(Session);
10 dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ})
11     when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
12     case check_inflight(Session) of
13         true  ->
14             noreply(deliver(Msg, Session));
15         false ->
16             hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})
17     end.

继而,将信息发送给socket controlling process,然后根据QoS的不同,判断是否需要等待ack。

总结

(流程示意图待补)

时间: 2024-10-28 12:22:24

emqtt 4 (我要publish消息了)的相关文章

(转)RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

RabbitMQ消息队列(九):Publisher的消息确认机制

在前面的文章中提到了queue和consumer之间的消息确认机制:通过设置ack.那么Publisher能不到知道他post的Message有没有到达queue,甚至更近一步,是否被某个Consumer处理呢?毕竟对于一些非常重要的数据,可能Publisher需要确认某个消息已经被正确处理. 在我们的系统中,我们没有是实现这种确认,也就是说,不管Message是否被Consume了,Publisher不会去care.他只是将自己的状态publish给上层,由上层的逻辑去处理.如果Message

RabbitMQ之消息确认机制(事务+Confirm)

概述 在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎

MQTT是IBM开发的一个即时通讯协议,构建于TCP/IP协议上,是物联网IoT的订阅协议,借助消息推送功能,可以更好地实现远程控制

最近一直做物联网方面的开发,以下内容关于使用MQTT过程中遇到问题的记录以及需要掌握的机制原理,主要讲解理论. 背景 MQTT是IBM开发的一个即时通讯协议.MQTT构建于TCP/IP协议上,面向M2M和物联网IoT的连接协议,采用轻量级发布和订阅消息传输机制.Mosquitto是一款实现了 MQTT v3.1 协议的开源消息代理软件,提供轻量级的,支持发布/订阅的的消息推送模式,使设备对设备之间的短消息通信简单易用. 基本概念 [MQTT协议特点]——相比于RESTful架构的物联网系统,MQ

Redis基础---消息通信模式

Redis发送订阅通信模式 Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息. Redis 发布订阅(pub/sub)实现了消息系统,发送者(在redis术语中称为发布者)在接收者(订阅者)接收消息时发送消息.传送消息的链路称为信道. 在Redis中,客户端可以订阅任意数量的信道. 首先,订阅者-客户端subscribe消息 localhost:6379> SUBSCRIBE redisChat Reading messages... (

MQTT协议笔记之消息流

前言 前面的笔记已把所有消息类型都过了一遍,这里从消息流的角度尝试解读一下. 网络故障 在任何网络环境下,都会出现一方连接失败,比如离开公司大门那一刻没有了WIFI信号.但持续连接的另一端-服务器可能不能立即知道对方已断开.类似网络异常情况,都有可能在消息发送的过程中出现,消息发送出去,就丢失了. MQTT协议假定客户端和服务器端稳定情况一般,彼此之通信管道不可靠,一旦客户端网络断开,情况就会很严重,很难恢复原状. 但别忘记,很多客户端会有永久性存储设备支持,比如闪存ROM.存储卡等,在通信出现

MQTT-SN协议乱翻之消息格式

前言 紧接着上篇初步介绍,本文为第二篇,主要梳理MQTT-SN 1.2协议中定义的消息格式. 通用消息格式 消息头 其它可变部分 2/4字节表示 N字节组成 消息头部 长度 消息类型 1或3个字节 1个字节 长度要么是1个字节,要么3个字节表示,并且自身也会包含在其内.一个字节可表示256长度,一般情况下,完全够用了. 只需要判断第一个字节是否为 0x01,若是那么长度为3个字节表示,剩下的两个字节会表示真正的消息长度,最大长度为65535 否则长度就是一个字节表示,256个长度,大部分消息长度

mqtt协议-broker之moqutte源码研究三之PUBLISH报文处理

先简单说明一下,对于mqtt是个双向通信的过程,也就是说,他既允许client向broker发布消息,同时也允许broker向client发布消息 public void processPublish(Channel channel, MqttPublishMessage msg) { final MqttQoS qos = msg.fixedHeader().qosLevel(); final String clientId = NettyUtils.clientID(channel); LO

EMQ 学习---订阅$SYS主题,捕获客户端上下线消息

acl.config文件定义了可订阅$SYS主题的权限. {allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}. {allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}. %%%{deny, all, subscribe, ["$SYS/#", {eq, "#"}