mqtt协议-broker之moqutte源码研究五之UNSUBSCRIBE与DISCONN报文处理

本文讲解moquette对UNSUBSCRIBE和DISCONNECT的处理

先说UNSUBSCRIBE,代码比较简单

    public void processUnsubscribe(Channel channel, MqttUnsubscribeMessage msg) {
    List<String> topics = msg.payload().topics();
    String clientID = NettyUtils.clientID(channel);

    LOG.info("Processing UNSUBSCRIBE message. CId={}, topics={}", clientID, topics);

    ClientSession clientSession = m_sessionsStore.sessionForClient(clientID);
    for (String t : topics) {
        Topic topic = new Topic(t);
        boolean validTopic = topic.isValid();
        if (!validTopic) {
            // close the connection, not valid topicFilter is a protocol violation
            channel.close();
            LOG.error("Topic filter is not valid. CId={}, topics={}, badTopicFilter={}", clientID, topics, topic);
            return;
        }
        if(LOG.isDebugEnabled()){
            LOG.debug("Removing subscription. CId={}, topic={}", clientID, topic);
        }
        subscriptions.removeSubscription(topic, clientID);
        clientSession.unsubscribeFrom(topic);
        String username = NettyUtils.userName(channel);
        m_interceptor.notifyTopicUnsubscribed(topic.toString(), clientID, username);
    }

    // ack the client
    int messageID = msg.variableHeader().messageId();
    MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, AT_LEAST_ONCE, false, 0);
    MqttUnsubAckMessage ackMessage = new MqttUnsubAckMessage(fixedHeader, from(messageID));

    LOG.info("Sending UNSUBACK message. CId={}, topics={}, messageId={}", clientID, topics, messageID);
    channel.writeAndFlush(ackMessage);
}

主要分为以下几步
1.从目录树下,移除该client的订阅,这个移除过程有点复杂,后面单独一篇专门讲解topic树
2.清除ClientSession里面的订阅,包括Set<Subscription> subscriptions,同时还得移除ISubscriptionsStore里面的Map<Topic, Subscription> subscriptions
3.唤醒拦截器
4.返回UNSUBACK ,这里注意UNSUBACK 是没有payload的。

再说DISCONNECT的处理

public void processDisconnect(Channel channel) throws InterruptedException {
    final String clientID = NettyUtils.clientID(channel);
    LOG.info("Processing DISCONNECT message. CId={}", clientID);
    channel.flush();
    final ConnectionDescriptor existingDescriptor = this.connectionDescriptors.getConnection(clientID);
    if (existingDescriptor == null) {
        // another client with same ID removed the descriptor, we must exit
        channel.close();
        return;
    }

    if (existingDescriptor.doesNotUseChannel(channel)) {
        // another client saved it‘s descriptor, exit
        LOG.warn("Another client is using the connection descriptor. Closing connection. CId={}", clientID);
        existingDescriptor.abort();
        return;
    }

    if (!removeSubscriptions(existingDescriptor, clientID)) {
        LOG.warn("Unable to remove subscriptions. Closing connection. CId={}", clientID);
        existingDescriptor.abort();
        return;
    }

    if (!dropStoredMessages(existingDescriptor, clientID)) {
        LOG.warn("Unable to drop stored messages. Closing connection. CId={}", clientID);
        existingDescriptor.abort();
        return;
    }

    if (!cleanWillMessageAndNotifyInterceptor(existingDescriptor, clientID)) {
        LOG.warn("Unable to drop will message. Closing connection. CId={}", clientID);
        existingDescriptor.abort();
        return;
    }

    if (!existingDescriptor.close()) {
        LOG.info("The connection has been closed. CId={}", clientID);
        return;
    }

    boolean stillPresent = this.connectionDescriptors.removeConnection(existingDescriptor);
    if (!stillPresent) {
        // another descriptor was inserted
        LOG.warn("Another descriptor has been inserted. CId={}", clientID);
        return;
    }

    LOG.info("The DISCONNECT message has been processed. CId={}", clientID);
}

