mqtt协议-broker之moqutte源码研究二之Connect报文处理

先上一个图,大概说明一下moquette 的类之间的关系

一.ProtocolProcessor类
该类是moquette里面的最终要的类,负责所有报文的处理,持有所有各模块功能的实现对象的引用, 下面详细介绍

    protected ConnectionDescriptorStore connectionDescriptors;//所有的连接描述符文存储,即clientId与通道之间的映射集合
protected ConcurrentMap<RunningSubscription, SubscriptionState> subscriptionInCourse;//所有当前正在处理的
    订阅关系的处理
private SubscriptionsDirectory subscriptions;//订阅目录,本质上是topic树
private ISubscriptionsStore subscriptionStore;//所有的订阅的集合
private boolean allowAnonymous;//是否允许匿名连接
private boolean allowZeroByteClientId;//是否允许clientId为空
private IAuthorizator m_authorizator; //对topic的读写权限认证

private IMessagesStore m_messagesStore;//retainMessage的存储

private ISessionsStore m_sessionsStore;//session 存储

private IAuthenticator m_authenticator;//连接时候的鉴权认证
private BrokerInterceptor m_interceptor;//各个层面的拦截器

private Qos0PublishHandler qos0PublishHandler;//qos0拦截器
private Qos1PublishHandler qos1PublishHandler;//qos1拦截器
private Qos2PublishHandler qos2PublishHandler;/qos2拦截器
private MessagesPublisher messagesPublisher;//消息发布器,当client向某个topic发布完消息后,
private InternalRepublisher internalRepublisher;//保留消息重发器
    ConcurrentMap<String, WillMessage> m_willStore//遗愿消息存储

    几乎所有的功能的源头都在这个类里面

二.对14种报文的处理,都在ProtocolProcessor类,后面会分篇挨个讲解moquette对这14个报文的处理
具体哪14中文报文如下

名字                   值           报文流动方向                      描述

Reserved 0 禁止 保留
CONNECT 1 客户端到服务端 客户端请求连接服务端
CONNACK 2 服务端到客户端 连接报文确认
PUBLISH 3 两个方向都允许 发布消息
PUBACK 4 两个方向都允许 QoS 1消息发布收到确认
PUBREC 5 两个方向都允许 发布收到(保证交付第一步)
PUBREL 6 两个方向都允许 发布释放(保证交付第二步)
PUBCOMP 7 两个方向都允许 QoS 2消息发布完成(保证交互第三步)
SUBSCRIBE 8 客户端到服务端 客户端订阅请求
SUBACK 9 服务端到客户端 订阅请求报文确认
UNSUBSCRIBE 10 客户端到服务端 客户端取消订阅请求
UNSUBACK 11 服务端到客户端 取消订阅报文确认
PINGREQ 12 客户端到服务端 心跳请求
PINGRESP 13 服务端到客户端 心跳响应
DISCONNECT 14 客户端到服务端 客户端断开连接
Reserved 15 禁止 保留

或者到这里看更详细的mqtt中文翻译
https://github.com/mcxiaoke/mqtt/blob/master/mqtt/02-ControlPacketFormat.md
非常感谢作者的辛劳工作和无私分享

三.debug跟踪moquette 对CONNECT报文的处理
大概分为以下几步
1.验证协议版本,如果不是mqtt-3.1或者mqtt-3.1.1则拒绝连接
2.验证clientId是否为空,如果为空,但是配置的时候(在上篇介绍的moquette.cof里面配置)要求不允许唯恐,即上面的allowZeroByteClientId或者cleanSession为false即要求保存会话,则视为不合法,拒绝连接,否则由moquette生成clientId
3.验证是否有登录的权限
这里面贴上源码讲解一下
private boolean login(Channel channel, MqttConnectMessage msg, final String clientId) {
// handle user authentication
if (msg.variableHeader().hasUserName()) {
byte[] pwd = null;
if (msg.variableHeader().hasPassword()) {
pwd = msg.payload().passwordInBytes();
} else if (!this.allowAnonymous) {
LOG.error("Client didn‘t supply any password and MQTT anonymous mode is disabled CId={}", clientId);
failedCredentials(channel);
return false;
}
if (!m_authenticator.checkValid(clientId, msg.payload().userName(), pwd)) {
LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}, password={}",
clientId, msg.payload().userName(), pwd);
failedCredentials(channel);
return false;
}
NettyUtils.userName(channel, msg.payload().userName());
} else if (!this.allowAnonymous) {
LOG.error("Client didn‘t supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
failedCredentials(channel);
return false;
}
return true;
}

