Linux:进程池实现

进程池在服务器应用中有很多很多=。=

下面是半同步半异步进程池的实现代码:

#ifndef _PROCESSPOOL_H
#define _PROCESSPOOL_H

#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<assert.h>
#include<stdio.h>
#include<unistd.h>
#include<errno,h>
#include<string.h>
#include<fcntl.h>
#include<stdlib.h>
#include<signal.h>
#include<sys/wait.h>
#include<sys/stat.h>

class process
{
	public:
		process():m_pid(-1){}
	public:
		pid_t m_pid;
		int m_pipefd[2];
};

template<typename T>
classe processpool
{
	private:
		//构造函数私有,类似于单例
		proceepool(int listenfd,int process_number = 8);
	public:
		static processpool<T>* create(int listenfd,int process_number = 8)
		{
			if(!m_instance)
			{
				m_instance = new processpool<T>(listenfd,process_number);
			}
			return m_instance;
		}
		~processpool()
		{
			delete [] m_sub_process;
		}
	private:
		void setup_sig_pipe();
		void run_parent();
		void run_child();

	private:
		//允许最大的子进村数量
		static const int MAX_PROCESS_NUMBER = 16;
		//子紧凑最多能处理的客户数量
		static const int USER_PER_PROCESS = 65536;
		//epoll最多处理的事件数
		static const int MAX_EVENT_NUMEBR = 1000;
		//进程池进程总数
		int m_process_number;
		//子进程在池中的序号,0开始
		int m_idx;
		//每个紧凑都有一个epool内核事件表,用m_epollfd标识
		int m_epollfd;
		//监听socket
		int m_listenfd;
		//子进程通过m_stop来决定是否停止运行
		int m_stop;
		//保存所有子进程的描述信息
		process* m_sub_process;
		//进程池实例
		static process<T>* m_instance;
};

template <class T>
processpool<T>* processpool<T>::m_instance = NULL;

static int sig_pipefd[2];

static int setnonblocking(int fd)
{
	int old_option = fcntl(fd,F_GETFL);
	int new_option = old_option | O_NONBLOCK;
	fcntl(fd,F_SETFL,new_option);
	return old_option;
}

statit void addfd(int epollfd,int fd)
{
	epoll_fd event;
	event.data.fd = fd;
	event.events = EPOLLIN|EPOLLET;
	epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event);
	setnonblocking(fd);
}

stacit void removefd(int epollfd,int fd)
{
	epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,0);
	close(fd);
}

stacit void sig_handler(int sig)
{
	int save_errno = errno;
	int msg = sig;
	send(sig_pipefd[1],(char *)&msg,1,0);
	errno = save_errno;
}

static void addsig(int sig,void(handler)(int),bool restart = true)
{
	struct sigaction sa;
	memset(&sa,‘\0‘,sizeof(sa));
	sa.sa_handler = handler;
	if(restart)
	{
		sa.sa_flags = SA_RESTAT;
	}
	sigfillset(&sa.sa_mask);
	assert(sigaction(sig,&sa,NULL)!= -1);
}

template<class T>
processpool<T>::processpool(int listenfd,int process_number)
	:m_listenfd(listenfd),m_process_numebr(process_number),m_ide(-1),m_stop(false)
{
	assert((process_number>0)&&(process_number<=MAX_PROCESS_NUMBER));
	m_sub_process = new process[process_number];
	assert(m_sub_process);
	for(int i =0;i<process_number;++i)
	{
		int ret = socketpair(PF_UINX,SOCK_STREAM,0,m_sub_process[i].m_pipefd);
		assert(ret == 0);
		m_sub_process[i].m_pid = fork();
		assert(m_sub_procee[i].m_pid >= 0);
		if(m_sub_procee[i].m_pid > 0)
		{
			close(m_sub_process[i].m_pipefd[1]);
			continue;
		}
		else
		{
			close(m_sub_process[i].m_pipefd[0]);
			m_idx = i;
			break;
		}
	}

}

template<class T>
void processpool<T>::setup_sig_pipe()
{
	m_epollfd = epoll_create(5);
	assert(m_epollfd != -1);

	int ret = socketpair(PF_UNIX,SOCK_STREAM,0,sig_pipefd);
	assert(ret!= -1);

	setnonblocking(sig_pipefd[1]);
	addfd(m_epollfd,sig_pipefd[0]);

	addsig(SIGCHLD,sig_handler);
	addsig(SIGTERM,sig_handler);
	addsig(SIGINT,sig_handler);
	addsig(SIGPIPE,SIG_IGN);
}

