Linux高性能服务器编程——多进程编程

多进程编程

多进程编程包括如下内容:

  1. 复制进程影映像的fork系统调用和替换进程映像的exec系列系统调用。
  2. 僵尸进程以及如何避免僵尸进程
  3. 进程间通信(Inter-Process Communication,IPC)最简单的方式:管道
  4. 3种进程间通信方式:信号量,消息队列和共享内存

fork系统调用

#include<unistd.h>

pid_tfork(void);

该函数的每次都用都返回两次,在父进程中返回的是子进程的PID,在子进程中返回的是0.该返回值是后续代码判断当前进程是父进程还是子进程的依据。fork调用失败是返回-1,并设置errno。

fork函数复制当前进程,在内核进程表中创建一个新的进程表项。新的进程表项有很多属性和原进程相同,比如堆指针、栈指针和标志寄存器的值。但也有许多属性被赋予新的值,比如该进程的PPID被设置成原进程的PID,信号位图被清除(原进程设置的信号处理函数不在对新进程起作用)。

此外,创建子进程后,父进程中打开的文件描述符默认在子进程也是打开的,且文件描述符的引用计数加1。不仅如此,父进程的用户根目录、当前工作目录等变量的引用计数均会加1。

exec系统系统调用

有时候我们需要在子进程中执行其他程序,即替换当前进程映像,这就需要使用如下exec系列函数之一:

#include <unistd.h>

extern char **environ;

int execl(const char *path, const char*arg, ...);

int execlp(const char *file, const char*arg, ...);

int execle(const char *path, const char*arg,..., char * const envp[]);

int execv(const char *path, char *constargv[]);

int execvp(const char *file, char *constargv[]);

int execvpe(const char *file, char *constargv[], char *const envp[]);

path参数制定可以执行文件的完整路径,file参数可以接受文件名,该文件的具体位置则在环境变量PATH中搜寻。arg接受可变参数,argv则接受参数数组,他们都会被传递给新程序的main函数。envp参数用于设置新程序的环境变量。如果未设置,则新程序将使用由全局变量environ指定的环境变量。

一般情况下,exec函数时不返回的,除非出错。它出错时返回-1,并设置errno。如果没出错,则远程中exec调用之后的代码都不会执行,因为此时原程序已经被exec指定的程序完全替换(包括代码和数据)。

exec函数不会关闭原程序打开的文件描述符,除非该文件描述符被设置了类似SOCK_CLOEXEC的属性。

处理僵尸进程

对于多进程程序而言,父进程一般需要跟踪子进程的退出状态。因此,当子进程结束运行时,内核不会立即释放该进程的进程表表项,以满足父进程后续对该子进程退出信息的查询。在子进程结束运行之后,父进程读取器退出状态之前,我们称该子进程处于僵尸态。若父进程结束或者异常终止,而子进程继续运行,此时子进程的PPID将被系统设置为1,即init进程。init进程接管了该子进程,并等待它结束。

子进程停留在僵尸态,占据内核资源,这是绝对不允许的,毕竟内核资源有限。下面这对函数在父进程中调用,以等待子进程结束,并获取子进程的返回信息,从而避免了僵尸进程的产生,或者使子进程的僵尸态立即结束:

#include<sys/types.h>

#include<sys/wait.h>

pid_t wait(int*status);

pid_twaitpid(pid_t pid, int *status, int options);

wait函数将阻塞进程,直到该进程的某个子进程结束运行为止。它返回结束运行的子进程的PID,并将子进程的退出状态信息(比如WIFEXITED
       表示如果子进程正常结束,它就返回一个非0值)存储于status参数指向的内存中。

wait函数的组设特性显然不是服务器程序期望的,而waitpid函数解决了这个问题。waitpid只等待由pid参数指定的子进程,如果pid取值为-1,则和wait函数相同,即等待任意一个子进程结束。options的参数可以控制waitpid的行为,当该参数取值为WNOHANG时,waitpid将是非阻塞的:如果pid指定的目标子进程还没有结束或意外终止,则waitpid立即返回0;如果目标子进程确实正常退出了,则waitpid返回该子进程的PID。失败返回-1,并设置errno。

要在事件已经发生的情况下执行非阻塞调用才能提高程序的效率。对waitpid函数而言,我们最好在某个子进程退出之后再调用它。那么父进程从何得知某个子进程已经退出了?这正是SIGCHLD信号的用途。当一个进程结束时,它将给其父进程发送一个SIGCHLD信号。因此,我们可以在父进程中捕获SIGCHLD信号,并在信号处理函数中调用waitpid函数以彻底结束一个子进程。如下所示:

static voidhandle-child( int sig)

{

pid_t pid;

int stat;

while((pid = waitpid(-1, &stat,WNOHANG)) > 0)

{

//对子进程进行善后处理

}

}

管道

