Mosquitto pub/sub服务实现代码浅析-主体框架

Mosquitto 是一个IBM 开源pub/sub订阅发布协议 MQTT 的一个单机版实现(目前也只有单机版),MQTT主打轻便,比较适用于移动设备等上面,花费流量少,解析代价低。相对于XMPP等来说,简单许多。

MQTT采用二进制协议,而不是XMPP的XML协议,所以一般消息甚至只需要花费2个字节的大小就可以交换信息了,对于移动开发比较有优势。

IBM虽然开源了其MQTT消息协议,但是却没有开源其RSMB服务端程序,不过还好目前有比较稳定的实现可用,本文的Mosquitto是其中比较活跃的实现之一,具体在 这里 有目前的实现列表可供选择。

趁着大脑还没有进入睡眠状态记录一下刚才看代码学到的东西。我下载的版本是1.2.2版,在 这里 可以找到 下载链接 。

零、介绍

关于 MQTT 3.1协议 本身比较简单,42页的PDF介绍完了,相比XMPP那长长的文档,谢天谢地了。由于刚看,所以很多细节都没有深入进去,这里只是记录个大概,后续有时间慢慢补好坑吧。

总体来说,mosquitto实现有如下几个特点:

  1. poll()异步模型,竟然不是epoll,也许这注定了其只能支持十几万连接同时在线的悲剧吧。
  2. 内存处理方面几乎没有任何优化,但简单可依赖;
  3. 多线程程序,许多地方都得加锁访问。但其实多线程的需求没那么强烈,可以考虑避免;

总之,是一个比较简单单可以适用于一般的服务中提供pub/sub功能支持,但如果放到大量并发的系统中,可以优化的地方还有很多。关于mosquitto的性能,暂时没有找到官方的评测,不过在 邮件组里面找到的一些讨论 似乎显示其性能上限为20W连接时在线的状态,当然具体取决于业务逻辑,交互是否很多等。不过这样的成绩还是不错的。一台机器可以起多个实例的嘛。


一、初始化

mosquitto.c文件main开头调用_mosquitto_net_init初始化SSL加密的库,然后调用mqtt3_config_init初始化配置的各个数据结构为默认值。配置文件的解析由mqtt3_config_parse_args牵头完成,具体配置文件解析就不多写了,fgets一行行的读取配置,然后设置到config全局变量中。其中包括对于监听地址等的读取。

然后保存pid进程号。mqtt3_db_open打开db文件

