Swoole源码学习记录(九)——Factory模块(上)

Swoole版本:1.7.5-stable

Factory这个命名让我一度认为这是一个工厂模型……这个工厂实际上并不负责生产实例,而是根据类型的不同执行两项任务:Factory实现的功能是一个任务中心,一个task请求进入Factory,会进过dispatch分配、onTask处理、onFinish交付结果一系列流程;FactoryProcess用于管理manager和worker进程,也有对单独的writer线程的管理。

(PS:Swoole源码中有FactoryThread模块,该模块是一个多线程模型,根据开发者Rango韩少的解释,因为PHP不支持多线程,所以无法使用这个模块,因此该模块被废弃了。而实际上,FactoryThread比FactoryProcess要更简洁……)

Factory模块

Factory模块的相关声明在Server.h头文件中。首先是一个在Factory模块中被用到的结构体swDispatchData,该结构体声明在Server.h的145 – 149 行,声明如下:

typedef struct
{
   long target_worker_id;
   swEventData data;
} swDispatchData;

swDispatchData存放了一个目标worker进程的id和一条数据,该结构体用于传递数据给task进程进行处理。

Swoole中用swFactory结构体封装了Factory模块的相关操作,其声明在Server.h文件中的151 – 168行,其声明如下:

struct _swFactory
{
   void *object;
   void *ptr; //server object
   int last_from_id;

   swReactor *reactor; //reserve for reactor

   int (*start)(struct _swFactory *);
   int (*shutdown)(struct _swFactory *);
   int (*dispatch)(struct _swFactory *, swDispatchData *);
   int (*finish)(struct _swFactory *, swSendData *);
   int (*notify)(struct _swFactory *, swDataHead *);    //send a event notify
   int (*end)(struct _swFactory *, swDataHead *);

   int (*onTask)(struct _swFactory *, swEventData *task); //workerfunction.get a task,goto to work
   int (*onFinish)(struct _swFactory *, swSendData *result); //factoryworker finish.callback
};

其中object用于存放一个具体的Factory类型(FactoryProcess or FactoryThread),last_from_id存放了最近一个通过该Factory发送消息的reactor的id。

swFactory的全部操作函数都声明在Server.h的176 – 183行,声明如下:

int swFactory_create(swFactory *factory);
int swFactory_start(swFactory *factory);
int swFactory_shutdown(swFactory *factory);
int swFactory_dispatch(swFactory *factory,swDispatchData *req);
int swFactory_finish(swFactory *factory,swSendData *_send);
int swFactory_notify(swFactory *factory,swDataHead *event);
int swFactory_end(swFactory *factory,swDataHead *cev);
int swFactory_check_callback(swFactory*factory);

这些函数在Factory.c中被定义。其中,start和shutdown函数简单返回SW_OK,create函数仅将传入的factory中的各个函数赋值(其实onTask和onFinish还是外部赋值……),callback函数仅仅检查onTask和onFinish两个函数指针是否为空,剩下的函数都是根据传入的参数调用对应的PHP回调函数(dispatch调用onTask,notify调用onClose和onConnect,end调用onClose并调用swServer_connection_close函数(详细分析见附录)关闭对应的connect连接)

这里需要分析一下swFactory_finish函数。swFactory_finish是一个通道,它的作用是将task运行结束后的数据发送给对应的Reactor。其核心源码如下:

   //unix dgram
   if (resp->info.type == SW_EVENT_UNIX_DGRAM)
    {
       socklen_t len;
       struct sockaddr_un addr_un;
       int from_sock = resp->info.from_fd;

       addr_un.sun_family = AF_UNIX;
       memcpy(addr_un.sun_path, resp->sun_path, resp->sun_path_len);
       len = sizeof(addr_un);
       ret = swSendto(from_sock, resp->data, resp->info.len, 0, (structsockaddr *) &addr_un, len);
       goto finish;
    }
   //UDP pacakge
   else if (resp->info.type == SW_EVENT_UDP || resp->info.type ==SW_EVENT_UDP6)
    {
       ret = swServer_udp_send(serv, resp);
       goto finish;
    }
   else
    {
       resp->length = resp->info.len;
       ret = swReactorThread_send(resp);
    }

