mqtt协议-broker之moqutte源码研究三之PUBLISH报文处理

先简单说明一下,对于mqtt是个双向通信的过程,也就是说,他既允许client向broker发布消息,同时也允许broker向client发布消息

public void processPublish(Channel channel, MqttPublishMessage msg) {
    final MqttQoS qos = msg.fixedHeader().qosLevel();
    final String clientId = NettyUtils.clientID(channel);
    LOG.info("Processing PUBLISH message. CId={}, topic={}, messageId={}, qos={}", clientId,
            msg.variableHeader().topicName(), msg.variableHeader().packetId(), qos);
    switch (qos) {
        case AT_MOST_ONCE:
            this.qos0PublishHandler.receivedPublishQos0(channel, msg);
            break;
        case AT_LEAST_ONCE:
            this.qos1PublishHandler.receivedPublishQos1(channel, msg);
            break;
        case EXACTLY_ONCE:
            this.qos2PublishHandler.receivedPublishQos2(channel, msg);
            break;
        default:
            LOG.error("Unknown QoS-Type:{}", qos);
            break;
    }
}

根据发布的消息的qos,用不同的QosPublishHandler来处理,QosPublishHandler有三个具体实现,分别是Qos0PublishHandler,Qos1PublishHandler,Qos2PublishHandler.
这里面先讲解Qos1PublishHandler的处理

void receivedPublishQos1(Channel channel, MqttPublishMessage msg) {
    // verify if topic can be write
    final Topic topic = new Topic(msg.variableHeader().topicName());
    String clientID = NettyUtils.clientID(channel);
    String username = NettyUtils.userName(channel);
    if (!m_authorizator.canWrite(topic, username, clientID)) {
        LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
        return;
    }

    final int messageID = msg.variableHeader().packetId();

    // route message to subscribers
    IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
    toStoreMsg.setClientID(clientID);

    this.publisher.publish2Subscribers(toStoreMsg, topic, messageID);

    sendPubAck(clientID, messageID);

    if (msg.fixedHeader().isRetain()) {
        if (!msg.payload().isReadable()) {
            m_messagesStore.cleanRetained(topic);
        } else {
            // before wasn‘t stored
            m_messagesStore.storeRetained(topic, toStoreMsg);
        }
    }

    // 修改publish消息,slice出的ByteBuf对象,原文中存在内存泄漏
    MoquetteMessage moquetteMessage = new MoquetteMessage(msg.fixedHeader(), msg.variableHeader(), msg.content());
    m_interceptor.notifyTopicPublished(moquetteMessage, clientID, username);
    msg.content().release();
}

    1.鉴权,该client下的username是否有对该topic发布消息(对topic写)的权限。
    2.创建一个IMessagesStore.StoredMessage,同时把消息推送给所有该对该消息的订阅者。

        if (LOG.isTraceEnabled()) {
        LOG.trace("Sending publish message to subscribers. ClientId={}, topic={}, messageId={}, payload={}, " +
                "subscriptionTree={}", pubMsg.getClientID(), topic, messageID, DebugUtils.payload2Str(pubMsg.getPayload()),
            subscriptions.dumpTree());
    } else if(LOG.isInfoEnabled()){
        LOG.info("Sending publish message to subscribers. ClientId={}, topic={}, messageId={}", pubMsg.getClientID(), topic,
            messageID);
    }
    publish2Subscribers(pubMsg, topic);