int main(int argc, char *argv[])
{

    memset(&int_db, 0, sizeof(struct mosquitto_db));

    _mosquitto_net_init();

    mqtt3_config_init(&config);
    rc = mqtt3_config_parse_args(&config, argc, argv);//k: init && load config file, set struct members

  配置读取完后,就可以打开监听端口了,使用mqtt3_socket_listen打开监听端口,并将SOCK套接字放在局部变量listensock里面,以便后面统一使用。

listener_max = -1;
    listensock_index = 0;
    for(i=0; i<config.listener_count; i++){
        if(mqtt3_socket_listen(&config.listeners[i])){
            _mosquitto_free(int_db.contexts);
            mqtt3_db_close(&int_db);
            if(config.pid_file){
                remove(config.pid_file);
            }
            return 1;
        }
        listensock_count += config.listeners[i].sock_count;
        listensock = _mosquitto_realloc(listensock, sizeof(int)*listensock_count);
        if(!listensock){
            _mosquitto_free(int_db.contexts);
            mqtt3_db_close(&int_db);
            if(config.pid_file){
                remove(config.pid_file);
            }
            return 1;
        }
        for(j=0; j<config.listeners[i].sock_count; j++){
            if(config.listeners[i].socks[j] == INVALID_SOCKET){
                _mosquitto_free(int_db.contexts);
                mqtt3_db_close(&int_db);
                if(config.pid_file){
                    remove(config.pid_file);
                }
                return 1;
            }
            listensock[listensock_index] = config.listeners[i].socks[j];
            if(listensock[listensock_index] > listener_max){
                listener_max = listensock[listensock_index];
            }
            listensock_index++;
        }
    }

  关于mqtt3_socket_listen函数也比较经典,socket(),bind(), listen()的流程,不同的是使用了新版的套接字信息获取函数getaddrinfo,该函数支持IPV4和IPV6,对应用层透明,不需要处理这些信息。

  

 mqtt3_socket_listen(struct _mqtt3_listener *listener)
{
    snprintf(service, 10, "%d", listener->port);
    memset(&hints, 0, sizeof(struct addrinfo));
    hints.ai_family = PF_UNSPEC;
    hints.ai_flags = AI_PASSIVE;
    hints.ai_socktype = SOCK_STREAM;

    //导致下面返回多个链表节的的因素可能有:
    //hostname参数关联的地址有多个,那么每个返回一个节点;比如host为域名的时候,nslookup返回几个ip就有几个
    //service参数指定的服务会吃多个套接字接口类型,那么也返回多个
    if(getaddrinfo(listener->host, service, &hints, &ainfo)) return INVALID_SOCKET;

    listener->sock_count = 0;
    listener->socks = NULL;

    for(rp = ainfo; rp; rp = rp->ai_next){
        //····
        sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
        if(sock == -1){
            strerror_r(errno, err, 256);
            _mosquitto_log_printf(NULL, MOSQ_LOG_WARNING, "Warning: %s", err);
            continue;
        }
        listener->sock_count++;
        listener->socks = _mosquitto_realloc(listener->socks, sizeof(int)*listener->sock_count);
        if(!listener->socks){
            _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
            return MOSQ_ERR_NOMEM;
        }
        listener->socks[listener->sock_count-1] = sock;
        /* Set non-blocking */
        opt = fcntl(sock, F_GETFL, 0);

        if(bind(sock, rp->ai_addr, rp->ai_addrlen) == -1){
            strerror_r(errno, err, 256);
            _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: %s", err);
            COMPAT_CLOSE(sock);
            return 1;
        }

        if(listen(sock, 100) == -1){
            strerror_r(errno, err, 256);
            _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: %s", err);
            COMPAT_CLOSE(sock);
            return 1;
        }
    }
    freeaddrinfo(ainfo);
}

二、消息事件循环

打开监听套接字后,就可以进入消息事件循环,标准网络服务程序的必须过程。这个由main函数调用mosquitto_main_loop启动。mosquitto_main_loop函数主体也是一个大循环,在循环里面进行超时检测,事件处理,网络读写等等。由于使用poll模型,所以就需要在进行poll()等待之前准备需要监听的套接字数组列表pollfds,效率低的地方就在这里。

对于监听套接字,简单将其加入pollfds里面,注册POLLIN可读事件即可。如果对于其他跟客户端等的连接,就需要多做一步操作了。如果是桥接模式,进行相应的处理,这里暂时不介绍桥接模式,桥接模式是为了分布式部署加入的非标准协议,目前只有IBM rsmb和mosquitto实现了。

对于跟客户端的连接,mosquitto会在poll等待之前调用mqtt3_db_message_write尝试发送一次未发送的数据给对方,避免不必要的等待可能。

int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock_count, int listener_max)
{
        memset(pollfds, -1, sizeof(struct pollfd)*pollfd_count);

        pollfd_index = 0;
        for(i=0; i<listensock_count; i++){//注册监听sock的pollfd可读事件。也就是新连接事件
            pollfds[pollfd_index].fd = listensock[i];
            pollfds[pollfd_index].events = POLLIN;
            pollfds[pollfd_index].revents = 0;
            pollfd_index++;
        }

        time_count = 0;
        for(i=0; i<db->context_count; i++){//遍历每一个客户端连接,尝试将其加入poll数组中
            if(db->contexts[i]){
//····

                    /* Local bridges never time out in this fashion. */
                    if(!(db->contexts[i]->keepalive)
                            || db->contexts[i]->bridge
                            || now - db->contexts[i]->last_msg_in < (time_t)(db->contexts[i]->keepalive)*3/2){

                        //在进入poll等待之前,先尝试将未发送的数据发送出去
                        if(mqtt3_db_message_write(db->contexts[i]) == MOSQ_ERR_SUCCESS){
                            pollfds[pollfd_index].fd = db->contexts[i]->sock;
                            pollfds[pollfd_index].events = POLLIN | POLLRDHUP;
                            pollfds[pollfd_index].revents = 0;
                            if(db->contexts[i]->current_out_packet){
                                pollfds[pollfd_index].events |= POLLOUT;
                            }
                            db->contexts[i]->pollfd_index = pollfd_index;
                            pollfd_index++;
                        }else{//尝试发送失败,连接出问题了
                            mqtt3_context_disconnect(db, db->contexts[i]);
                        }
                    }else{//超过1.5倍的时间,超时关闭连接
                        if(db->config->connection_messages == true){
                            _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Client %s has exceeded timeout, disconnecting.", db->co
ntexts[i]->id);
                        }
                        /* Client has exceeded keepalive*1.5 */
                        mqtt3_context_disconnect(db, db->contexts[i]);//关闭连接,清空数据,后续还可以用.sock=INVALID_SOCKET
                    }
                    }else{
#endif
                        if(db->contexts[i]->clean_session == true){
                            //这个连接上次由于什么原因,挂了,设置了clean session,所以这里直接彻底清空其结构
                            mqtt3_context_cleanup(db, db->contexts[i], true);
                            db->contexts[i] = NULL;
                        }else if(db->config->persistent_client_expiration > 0){
                            //协议规定persistent_client的状态必须永久保存,这里避免连接永远无法删除,增加这个超时选项。
                            //也就是如果一个客户端断开连接一段时间了,那么我们会主动干掉他
                            /* This is a persistent client, check to see if the
                             * last time it connected was longer than
                             * persistent_client_expiration seconds ago. If so,
                             * expire it and clean up.
                             */
                            if(now > db->contexts[i]->disconnect_t+db->config->persistent_client_expiration){
                                _mosquitto_log_printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", db-
>contexts[i]->id);
#ifdef WITH_SYS_TREE
                                g_clients_expired++;
#endif
                                db->contexts[i]->clean_session = true;
                                mqtt3_context_cleanup(db, db->contexts[i], true);
                                db->contexts[i] = NULL;
                            }
                        }
#ifdef WITH_BRIDGE
                    }

  然后先使用mqtt3_db_message_timeout_check检测一下超时没有收到客户端回包确认的消息,mosquitto对于超时的消息处理,是会进行重发的。不过理论上,TCP是不需要重发的,具体见这里: MQTT消息推送协议应用数据包超时是否需要重发?  不过,由于mosquitto对于客户端断开连接的处理比较弱,连接重新建立后,使用的相关数据结构还是相同的,因此重发其实也可以,只是这个时候的重发,实际上是在一个连接上没有收到ACK回包,然后后续建立的新连接上进行重传。不是在一个连接上重传。但是这样其实也有很多弊端,比如客户端必须支持消息的持久化记录,否则容易双方对不上话的情况。

