Swoole源码学习记录(十一)——Worker,Connection

Swoole版本:1.7.5-stable

本章将分析Swoole中的三个比较重要的模块,Worker,ReactorProcess和Connection。其中Worker和ReactorProcess其实是对前面三章的一个补充,在前面的章节中为了分析结果的流畅性没有针对这些模块做特定分析,在此做出补充。

Worker模块

首先是Worker模块。Worker在Swoole中为核心工作进程的封装,包括用于处理核心逻辑的worker和用于处理任务的task_worker。在Swoole中使用了结构体swWorker来封装worker进程的相关属性,其声明在swoole.h文件中的727 – 787行,其声明如下:

struct _swWorker
{
         /**
          * worker process
          */
         pid_tpid;

         /**
          * worker thread
          */
         pthread_ttid;

         swProcessPool*pool;

         swMemoryPool*pool_output;

         swQueue*queue;

         /**
          * redirect stdout to pipe_master
          */
         uint8_tredirect_stdout;

         /**
          * worker status, IDLE or BUSY
          */
         uint8_tstatus;

         uint8_tipc_mode;

         /**
          * redirect stdin to pipe_worker
          */
         uint8_tredirect_stdin;

         /**
          * worker id
          */
         uint32_tid;

         /**
          * eventfd, process notify
          */
         swPipe*notify;

         /**
          * share memory store
          */
         struct
         {
                   uint8_tlock;
                   void*ptr;
         }store;

         intpipe_master;
         intpipe_worker;
         intpipe;
         intreactor_id;
         void*ptr;
         void*ptr2;
};

标有注释的变量就不说明了,大家一看就明白。剩下的几个,pool是个进程池,用于分配task_worker,pool_output用于存放task_worker执行结束后的结果,queue是消息队列,ipc_mode是进程间的通讯模式,pipe_master和pipe_worker是管道的fd,分别用于写消息到master进程和从master读取消息,pipe当然就是管道的id,reactor_id是该worker归属的reactor的标志。(ptr和ptr2待补充,实在没找到这俩变量在哪用的)

这里做一点补充,一个Worker中有两个管道,一个管道用于和master进程通信,一个管道用于和Reactor通信。而实际上如果指定了消息队列模式,则通信方式都是通过读写swQueue队列来实现的。这几点在之前的分析中已经有说明了,再此补充说明一下。

swWorker有四个操作函数,这些函数声明在Server.h文件中的545 – 548行,其声明如下:

int swWorker_create(swWorker *worker);
void swWorker_free(swWorker *worker);
void swWorker_signal_init(void);
void swWorker_signal_handler(int signo);

这四个函数声明在Worker.c中,其中swWorker_free只是释放了worker的store内存并且关闭了notify管道(不接收reactor消息),swWorker_signal_init指定了对应信号的回调函数,swWorker_signal_handler规定了对应信号的操作(基本——没内容,看一看就好……),这里只贴出swWorker_create的源码:

         void*store = sw_shm_malloc(SwooleG.serv->buffer_output_size);
         if(store == NULL)
         {
                   swWarn("mallocfor worker->store failed.");
                   returnSW_ERR;
         }

         swPipe*worker_notify = sw_malloc(sizeof(swPipe));
         if(worker_notify == NULL)
         {
                   swWarn("mallocfor worker->notify failed.");
                   sw_shm_free(store);
                   returnSW_ERR;
         }

         /**
          * Create notify pipe
          */
         if(swPipeNotify_auto(worker_notify, 1, 0))
         {
                   sw_shm_free(store);
                   sw_free(worker_notify);
                   returnSW_ERR;
         }

源码解释:创建共享内存store,分配通信管道notify的内存,并调用swPipeNotify_auto创建实际管道(根据内核版本决定使用eventfd管道还是Base管道)。

Connection模块

Connection模块应该是少有的有自己独立的头文件和C文件的模块了……

Connection用于存储一个实际的连接(C-to-S),用来存放一些相关的变量,比如连接时长、上一次响应时间之类的,当然另一个好处就在于可以给这个连接加一个SSL控制然后就能实现收发数据的安全加密解密了对不对(好像很厉害的样子)……