判断是否是跟踪模式,如果是的话,会把当前所有的订阅关系打印到日志,由于这个需要遍历topic树,消耗比较大,所以是可配置的,在moquette.cof里面配置。
核心的处理逻辑在下面,接着往下看

    void publish2Subscribers(IMessagesStore.StoredMessage pubMsg, Topic topic) {
    List<Subscription> topicMatchingSubscriptions = subscriptions.matches(topic);
    final String topic1 = pubMsg.getTopic();
    final MqttQoS publishingQos = pubMsg.getQos();
    final ByteBuf origPayload = pubMsg.getPayload();

    for (final Subscription sub : topicMatchingSubscriptions) {
        MqttQoS qos = lowerQosToTheSubscriptionDesired(sub, publishingQos);
        ClientSession targetSession = m_sessionsStore.sessionForClient(sub.getClientId());

        boolean targetIsActive = this.connectionDescriptors.isConnected(sub.getClientId());
//TODO move all this logic into messageSender, which puts into the flightZone only the messages that pull out of the queue.
        if (targetIsActive) {
            if(LOG.isDebugEnabled()){
                LOG.debug("Sending PUBLISH message to active subscriber. CId={}, topicFilter={}, qos={}",
                    sub.getClientId(), sub.getTopicFilter(), qos);
            }
            // we need to retain because duplicate only copy r/w indexes and don‘t retain() causing
            // refCnt = 0
            ByteBuf payload = origPayload.retainedDuplicate();
            MqttPublishMessage publishMsg;
            if (qos != MqttQoS.AT_MOST_ONCE) {
                // QoS 1 or 2
                int messageId = targetSession.inFlightAckWaiting(pubMsg);
                // set the PacketIdentifier only for QoS > 0
                publishMsg = notRetainedPublishWithMessageId(topic1, qos, payload, messageId);
            } else {
                publishMsg = notRetainedPublish(topic1, qos, payload);
            }
            this.messageSender.sendPublish(targetSession, publishMsg);
        } else {
            if (!targetSession.isCleanSession()) {
                if(LOG.isDebugEnabled()){
                    LOG.debug("Storing pending PUBLISH inactive message. CId={}, topicFilter={}, qos={}",
                        sub.getClientId(), sub.getTopicFilter(), qos);
                }
                // store the message in targetSession queue to deliver
                targetSession.enqueue(pubMsg);
            }
        }
    }
}

大概分为以下几步
2.1.根据topic找出匹配的订阅集合list,这里面由于涉及到比较大的计算,所以单独讲解

         public List<Subscription> matches(Topic topic) {
    Queue<Token> tokenQueue = new LinkedBlockingDeque<>(topic.getTokens());
    List<ClientTopicCouple> matchingSubs = new ArrayList<>();
    subscriptions.get().matches(tokenQueue, matchingSubs);

  // 客户端使用带通配符的主题过滤器请求订阅时,客户端的订阅可能会重复,因此发布的消息可能会匹配多个过滤器。对于这种情
        //况,服务端必须将消息分发给所有订阅匹配的QoS等级最高的客户端。服务端之后可以按照订阅的QoS等级,分发消息的副本给每
        //一个匹配的订阅者。
    Map<String, Subscription> subsForClient = new HashMap<>();
    for (ClientTopicCouple matchingCouple : matchingSubs) {
        Subscription existingSub = subsForClient.get(matchingCouple.clientID);
        Subscription sub = this.subscriptionsStore.getSubscription(matchingCouple);
        if (sub == null) {
            // if the m_sessionStore hasn‘t the sub because the client disconnected
            continue;
        }
        // update the selected subscriptions if not present or if has a greater qos
        if (existingSub == null || existingSub.getRequestedQos().value() < sub.getRequestedQos().value()) {
            subsForClient.put(matchingCouple.clientID, sub);//注意这里最终存入的从session里面获取的订阅,而不是从topic目录里面获取的,是因为,有可能client当时并不在线或者该订阅的qos等级变化了
        }
    }
    return new ArrayList<>(subsForClient.values());
}
可以看的出来,会先创建一个队列,存储topic的层级比如/a/b/c,队列里面就会有三个运输[c,b,a]   这里面之所以要用到队列而不是,list就是因为后面进行匹配的时候需要确保先从第一个层级开始匹配,而不是最后一个
void matches(Queue<Token> tokens, List<ClientTopicCouple> matchingSubs) {
    Token t = tokens.poll();

    // check if t is null <=> tokens finished
    if (t == null) {
        matchingSubs.addAll(m_subscriptions);
        // check if it has got a MULTI child and add its subscriptions
        for (TreeNode n : m_children) {
            if (n.getToken() == Token.MULTI || n.getToken() == Token.SINGLE) {
                matchingSubs.addAll(n.subscriptions());
            }
        }

        return;
    }
    // we are on MULTI, than add subscriptions and return
    if (m_token == Token.MULTI) {
        matchingSubs.addAll(m_subscriptions);
        return;
    }
    for (TreeNode n : m_children) {
        if (n.getToken().match(t)) {
            // Create a copy of token, else if navigate 2 sibling it
            // consumes 2 elements on the queue instead of one
            n.matches(new LinkedBlockingQueue<>(tokens), matchingSubs);
            // TODO don‘t create a copy n.matches(tokens, matchingSubs);
        }
    }
}