template<class T>
void processpool<T>::run_child()
{
	setup_sig_pipe();

	int pipefd = m_sub_process[m_idx].pipefd[1];

	addfd(m_epollfd,pipefd);

	epoll_event events[MAX_EVENT_NUMBER];
	T* users = new T[USER_PER_PROCESS];
	assert(users);
	int number = 0;
	int ret = -1;
	while(! m_stop)
	{
		number = epoll_wait(m_epollfd,events,MAX_EVENT_NUMBER,-1);
		if((number < 0) && (errno !=EINTR))
		{
			printf("epoll failure\n");
			break;
		}

		for(int i = 0;i<number;++i)
		{
			int sockfd = events[i].data.fd;
			if((sockfd == pipefd)&&(events[i].events& EPOLLIN))
			{
				int client = 0;
				ret = recv(sockfd,(char*)&clinet,sizeof(client),0);
				if(((ret <0)&&(errno!=EAGAIN))||ret == 0)
				{
					continue;
				}
				else
				{
					struct sockaddr_in client_address;
					socklen_t client_addrlength = sizeof(client_addrss);
					int connfd = accept(m_listenfd,(struct sockaddr*)&client_address,&client_addrlength);
					if(connfd < 0)
					{
						printf("errno is:%d\b",errno);
						continue;
					}
					addfd(m_epollfd,connfd);
					users[connfd].init(m_epollfd,connfd,client_address);
				}
			}
			else if((sockfd == sig_pipefd[0])&&(events[i].events&EPOLLIN))
			{
				int sig;
				char signals[1024];
				ret = recv(sig_pipefd[0],signals,sizeof(signals),0);
				if(ret<= 0)
				{
					continue;
				}
				else
				{
					for(int i = 0;i< ret;++i)
					{
						switch(signals[i])
						{
							case SIGCHLD:
								{
									pid_t pid;
									int stat;
									while(pid = waitpid(-1,&stat,WNOHANG)>0)
									{
										continue;
									}
									break;
								}
							case SIGTERM:
							case SIGINT:
								{
									m_stop = true;
									break;
								}
							default:
								{
									break;
								}
						}
					}
				}
			}
			else if(events[i].events&EPOLLIN)
			{
				users[sockfd].process();
			}
			else
			{
				continue;
			}
		}
	}
	delete [] users;
	users = NULL;
	close(pipefd);
	close(m_epollfd);
}

template<class T>
void processpool<T>::run_parent()
{
	setup_sig_pipe();

	addfd(m_epollfd,m_listenfd);

	epoll_event events[MAX_EVENT_NUMBER];

	int sub_process_counter = 0;
	int new_conn = 1;
	int number = 0;
	int ret = -1;

	while(!m_stop)
	{
		number = epoll_wait(m_epollfd,events,MAX_EVENT_NUMBER,-1);
		if((number< 0) &&(errno!=EINTER))
		{
			printf("epoll failure\n");
			break;
		}
		for(int i = 0;i<number;++i)
		{
			int sockfd = events[i].data.fd;
			if(sockfd == m_listenfd)
			{
				int i = sub_process_counter;
				do
				{
					if(m_sub_process[i].m_pid != -1)
					{
						break;
					}
					i = (i+1)%m_process_number;
				}
				while(i!= sub_process_counter);

				if(m_sub_number[i].m_pid == -1)
				{
					m_stop = true;
					break;
				}
				sub_process_counter = (i+1)%m_process_number;
				send(m_sub_process[i].m_pipefd[0],(char*)&new_conn,sizeof(new_conn),0);
				printf("send request to child %d\n",i);
			}
			else if((sockfd == sig_pipefd[0]) && (events[i].events &EPOLLIN))
			{
				int sig;
				char signals[1024];
				ret = recv(sig_pipefd[0],signals,sizeof(signals),0);
				if(ret<= 0)
				{
					continue;
				}
				else
				{
					for(int i = 0;i<ret;++i)
					{
						switch(signal[i])
						{
							case SIGCHLD:
								{
									pid_t pid;
									int stat;
									while((pid = waitpid(-1,&stat,WNOHANG))>0)
									{
										for(int i = 0;i<m_process_number;++i)
										{
											if(m_sub_process[i].m_pid== pid)
											{
												printf("child %d join\n",i);
												close(m_sub_process[i].m_pipfd[0]);
												m_sub_process[i].m_pid = -1;
											}
										}
									}
									m_stop = true;
									for(int i = 0;i<m_process_number;++i)
									{
										if(m_sub_process[i].m_pid!= -1)
										{
											m_stop = false;
										}
									}
									break;
								}
							case SIGTERM:
							case SIGINT:
								{
									printf("kill all the child now\n");
									for(int i = 0;i<m_process_number;++i)
									{
										int pid = m_sub_process[i].m_pid;
										if(pid != -1)
										{
											kill(pid,SIGTERM);
										}
									}
									break;
								}
							default:
								{
									break;
								}
						}
					}
				}
			}
			else
			{
				continue;
			}
		}
	}
	close(m_epollfd);
} 

#endif
时间: 2024-12-25 05:47:39