3.1.如果CONNETCT报文里面的可变头里面没有用户名,直接返回true
3.2.如果有用户名,同时有密码,从可变头取出密码,调用m_authenticator进行验证
3.3 如果有用户名,没有密码,认证失败,拒绝连接
3.4 如果没有用户名,同时配置为不允许匿名,则认证失败

4.创建连接描述符,连接描述符包括clientId,channel,isCleanSession,ConnectState,同时判断连接描述符集合里面是否包括该连接描述符,如果包含,代表该连接以及建立,断开连接
5.根据CONNECT报文里面的Keep Alive time 来设置tcp参数
6.根据CONNECT报文遗愿消息标志位,觉得是否存储遗愿消息
7.返回CONNACK报文,这里面把返回CONNACK报文单独讲解一下

        private boolean sendAck(ConnectionDescriptor descriptor, MqttConnectMessage msg, final String clientId) {
    LOG.info("Sending connect ACK. CId={}", clientId);
    final boolean success = descriptor.assignState(DISCONNECTED, SENDACK);
    if (!success) {
        return false;
    }

    MqttConnAckMessage okResp;
    ClientSession clientSession = m_sessionsStore.sessionForClient(clientId);
    boolean isSessionAlreadyStored = clientSession != null;
    if (!msg.variableHeader().isCleanSession() && isSessionAlreadyStored) {
        okResp = connAckWithSessionPresent(CONNECTION_ACCEPTED);
    } else {
        okResp = connAck(CONNECTION_ACCEPTED);
    }

    if (isSessionAlreadyStored) {
        LOG.info("Cleaning session. CId={}", clientId);
        clientSession.cleanSession(msg.variableHeader().isCleanSession());
    }
    descriptor.writeAndFlush(okResp);
    LOG.info("The connect ACK has been sent. CId={}", clientId);
    return true;
}

        7.1 判断当前连接的状态,怎么判断的呢?这里面用了AtomicReference<ConnectionState>通过调用原子引用类  compareAndSet(DISCONNECTED, SENDACK)来解决并发修改连接状态的问题。
        7.2如果状态是disConnect,将状态修改为sendAck
        7.3 如果CONNETCT报文里面的CleanSession标识设置为0同时broker已经有了client的会话,将CONNACK报文里面的连接确认标志设为1,告诉客户端,broker已经有了响应的会话信息。否则将连接确认标志设为0
        7.4 如果已经存在相应的client的会话,则根据新的连接,更新clientSession里面的是否清理session属性

8.唤醒拦截器记录连接事件
9.创建或者从新加载clientSession,这里面单独讲解一下

            private ClientSession createOrLoadClientSession(ConnectionDescriptor descriptor, MqttConnectMessage msg,
        String clientId) {
    final boolean success = descriptor.assignState(SENDACK, SESSION_CREATED);
    if (!success) {
        return null;
    }

    ClientSession clientSession = m_sessionsStore.sessionForClient(clientId);
    boolean isSessionAlreadyStored = clientSession != null;
    if (!isSessionAlreadyStored) {
        clientSession = m_sessionsStore.createNewSession(clientId, msg.variableHeader().isCleanSession());
    }
    if (msg.variableHeader().isCleanSession()) {
        LOG.info("Cleaning session. CId={}", clientId);
        clientSession.cleanSession();
    }
    return clientSession;
}

     9.1 AtomicReference<ConnectionState>通过调用原子引用类  compareAndSet(SENDACK, SESSION_CREATED)将连接状态从sendAck修改为session_create
     9.2 session存储结合里面,是否已经存在会话信息,如果不存在,创建一个新的clientsession
     9.3 如果存在,根据CONNETCT报文里面的cleansession自动决定是否清理调旧的会话信息。

10.如果CONNETCT报文要求不清理会话信息(cleansession标志位为0),则重发QoS1 and QoS2 messages,同时将连接状态从session_create修改成message_republish
11.将连接状态从session_create修改成established

到此,broker和client直接的mqtt连接正式建立,后面client可以开始发送SUBSCRIBE或者PUBLISH报文了。
在这里再补充一点,对于broker来说,建立连接的过程中,连接状态会从disConnect->sendAck->session_create->message_republish->established,之所以要设置这些状态,是因为,每一步后面的操作都要基于前面的状态来决定是否需要真正执行,这里面用到了原子引用类来保证,状态的修改这个操作的原子行,确保了在并发的情况下,每一步操作都是条件满足的。

下面一篇将会讲解SUBSCRIBE报文的处理

原文地址:http://blog.51cto.com/13579730/2073630