这段代码不好理解,涉及到迭代topic树io.moquette.spi.impl.subscriptions.TreeNode,下面以一个图说明
另外对topic匹配规则不熟悉的同学可以看一下这里https://github.com/mcxiaoke/mqtt/blob/master/mqtt/04-OperationalBehavior.md
假如有A,B,C,D, E五个client,其中A订阅了/test/#,B订阅了/test/hello/#,C 订阅了/test/hello/beijing,
D订阅了/test/+/hello,现在E向topic-name为/test/hello/shanghai发布了一条消息请问哪几个client应该收到这条消息。
向画出topic树如下(请原谅我手画)

先分析 E发布消息的整个过程:
2.1.1./test/hello/shanghai被放入queue,取出来的顺序依次为test,hello,shanghai
2.1.2.第一轮先匹配test,test不为空,RootNode(其实是null)不为#,执行到遍历RootNode下的子节点,RootNode先的子节点只有一个,test,test.equals(test),然后当前treenode变为test,
2.1.3.从queue里面取出hello,hello不为空,test不为#,遍历test这个treenode的子节点,test有三个子节点,分别是(#,+,hello)
2.1. 3.1 子节点是#,# .mathcs(hello),当前节点是#,然后从队列里面取出shanghai,shanghai不为空,#为#,当前迭代终止,节点A匹配放入匹配的list,
2.1.3.2 子节点是+,+ .mathcs(hello)当前节点是+,然后从队列里面取出shanghai(这里可能有的同学有疑问,为什么还能取出shanghai呢,因为进行下一级迭代的时候是new的新的queue),上海不为空,+不为#所以不匹配,接着匹配+这个treenode的子节点,+只有一个子节点
2.1. 3.2.1 当前节点是hello,hello.mathcs(shanghai)不成立,迭代终止,所以D不会放入匹配的list
1.3.3 子节点是hello,hello.equals(hello),当前节点变成hello,然后从队列里面取出shanghai,shanghai不为空,hello不为#,遍历hello这个treenode下的子节点,hello先有两个子节点,分别是(#,beijing),
2.1.3.3.1 子节点是#,# .mathcs(hello),当前节点是#,从队列里面取出的是空,所以直接会走第一个if分支,并且把B放入匹配的list,并且退出方法。
2.1. 3.3.2 子节点是beijing,beijing.equals(shanghai)不成立,退出迭代
最终能够匹配成功的只有A,B这两个client
也就说说能够被成功匹配,要么/test/hello/shanghai的每一层级都能成功匹配,“+”能够够任意的单个层级,或者某一个层级是“#”(分别对应上面的两个if分支)
到此位置,有topic-name匹配由topic-filters组成的topic树的整个过程分析完成了。下面接着回到上面的publish2Subscribers的这个方法讲解匹配出client之后的动作