首先是swConnection结构体,该结构体封装了所有Connection的属性变量,其声明在Connection.c文件中,声明如下:

typedef struct _swConnection
{
         /**
          * is active
          * system fd must be 0. en: timerfd, signalfd,listen socket
          */
         uint8_tactive;

         /**
          * file descript
          */
         intfd;

         /**
          * ReactorThread id
          */
         uint16_tfrom_id;

         /**
          * from which socket fd
          */
         uint16_tfrom_fd;

         /**
          * socket address
          */
         structsockaddr_in addr;

         /**
          * link any thing
          */
         void*object;

         /**
          * input buffer
          */
         swBuffer*in_buffer;

         /**
          * output buffer
          */
         swBuffer*out_buffer;

         /**
          * connect time(seconds)
          */
         time_tconnect_time;

         /**
          * received time with last data
          */
         time_tlast_time;

#ifdef SW_USE_OPENSSL
         SSL*ssl;
         uint32_tssl_state;
#endif

} swConnection;

基本上每一个变量都有注释我就不多废话了……不过这里说明一下,swConnection中的fd变量存放的是该连接所对应的描述符,而那个from_fd吧……经过一系列的排查我最终发现……from_fd是Server创建的监听fd。

(大概说一下排查过程,首先找到创建Connection的函数,因为知道Connection大概会在Server处创建,所以在Server.h中找到swServer_connection_new,然后看每个变量的具体赋值,然后发现from_fd是参数中的swDataHead赋予的,于是回追到调用swServer_connection_new的函数swServer_master_onAccept,然后就发现fd来自于accept的返回值,是一个新创建的socket连接,而from_fd是server创建的用于监听的socketfd)

Connection的操作函数分布在两个地方,new和close函数声明在Server.h的476 – 477 行,而其他操作函数都在Connection.h文件中声明(收发数据、操作缓存)。

首先是在Server.h中用于新建和关闭连接的函数,其声明如下:

swConnection* swServer_connection_new(swServer *serv, swDataHead *ev);
int swServer_connection_close(swServer*serv, int fd, int notify);

这两个函数的具体定义在Server.c文件中,swServer_connection_new函数定义在文件最末尾,其核心代码如下:

   int conn_fd = ev->fd;
   swConnection* connection = NULL;

   SwooleStats->accept_count++;
   sw_atomic_fetch_add(&SwooleStats->connection_num, 1);

   if (conn_fd > swServer_get_maxfd(serv))
    {
       swServer_set_maxfd(serv, conn_fd);
#ifdef SW_CONNECTION_LIST_EXPAND
       //新的fd超过了最大fd
        //需要扩容
       if (conn_fd == serv->connection_list_capacity - 1)
       {
           void *new_ptr = sw_shm_realloc(serv->connection_list,sizeof(swConnection)*(serv->connection_list_capacity +SW_CONNECTION_LIST_EXPAND));
           if (new_ptr == NULL)
           {
                swWarn("connection_listrealloc fail");
                return SW_ERR;
           }
           else
           {
               serv->connection_list_capacity += SW_CONNECTION_LIST_EXPAND;
                serv->connection_list =(swConnection *)new_ptr;
           }
       }
#endif
    }

   connection = &(serv->connection_list[conn_fd]);

   connection->fd = conn_fd;
   connection->from_id = ev->from_id;
   connection->from_fd = ev->from_fd;
   connection->connect_time = SwooleGS->now;
   connection->last_time = SwooleGS->now;
   connection->active= 1; //使此连接激活,必须在最后,保证线程安全

源码解释:首先将accept_count和connection_num两个计数器加1。如果新的conn_fd超过了serv中连接列表的最大容量,就将连接列表扩容(注意到这里调用的是sw_shm_realloc,也就是connection_list的内存是可共享的)。随后将新的conn_fd加入连接列表中,然后给connection的属性赋值。主要的属性有:连接描述符fd,来自于哪个reactor、来自于哪个监听fd、连接的时间和上一次响应时间、连接的active状态。

