MQTT 开源代理mosquitto的网络层封装相当sucks

最近学习MQTT协议,选择了当前比较流行的MQTT Broker “mosquitto”,但是在阅读代码过程中发现其网络底层库封装的相当差劲。

对于MQTT协议的变长头长度的读取上,基本上采取每次一个byte的方式进行读取判断,对于系统调用read的高代价来讲,真的是相当的浪费,也难怪其不能作为高并发的服务器进行处理。

当然mosquitto需要优化的地方还很多:

1. 使用poll而不是使用epoll (可能是处于跨平台考虑,如果linux下可以使用epoll替换),同时的就是刚才提到的 byte 读取网络数据

2. 订阅树的管理上,对于大量的请求断开或者重练效率比较低

3. 空闲空间管理机制优化和数据包发送方式的修改

4. 内存管理上malloc new 没有使用mem pool机制,在大并发情况下,内存管理容易出现问题

但是从另一个方面讲,mosquitto作为开源的实现,思路上还是比较清晰,为mqtt服务器开发提供了比较完备的参考,这也就是它的价值所在了。

#ifdef WITH_BROKER

int _mosquitto_packet_read(struct mosquitto_db *db, struct mosquitto *mosq)

#else

int _mosquitto_packet_read(struct mosquitto *mosq)

#endif

{

uint8_t byte;

ssize_t read_length;

int rc = 0;

if(!mosq) return MOSQ_ERR_INVAL;

if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;

if(mosq->state == mosq_cs_connect_pending){

return MOSQ_ERR_SUCCESS;

}

/* This gets called if pselect() indicates that there is network data

* available - ie. at least one byte.  What we do depends on what data we

* already have.

* If we‘ve not got a command, attempt to read one and save it. This should

* always work because it‘s only a single byte.

* Then try to read the remaining length. This may fail because it is may

* be more than one byte - will need to save data pending next read if it

* does fail.

* Then try to read the remaining payload, where ‘payload‘ here means the

* combined variable header and actual payload. This is the most likely to

* fail due to longer length, so save current data and current position.

* After all data is read, send to _mosquitto_handle_packet() to deal with.

* Finally, free the memory and reset everything to starting conditions.

*/

if(!mosq->in_packet.command){

read_length = _mosquitto_net_read(mosq, &byte, 1);

if(read_length == 1){

mosq->in_packet.command = byte;

#ifdef WITH_BROKER

#  ifdef WITH_SYS_TREE

g_bytes_received++;

#  endif

/* Clients must send CONNECT as their first command. */

if(!(mosq->bridge) && mosq->state == mosq_cs_new && (byte&0xF0) != CONNECT) return MOSQ_ERR_PROTOCOL;

#endif

}else{

if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */

#ifdef WIN32

errno = WSAGetLastError();

#endif

if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){

return MOSQ_ERR_SUCCESS;

}else{

switch(errno){

case COMPAT_ECONNRESET:

return MOSQ_ERR_CONN_LOST;

default:

return MOSQ_ERR_ERRNO;

}

}

}

}

/* remaining_count is the number of bytes that the remaining_length

* parameter occupied in this incoming packet. We don‘t use it here as such

* (it is used when allocating an outgoing packet), but we must be able to

* determine whether all of the remaining_length parameter has been read.

* remaining_count has three states here:

*   0 means that we haven‘t read any remaining_length bytes

*   <0 means we have read some remaining_length bytes but haven‘t finished

*   >0 means we have finished reading the remaining_length bytes.

*/

if(mosq->in_packet.remaining_count <= 0){

do{

read_length = _mosquitto_net_read(mosq, &byte, 1);

if(read_length == 1){

mosq->in_packet.remaining_count--;

/* Max 4 bytes length for remaining length as defined by protocol.

* Anything more likely means a broken/malicious client.

*/

if(mosq->in_packet.remaining_count < -4) return MOSQ_ERR_PROTOCOL;

#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)

g_bytes_received++;

#endif

mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;

mosq->in_packet.remaining_mult *= 128;

}else{

if(read_length == 0) return MOSQ_ERR_CONN_LOST; /* EOF */

#ifdef WIN32

errno = WSAGetLastError();

#endif

if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){

return MOSQ_ERR_SUCCESS;

}else{

switch(errno){

case COMPAT_ECONNRESET:

return MOSQ_ERR_CONN_LOST;

default:

return MOSQ_ERR_ERRNO;

}

}

}

}while((byte & 128) != 0);