时间: 2024-10-06 09:52:52

mqtt协议-broker之moqutte源码研究二之Connect报文处理的相关文章

mqtt协议-broker之moqutte源码研究二之SUBSCRIBE报文处理

这一篇开始讲解moqutte对SUBSCRIBE报文的处理 代码不复杂public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {String clientID = NettyUtils.clientID(channel);//从channel里面获取clientId,具体原理看下文int messageID = messageId(msg);LOG.info("Processing SUBSCRIBE mes

mqtt协议-broker之moqutte源码研究三之PUBLISH报文处理

先简单说明一下,对于mqtt是个双向通信的过程,也就是说,他既允许client向broker发布消息,同时也允许broker向client发布消息 public void processPublish(Channel channel, MqttPublishMessage msg) { final MqttQoS qos = msg.fixedHeader().qosLevel(); final String clientId = NettyUtils.clientID(channel); LO

mqtt协议-broker之moqutte源码研究一

mqtt协议的broker有很多,但是java的支持集群的并不多,之前调研过一番,发现moqutte基本满足需求,就想着基于这个在自己做二次开发.后面会逐渐把自己对moqutte的研究发布出来,希望能给有相同需求的同学一定的参考意义.github地址:https://github.com/andsel/moquette一.将代码倒入idea找到启动类启动报错,是因为找不到moquette的配置文件跟踪源码moquette的配置文件地址是config/moquette.conf因为咱们是直接启动的

mqtt协议-broker之moqutte源码研究五之UNSUBSCRIBE与DISCONN报文处理

本文讲解moquette对UNSUBSCRIBE和DISCONNECT的处理 先说UNSUBSCRIBE,代码比较简单 public void processUnsubscribe(Channel channel, MqttUnsubscribeMessage msg) { List<String> topics = msg.payload().topics(); String clientID = NettyUtils.clientID(channel); LOG.info("Pr

mqtt协议-broker之moqutte源码研究六之集群

moquette的集群功能是通过Hazelcast来实现的,对Hazelcast不了解的同学可以自行Google以下.在讲解moquette的集群功能之前需要讲解一下moquette的拦截器,因为moquette对Hazelcast的集成本身就是通过拦截器来实现的. 一.拦截器io.moquette.spi.impl.ProtocolProcessor类里面有一个BrokerInterceptor类,这个类就是broker拦截器,这个对象,在processConnect,processPubAc

Chrome自带恐龙小游戏的源码研究(二)

在上一篇<Chrome自带恐龙小游戏的源码研究(一)>中实现了地面的绘制和运动,这一篇主要研究云朵的绘制. 云朵的绘制通过Cloud构造函数完成.Cloud实现代码如下: 1 Cloud.config = { 2 HEIGHT:14, //云朵sprite的高度 3 MAX_CLOUD_GAP:400, //两朵云之间的最大间隙 4 MAX_SKY_LEVEL:30, //云朵的最大高度 5 MIN_CLOUD_GAP:100, //两朵云之间的最小间隙 6 MIN_SKY_LEVEL:71,

Android Launcher源码研究(二) 加载app流程1

今天主要分析Android Launcher源码中的一些重要类之间的关系,基本的加载流程.先来看一个类图 Launcher.java 是主Activity 在onCreate方法里面初始化了LauncherMode实例. LauncherApplication app = ((LauncherApplication)getApplication()); mModel = app.setLauncher(this); 直接进入LauncherApplication.java的方法 Launcher

Nginx源码研究二:NGINX的网络IO

NGINX作为服务端的应用程序,在客户端发出数据后,服务端在做着这样一些处理,数据先会经过网卡,网卡会和操作系统做交互,经过操作系统的协议栈处理,再和不同的应用程序交互. 在这里面涉及两个概念,一个是用户态,一个是内核态.应用程序通过系统调用函数进入内核空间,内核运行进行数据准备和数据拷贝等工作.对于NGINX来说,他是作为应用程序和操作系统交互,即是用户态和内核态的之间的交互,NGINX和内核交互方式有很多,例如open(),read() 等都是在和内核交互,而对于网络IO来说,我们知道lin

Nginx源码研究二:NGINX的内存管理

关于nginx的内存使用,我们先看代码,下面是nginx_cycle.c中对全局数据结构cycle的初始化过程 pool = ngx_create_pool(NGX_CYCLE_POOL_SIZE, log); //申请16K的内存池 if (pool == NULL) { return NULL; } pool->log = log; cycle = ngx_pcalloc(pool, sizeof(ngx_cycle_t)); if (cycle == NULL) { ngx_destroy