然后是swServer_connection_close函数,该函数也定义在Server.c的末尾,让我没想到的是一个close函数也有这么长,以至于我不得不分段进行分析……

	conn->active = 0;
         /**
          * Close count
          */
         sw_atomic_fetch_add(&SwooleStats->close_count,1);
         sw_atomic_fetch_sub(&SwooleStats->connection_num,1);

         intreactor_id = conn->from_id;

         reactor= &(serv->reactor_threads[reactor_id].reactor);

源码解释:首先将conn设置为关闭状态,然后修改两个计数器close_count和connection_num.随后根据conn中的reactor_id从serv的reactor列表中获取对应的reactor。

   //释放缓存区占用的内存
   if (serv->open_eof_check)
    {
       if (conn->in_buffer)
       {
           swBuffer_free(conn->in_buffer);
           conn->in_buffer = NULL;
       }
    }
   else if (serv->open_length_check)
    {
       if (conn->object)
       {
           swString_free(conn->object);
       }
    }
   else if (serv->open_http_protocol)
    {
       if (conn->object)
       {
           swHttpRequest *request = (swHttpRequest *) conn->object;
           if (request->state > 0 && request->buffer)
           {
                swTrace("ConnectionClose.free buffer=%p, request=%p\n", request->buffer, request);
                swString_free(request->buffer);
                bzero(request,sizeof(swHttpRequest));
           }
       }
    }
    if(conn->out_buffer != NULL)
    {
       swBuffer_free(conn->out_buffer);
       conn->out_buffer = NULL;
    }

   if (conn->in_buffer != NULL)
    {
       swBuffer_free(conn->in_buffer);
       conn->in_buffer = NULL;
    }

源码解释:这里用于释放conn开启的缓存区。如果开启了eof_check,则需要将conn的输入缓存释放(这里放着没有检测到eof的数据);如果打开了length_check,则将object指向的对象释放;如果使用了http协议,则释放object指向的swHttpRequest对象。最后,释放out_buffer和in_buffer。

   //通知到worker进程
   if (serv->onClose != NULL && notify == 1)
    {
       //通知worker进程
       notify_ev.from_id = reactor_id;
       notify_ev.fd = fd;
       notify_ev.type = SW_EVENT_CLOSE;
       SwooleG.factory->notify(SwooleG.factory, ¬ify_ev);
    }

源码解释:如果设置了onClose回调并且指定需要通知,则调用factory的notify方法发送通知到worker进程。(所以发现onClose回调异常的童鞋可以通过这里查找bug)

#ifdef SW_USE_OPENSSL
         if(conn->ssl)
         {
                   swSSL_close(conn);
         }
#endif

   /**
    * reset maxfd, for connection_list
    */
   if (fd == swServer_get_maxfd(serv))
    {
       SwooleG.lock.lock(&SwooleG.lock);
       int find_max_fd = fd - 1;
       swTrace("set_maxfd=%d|close_fd=%d\n", find_max_fd, fd);
       /**
        * Find the new max_fd
        */
       for (; serv->connection_list[find_max_fd].active == 0 &&find_max_fd > swServer_get_minfd(serv); find_max_fd--);
       swServer_set_maxfd(serv, find_max_fd);
       SwooleG.lock.unlock(&SwooleG.lock);
    }

     //关闭此连接,必须放在最前面,以保证线程安全
     returnreactor->del(reactor, fd);

源码解释:如果使用了SSL功能,则需要从SSL中移除该SSL。如果关闭的fd是maxfd,则需要重新设置serv中的连接列表,操作步骤为先锁住全局变量SwooleG,然后从末尾循环便利connection_list直到找到当前最大的fd,最后解锁。最后一步,从reactor中移除监听的fd(谁能给我解释一下这句注释啥意思,很急,在线等)

在Connection.h中一共声明了10个操作函数,其中三个内联函数。据我目测,实际上原来应该没有swConnection_recv和swConnection_send这两个函数,应该是有了SSL特性后专门添加上的,因为这俩函数就只是判断了一下有没有开启SSL特性,如果开启了并且conn设置了ssl,则调用ssl的安全读写方法。swConnection_error函数也不具体分析了,只是根据errcode的值返回对应的SW_*。