/* We have finished reading remaining_length, so make remaining_count

* positive. */

mosq->in_packet.remaining_count *= -1;

if(mosq->in_packet.remaining_length > 0){

mosq->in_packet.payload = _mosquitto_malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));

if(!mosq->in_packet.payload) return MOSQ_ERR_NOMEM;

mosq->in_packet.to_process = mosq->in_packet.remaining_length;

}

}

while(mosq->in_packet.to_process>0){

read_length = _mosquitto_net_read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);

if(read_length > 0){

#if defined(WITH_BROKER) && defined(WITH_SYS_TREE)

g_bytes_received += read_length;

#endif

mosq->in_packet.to_process -= read_length;

mosq->in_packet.pos += read_length;

}else{

#ifdef WIN32

errno = WSAGetLastError();

#endif

if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){

if(mosq->in_packet.to_process > 1000){

/* Update last_msg_in time if more than 1000 bytes left to

* receive. Helps when receiving large messages.

* This is an arbitrary limit, but with some consideration.

* If a client can‘t send 1000 bytes in a second it

* probably shouldn‘t be using a 1 second keep alive. */

pthread_mutex_lock(&mosq->msgtime_mutex);

mosq->last_msg_in = mosquitto_time();

pthread_mutex_unlock(&mosq->msgtime_mutex);

}

return MOSQ_ERR_SUCCESS;

}else{

switch(errno){

case COMPAT_ECONNRESET:

return MOSQ_ERR_CONN_LOST;

default:

return MOSQ_ERR_ERRNO;

}

}

}

}

/* All data for this packet is read. */

mosq->in_packet.pos = 0;

#ifdef WITH_BROKER

#  ifdef WITH_SYS_TREE

g_msgs_received++;

if(((mosq->in_packet.command)&0xF5) == PUBLISH){

g_pub_msgs_received++;

}

#  endif

rc = mqtt3_packet_handle(db, mosq);

#else

rc = _mosquitto_packet_handle(mosq);

#endif

/* Free data and reset values */

_mosquitto_packet_cleanup(&mosq->in_packet);

pthread_mutex_lock(&mosq->msgtime_mutex);

mosq->last_msg_in = mosquitto_time();

pthread_mutex_unlock(&mosq->msgtime_mutex);

return rc;

}

时间: 2024-10-16 19:07:04

MQTT 开源代理mosquitto的网络层封装相当sucks的相关文章

MQTT学习笔记——树莓派MQTT客户端 使用Mosquitto和paho-python

0 前言 本文说明如何在树莓派上安装Mosquitto.本文通过两个简单的例子说明树莓派中如何使用MQTT协议实现消息订阅,这些例子包括Mosquitto_sub指令实现消息订阅和paho-python扩展库实现GPIO端口的远程控制.本文中使用了两个工具--Mosquitto paho-python,其中Mosquitto是一款实现了 MQTT v3.1 协议的开源消息代理软件,提供轻量级的,支持发布/订阅的的消息推送模式,使设备对设备之间的消息通信简单易用:另外,paho-python是一个

MQTT学习笔记——MQTT协议体验 Mosquitto安装和使用

0 前言 MQTT是IBM开发的一个即时通讯协议.MQTT是面向M2M和物联网的连接协议,采用轻量级发布和订阅消息传输机制.Mosquitto是一款实现了 MQTT v3.1 协议的开源消息代理软件,提供轻量级的,支持发布/订阅的的消息推送模式,使设备对设备之间的短消息通信简单易用. 若初次接触MQTT协议,可先理解以下概念: [MQTT协议特点]--相比于RESTful架构的物联网系统,MQTT协议借助消息推送功能,可以更好地实现远程控制. [MQTT协议角色]--在RESTful架构的物联网