源码解释:这里首先判定swDataHead* resp应答中的type类型,如果type类型是Unix sock的数据包类型,则调用swSendto函数(详细分析见附录)发送数据到指定的Reactor;如果type类型是UDP,则调用swServer_udp_send(详细分析见附录)发送数据;否则,调用swReactorThread_send函数发送数据。

FactoryProcess模块

FactoryProcess模块是Swoole的进程管理模块,是Swoole另一个核心组件。通过该模块,Swoole能有效的调度和管理Master进程和多个Worker进程。

FactoryProcess的结构体swFactoryProcess声明在Server.h文件的170 – 174行,声明如下:

typedef struct _swFactoryProcess
{
   swPipe *pipes;
   int writer_pti; //current writer id
} swFactoryProcess;

第一个变量pipes存放用于进程间通信的管道,从后续实现分析这个Pipe仅用于接收其他进程发来的消息;第二个参数定义了当前writer线程的id,这个writer线程用于进程发送数据。

swFactoryProcess的四个公共操作函数声明在Server.h文件中的185 – 188行,其声明如下:

int swFactoryProcess_create(swFactory*factory, int writer_num, int worker_num);
int swFactoryProcess_start(swFactory*factory);
int swFactoryProcess_shutdown(swFactory*factory);
int swFactoryProcess_end(swFactory*factory, swDataHead *event);

这四个函数都在FactoryProcess.c文件中具体定义。其中,swFactoryProcess_create函数用于创建一个swFactoryProcess实例,其核心代码如下:

   swFactoryProcess*object;
   swServer *serv = SwooleG.serv;
object =SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swFactoryProcess));
   if (object == NULL)
    {
       swWarn("[Master] malloc[object] failed");
       return SW_ERR;
    }
   serv->writer_threads = SwooleG.memory_pool->alloc(SwooleG.memory_pool,serv->reactor_num * sizeof(swWorkerThread));
   if (serv->writer_threads == NULL)
    {
       swWarn("[Master] malloc[object->writers] failed");
       return SW_ERR;
    }
    object->writer_pti= 0;

源码解释:从全局变量SwooleG的内存池中申请一个swFactoryProcess的内存,并同样从这个内存池中申请和reactor_num等量的writer线程,并初始化swFactoryProcess的writer_pti为0.

swFactoryProcess_shutdown函数不是关闭一个swFactoryProcess,而是关闭SwooleG全局变量中的swServer实例。其核心源码如下:

   swServer *serv = SwooleG.serv;
   int i;
   //kill manager process
   kill(SwooleGS->manager_pid, SIGTERM);
   //kill all child process
   for (i = 0; i < serv->worker_num; i++)
    {
       swTrace("[Main]kill worker processor");
       kill(serv->workers[i].pid, SIGTERM);
    }
   if (serv->ipc_mode == SW_IPC_MSGQUEUE)
    {
       serv->read_queue.free(&serv->read_queue);
       serv->write_queue.free(&serv->write_queue);
    }
    //close pipes
    return SW_OK;

源码解释:杀死manager进程,遍历swServer的worker列表并分别杀死每一个worker进程。如果使用了消息队列模式,则还需将read队列和write队列释放。

swFactoryProcess_start函数用于启动一个swFactoryProcess实例。这里需要注意,在该函数中涉及到一些没有分析过的函数和结构体,这些对象的实际分析将于后面的章节进行,在此我只简单说明这些函数的作用。swFactoryProcess_start函数核心源码如下:

    if (swFactory_check_callback(factory) <0)
    {
       swWarn("swFactory_check_callback failed");
       return SW_ERR;
    }

    int i;
   swServer *serv = factory->ptr;
   swWorker *worker;

   for (i = 0; i < serv->worker_num; i++)
    {
       worker = swServer_get_worker(serv, i);
       if (swWorker_create(worker) < 0)
       {
           return SW_ERR;
       }
    }

   //必须先启动manager进程组,否则会带线程fork
   if (swFactoryProcess_manager_start(factory) < 0)
    {
       swWarn("swFactoryProcess_manager_start fail");
       return SW_ERR;
    }

   if (serv->ipc_mode == SW_IPC_MSGQUEUE)
    {
       swQueueMsg_set_blocking(&serv->read_queue, 0);
       //tcp & message queue require writer pthread
       if (serv->have_tcp_sock == 1)
       {
           int ret = swFactoryProcess_writer_start(factory);
           if (ret < 0)
           {
                return SW_ERR;
           }
       }
    }
   //主进程需要设置为直写模式
    factory->finish= swFactory_finish;