2.2 遍历找出的匹配的client,确定qos,qos取订阅请求要求的qos和发布的消息自身的qos的最小值,这个是mqtt协议自身规定的,之所以这样规定是因为,订阅的时候其基本单位是某个topic,订阅者只能订阅一个topic,而不能订阅一个消息,而发布消息的基本单位是一个消息,一个topic下可以有很多个消息,不同的消息可以有不同的qos,所以这里面,真正的qos是由订阅方和发布方共同决定的,出于性能的考虑,去最小的qos
2.3 根据连接描述符,判断是否client依然在线,如果不在线,且客户端要求保留会话,则把消息保存到该client的session的BlockingQueue<StoredMessage>,以待client再次上线之后,再次发送给该client,这个对应着在建立连接的时候有一个republish的动作,具体看http://blog.51cto.com/13579730/2073630的 第10步
2.3.如果在线,根据qos做不同的处理,如果qos是0,比较简单,之间发送,qos是1或者2,则会先把消息放入outboundFlightZone,产生一个messageId,再通过PersistentQueueMessageSender进行发送
io.moquette.spi.impl.PersistentQueueMessageSender#sendPublish,具体的分发逻辑比较简单,这里不详细讲解。类之间的关系是
ProtocolProcessor--》qos1PublishHandler--》MessagesPublisher--》PersistentQueueMessageSender,基本的逻辑就是通过ConnectionDescriptorStore进行发送,对于发送失败的要求保存会话的qos1或者2消息,将会继续保留,知道重复成功

到此publish2Subscribers方法即发送的核心逻辑讲解完了,让我们回到io.moquette.spi.impl.Qos1PublishHandler#receivedPublishQos1这个方法,

3.发送PUBACK消息,
4.如果是retain消息,但是有没有paylod,将该topic下的retain消息清除掉,可以理解成是客户端主动要求清除的,因为它发送了一个空的消息,如果有payload,则存储retain消息,对于保留消息,详细看这里https://github.com/mcxiaoke/mqtt/blob/master/mqtt/0303-PUBLISH.md 我简单总结一下,1.每个topic下永远只会存储一条retain消息,如果发送了两条,那么后面的将会将前面的覆盖;2.如果客户端发送了一个零字节的retain消息,broker将会清理调该topic下的retain消息,因为broker不会存储零字节的retain消息;3.服务端的保留消息不是会话状态的组成部分。服务端应该保留那种消息直到客户端删除它。4.当client收到broker发送的一个retain消息是,可以理解成这是client新建立的一个订阅的第一条消息。
5.唤醒拦截器

到此moquette对PUBLISH报文的处理讲解完了,这里面只讲解了qos1的处理,是因为qos0处理比较简单,而qos2我们没什么应用场景。另外这里说明一下比较容易混淆的概念cleanSession与retain消息
1.retain消息并不是session的一部分,它不与client挂钩,而是与topic-filter挂钩。也就是说当发布这个retain消息的clientsession不存在了,但是retain消息依然存在,除非有client主动删除它,
2.对于要求保留会话的client,会存在一个broker主动重新发送消息的过程,这个动作实在client重新建立连接的时候,具体看这里http://blog.51cto.com/13579730/2073630 的第10步,这是因为broker有责任对要求保留会话的client重新发送qos1与qos2消息
3.对于client发布订阅的时候,broker也会有一个主动发送消息的过程,具体看这里http://blog.51cto.com/13579730/2073914 的第8步
4.qos1消息和qos2是在消息发送失败或者,client不在线的时候,存储到clientsession里面的一个BlockingQueue<StoredMessage>里面的,而retain消息是在,broker收到的时候直接存储到IMessagesStore进行存储的,其底层是个Map<Topic, StoredMessage>

原文地址:http://blog.51cto.com/13579730/2074290

时间: 2024-11-06 09:35:28

mqtt协议-broker之moqutte源码研究三之PUBLISH报文处理的相关文章

mqtt协议-broker之moqutte源码研究二之Connect报文处理

先上一个图,大概说明一下moquette 的类之间的关系 一.ProtocolProcessor类该类是moquette里面的最终要的类,负责所有报文的处理,持有所有各模块功能的实现对象的引用, 下面详细介绍 protected ConnectionDescriptorStore connectionDescriptors;//所有的连接描述符文存储,即clientId与通道之间的映射集合 protected ConcurrentMap<RunningSubscription, Subscrip

mqtt协议-broker之moqutte源码研究二之SUBSCRIBE报文处理