动态代理实现横切——封装事务

上节课中,通过现象2可知道:如果对各个实现相同的控制,则需要重复写大量的代码.比如说,写日志,事务的开启,关闭,回滚等一系列操作. 但是在开发的过程中,如果经常注意以上的开发,那开发效率将很低的.而且还容易出错. 面对上面的问题,如果只是面向对象的编程.那开发的时候,程序员不仅要专注于业务逻辑的Coding,而且还要在后面写上日志的处理办法,事务的开启关闭等一系列与业务逻辑无关的代码.log.write(class,operate)-- 效率上肯定是要打折扣的.而且,把这种重复性的工作交给人力.

采用动态代理对事务进行封装

在上篇博客中介绍了使用ThreadLocal维护Connection的方法,这样做的最大的好处就是不用来回的传递Connection了,但是我们有会发现在我们使用事务的时候不可避免的会写许多重复的代码,这些都是与业务逻辑无关的: Connection conn = ConnectionManage.GetConnection(); ConnectionManage.beginTransaction(conn); ConnectionManage.commitTransaction(conn);/

MQTT服务器(Broker) - mosquitto配置文件详解

常规配置 #使用每个侦听器的安全设置. # #建议先设置此选项. # #如果此选项设置为true,则所有身份验证和访问控制 #选项是根据每个侦听器控制的.以下选项是 #受影响的有: # # password_file acl_file psk_file auth_plugin auth_opt_* allow_anonymous # auto_id_prefix allow_zero_length_clientid # # 请注意,如果设置为true,则断开连接的持久客户端(即,干净会话设置为f

使用Swift的代理,闭包来封装一个公用协议减少垃圾代码

iOS开发中,如果不进行适当的封装,使用协议或者继承类来进行开发,你就会遇到传说中的ViewController(以后简称VC) Hell的问题…… 比如说,我们先声网App中为了调用接口,做简单的判断,会有如下的垃圾代码(前辈遗留下来的): override func viewDidLoad() { super.viewDidLoad() var color = UIColor(red: 153/255, green: 204/255, blue: 204/255, alpha: 1) sel

TTP代理原装TTP233D-RB6 DFN6封装薄,体积小 TTP原厂工程服务技术支持

产品型号:TTP233D- RB6封装形式:DFN6L产品年份:新年份单按键触摸检测 IC 概 述 ●TTP233D-RB6 TonTouchTM 是单按键触摸检测芯片, 此触摸检测芯片内建稳压电路, 提供 稳定的电压给触摸感应电路使用, 稳定的触摸检测效果可以广泛的满足不同应用的需求, 此触摸检测芯片是专为取代传统按键而设计, 触摸检测 PAD 的大小可依不同的灵敏度设 计在合理的范围内, 低功耗与宽工作电压, 是此触摸芯片在 DC 或 AC 应用上的特性. 特 点 ●工作电压 2.4V ~

iOS数据库离线缓存思路和网络层封装

一直想总结一下关于iOS的离线数据缓存的方面的问题,然后最近也简单的对AFN进行了再次封装,所有想把这两个结合起来写一下.数据展示型的页面做离线缓存可以有更好的用户体验,用户在离线环境下仍然可以获取一些数据,这里的数据缓存首选肯定是SQLite,轻量级,对数据的存储读取相对于其他几种方式有优势,这里对AFN的封装没有涉及太多业务逻辑层面的需求,主要还是对一些方法再次封装方便使用,解除项目对第三方的耦合性,能够简单的快速的更换底层使用的网络请求代码.这篇主要写离线缓存思路,对AFN的封装只做简单的

mqtt开源服务器 EMQX 使用指南

官网: https://docs.emqx.io/broker/v3/cn/getstarted.html#mqtt-clients 一.安装启动 # 各平台下载https://www.emqx.io/downloads/broker?osType=Linux # 解压(例子linux) , window RARunzip emqx-macosx-v3.2.0.zip && cd emqx # 启动emqx ./bin/emqx start # 检查运行状态 ./bin/emqx_ctl