源码解释:首先判断swFactory是否设置了相应的PHP回调函数(onTask,onFinish)。然后创建worker_num个worker对象,随后调用swFactoryProcess_manager_start函数启动manager进程和对应的worker子进程。如果swServer设置了消息队列属性,则设置swServer的read队列为非阻塞,并启动writer线程。最后设置manager主进程的finish函数。

swFactoryProcess_end函数用于关闭一个swFactoryProcess实例。其核心代码入下:

         swServer *serv =factory->ptr;
         swSendData_send;

         bzero(&_send,sizeof(_send));

         _send.info.fd= event->fd;
         /**
          * length == 0, close the connection
          */
         _send.info.len= 0;

         /**
          * passive or initiative
          */
         _send.info.type= event->type;

         swConnection*conn = swServer_connection_get(serv, _send.info.fd);
         if(conn == NULL || conn->active == 0)
         {
                   swWarn("cannot close. Connection[%d] not found.", _send.info.fd);
                   returnSW_ERR;
         }
         if (serv->onClose != NULL)
         {
                   serv->onClose(serv, event->fd, conn->from_id);
         }
         returnswFactoryProcess_finish(factory, &_send);

源码解释:获取swServer对象,创建需要发送的swSendData并设置fd、长度(长度为0代表关闭连接)和类型。并获取swServer中fd对应的connection连接。随后调用onClose回调函数(如果有),并调用swFactoryProcess_finish函数发送数据。

除了以上四个共有的操作函数外,swFactoryProcess还拥有三个不同的操作函数,声明如下:

static int swFactoryProcess_notify(swFactory *factory, swDataHead *event);
static int swFactoryProcess_dispatch(swFactory *factory, swDispatchData *buf);
static int swFactoryProcess_finish(swFactory *factory, swSendData *data);

这三个函数是核心的进程通信函数,notify函数用于主进程通知worker进程,dispatch用于reactor向worker进程投递请求,finish用于worker将执行后的数据发送给client。

一个个来看,首先是swFactoryProcess_notify函数,其核心代码如下:

int swFactoryProcess_notify(swFactory*factory, swDataHead *ev)
{
         memcpy(&sw_notify_data._send,ev, sizeof(swDataHead));
         sw_notify_data._send.len= 0;
         returnfactory->dispatch(factory, (swDispatchData *) &sw_notify_data);
}

源码解释:通过factory的dispatch函数将notify_data发送给worker进程。

其中sw_notify_data的声明如下:

static __thread struct
{
         longtarget_worker_id;
         swDataHead_send;
} sw_notify_data;

这个结构体是用来匹配swDispatchData结构体的,并没有特殊的意义,__thread关键字代表该结构体是每个线程独有一份,线程之间互不影响。

swFactoryDespatch函数的调用来自于ReactorThread或者ReactorProcess(根据Rango的说法,所有线程相关的模块都已经停用了……),其核心源码如下:

   uint32_tschedule_key;
   uint32_t send_len = sizeof(task->data.info) + task->data.info.len;
   uint16_t target_worker_id;
   swServer *serv = SwooleG.serv;

   if (task->target_worker_id < 0)
    {
       //udp use remote port
       if (task->data.info.type == SW_EVENT_UDP || task->data.info.type== SW_EVENT_UDP6
                || task->data.info.type ==SW_EVENT_UNIX_DGRAM)
       {
           schedule_key = task->data.info.from_id;
       }
       else
       {
           schedule_key = task->data.info.fd;
       }

#ifndef SW_USE_RINGBUFFER
       if (SwooleTG.factory_lock_target)
       {
           if (SwooleTG.factory_target_worker < 0)
           {
                target_worker_id =swServer_worker_schedule(serv, schedule_key);
                SwooleTG.factory_target_worker= target_worker_id;
           }
           else
           {
                target_worker_id =SwooleTG.factory_target_worker;
           }
       }
       else
#endif
       {
           target_worker_id = swServer_worker_schedule(serv, schedule_key);
       }
    }
   else
    {
       target_worker_id = task->target_worker_id;
    }

   if (SwooleTG.type == SW_THREAD_REACTOR)
    {
       return swReactorThread_send2worker((void *) &(task->data),send_len, target_worker_id);
    }
   else
    {
       swTrace("dispatch to worker#%d", target_worker_id);
       return swServer_send2worker_blocking(serv, (void *)&(task->data), send_len, target_worker_id);
    }