int mqtt3_db_message_timeout_check(struct mosquitto_db *db, unsigned int timeout)
{//循环遍历每一个连接的每个消息msg,看起是否超时,如果超时,将消息状态改为上一个状态,从而后续触发重发
    int i;
    time_t threshold;
    enum mosquitto_msg_state new_state = mosq_ms_invalid;
    struct mosquitto *context;
    struct mosquitto_client_msg *msg;

    threshold = mosquitto_time() - timeout;

    for(i=0; i<db->context_count; i++){//遍历每一个连接,
        context = db->contexts[i];
        if(!context) continue;

        msg = context->msgs;
        while(msg){//遍历每个msg消息,看看其状态,如果超时了,那么从上一个消息开始重发.其实不需要重发http://chenzhenianqing.cn/ar
ticles/977.html
            //当然如果这个是复用了之前断开过的连接,那就需要重发。但是,这个时候其实可以重发整个消息的。不然容易出问题,客户端难>
度大
            if(msg->timestamp < threshold && msg->state != mosq_ms_queued){
                switch(msg->state){
                    case mosq_ms_wait_for_puback:
                        new_state = mosq_ms_publish_qos1;
                        break;
                    case mosq_ms_wait_for_pubrec:
                        new_state = mosq_ms_publish_qos2;
                        break;
                    case mosq_ms_wait_for_pubrel:
                        new_state = mosq_ms_send_pubrec;
                        break;
                    case mosq_ms_wait_for_pubcomp:
                        new_state = mosq_ms_resend_pubrel;
                        break;
                    default:
                        break;
                }
                if(new_state != mosq_ms_invalid){
                    msg->timestamp = mosquitto_time();//设置当前时间,下次依据来判断超时

  超时提前检测完成后就可以进入poll等待了。等待完成后,对于有可读事件的连接,调用loop_handle_reads_writes进行事件读写处理,对于监听端口的事件,使用mqtt3_socket_accept去接受新连接。

  loop_handle_reads_writes新事件处理函数比较简单,主体还是循环判断可读可写事件,进行相应的处理。具体不多介绍了,需要关注的是由于是异步读写,所以需要记录上次读写状态,以便下次进入上下午继续读取数据。可写事件由_mosquitto_packet_write完成,可读事件由_mosquitto_packet_read完成。

  新客户端连接的事件则由qtt3_socket_accept完成,其会将新连接放在db->contexts[i]数组的某个空位置,每次都会遍历寻找一个空的槽位放新连接。这里有个小优化其实就是用hints的机制,记录上次的查找位置,避免多次重复的从前面找到后面。

  连接读写事件处理完成后,mosquitto会检测是否需要重新reload部分配置文件。这个由SIGHUP的信号触发。

  限于篇幅,具体的逻辑请求处理下次再介绍了。

三、总结

mosquitto是一个简单可依赖的开源MQTT实现,能支持10W左右的同时在线(未亲测),单机版本,但通过bridge桥接模式支持部分分布式,但有限;协议本身非常适合在移动设备上使用,耗电少,处理快,属于header上带有消息体长度的协议,这个在异步网络事件代码编写时是码农最爱的,哈哈。

对于后续的提高优化的地方,简单记录几点:

  1. 发送数据用writev
  2. poll -> epoll ,用以支持更高的冰法;
  3. 改为单线程版本,降低锁开销,目前锁开销还是非常大的。目测可以改为单进程版本,类似redis,精心维护的话应该能达到不错的效果;
  4. 网络数据读写使用一次尽量多读的方式,避免多次进入系统调用;
  5. 内存操作优化。不free,留着下次用;
  6. 考虑使用spwan-fcgi的形式或者内置一次启动多个实例监听同一个端口。这样能更好的发挥机器性能,达到更高的性能;

参考:

  Mosquitto pub/sub服务实现代码浅析-主体框架

时间: 2024-10-16 02:48:48

Mosquitto pub/sub服务实现代码浅析-主体框架的相关文章

Camera服务之--架构浅析

Camera服务之--架构浅析 分类: Camera 分析2011-12-22 11:17 7685人阅读 评论(3) 收藏 举报 android硬件驱动框架jnilinux内核平台 一.应用层 Camera 的应用层在Android 上表现为直接调用SDK API 开发的一个Camera 应用APK 包.代码在/android/packages/apps/Camera 下.主要对 android.hardware.Camera(在Framework中) 类的调用,并且实现Camera 应用的业

阅读高手编写的类似QQ聊天的服务端代码业务层设计总结

业务层的代码也应该是面向接口编程,先抽象一个接口或是抽象类,规范一些算法或者功能框架,再在其子类或是实现类中完成具体的方法,易于后期代码的维护. 1.业务层缓存技术 如果数据对实时性要求不高,可以把数据缓存在内存中,提高效率.一般都是利用集合来缓存数据.如下代码: /** * 存放写线程的缓存器 * * @author way */ public class OutputThreadMap { private HashMap<Integer, OutputThread> map; <sp

Web服务之Nginx浅析

一.Nginx 简介: nginx [engine x]是Igor Sysoev编写的一个高性能的HTTP和反向代理服务器,另外它也可以作为邮件代理服务器. 在大多数情况下都是用来做静态web服务器和反向代理服务器,在作为反向代理服务器的时候,Nginx可以对后端的real server做负载均衡,基于应用层的负载均衡,但是他仅支持一些常见的协议,如:http.mysql.ftp.smtp. 特性: Nginx是一款面向性能设计的HTTP服务器,相较于Apache.lighttpd具有占有内存少

Socket通信客户端和服务端代码

这两天研究了下Socket通信,简单实现的客户端和服务端代码 先上winfrom图片,客户端和服务端一样 服务端代码: using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Threading.Tasks; usin

七牛云存储android客户端及java服务端代码编写

前一篇博客提到让我很伤心的c应用,由于是一款供用户上传图片的应用,因此必须解决图片存储问题,如果直接将图片存储至服务器,当用户上传图片较多,服务器空间将很快吃紧,同时也没有那么大的带宽,现实中我买的阿里云服务器是最低配置,数据盘才20G,带宽才1M,如果用这样配置的服务器做图片存储,那实在太扯了.于是很自然的想到用图片云存储服务器,通过不断查找资料,最后将目标定位在七牛云和又拍云.在做选择时,主要对比了两者之间的价格及技术优势,也看了很多相关话题讨论,个人认为这两者无论从技术方案还是产品价格,都

问题:Custom tool error: Failed to generate code for the service reference &#39;AppVot;结果:添加Service Reference, 无法为服务生成代码错误的解决办法

添加Service Reference, 无法为服务生成代码错误的解决办法 我的解决方案是Silverlight+WCF的应用,Done Cretiria定义了需要在做完Service端的代码后首先运行事先定义好的Unit Test,确保在客户端使用Service之前Service是可以正确的运行的.在我创建Unit Test之前,需要在测试项目中添加对WCF Service的引用,而这时却出现了错误. Custom tool error: Failed to generate code for

android简单登录注册服务端代码实现

长时间不写,知识都淡忘了!现在实现简单登录注册功能,供以后参考!!!! 项目下载地址: https://github.com/majunm/TestServiceDemo.git 服务器: Tomcat/7.0.40  数据库: mysql5.0 数据库创建: mysql 正确安装 配置完path 后 如下界面: mysql -u root -p    // 连接mysql数据库 create database july;//创建数据库 数据库名 july show databases; //

C#根据WSDL文件生成WebService服务端代码

转自:http://www.cnblogs.com/liyi93/archive/2012/01/30/2332320.html 虽然现在已经进入了.NET FrameWork 4.0的时代,WebService也已经逐渐被淘汰,取而代之的是WCF. 但在工作中难免遇到需要兼容旧版本程序和按照以前的文档进行开发. 一般一个已经实现功能的WebService会发布自己的WSDL文件,供客户端调用生成代理类. 但有时是先有server与client交互的接口定义(WSDL)文件,然后由server和

H5+上传注意要点及服务端代码

// 上传文件 function upload(num) { console.log("num:" + num); console.log("headImg.src.:" + headImg.src); if(num == 0 && headImg.src.indexOf('img/header.png') > -1) { plus.nativeUI.alert("请添加头像图片文件!"); return; } consol