另外7个操作函数声明如下:

int swConnection_send_blocking(int fd, void*data, int length, int timeout);
int swConnection_buffer_send(swConnection*conn);

swString *swConnection_get_string_buffer(swConnection *conn);
void swConnection_clear_string_buffer(swConnection *conn);
volatile swBuffer_trunk*swConnection_get_out_buffer(swConnection *conn, uint32_t type);
volatile swBuffer_trunk*swConnection_get_in_buffer(swConnection *conn);
int swConnection_sendfile(swConnection*conn, char *filename);

可以看到基本上都是用于发送数据、文件以及操作缓存区的函数,下面一个个上分析。

首先是swConnection_send_blocking函数,该函数是一个阻塞式的发送函数,其源码如下:

    while (writen > 0)
    {
       if (swSocket_wait(fd, timeout, SW_EVENT_WRITE) < 0)
       {
           return SW_ERR;
       }
       else
       {
           n = send(fd, data, writen, MSG_NOSIGNAL | MSG_DONTWAIT);
           if (n < 0)
           {
                swWarn("send() failed.Error: %s[%d]", strerror(errno), errno);
                return SW_ERR;
           }
           else
           {
                writen -= n;
                continue;
           }
       }
    }

源码解释:首先调用swSocket_wait函数(调用poll函数监听socket直到socket满足监听条件)监听到fd可写,随后调用send函数发送数据;循环写入直到所有数据都写入fd。

这里我补充说明一下swoole的Buffer.c,不作详细分析,只说明其功能。Buffer是个链表式结构,每个链表节点是一个trunk,trunk用于存放具体的数据。

下面分析swConnection_buffer_send函数,这个函数是将conn中的输出缓存区中的数据发出,核心源码如下:

   swBuffer *buffer= conn->out_buffer;
   swBuffer_trunk *trunk = swBuffer_get_trunk(buffer);
   sendn = trunk->length - trunk->offset;

   if (sendn == 0)
    {
       swBuffer_pop_trunk(buffer, trunk);
       return SW_CONTINUE;
    }
   ret = swConnection_send(conn, trunk->store.ptr + trunk->offset,sendn, 0);
   //printf("BufferOut:reactor=%d|sendn=%d|ret=%d|trunk->offset=%d|trunk_len=%d\n",reactor->id, sendn, ret, trunk->offset, trunk->length);
   if (ret < 0)
    {
       switch (swConnection_error(errno))
       {
       case SW_ERROR:
           swWarn("send to fd[%d] failed. Error: %s[%d]", conn->fd,strerror(errno), errno);
           return SW_OK;
       case SW_CLOSE:
           return SW_CLOSE;
       case SW_WAIT:
           return SW_WAIT;
       default:
           return SW_CONTINUE;
       }
    }
   //trunk full send
   else if (ret == sendn || sendn == 0)
    {
       swBuffer_pop_trunk(buffer, trunk);
    }
   else
    {
       trunk->offset += ret;
    }

源码解释:首先从conn中的输出缓存中拿到首部的trunk,并计算该trunk还有多少数据需要发送。如果已经发送完了,弹出该trunk,继续发送下一个trunk。如果没有发送完,调用swConnection_send方法发送剩余数据。随后从trunk中移除已经发送的数据,并再次判定是否发送结束。

swConnection中有个void* object变量,这个指针可以用来指向一些奇奇怪怪的东西,比如一个swString对象,于是就有了两个操作函数swConnection_get_string_buffer和swConnection_clear_string_buffer,这俩函数不贴代码了,一个new一个free而已。

接着分析两个操作buffer的函数。首先是swConnection_get_in_buffer,该函数的作用是创建一个新的trunk加入到in_buffer中,并返回这个trunk的指针,其核心源码如下:

    if (conn->in_buffer == NULL)
    {
       buffer = swBuffer_new(SW_BUFFER_SIZE);
       //buffer create failed
       if (buffer == NULL)
       {
           return NULL;
       }
       //new trunk
       trunk = swBuffer_new_trunk(buffer, SW_TRUNK_DATA,buffer->trunk_size);
       if (trunk == NULL)
       {
           sw_free(buffer);
           return NULL;
       }
       conn->in_buffer = buffer;
    }
   else
    {
       buffer = conn->in_buffer;
       trunk = buffer->tail;
       if (trunk == NULL || trunk->length == buffer->trunk_size)
       {
           trunk = swBuffer_new_trunk(buffer, SW_TRUNK_DATA,buffer->trunk_size);
       }
    }