源码解释:首先判定task是否已经指定了worker,如果task的target_worker_id小于0(代表没有指定),则需要为其分配worker进程。首先根据task的data域中info的type字段判断task是属于哪个fd,这个fd用于在随后的调用中判断如何分配worker进程。之后,判断是否指定使用RingBuffer,判断SwooleTG(Swoole Thread Global,线程相关全局变量,以__thread关键字修饰,每个线程拥有自己独立的一份,线程之间互不影响)是否指定了当前线程所有的worker_id,如果没有指定,调用swServer_worker_schedule函数分配worker;否则直接使用指定的worker。如果以上条件都未能满足,则直接使用swServer_worker_schedule函数分配worker。如果task已经指定了worker,则默认使用该worker。随后判断SwooleTG的type字段,如果为SW_THREAD_REACTOR(线程Reacotr模式),则调用swReactorThread_send2worker函数(非阻塞)发送task请求;否则(type为SW_PROCESS_REACTOR模式),调用swServer_send2worker_blocking函数(阻塞)发送请求。

这里解释一下这两个函数的阻塞和非阻塞的原因。swReactorThread_send2worker调用write函数如果失败,会将数据存放进pipe的缓存中,等待下一次发送;而swServer_send2worker_blocking调用write失败后,会重新发送或调用swSocket_wait等待一段时间后再次发送。

接下来是swFactoryProcess_finish函数,该函数用于将一个resp应答发送给task的发起者。由于该函数较长,在此分段进行分析:其源码如下:

         //unix dgram
         if(resp->info.type == SW_EVENT_UNIX_DGRAM)
         {
                   socklen_tlen;
                   structsockaddr_un addr_un;
                   intfrom_sock = resp->info.from_fd;

                   addr_un.sun_family= AF_UNIX;
                  memcpy(addr_un.sun_path,resp->sun_path, resp->sun_path_len);
                   len= sizeof(addr_un);
                   ret= swSendto(from_sock, resp->data, resp->info.len, 0, (struct sockaddr *)&addr_un, len);
                   gotofinish;
         }

源码解释:如果判定resp的类型为unixsock的报文模式,则构建相应sockaddr_un结构体并调用swSendto函数发送resp(swSendto函数分析见附录)。发送完成后,goto到finish标签继续运行。

         //UDP pacakge
         elseif (resp->info.type == SW_EVENT_UDP || resp->info.type == SW_EVENT_UDP6)
         {
                   ret= swServer_udp_send(serv, resp);
                   gotofinish;
         }

源码解释:如果是UDP模式,调用swServer_udp_send函数发送数据并goto到finish标签。

         struct
         {
                   longpti;
                   swEventData_send;
         }sdata;

         //formessage queue
         sdata.pti= (SwooleWG.id % serv->writer_num) + 1;

         swConnection*conn = swServer_connection_get(serv, fd);
         if(conn == NULL || conn->active == 0)
         {
                   swWarn("connection[%d]not found.", fd);
                   returnSW_ERR;
         }

         sdata._send.info.fd= fd;
         sdata._send.info.type= resp->info.type;
         swWorker*worker = swServer_get_worker(serv, SwooleWG.id);