1.检查连接描述符是否还存在,如果不存在,说明之前已经有客户端删除了它,直接关闭通道
2.判断这个client的连接描述符是不是,是不是还是当前使用这个通道的client?作者要先防止这种情况呢?先卖个关子,后面的第6条会说明
3.清除订阅请求,这里面好像只清楚了不要求保存会话信息的clientsession里面的ISessionsStore里面的Map<Topic, Subscription> subscriptions,而并没有清除ClientSession里面的Set<Subscription> subscriptions和topic树里面的订阅,这能够解释http://blog.51cto.com/13579730/2073914 这篇文章结尾讨论的问题了,只有Map<Topic, Subscription> subscriptions的订阅才是最准确的。
4.丢弃存储的消息,这里面也只是会丢弃不要去保存会话信息的消息
5.清除遗愿消息,对于遗愿消息,这里稍微啰嗦一点,遗愿消息是在初次连接的存储到ProtocolProcessor的ConcurrentMap<String, WillMessage> m_willStore这里面的,那么什么时候发送给订阅者呢?看下面

    io.moquette.server.netty.NettyMQTTHandler#channelInactive
    @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    String clientID = NettyUtils.clientID(ctx.channel());
    if (clientID != null && !clientID.isEmpty()) {
        LOG.info("Notifying connection lost event. MqttClientId = {}.", clientID);
        m_processor.processConnectionLost(clientID, ctx.channel());
    }
    ctx.close();
}
    说明是当netty检测到通道不活跃的时候通知ProtocolProcessor处理ConnectionLost事件的。
    public void processConnectionLost(String clientID, Channel channel) {
    LOG.info("Processing connection lost event. CId={}", clientID);
    ConnectionDescriptor oldConnDescr = new ConnectionDescriptor(clientID, channel, true);
    connectionDescriptors.removeConnection(oldConnDescr);//移除连接描述符
    // publish the Will message (if any) for the clientID
    if (m_willStore.containsKey(clientID)) {
        WillMessage will = m_willStore.get(clientID);
        forwardPublishWill(will, clientID);//发布遗愿消息
        m_willStore.remove(clientID);//移除遗愿消息存储
    }

    String username = NettyUtils.userName(channel);
    m_interceptor.notifyClientConnectionLost(clientID, username);//唤醒拦截器
}
    在以下这种情况下会发布遗愿消息
    遗嘱消息发布的条件,包括但不限于:
    服务端检测到了一个I/O错误或者网络故障。
    客户端在保持连接(Keep Alive)的时间内未能通讯。
    客户端没有先发送DISCONNECT报文直接关闭了网络连接。
    由于协议错误服务端关闭了网络连接。

    另外说明一下,遗愿消息是可以设置消息等级的,而且可以被设置成retain消息

6.连接描述符集合里面清除该通道对应的连接描述符,这里有一点很容易误解,强调一下

    boolean stillPresent = this.connectionDescriptors.removeConnection(existingDescriptor);
    if (!stillPresent) {
        // another descriptor was inserted
        LOG.warn("Another descriptor has been inserted. CId={}", clientID);
        return;
    }

    作者调用的是ConcurrentMap里面的boolean remove(Object key, Object value);这个方法要求key存在,且value 与预期的一样才会删除,也就说,是有可能存在的,key一样而value不一样的情况的,什么时候会出现?答案是client在两个设备上先后登陆,这个时候由于是存在一个map里面的所以后面的登陆所创建的连接描述符会覆盖前面的一个。当然这里面,也可以在覆盖之前强制断开之前那个连接,但是moquette并没有这么做,具体看源码io.moquette.server.ConnectionDescriptorStore#addConnection

也就说说moquette是允许存在一个账号多设备登陆的。将入client先后在A,B两个设备上建立连接,B连接会覆盖A连接,这个时候A连接虽然还在,但其实是永远也收不到消息的,因为发送消息的时候,会以ConnectionDescriptorStore里面存储的为准,具体看源码
io.moquette.server.ConnectionDescriptorStore#sendMessage,也就是说A连接会无谓的占用broker的资源,个人觉得这样并不好,也非常没有必要,大家可以自行改进。
现在大家就能够理解上面的第2步了,因为这个就是为双登陆的情况下,被覆盖的那个连接准备的。

原文地址:http://blog.51cto.com/13579730/2074573

时间: 2024-11-06 09:35:33

mqtt协议-broker之moqutte源码研究五之UNSUBSCRIBE与DISCONN报文处理的相关文章

mqtt协议-broker之moqutte源码研究一

mqtt协议的broker有很多,但是java的支持集群的并不多,之前调研过一番,发现moqutte基本满足需求,就想着基于这个在自己做二次开发.后面会逐渐把自己对moqutte的研究发布出来,希望能给有相同需求的同学一定的参考意义.github地址:https://github.com/andsel/moquette一.将代码倒入idea找到启动类启动报错,是因为找不到moquette的配置文件跟踪源码moquette的配置文件地址是config/moquette.conf因为咱们是直接启动的