源码解释:如果in_buffe为空,则创建一个swBuffer,并在这个buffer中创建一个新的trunk;否则,直接在in_buffer中创建trunk并添加到buffer末尾。

swConnection_get_out_buffer用于从out_buffer中拿到一个trunk。核心源码如下:

    if (conn->out_buffer == NULL)
    {
       conn->out_buffer = swBuffer_new(SW_BUFFER_SIZE);
       if (conn->out_buffer == NULL)
       {
           return NULL;
       }
    }
   if (type == SW_TRUNK_SENDFILE)
    {
       trunk = swBuffer_new_trunk(conn->out_buffer, SW_TRUNK_SENDFILE, 0);
    }
    else
    {
       trunk = swBuffer_get_trunk(conn->out_buffer);
       if (trunk == NULL)
       {
           trunk = swBuffer_new_trunk(conn->out_buffer, SW_TRUNK_DATA,conn->out_buffer->trunk_size);
       }
    }

源码解释:如果out_buffer为空,尝试创建,如果创建失败,返回NULL。然后判定type类型,如果为SENDFILEl类型,则直接创建type类型的trunk;如果不是,则尝试从buffer中获取首部的trunk,如果该trunk为空,则创建新的trunk。

这里重点分析一下swConnection_sendfile函数,该函数用于在out_buffer中添加一个用于sendfile的trunk。其核心源码如下:

    if(conn->out_buffer == NULL)
    {
       conn->out_buffer = swBuffer_new(SW_BUFFER_SIZE);
       if (conn->out_buffer == NULL)
       {
           return SW_ERR;
       }
    }

   swBuffer_trunk *trunk = swBuffer_new_trunk(conn->out_buffer,SW_TRUNK_SENDFILE, 0);
   if (trunk == NULL)
    {
       swWarn("get out_buffer trunk failed.");
       return SW_ERR;
    }
   swTask_sendfile *task = sw_malloc(sizeof(swTask_sendfile));
   if (task == NULL)
    {
       swWarn("malloc for swTask_sendfile failed.");
       //TODO: 回收这里的内存
       return SW_ERR;
    }
   bzero(task, sizeof(swTask_sendfile));

   task->filename = strdup(filename);
   int file_fd = open(filename, O_RDONLY);
   if (file_fd < 0)
    {
       swWarn("open file[%s] failed. Error: %s[%d]",task->filename, strerror(errno), errno);
       return SW_ERR;
    }
   struct stat file_stat;
   if (fstat(file_fd, &file_stat) < 0)
    {
       swWarn("swoole_async_readfile: fstat failed. Error: %s[%d]",strerror(errno), errno);
       return SW_ERR;
    }

   task->filesize = file_stat.st_size;
   task->fd = file_fd;
   trunk->store.ptr= (void *)task;

源码解释:先判定out_buffer是否存在,如果不存在则创建,随后在out_buffer中创建一个类型为sendfile的trunk。创建一个sendfile的task(swTask_sendfile),指定文件名以及文件的描述符fd,通过fstat函数获取文件的大小。随后将task添加到trunk的存储空间中。

以上就是Connection.c模块的相关分析。这章文字不多,代码不少,各位读者见谅……

(原计划分析ReactorProcess模块,但是发现这里需要和ProcessPool一起分析,因此单独开一章。)

时间: 2024-10-06 22:27:29

Swoole源码学习记录(十一)——Worker,Connection的相关文章

Swoole源码学习记录(九)——Factory模块(上)