源码解释:构建了一个临时结构体用于匹配swQueue_data结构体,并指定对应的writer线程id。随后获取fd对应的connection连接和SwooleWG(Worker进程的全局变量)中存放的当前进程对应的worker_id。

         if(resp->length > 0)
         {
                   int64_twait_reactor;

                   /**
                    * Storage is in use right now, wait notify.
                    */
                   if(worker->store.lock == 1)
                   {
                            worker->notify->read(worker->notify,&wait_reactor, sizeof(wait_reactor));
                   }
                   swPackage_responseresponse;

                   response.length= resp->length;
                   response.worker_id= SwooleWG.id;

                   //swWarn("BigPackage,length=%d|worker_id=%d", response.length, response.worker_id);

                   sdata._send.info.from_fd= SW_RESPONSE_BIG;
                   sdata._send.info.len= sizeof(response);

                   memcpy(sdata._send.data,&response, sizeof(response));

                   /**
                    * Lock the worker storage
                    */
                   worker->store.lock= 1;
                   memcpy(worker->store.ptr,resp->data, resp->length);
         }

源码解释:如果resp的长度大于0,表示这是个较大的应答包(完全不理解为什么大于0就代表big response了……),需要使用共享内存。首先判定worker的store是否已经被占用,如果被占用,则通过notify管道等待唤醒。当store可用后,用swPackage_response结构体封装数据信息并代替原有的data域,设置from_id为SW_RESPONSE_BIG,并将data复制到worker的store共享内存中并上锁。

#if SW_REACTOR_SCHEDULE == 2
         sdata._send.info.from_id= fd % serv->reactor_num;
#else
         sdata._send.info.from_id= conn->from_id;
#endif

源码解释:根据Reactor的Schedule模型确定from_id, 2代表的是取模。

    for (count = 0;count < SW_WORKER_SENDTO_COUNT; count++)
    {
       if (serv->ipc_mode == SW_IPC_MSGQUEUE)
       {
           ret = serv->write_queue.in(&serv->write_queue, (swQueue_data*) &sdata, sendn);
       }
       else
       {
           int master_pipe = swWorker_get_write_pipe(serv, fd);
           //swWarn("send to reactor.fd=%d|pipe=%d|reactor_id=%d|reactor_pipe_num=%d", fd, master_pipe,conn->from_id, serv->reactor_pipe_num);
           ret = write(master_pipe, &sdata._send, sendn);

#ifdef SW_WORKER_WAIT_PIPE
           if (ret < 0 && errno == EAGAIN)
           {
                /**
                 * Wait pipe can be written.
                 */
                if (swSocket_wait(master_pipe,SW_WORKER_WAIT_TIMEOUT, SW_EVENT_WRITE) == SW_OK)
                {
                    continue;
                }
                else
                {
                    goto finish;
                }
           }
#endif
       }
                   //swTraceLog("wt_queue->in:fd=%d|from_id=%d|data=%s|ret=%d|errno=%d", sdata._send.info.fd,sdata._send.info.from_id, sdata._send.data, ret, errno);
        if(ret >= 0)
        {
             break;
        }
        elseif (errno == EINTR)
        {
             continue;
        }
        elseif (errno == EAGAIN)
        {
             swYield();
        }
        else
        {
             break;
        }
    }

源码解释:循环写出数据直到写出成功,最大重写次数为SW_WORKER_SENDTO_COUNT(32)次。每次写出时,先判断是否为消息队列模式,如果是,将数据写入消息队列;如果不是,获取worker的write_pipe,并通过该管道将数据发送给master进程。

这里可以看到,FactoryProcess实质上在做一个类似消息中心的功能,所有的任务发送、通知、回传都是通过FactoryProcess来处理,而进程间也基本通过共享内存、管道、专有的writer线程等方式实现进程间or进程内的通讯,因此整个FactoryProcess模块还是非常复杂的。

具体FactoryProcess模块的使用会在Server模块中体现出来,届时将进一步解析FactoryProcess模块的设计理念以及运用场景。

对于FactoryProcess的基本分析到这里就结束了,下一章将分析FactoryProcess.c文件中剩下的操作函数:Worker,Manager和Writer。

时间: 2024-08-01 00:10:17

Swoole源码学习记录(九)——Factory模块(上)的相关文章

Swoole源码学习记录(十)——Factory模块(下)