mqtt协议-broker之moqutte源码研究二之Connect报文处理

先上一个图,大概说明一下moquette 的类之间的关系 一.ProtocolProcessor类该类是moquette里面的最终要的类,负责所有报文的处理,持有所有各模块功能的实现对象的引用, 下面详细介绍 protected ConnectionDescriptorStore connectionDescriptors;//所有的连接描述符文存储,即clientId与通道之间的映射集合 protected ConcurrentMap<RunningSubscription, Subscrip

mqtt协议-broker之moqutte源码研究三之PUBLISH报文处理

先简单说明一下,对于mqtt是个双向通信的过程,也就是说,他既允许client向broker发布消息,同时也允许broker向client发布消息 public void processPublish(Channel channel, MqttPublishMessage msg) { final MqttQoS qos = msg.fixedHeader().qosLevel(); final String clientId = NettyUtils.clientID(channel); LO

mqtt协议-broker之moqutte源码研究二之SUBSCRIBE报文处理

这一篇开始讲解moqutte对SUBSCRIBE报文的处理 代码不复杂public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {String clientID = NettyUtils.clientID(channel);//从channel里面获取clientId,具体原理看下文int messageID = messageId(msg);LOG.info("Processing SUBSCRIBE mes

mqtt协议-broker之moqutte源码研究六之集群

moquette的集群功能是通过Hazelcast来实现的,对Hazelcast不了解的同学可以自行Google以下.在讲解moquette的集群功能之前需要讲解一下moquette的拦截器,因为moquette对Hazelcast的集成本身就是通过拦截器来实现的. 一.拦截器io.moquette.spi.impl.ProtocolProcessor类里面有一个BrokerInterceptor类,这个类就是broker拦截器,这个对象,在processConnect,processPubAc

Chrome自带恐龙小游戏的源码研究(五)

在上一篇<Chrome自带恐龙小游戏的源码研究(四)>中实现了障碍物的绘制及移动,从这一篇开始主要研究恐龙的绘制及一系列键盘动作的实现. 会眨眼睛的恐龙 在游戏开始前的待机界面,如果仔细观察会发现恐龙会时不时地眨眼睛.这是通过交替绘制这两个图像实现的: 可以通过一张图片来了解这个过程: 为实现图片的切换,需要一个计时器timer,并且需要知道两张图片切换的时间间隔msPerFrame.当计时器timer的时间大于切换的时间间隔msPerFrame时,将图片切换到下一张,到达最后一张时又从第一张

Chrome自带恐龙小游戏的源码研究(六)

在上一篇<Chrome自带恐龙小游戏的源码研究(五)>中实现了眨眼睛的恐龙,这一篇主要研究恐龙的跳跃. 恐龙的跳跃 游戏通过敲击键盘的Spacebar或者Up来实现恐龙的跳跃.先用一张图来表示整个跳跃的过程: 首先规定向下为正方向,即重力加速度(g)为正,起跳的速度(v)为负,恐龙距离画布上方的距离为yPos: 每一帧动画中,速度都会与重力加速度相加得到新的速度,再用新的速度与yPos相加得到新的yPos,改变恐龙的位置为新的yPos,表现出来为yPos不断减小: 当恐龙升至最高点,此时速度为

OAuth2学习及DotNetOpenAuth部分源码研究

OAuth2学习及DotNetOpenAuth部分源码研究 在上篇文章中我研究了OpenId及DotNetOpenAuth的相关应用,这一篇继续研究OAuth2. 一.什么是OAuth2 OAuth是一种开放认证协议,允许用户让第三方应用访问该用户在某一网站上存储的私密的资源(如照片,视频,联系人列表),而无需将用户名和密码提供给第三方应用.数字2表示现在使用第2代协议. 二.OAuth2中的角色 OAuth2有四种角色 resource owner资源所有者:比如twitter用户,他在twi

Chrome自带恐龙小游戏的源码研究(完)

在上一篇<Chrome自带恐龙小游戏的源码研究(七)>中研究了恐龙与障碍物的碰撞检测,这一篇主要研究组成游戏的其它要素. 游戏分数记录 如图所示,分数及最高分记录显示在游戏界面的右上角,每达到100分就会出现闪烁特效,游戏第一次gameover时显示历史最高分.分数记录器由DistanceMeter构造函数实现,以下是它的全部代码: 1 DistanceMeter.dimensions = { 2 WIDTH: 10, //每个字符的宽度 3 HEIGHT: 13, //每个字符的高 4 DE