这一篇开始讲解moqutte对SUBSCRIBE报文的处理 代码不复杂public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {String clientID = NettyUtils.clientID(channel);//从channel里面获取clientId,具体原理看下文int messageID = messageId(msg);LOG.info("Processing SUBSCRIBE mes

mqtt协议-broker之moqutte源码研究一

mqtt协议的broker有很多,但是java的支持集群的并不多,之前调研过一番,发现moqutte基本满足需求,就想着基于这个在自己做二次开发.后面会逐渐把自己对moqutte的研究发布出来,希望能给有相同需求的同学一定的参考意义.github地址:https://github.com/andsel/moquette一.将代码倒入idea找到启动类启动报错,是因为找不到moquette的配置文件跟踪源码moquette的配置文件地址是config/moquette.conf因为咱们是直接启动的

mqtt协议-broker之moqutte源码研究五之UNSUBSCRIBE与DISCONN报文处理

本文讲解moquette对UNSUBSCRIBE和DISCONNECT的处理 先说UNSUBSCRIBE,代码比较简单 public void processUnsubscribe(Channel channel, MqttUnsubscribeMessage msg) { List<String> topics = msg.payload().topics(); String clientID = NettyUtils.clientID(channel); LOG.info("Pr

mqtt协议-broker之moqutte源码研究六之集群

moquette的集群功能是通过Hazelcast来实现的,对Hazelcast不了解的同学可以自行Google以下.在讲解moquette的集群功能之前需要讲解一下moquette的拦截器,因为moquette对Hazelcast的集成本身就是通过拦截器来实现的. 一.拦截器io.moquette.spi.impl.ProtocolProcessor类里面有一个BrokerInterceptor类,这个类就是broker拦截器,这个对象,在processConnect,processPubAc

Android Launcher源码研究(三) 加载app流程2

接上次的. 首先Launcher实现了LauncherModel.Callbacks接口,APP信息数据加载成功后 ,回调接口把app信息显示到Launcher的 workspace界面上,这个过程代码里面称为bind. 下面是个类调用过程的时序图,不是很标准,不过能表达基本调用顺序帮助我们理解. 首先就是Launcher OnCreate中调用LauncherMode startLoader方法,这里只看异步的方式 就是当前的页面下标为-1,加载所有app信息 mWorkspace.getCu

Chrome自带恐龙小游戏的源码研究(四)

在上一篇<Chrome自带恐龙小游戏的源码研究(三)>中实现了让游戏昼夜交替,这一篇主要研究如何绘制障碍物. 障碍物有两种:仙人掌和翼龙.仙人掌有大小两种类型,可以同时并列多个:翼龙按高.中.低的随机飞行高度出现,不可并行.仙人掌和地面有着相同的速度向左移动,翼龙则快一些或慢一些,因为添加了随机的速度修正.我们使用一个障碍物列表管理它们,当它们移出屏幕外时则将其从列表中移除.同时再用一个列表记录它们的类型: 1 Obstacle.obstacles = []; //存储障碍物的数组 2 Obs

OAuth2学习及DotNetOpenAuth部分源码研究

OAuth2学习及DotNetOpenAuth部分源码研究 在上篇文章中我研究了OpenId及DotNetOpenAuth的相关应用,这一篇继续研究OAuth2. 一.什么是OAuth2 OAuth是一种开放认证协议,允许用户让第三方应用访问该用户在某一网站上存储的私密的资源(如照片,视频,联系人列表),而无需将用户名和密码提供给第三方应用.数字2表示现在使用第2代协议. 二.OAuth2中的角色 OAuth2有四种角色 resource owner资源所有者:比如twitter用户,他在twi

Chrome自带恐龙小游戏的源码研究(完)

在上一篇<Chrome自带恐龙小游戏的源码研究(七)>中研究了恐龙与障碍物的碰撞检测,这一篇主要研究组成游戏的其它要素. 游戏分数记录 如图所示,分数及最高分记录显示在游戏界面的右上角,每达到100分就会出现闪烁特效,游戏第一次gameover时显示历史最高分.分数记录器由DistanceMeter构造函数实现,以下是它的全部代码: 1 DistanceMeter.dimensions = { 2 WIDTH: 10, //每个字符的宽度 3 HEIGHT: 13, //每个字符的高 4 DE