Swoole版本:1.7.5-stable 本章将分析FactoryProcess.c中剩下的函数,这些函数用于操作worker.manager以及writer.这些函数提供了最核心的进程创建.管理等功能,是Swoole的master-worker结构的基石. 先从worker相关的函数开始(manager相关函数基本都涉及操作worker进程).在FactoryProcess.c中一共声明了4个操作函数,分别是: static int swFactoryProcess_worker_loop(

Swoole源码学习记录(十三)——Server模块详解(上)

终于可以正式进入Server.c模块了-- 在之前的分析中,可以看到很多相关模块的声明都已经写在了Server.h中,就是因为这些模块构成了Server的核心部分.而Server本身,则是一个最上层的对象,它包括了核心的Reactor和Factory模块,存放了消息队列的key值,控制着全部的Connection,所有PHP层面的回调函数也在这里指定:同时,Server存放了大量的属性值,这些值决定了整个Swoole的详细特征. 直接上源码.首先是swServer结构体,这个结构体定义了一个sw

Swoole源码学习记录(十二)——ReactorThread模块

ReactorThread 这一章将分析Swoole的ReactorThread模块.虽然叫Thread,但是实际上使用的是swFactoryProcess也就是多进程模式.但是,在ReactorThread中,所有的事件监听是在线程中运行的(Rango只是简单提到了PHP不支持多线程安全,具体原因还有待请教--),比如在UDP模式下,是针对每一个监听的host开辟一个线程运行reactor,在TCP模式下,则是开启指定的reactor_num个线程用于运行reactor. 那么OK,先上swR

Swoole源码学习记录(十四)——Server模块详解(下)

swoole版本:1.7.6-stable 上一章已经分析了如何启动swServer的相关函数.本章将继续分析swServer的相关函数, 1.swServer函数分析 swServer_addListener 该函数用于在swServer中添加一个需要监听的host及port.函数原型如下: // Server.h 438h int swServer_addListener(swServer *serv, int type, char *host,int port); 参数 说明 swServ

Swoole源码学习记录(八)——Reactor模块-epoll

Swoole版本:1.7.5-beta Reactor模块可以说是Swoole中最核心的模块之一,正是这些reactor模型为swoole提供了异步操作的基础.Swoole中根据不同的内核函数,提供了四种Reactor封装,ReactorEpoll,ReactorKqueue,ReactorPoll和ReactorSelect.同时,Swoole通过结构体swReactor封装了对于reactor的操作函数和基本属性.本章,我将分析swReactor以及四种Reactor模型中的ReactorE

Swoole源码学习记录(十五)——Timer模块分析

swoole版本:1.7.7-stable Github地址:点此查看 1.Timer 1.1.swTimer_interval_node 声明: // swoole.h 1045-1050h typedef struct _swTimer_interval_node { struct _swTimerList_node *next, *prev; struct timeval lasttime; uint32_t interval; } swTimer_interval_node; 成员 说明

Swoole源码学习记录(十一)——Worker,Connection

Swoole版本:1.7.5-stable 本章将分析Swoole中的三个比较重要的模块,Worker,ReactorProcess和Connection.其中Worker和ReactorProcess其实是对前面三章的一个补充,在前面的章节中为了分析结果的流畅性没有针对这些模块做特定分析,在此做出补充. Worker模块 首先是Worker模块.Worker在Swoole中为核心工作进程的封装,包括用于处理核心逻辑的worker和用于处理任务的task_worker.在Swoole中使用了结构

Mybatis源码学习记录

一.对源码先上一个结构图: 源代码主要在org.apache.ibatis目录下,18个包,其中在应用中主要的包有:builder.session.cache.type.transaction.datasource.jdbc.mapping,提供支撑服务的包有annotation.binding.io.logging.plugin.reflection.scripting.exception.executor.parsing 二.从使用入手 MyBatis使用的三板斧是SqlSessionFac

select 源码学习记录

先记录一下学习的成果,慢慢完善 四个相关函数 fd_set的结构在上一篇中有讲,同时解释了为什么最大1024. -fd_set为1024/32的long型数组结构体.也就是结构体里面保存了long型数组 int FD_ZERO(int fd, fd_set *fdset); int FD_CLR(int fd, fd_set *fdset); int FD_SET(int fd, fd_set *fd_set); int FD_ISSET(int fd, fd_set *fdset); FD_Z