Linux:进程池实现的相关文章

linux进程池模型

static int nchildren;static pid_t* pids;int main(int argc,char**argv){ int listenfd,i; socklen_t addrlen; void sig_int(int); pid_t child_make(int,int,int); if(argc==3) listenfd=Tcp_listen(NULL,argv[1]),argv[2],&addrlen); else err_quit("usage:serv

Linux高性能服务器编程——进程池和线程池

进程池和线程池 池的概念 由于服务器的硬件资源"充裕",那么提高服务器性能的一个很直接的方法就是以空间换时间,即"浪费"服务器的硬件资源,以换取其运行效率.这就是池的概念.池是一组资源的集合,这组资源在服务器启动之初就完全被创建并初始化,这称为静态资源分配.当服务器进入正是运行阶段,即开始处理客户请求的时候,如果它需要相关的资源,就可以直接从池中获取,无需动态分配.很显然,直接从池中取得所需资源比动态分配资源的速度要快得多,因为分配系统资源的系统调用都是很耗时的.当

Linux客户/服务器程序设计范式2&mdash;&mdash;并发服务器(进程池)

引言 让服务器在启动阶段调用fork创建一个子进程池,通过子进程来处理客户端请求.子进程与父进程之间使用socketpair进行通信(为了方便使用sendmsg与recvmsg,如果使用匿名管道,则无法使用以上两个函数).以下针对TCP进行分析. server端使用select轮询用于监听客户端请求的被动套接字fd_listen以及用于父子之间通信的socketpair.每当客户端有请求时,server端会将由accept返回的用于与客户端通信的socket描述符通过socketpair发送给一

Linux网络编程——进程池实现过程详解(1)

目录 进程池 父进程的实现流程 子进程的实现流程 进程池 父进程的实现流程 1.定义数据结构pChild,申请子进程数目的结构体空间 2.通过循环,socketpair创建全双工管道,创建子进程,将子进程pid,管道对端,是否忙碌等信息存储 3.socket,bind,listen,对应的端口处于监听状态 netstat 4.epoll_create创建epfd,监控socketFd和所有子进程的管道对端 5.while(1)循环 epoll_wait等待客户端的请求及子进程是否有通知 如果so

Linux客户/服务器程序设计范式2——并发服务器(进程池)

引言 让服务器在启动阶段调用fork创建一个子进程池,通过子进程来处理客户端请求.子进程与父进程之间使用socketpair进行通信(为了方便使用sendmsg与recvmsg,如果使用匿名管道,则无法使用以上两个函数).以下针对TCP进行分析. server端使用select轮询用于监听客户端请求的被动套接字fd_listen以及用于父子之间通信的socketpair.每当客户端有请求时,server端会将由accept返回的用于与客户端通信的socket描述符通过socketpair发送给一

Python进程锁和进程池

进程锁 进程与进程之间是独立的,为何需要锁? 对于进程,屏幕的输出只有一个,此时就涉及到资源的竞争.在Linux的Python2.x中可能出现问题. 这仅仅是一种情况,多个进程之间虽然是独立的,但仅限于内存和运算,如果涉及到其它一些资源, 就可能存在竞争问题,在实际使用过程中要注意思考和防范错误. from multiprocessing import Process, Lock def func(lock, i):     lock.acquire()     print("hello,&qu

Python开发基础--- 进程间通信、进程池、协程

进程间通信 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的. 进程队列queue 不同于线程queue,进程queue的生成是用multiprocessing模块生成的. 在生成子进程的时候,会将代码拷贝到子进程中执行一遍,及子进程拥有和主进程内容一样的不同的名称空间. 示例1: 1 import multiprocessing 2 def foo(): 3 q.put([11,'hello',True]

Python--线程队列(queue)、multiprocessing模块(进程对列Queue、管道(pipe)、进程池)、协程

队列(queue) 队列只在多线程里有意义,是一种线程安全的数据结构. get与put方法 ''' 创建一个"队列"对象 import queue q = queue.Queue(maxsize = 10) queue.Queue类即是一个队列的同步实现.队列长度可为无限或者有限.可通过Queue的构造函数的可选参数maxsize来设定队列长度.如果maxsize小于1就表示队列长度无限. 将一个值放入队列中: q.put() 调用队列对象的put()方法在队尾插入一个项目.put()

开启线程池和进程池

线程与进程的应用场合很多,主要处理并发与多任务.然而,当开启的线程与进程过多时,系统的开销过多会造成性能低下甚至崩溃.这时,希望出现一种方法能规定只能执行指定数量线程与进程的策略.特别是针对不知道要开启多少线程或进程,而有可能出现线程或进程过多的情况.于是,线程池与进程池出现了.python3以后增加了concurrent.futures模块,为异步执行提供了高级的接口. 线程池 concurrent.futures.ThreadPoolExecutor(max_workers=None, th