pipe用于创建管道。管道是父进程和子进程间的常用通信手段。管道能在父、子进程之间传递数据,利用的是fork调用之后两个管道文件描述符(fd[0]和fd[1])都保持打开。一堆这样的文件描述符只能保证父、子进程间一个方向的数据传输,父进程和子进程必须有一个关闭fd[0[,另一个关闭fd[1]。

要实现父、子进程之间的双向数据传输,就必须使用两个管道。socket编程接口提供了一个创建全双工管道的系统调用:socketpair。

信号量

当多个进程表同时访问系统上的某个资源的时候,比如同时写一个数据库的某条记录,或者同时修改某个文件,就需要考虑进城的同步问题,以确保任一时刻只有一个进程可以拥有对资源的独占式访问。通常,程序对共享资源的访问的代码只是很短的一段,你就是这一段代码引发了进程之间的竞态条件。我们称这段代码为关键代码段,或者临界区。

信号量是一种特殊的变量,它只能取自然数并只支持两种操作:等待(wait)和信号(signal),这两种操作更常见的称呼是P、V操作。假设有信号量SV,则对它的P、V操作含义如下:

P(SV),如果SV的值大于0,就将它减1:;如果SV的值为0,则挂起进程的执行。

V(SV),如果有其他进程因为等待SV二挂起,,则换星之;如果没有,则将SV加1。

信号口粮的却只可以是任何自然是,但最常用的、最简单的信号量是二进制信号量,它只能取0和1两个值。

信号量API主要包含3个系统调用:semget、semop和semctl。它们都被设计为操作一组信号量,即信号量集,而不是单个信号量。

semget系统调用

semget系统调用创建一个新的信号量集,或者获取一个已经存在的信号量集。

#include <sys/types.h>

#include <sys/ipc.h>

#include <sys/sem.h>

int semget(key_t key, int nsems, intsemflg);

key参数是一个键值,用来标识一个全局唯一的信号量集,就像文件名全局唯一地标识一个文件一样。要通过信号量通信的进程需要使用相同的键值来创建/获取该信号量。

nsems参数指定要创建/获取的信号量集中信号量的数目。如果是创建信号量、则该值必须被指定;如果是获取已经存在的信号量,则可以把它设置为0。

semflg参数指定一组标志。它低端的9个比特是该信号量的权限,其格式和含义都与系统调用open的mode参数相同。此外,它还可以和IPC_CREATE标志做按位或运算创建新的信号量集。此时即使信号量已经存在,semget也不会产生错误。还可以联合使用IPC_CREATE和IPC_EXCL标志来确保创建一组新的、唯一的信号量集。在这种情况下,如果信号量集已经存在,则semget返回错误并设置errno为EEXIT。

semget成功时返回一个正整数值,它是信号量集的标示符;semget失败时返回-1,并设置errno。

semop系统调用

semop系统调用改变信号量的值,即执行P、V操作。在讨论semop之前,我们需要先介绍与每个信号量关联的一些和总要的内核变量:

unsigned shortsemval;                                       
//信号量的值

unsigned shortsemzcnt;                                              
//等待信号量值变为0的进程数量

unsigned shortsemncnt;                                    
//等待信号量值增加的进程数量

pid_t sempid;                                                         
//最后一次执行semop操作的进程ID

semop对信号量的操作实际上就是对这些内核变量的操作。semop的定义如下:

#include<sys/types.h>

#include<sys/ipc.h>

#include<sys/sem.h>

int semop(intsemid, struct sembuf *sops, unsigned nsops);

其中,semid参数是由semget调用返回的信号量集标示符,用于指定被操作的目标信号量集。sops参数指向一个sembuf结果类型的数组,sembuf结果体的定义如下:

struct sembuf{

unsigned short int sem_num;

short int sem_op;

short int sem_flag;

};

其中,sem_num成员是信号量集中信号的编号,0表示信号量集中的第一个信号量。sem_op成员指定操作类型,其可选值为正整数,0和负整数。每种类型的操作的行为又受到sem_flag成员的影响。sem_flag的可选值是IPC_NOWAIT和SEM_UNDO。IPC_NOWAIT含义是,无论信号量操作是否成功,semop调用都将立即返回,这类似非阻塞I/O操作。SEM_UNDO的含义是,当进程时取消正在进行的semop操作。具体来说,sem_op和sem_flag将按照如下方式影响semop的行为:

如果sem_op大于0,则semop将被操作的信号量的值semval增加sem_op。该操作要求调用进程对被操作信号量集拥有写权限,此时若设置了SEM_UNDO标志,则系统将更新进程的semadj变量(用于跟踪对信号量的修改情况)。

如果sem_op等于0,则表示这是一个等待0操作。该操作要求进程对被操作的信号量集拥有读权限。如果此时信号量的值是0,则调用立即返回。如果信号量的值不是0,则semop根据sem_flag的设置情况执行失败返回或者阻塞以等待信号量变为0。

如果sem_op小于0,则表示对信号量值进行减操作,即期望获得信号量。该操作要求进程对操作信号量集拥有写权限。如果信号量的值semval大于或等于sem_op的绝对值,则semop操作成功,调用进程立即获得信号量,并且系统将该信号量的semval值减去sem_op的绝对值。此时如果设置了SEM_UNDO标志,则系统将更新进程的semadj变量。如果信号量的值小于sem_op的绝对值值,则semop根据sem_flag的设置情况执行失败返回或者阻塞以等待信号量变可用。

semop系统调用的第3个参数num_sem_ops指定要执行的操作个数,即sem_ops数组中元素的个数。semop对数组sops中的每个成员按照数组顺序依次执行操作,并且该过程是院子操作,以避免别的进程在同一时刻按照不同的顺序对该信号集中的信号量执行semop操作导致的竞态条件。

semop成功时返回0,失败则返回-1并设置errno。失败的时候,sops中数组指定所有操作都不被执行。

semctl系统调用

semctl系统调用允许调用者对信号量进行直接控制。

#include<sys/types.h>

#include<sys/ipc.h>

#include<sys/sem.h>

int semctl(intsemid, int semnum, int cmd, ...);

semid参数是由semget调用返回的信号量标识符,用以指定被操作的信号量集。semnum参数指定被操作的信号量在信号量集中的编号。cmd参数指定要执行的命令。有的命令需要调用者传递第四个参数(例如cmd为IPC_RMID时,表示立即一处信号量集,唤醒所有等待该信号量集的进程,需要第四个参数)。

一般第四个参数的推荐格式如下:

union semun {

int             
val;   
/* Value for SETVAL */

struct semid_ds *buf;   
/* Buffer for IPC_STAT, IPC_SET */

unsigned short 
*array; /* Array for GETALL, SETALL */

struct seminfo 
*__buf; /* Buffer for IPC_INFO

(Linux-specific) */

}

semtl成功时返回值取决于cmd参数,失败是返回-1并设置errno。

特殊键值IPC_PRIVATE

semget调用者可以给其Key参数传递一个特殊的键值IPC_PRICATE(其值为0),这样无论该信号量是否已经存在,semget都将创建一个新的信号量。使用该键值创建的信号量并非像其他的名字声称的那样是进程私有的。其他进程,尤其是子进程,也有方法来访问这个信号量。如下展示了使用IPC_PRIVATE信号量的示例:

#include <sys/sem.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>

union semun
{
     int val;
     struct semid_ds* buf;
     unsigned short int* array;
     struct seminfo* __buf;
};

void pv( int sem_id, int op )
{
    struct sembuf sem_b;
    sem_b.sem_num = 0;
    sem_b.sem_op = op;
    sem_b.sem_flg = SEM_UNDO;
    semop( sem_id, &sem_b, 1 );
}

int main( int argc, char* argv[] )
{
    int sem_id = semget( IPC_PRIVATE, 1, 0666 );

    union semun sem_un;
    sem_un.val = 1;
    semctl( sem_id, 0, SETVAL, sem_un );

    pid_t id = fork();
    if( id < 0 )
    {
        return 1;
    }
    else if( id == 0 )
    {
        printf( "child try to get binary sem\n" );
        pv( sem_id, -1 );
        printf( "child get the sem and would release it after 5 seconds\n" );
        sleep( 5 );
        pv( sem_id, 1 );
        exit( 0 );
    }
    else
    {
        printf( "parent try to get binary sem\n" );
        pv( sem_id, -1 );
        printf( "parent get the sem and would release it after 5 seconds\n" );
        sleep( 5 );
        pv( sem_id, 1 );
    }

    waitpid( id, NULL, 0 );
    semctl( sem_id, 0, IPC_RMID, sem_un );
    return 0;
}

执行结果如下:

parenttry to get binary sem

parentget the sem and would release it after 5 seconds

childtry to get binary sem

childget the sem and would release it after 5 seconds

共享内存

共享内存是最高效的IPC机制,因为它不涉及进程之间的任何数据传输。这种高效带来的问题是,我们必须使用其他辅助手段来同步进程对内存的访问,否则会产生竞态条件。因此,共享内存通常和其他进程间通信方式一起使用。

Linux共享内存的API都定义在sys/shm.h头文件中,包括4个系统调用:shmget、shmat、shmdt和shmctl。

shmget系统调用

shmget系统调用创建一段新的共享内存,或者获取一段已经存在的共享内存。其定义如下:

#include <sys/ipc.h>

#include <sys/shm.h>

int shmget(key_t key, size_t size, intshmflg);

和shmget系统调用一样,key参数是一个键值,用来标识一段全局唯一的共享内存。size参数指定共享的大小,单位是字节。如果是创建新的共享内存,则size值必须被指定。如果是获取已经存在的共享内存,则可以把size设置为0。

shmflg参数的使用和含义与semget系统调用的semflag参数相同,不过shmget支持两个额外的标志——SHM_HUGETLB和SHM_NORESERVE。他们的含义如下:

SHM_HUGETLB,类似于mmap的MAP_HUGETLB标志,系统将使用“大页面“来为共享分配空间。

SHM_NORESERVE,类似于mmap的MAP_NORESERVE标志,不为共享内存保留交换分区。这样,当物理内存不足时,对共享内存执行写操作将触发SIGSEGV信号。

shmget成功时返回一个正整数,它是共享内存的标识符。shmget失败时返回-1,并设置errno。

如果shmget用于创建共享内存,则这段共享内存的所有字节都被初始化为0,与之关联的内核数据结构shmid_ds将被创建并初始化。

shmat和shmdt系统调用

共享内存被创建/获取之后,我们不能立即访问它,而是需要先将它关联到进程的地址空间中。使用完共享内存之后,我们也需要将它从地址空间中分离。这两项任务分别由如下两个系统调用实现:

#include <sys/types.h>

#include <sys/shm.h>

void *shmat(int shmid, const void *shmaddr,int shmflg);

int shmdt(const void *shmaddr);

其中shmid参数是由shmget调用返回的共享内存标识符。shmaddr参数指定共享内存关联到进程的哪块地址空间,最终的效果还是受到shmflg参数的可选标志SHM_RND的影响:

  1. 如果shmaddr为NULL,则被关联的地址由操作系统选择。这是推荐的做法,以确保可移植性。
  2. 如果shmadd非空,并且SHM_RND标志未被设置,则共享内存关联到addr指定的地质处。
  3. 如果shm_addr非空,并且设置了SHM_RND标志,则被关联的地址是[ shmaddr - (shmadrr % SHMLBA)]。SHMLBA的含义是“段低端边界地址倍数“,它必须是内存页面大小的整数倍。现在在Linux中,等于一个内存页大小,SHM_RND的含义是将共享内存被关联的地址向下圆整到离shm_addr最近的SHMLBA的证书倍地质处。

    除了SHM_RND标志外,shmflg参数还支持如下标志:

    SHM_RDONLY:进程仅能读取共享内存中的内容。如果没有该标志,则进程可以同时对共享内存读写。

    SHM_REMAP:如果shmaddr已经被关联到一段共享内存上,则冲向关联。

    SHM_EXEC:它指定对共享内存段的执行权限。

    shmat成功时返回共享内存被关联到的地址,失败是则返回(void*)-1并设置errno。

    shmdt函数将关联到shm_addr处的共享内存分离。成功时返回0,失败则返回-1并设置errno。

shmctl系统调用

shmctl系统调用控制共享内存的某些属性。#include<sys/ipc.h>

#include <sys/shm.h>

int shmctl(int shmid, int cmd, structshmid_ds *buf);

shmid参数是由shmget调用返回的共享内存标识符。cmd参数指定要执行的命令。shmctl成功时的返回值取决于cmd参数,失败时返回-1,并设置errno。

共享内存的POSIX方法

我们介绍过mmap函数。利用Map_ANONYMOUS标志我们可以实现父、子进程之间的匿名内存共享。通过打开同一个文件,mmap也可以实现无关进程之间的内存共享。Linux提供了另外一种利用mmap在无关进程之间共享内存的方式。这种方式无须任何文件的支持,但它需要先使用如下函数来创建或打开一个POSIX共享内存对象:

#include<sys/mman.h>

#include <sys/stat.h>       
/* For mode constants */

#include<fcntl.h>          
/* For O_*constants */

intshm_open(const char *name, int oflag, mode_t mode);

shm_open的使用方法与open系统调用完全相同。name参数指定要创建/打开的共享内存对象。从可移植的角度考虑,该参数应该使用”/somename”的格式:以”/”开始,后接多个字符,且这些字符都不是”/”;以”\0”结尾,长度不超过NAME_MAX。

oflag指定创建方式。它可以是下列标志中的一个或者多个的按位或:

O_RDONLY。以制度方式打开共享内存对象。

O_RDWR。以可读、可写方式打开共享内存对象。

O_CREAT。如果共享内存对象不存在,则创建之。此时mode参数的最低9位将制定该共享内存对象的访问权限。共享内存对象被创建的时候,其初试长度为0。

O_EXCL。和O_CREAT一起使用,如果由name指定的共享内存对象已经存在,则shm_open调用返回错误,否则就创建一个新的共享内存对象。

O_TRUNC。共享内存对象已经存在,则把它截断,使其长度为0。

shm_open调用成功时返回一个文件描述符。该文件描述符可用于后续的mmap调用,从而将共享内存关联到调用进程。sh_open失败时返回-1,并设置errno。

和打开的文件最后需要关闭一样,由shm_open创建的共享内存对象使用完之后也需要被删除。

#include<sys/mman.h>

#include <sys/stat.h>       
/* For mode constants */

#include<fcntl.h>          
/* For O_*constants */

intshm_unlink(const char *name);

该函数将name参数指定的共享内存对象标记为等待删除。当所有使用该共享内存对象的进程都是用ummap将它从进程中分离之后,系统将销毁这个共享内存对象所占据的资源。

共享内存实例

聊天室服务器程序:一个多进程服务器,一个子进程处理一个客户连接。同时,我们将所有客户socket连接的读缓冲设计为一块内存共享,服务器程序如程序清单1所示:

消息队列

消息队列是在两个进程之间传递二进制块数据的一种鸡蛋有效的方式。每个数据块都有一个特定的类型,接收方可以根据类型来有选择的接收数据,而不一定像管道和匿名管道那样必须以先进先出的方式接收数据。

Linux消息队列的4个API包括四个系统调用:msgget、msgsnd、msgcrv和msgctl。

msgget系统调用

msgget系统调用创建一个消息队列,或者获取一个已有的消息队列。

#include <sys/types.h>

#include <sys/ipc.h>

#include <sys/msg.h>

int msgget(key_t key, int msgflg);

和msgget系统调用一样,key参数是一个键值,用来标识一个全局唯一的消息队列。

msgflg参数的使用和含义与semget系统调用的sem_flags参数相同。

msgget成功时返回一个正整数值,它是消息队列的标识符。msgget失败时返回-1,并设置errno。

如果msgget用于创建消息队列,则与之关联的内核数据结构msqid_ds将被创建并初始化。msqid_ds结构体的定义如下:

struct msqid_ds{

structipc_perm msg_perm;            
//消息队列的操作权限

time_tmsg_stime;                            
//最后一次调用msgsnd的时间

time_tmsg_rtime;                             
//最后一次调用msgrcv的时间

time_t msg_ctime;                            
//最后一次被修改的时间

unsignedlong __msg_cbytes;                  
`//消息队列中已有的字节数

msgqnum_tmsg_q
num;                 
//消息队列中已有的消息树=数

msglen_t
msg_qbytes;                   
//消息队列允许的最大字节数

pid_tmsg_lspid;                                 
//最后执行msgsnd的进程的PID

pid_tmsg_lrpid;                                  
//最后执行msgrcv的进程的PID

}

msgsnd系统调用

#include <sys/types.h>

#include <sys/ipc.h>

#include <sys/msg.h>

int msgsnd(int msqid, const void *msgp,size_t msgsz, int msgflg);

msqid是由msgget调用返回的消息队列标识符。

msg_ptr参数指向一个准备发送的消息,消息必须被定义如下类型:

struct msgbuf {

long mtype;      
/* message type, must be > 0 */

char mtext[1];   
/* message data */

};

其中,mtype何曾元指定消息的类型,它必须是一个正整数。mtext是消息数据。msg_sz参数是消息的数据部分(mtext)的长度。这个长度可以为0,表示没有消息数据。

msgflg参数控制msgsnd的行为。它通常仅支持IPC_NOWAIT标志,即以非阻塞的方式发送消息。默认情况下,发送消息如果消息队列满了,则msgsnd将阻塞。若IPC_NOWAIT标志被指定,则msgsnd调用可能立即返回并设置errno为EAGAIN。

msgrcv系统调用

msgrcv系统调用从消息队列中获取消息:

#include <sys/types.h>

#include <sys/ipc.h>

#include <sys/msg.h>

ssize_t msgrcv(int msqid, void *msgp,size_t msgsz, long msgtyp, int msgflg);

msqid是消息队列标识符,msgp用于存储接收的消息,msgsz指的是消息数据部分的长度。msgtype参数指定接收何种类型的消息,我们可以使用如下几种方式指定消息类型:

msgtype等于0:读取消息队列中的第一个消息。

msgtype大于0:读取消息队列中的第一个类型为msgtype的消息。

msggtype小于0:读取消息队列中第一个类型值比msgtype绝对值小的消息。

msgflag控制msgrcv函数的行为。它可以是如下标志的按位或:

IPC_NOWAIT:如果消息队列中没有消息,则msgrcv调用立即返回并设置errno为ENOMSG。

MSG_CEXEPT:如果msgtype大于0,则接受消息队列中第一个非msgtype类型的消息。

MSG_NOERROR:如果消息数据部分的长度超过了msgsz就将它截断。,

msgctl系统调用

msgctl系统调用控制消息队列的某些属性,定义如下:

#include <sys/types.h>

#include <sys/ipc.h>

#include <sys/msg.h>

int msgctl(int msqid, int cmd, structmsqid_ds *buf);

msgqid是消息队列的标识符。cmd参数执行要执行的命令(例如IPC_STAT指将消息队列关联的内核数据结构复制到buf中)

程序清单1
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>

#define USER_LIMIT 5
#define BUFFER_SIZE 1024
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define PROCESS_LIMIT 65536

struct client_data
{
    sockaddr_in address;
    int connfd;
    pid_t pid;
    int pipefd[2];
};

static const char* shm_name = "/my_shm";
int sig_pipefd[2];
int epollfd;
int listenfd;
int shmfd;
char* share_mem = 0;
client_data* users = 0;
int* sub_process = 0;
int user_count = 0;
bool stop_child = false;

int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}

void addfd( int epollfd, int fd )
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );
}

void sig_handler( int sig )
{
    int save_errno = errno;
    int msg = sig;
    send( sig_pipefd[1], ( char* )&msg, 1, 0 );
    errno = save_errno;
}

void addsig( int sig, void(*handler)(int), bool restart = true )
{
    struct sigaction sa;
    memset( &sa, '\0', sizeof( sa ) );
    sa.sa_handler = handler;
    if( restart )
    {
        sa.sa_flags |= SA_RESTART;
    }
    sigfillset( &sa.sa_mask );
    assert( sigaction( sig, &sa, NULL ) != -1 );
}

void del_resource()
{
    close( sig_pipefd[0] );
    close( sig_pipefd[1] );
    close( listenfd );
    close( epollfd );
    shm_unlink( shm_name );
    delete [] users;
    delete [] sub_process;
}

void child_term_handler( int sig )
{
    stop_child = true;
}

int run_child( int idx, client_data* users, char* share_mem )
{
    epoll_event events[ MAX_EVENT_NUMBER ];
    int child_epollfd = epoll_create( 5 );
    assert( child_epollfd != -1 );
    int connfd = users[idx].connfd;
    addfd( child_epollfd, connfd );
    int pipefd = users[idx].pipefd[1];
    addfd( child_epollfd, pipefd );
    int ret;
    addsig( SIGTERM, child_term_handler, false );

    while( !stop_child )
    {
        int number = epoll_wait( child_epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( ( number < 0 ) && ( errno != EINTR ) )
        {
            printf( "epoll failure\n" );
            break;
        }

        for ( int i = 0; i < number; i++ )
        {
            int sockfd = events[i].data.fd;
            if( ( sockfd == connfd ) && ( events[i].events & EPOLLIN ) )
            {
                memset( share_mem + idx*BUFFER_SIZE, '\0', BUFFER_SIZE );
                ret = recv( connfd, share_mem + idx*BUFFER_SIZE, BUFFER_SIZE-1, 0 );
                if( ret < 0 )
                {
                    if( errno != EAGAIN )
                    {
                        stop_child = true;
                    }
                }
                else if( ret == 0 )
                {
                    stop_child = true;
                }
                else
                {
                    send( pipefd, ( char* )&idx, sizeof( idx ), 0 );
                }
            }
            else if( ( sockfd == pipefd ) && ( events[i].events & EPOLLIN ) )
            {
                int client = 0;
                ret = recv( sockfd, ( char* )&client, sizeof( client ), 0 );
                if( ret < 0 )
                {
                    if( errno != EAGAIN )
                    {
                        stop_child = true;
                    }
                }
                else if( ret == 0 )
                {
                    stop_child = true;
                }
                else
                {
                    send( connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0 );
                }
            }
            else
            {
                continue;
            }
        }
    }

    close( connfd );
    close( pipefd );
    close( child_epollfd );
    return 0;
}

int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );

    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );

    listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );

    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );

    ret = listen( listenfd, 5 );
    assert( ret != -1 );

    user_count = 0;
    users = new client_data [ USER_LIMIT+1 ];
    sub_process = new int [ PROCESS_LIMIT ];
    for( int i = 0; i < PROCESS_LIMIT; ++i )
    {
        sub_process[i] = -1;
    }

    epoll_event events[ MAX_EVENT_NUMBER ];
    epollfd = epoll_create( 5 );
    assert( epollfd != -1 );
    addfd( epollfd, listenfd );

    ret = socketpair( PF_UNIX, SOCK_STREAM, 0, sig_pipefd );
    assert( ret != -1 );
    setnonblocking( sig_pipefd[1] );
    addfd( epollfd, sig_pipefd[0] );

    addsig( SIGCHLD, sig_handler );
    addsig( SIGTERM, sig_handler );
    addsig( SIGINT, sig_handler );
    addsig( SIGPIPE, SIG_IGN );
    bool stop_server = false;
    bool terminate = false;

    shmfd = shm_open( shm_name, O_CREAT | O_RDWR, 0666 );
    assert( shmfd != -1 );
    ret = ftruncate( shmfd, USER_LIMIT * BUFFER_SIZE );
    assert( ret != -1 );

    share_mem = (char*)mmap( NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0 );
    assert( share_mem != MAP_FAILED );
    close( shmfd );

    while( !stop_server )
    {
        int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( ( number < 0 ) && ( errno != EINTR ) )
        {
            printf( "epoll failure\n" );
            break;
        }

        for ( int i = 0; i < number; i++ )
        {
            int sockfd = events[i].data.fd;
            if( sockfd == listenfd )
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
                if ( connfd < 0 )
                {
                    printf( "errno is: %d\n", errno );
                    continue;
                }
                if( user_count >= USER_LIMIT )
                {
                    const char* info = "too many users\n";
                    printf( "%s", info );
                    send( connfd, info, strlen( info ), 0 );
                    close( connfd );
                    continue;
                }
                users[user_count].address = client_address;
                users[user_count].connfd = connfd;
                ret = socketpair( PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd );
                assert( ret != -1 );
                pid_t pid = fork();
                if( pid < 0 )
                {
                    close( connfd );
                    continue;
                }
                else if( pid == 0 )
                {
                    close( epollfd );
                    close( listenfd );
                    close( users[user_count].pipefd[0] );
                    close( sig_pipefd[0] );
                    close( sig_pipefd[1] );
                    run_child( user_count, users, share_mem );
                    munmap( (void*)share_mem,  USER_LIMIT * BUFFER_SIZE );
                    exit( 0 );
                }
                else
                {
                    close( connfd );
                    close( users[user_count].pipefd[1] );
                    addfd( epollfd, users[user_count].pipefd[0] );
                    users[user_count].pid = pid;
                    sub_process[pid] = user_count;
                    user_count++;
                }
            }
            else if( ( sockfd == sig_pipefd[0] ) && ( events[i].events & EPOLLIN ) )
            {
                int sig;
                char signals[1024];
                ret = recv( sig_pipefd[0], signals, sizeof( signals ), 0 );
                if( ret == -1 )
                {
                    continue;
                }
                else if( ret == 0 )
                {
                    continue;
                }
                else
                {
                    for( int i = 0; i < ret; ++i )
                    {
                        switch( signals[i] )
                        {
                            case SIGCHLD:
                            {
	                        pid_t pid;
	                        int stat;
	                        while ( ( pid = waitpid( -1, &stat, WNOHANG ) ) > 0 )
                                {
                                    int del_user = sub_process[pid];
                                    sub_process[pid] = -1;
                                    if( ( del_user < 0 ) || ( del_user > USER_LIMIT ) )
                                    {
                                        printf( "the deleted user was not change\n" );
                                        continue;
                                    }
                                    epoll_ctl( epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0 );
                                    close( users[del_user].pipefd[0] );
                                    users[del_user] = users[--user_count];
                                    sub_process[users[del_user].pid] = del_user;
                                    printf( "child %d exit, now we have %d users\n", del_user, user_count );
                                }
                                if( terminate && user_count == 0 )
                                {
                                    stop_server = true;
                                }
                                break;
                            }
                            case SIGTERM:
                            case SIGINT:
                            {
                                printf( "kill all the clild now\n" );
                                //addsig( SIGTERM, SIG_IGN );
                                //addsig( SIGINT, SIG_IGN );
                                if( user_count == 0 )
                                {
                                    stop_server = true;
                                    break;
                                }
                                for( int i = 0; i < user_count; ++i )
                                {
                                    int pid = users[i].pid;
                                    kill( pid, SIGTERM );
                                }
                                terminate = true;
                                break;
                            }
                            default:
                            {
                                break;
                            }
                        }
                    }
                }
            }
            else if( events[i].events & EPOLLIN )
            {
                int child = 0;
                ret = recv( sockfd, ( char* )&child, sizeof( child ), 0 );
                printf( "read data from child accross pipe\n" );
                if( ret == -1 )
                {
                    continue;
                }
                else if( ret == 0 )
                {
                    continue;
                }
                else
                {
                    for( int j = 0; j < user_count; ++j )
                    {
                        if( users[j].pipefd[0] != sockfd )
                        {
                            printf( "send data to child accross pipe\n" );
                            send( users[j].pipefd[0], ( char* )&child, sizeof( child ), 0 );
                        }
                    }
                }
            }
        }
    }

    del_resource();
    return 0;
}

Linux高性能服务器编程——多进程编程,布布扣,bubuko.com

时间: 2024-10-24 22:53:17

Linux高性能服务器编程——多进程编程的相关文章

Linux高性能服务器编程——I/O复用

 IO复用 I/O复用使得程序能同时监听多个文件描述符,通常网络程序在下列情况下需要使用I/O复用技术: 客户端程序要同时处理多个socket 客户端程序要同时处理用户输入和网络连接 TCP服务器要同时处理监听socket和连接socket,这是I/O复用使用最多的场合 服务器要同时处理TCP请求和UDP请求.比如本章将要讨论的会社服务器 服务器要同时监听多个端口,或者处理多种服务. I/O复用虽然能同时监听多个文件描述符,但它本身是阻塞的.并且当多个文件描述符同时就绪时,如果不采用额外措施

linux高性能服务器编程

<Linux高性能服务器编程>:当当网.亚马逊 目录: 第一章:tcp/ip协议族 第二章:ip协议族 第三章:tcp协议详解 第四章:tcp/ip通信案例:访问Internet 第五章:linux网络编程基础API 第六章:高级IO函数 第七章:linux服务器程序规范 第八章:高性能服务器框架 第九章:IO复用 第十章:信号 第十一章:定时器 第十二章:高性能IO框架库libevent 第十三章:多进程编程 第十四章:多线程编程 第十五章:进程池和线程池 第十六章:服务器调制.调试和测试

Linux高性能服务器编程——定时器

 定时器 服务器程序通常管理着众多定时事件,因此有效组织这些定时事件,使之能在预期的时间点被触发且不影响服务器的主要逻辑,对于服务器的性能有着至关重要的影响.位置我们要将每个定时事件封装成定时器,并使用某种容器类型的数据结构,比如链表.排序链表和时间轮将所有定时器串联起来,以实现对定时事件的统一管理. Linux提供三种定时方法: 1.socket选项SO_RECVTIMEO和SO_SNDTIMEO. 2.SIGALRM信号 3.I/O复用系统调用的超时参数 socket选项SO_RCVTI

Linux 高性能服务器编程——高级I/O函数

重定向dup和dup2函数 [cpp] view plaincopyprint? #include <unistd.h> int dup(int file_descriptor); int dup2(int file_descriptor_one, int file_descriptor_two); dup创建一个新的文件描述符, 此描述符和原有的file_descriptor指向相同的文件.管道或者网络连接. dup返回的文件描述符总是取系统当前可用的最小整数值. dup2函数通过使用参数f

Linux高性能服务器编程——信号及应用

 信号 信号是由用户.系统或者进程发送给目标进程的信息,以通知目标进程某个状态的改变或系统异常.Linux信号可由如下条件产生: 对于前台进程,用户可以通过输入特殊的终端字符来给它发送信号.比如输入Ctrl+C通常会给进程发送一个终端信号. 2.系统异常 系统状态变化 运行kill命令或调用kill函数 Linux信号概述 发送信号 Linux下,一个进程给其他进程发送信号的API是kill函数.其定义如下: #include <sys/types.h> #include <sign

Linux高性能服务器编程——系统检测工具

系统检测工具 tcpdump tcpdump是一款经典的转包工具,tcpdump给使用者提供了大量的选项,泳衣过滤数据报或者定制输出格式. lsof lsof是一个列出当前系统打开的文件描述符的工具.通过它我们可以了解感兴趣的进程打开了哪些文件描述符,或者我们感兴趣的文件描述符被哪些进程打卡了. nc nc命令主要被用来快速构建网络连接.我们可以让它以服务器方式运行,监听某个端口并接收客户连接,因此它可用来调试客户端程序.我们也可以使之以客户端方式运行,向服务器发起连接并收发数据,因此它可以用来

Linux高性能服务器编程——进程池和线程池

进程池和线程池 池的概念 由于服务器的硬件资源"充裕",那么提高服务器性能的一个很直接的方法就是以空间换时间,即"浪费"服务器的硬件资源,以换取其运行效率.这就是池的概念.池是一组资源的集合,这组资源在服务器启动之初就完全被创建并初始化,这称为静态资源分配.当服务器进入正是运行阶段,即开始处理客户请求的时候,如果它需要相关的资源,就可以直接从池中获取,无需动态分配.很显然,直接从池中取得所需资源比动态分配资源的速度要快得多,因为分配系统资源的系统调用都是很耗时的.当

Linux高性能服务器编程——多线程编程(上)

多线程编程 Linux线程概述 线程模型 线程是程序中完成一个独立任务的完整执行序列,即一个可调度的实体.根据运行环境和调度者的身份,线程可分为内核线程和用户线程.内核线程,在有的系统上也称为LWP(Light Weigth Process,轻量级进程),运行在内核空间,由内核来调度:用户线程运行在用户空间,由线程库来调度.当进程的一个内核线程获得CPU的使用权时,它就加载并运行一个用户线程.可见,内核线程相当于用于线程运行的容器.一个进程可以拥有M个内核线程和N个用户线程,其中M≤N.并且在一

Linux高性能服务器编程——多线程编程(下)

多线程编程 条件变量 如果说互斥锁是用于同步线程对共享数据的访问的话,那么条件变量则是用于线程之间同步共享数据的值.条件变量提供了一种线程间的通信机制:当某个共享数据达到某个值得时候,唤醒等待这个共享数据的线程. 条件本身是由互斥量保护的.线程在改变条件状态前必须首先锁住互斥量,其他现成在获得互斥量之前不会察觉到这种变化,因为必须锁住互斥量以后才能计算条件. 条件变量的相关函数主要有如下5个: #include <pthread.h> int pthread_cond_destroy(pthr