Swoole版本:1.7.5-stable Factory这个命名让我一度认为这是一个工厂模型--这个工厂实际上并不负责生产实例,而是根据类型的不同执行两项任务:Factory实现的功能是一个任务中心,一个task请求进入Factory,会进过dispatch分配.onTask处理.onFinish交付结果一系列流程:FactoryProcess用于管理manager和worker进程,也有对单独的writer线程的管理. (PS:Swoole源码中有FactoryThread模块,该模块是一个

Swoole源码学习记录(十四)——Server模块详解(下)

swoole版本:1.7.6-stable 上一章已经分析了如何启动swServer的相关函数.本章将继续分析swServer的相关函数, 1.swServer函数分析 swServer_addListener 该函数用于在swServer中添加一个需要监听的host及port.函数原型如下: // Server.h 438h int swServer_addListener(swServer *serv, int type, char *host,int port); 参数 说明 swServ

Swoole源码学习记录(十)——Factory模块(下)

Swoole版本:1.7.5-stable 本章将分析FactoryProcess.c中剩下的函数,这些函数用于操作worker.manager以及writer.这些函数提供了最核心的进程创建.管理等功能,是Swoole的master-worker结构的基石. 先从worker相关的函数开始(manager相关函数基本都涉及操作worker进程).在FactoryProcess.c中一共声明了4个操作函数,分别是: static int swFactoryProcess_worker_loop(

Swoole源码学习记录(十三)——Server模块详解(上)

终于可以正式进入Server.c模块了-- 在之前的分析中,可以看到很多相关模块的声明都已经写在了Server.h中,就是因为这些模块构成了Server的核心部分.而Server本身,则是一个最上层的对象,它包括了核心的Reactor和Factory模块,存放了消息队列的key值,控制着全部的Connection,所有PHP层面的回调函数也在这里指定:同时,Server存放了大量的属性值,这些值决定了整个Swoole的详细特征. 直接上源码.首先是swServer结构体,这个结构体定义了一个sw

Swoole源码学习记录(十二)——ReactorThread模块

ReactorThread 这一章将分析Swoole的ReactorThread模块.虽然叫Thread,但是实际上使用的是swFactoryProcess也就是多进程模式.但是,在ReactorThread中,所有的事件监听是在线程中运行的(Rango只是简单提到了PHP不支持多线程安全,具体原因还有待请教--),比如在UDP模式下,是针对每一个监听的host开辟一个线程运行reactor,在TCP模式下,则是开启指定的reactor_num个线程用于运行reactor. 那么OK,先上swR

Swoole源码学习记录(八)——Reactor模块-epoll

Swoole版本:1.7.5-beta Reactor模块可以说是Swoole中最核心的模块之一,正是这些reactor模型为swoole提供了异步操作的基础.Swoole中根据不同的内核函数,提供了四种Reactor封装,ReactorEpoll,ReactorKqueue,ReactorPoll和ReactorSelect.同时,Swoole通过结构体swReactor封装了对于reactor的操作函数和基本属性.本章,我将分析swReactor以及四种Reactor模型中的ReactorE

Swoole源码学习记录(十五)——Timer模块分析

swoole版本:1.7.7-stable Github地址:点此查看 1.Timer 1.1.swTimer_interval_node 声明: // swoole.h 1045-1050h typedef struct _swTimer_interval_node { struct _swTimerList_node *next, *prev; struct timeval lasttime; uint32_t interval; } swTimer_interval_node; 成员 说明

Mybatis源码学习记录

一.对源码先上一个结构图: 源代码主要在org.apache.ibatis目录下,18个包,其中在应用中主要的包有:builder.session.cache.type.transaction.datasource.jdbc.mapping,提供支撑服务的包有annotation.binding.io.logging.plugin.reflection.scripting.exception.executor.parsing 二.从使用入手 MyBatis使用的三板斧是SqlSessionFac

select 源码学习记录

先记录一下学习的成果,慢慢完善 四个相关函数 fd_set的结构在上一篇中有讲,同时解释了为什么最大1024. -fd_set为1024/32的long型数组结构体.也就是结构体里面保存了long型数组 int FD_ZERO(int fd, fd_set *fdset); int FD_CLR(int fd, fd_set *fdset); int FD_SET(int fd, fd_set *fd_set); int FD_ISSET(int fd, fd_set *fdset); FD_Z