进程池模型

进程池模型需要通过system V IPC机制或管道、信号、文件锁等进行同步。以下是进程池的一般模型。

Linux惊群现象:

惊群:惊群是指多个进程/线程在等待同一资源时,每当资源可用,所有的进程/线程都来竞争资源的现象。

accept、select、epoll实现进程池模型时的惊群现象:

1).Linux多进程accept系统调用的惊群问题(注意,这里没有使用select、epoll等事件机制),在linux 2.6版本之前的版本存在,在之后的版本中解决掉了。

2).使用select epoll等事件机制,在linux早期的版本中,惊群问题依然存在(epoll_create在fork之前)。 原因与之前单纯使用accept导致惊群,原因类似。Epoll的惊群问题,同样在之后的某个版本部分解决了。

3).Epoll_create在fork之后调用,不能避免惊群问题,Nginx使用互斥锁,解决epoll惊群问题。

进程池中的进程调用accept如果阻塞在同一个listen队列中,有可能产生惊群现象(取决于Linux版本):当一connect到达时,所有accept都会唤醒,但只有一个accept会返回正确结果,其他的都会返回错误码。

accept多进程实现:让一个进程bind一个网络地址(可能是AF_INET,AF_UNIX或者其他任何你想要的),然后fork这个进程自己:

int s = socket(...)

bind(s, ...)

listen(s, ...)

fork()

Fork自己几次之后,每个进程阻塞在accept()函数这里

for(;;) {

int client = accept(...);  //子进程阻塞在这了

if (client < 0) continue;

...

}

在较老的unix系统中,当有连接到来时,accept()在每个阻塞在这的进程里被唤醒。但是,只有这些进程中的一个能够真正的accept这个连接,其他的进程accept将返回EAGAIN惊群造成结果是系统对用户进程/线程频繁的做无效的调度、上下文切换,系统系能大打折扣。

实现代码:

服务器端代码:

#include <stdio.h>

#include <stdlib.h>

#include <string.h>

#include <errno.h>

#include <sys/socket.h>

#include <arpa/inet.h>

#include <netinet/in.h>

#include <sys/types.h>

#include <unistd.h>

#include <time.h>

#define BUFLEN 1024

#define PIDNUM 3

/*******************并发服务器模型之一:预先分配好了进程的个数**********************/

static void handle_fork(int sockfd){

int newfd;

struct sockaddr_in c_addr;

char buf[BUFLEN];

socklen_t len;

time_t now;

while(1){

len = sizeof(struct sockaddr);

if((newfd = accept(sockfd,(struct sockaddr*) &c_addr, &len)) == -1){

perror("accept");

exit(errno);

}else

printf("\n*****************通信开始***************\n");

printf("正在与您通信的客户端是:%s: %d\n",inet_ntoa(c_addr.sin_addr),ntohs(c_addr.sin_port));

/******处理客户端请求*******/

bzero(buf,BUFLEN);

len = recv(newfd,buf,BUFLEN,0);

if(len >0 && !strncmp(buf,"TIME",4)){

bzero(buf,BUFLEN);

/*获取系统当前时间*/

now = time(NULL);

/*ctime将系统时间转换为字符串,sprintf使转化后的字符串保存在buf*/

sprintf(buf,"%24s\r\n",ctime(&now));

//******发送系统时间*******/

send(newfd,buf,strlen(buf),0);

}

/*关闭通讯的套接字*/

close(newfd);

}

}

int main(int argc, char **argv)

{

int sockfd;

struct sockaddr_in s_addr;

unsigned int port, listnum;

pid_t pid[PIDNUM];

/*建立socket*/

if((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1){

perror("socket");

exit(errno);

}else

printf("socket create success!\n");

/*设置服务器端口*/

if(argv[2])

port = atoi(argv[2]);

else

port = 4567;

/*设置侦听队列长度*/

if(argv[3])

listnum = atoi(argv[3]);

else

listnum = 3;

/*设置服务器ip*/

bzero(&s_addr, sizeof(s_addr));

s_addr.sin_family = AF_INET;

s_addr.sin_port = htons(port);

if(argv[1])

s_addr.sin_addr.s_addr = inet_addr(argv[1]);

else

s_addr.sin_addr.s_addr = INADDR_ANY;

/*把地址和端口帮定到套接字上*/

if((bind(sockfd, (struct sockaddr*) &s_addr,sizeof(struct sockaddr))) == -1){

perror("bind");

exit(errno);

}else

printf("bind success!\n");

/*侦听本地端口*/

if(listen(sockfd,listnum) == -1){

perror("listen");

exit(errno);

}else

printf("the server is listening!\n");

/*处理客户端的连接*/

int i = 0;

for(i = 0; i < PIDNUM; i++){

pid[i] = fork();

if(pid[i] == 0)

handle_fork(sockfd);

}

/*关闭服务器的套接字*/

close(sockfd);

return 0;

}

客户端代码:

#include <stdio.h>

#include <stdlib.h>

#include <string.h>

#include <errno.h>

#include <sys/socket.h>

#include <arpa/inet.h>

#include <netinet/in.h>

#include <sys/types.h>

#include <unistd.h>

#include <time.h>

#define BUFLEN 1024

int main(int argc, char **argv)

{

int sockfd;

struct sockaddr_in s_addr;

socklen_t len;

unsigned int port;

char buf[BUFLEN];

/*建立socket*/

if((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1){

perror("socket");

exit(errno);

}else

printf("socket create success!\n");

/*设置服务器端口*/

if(argv[2])

port = atoi(argv[2]);

else

port = 4567;

/*设置服务器ip*/

bzero(&s_addr, sizeof(s_addr));

s_addr.sin_family = AF_INET;

s_addr.sin_port = htons(port);

if (inet_aton(argv[1], (struct in_addr *)&s_addr.sin_addr.s_addr) == 0) {

perror(argv[1]);

exit(errno);

}

/*开始连接服务器*/

if(connect(sockfd,(struct sockaddr*)&s_addr,sizeof(struct sockaddr)) == -1){

perror("connect");

exit(errno);

}else

printf("conncet success!\n");

/******缓冲区清零*******/

bzero(buf,BUFLEN);

strcpy(buf,"TIME");

/******发送消息*******/

send(sockfd,buf,strlen(buf),0);

/******缓冲区清零*******/

bzero(buf,BUFLEN);

/******接收消息*******/

len = recv(sockfd,buf,BUFLEN,0);

if(len > 0)

printf("服务器的系统时间是:%s\n",buf);

close(sockfd); /*关闭连接*/

return 0;

}

时间: 2024-10-02 03:31:42

进程池模型的相关文章

linux进程池模型

static int nchildren;static pid_t* pids;int main(int argc,char**argv){ int listenfd,i; socklen_t addrlen; void sig_int(int); pid_t child_make(int,int,int); if(argc==3) listenfd=Tcp_listen(NULL,argv[1]),argv[2],&addrlen); else err_quit("usage:serv

joinablequeue模块 生产者消费者模型 Manager模块 进程池 管道

一.生产者消费者 主要是为解耦(借助队列来实现生产者消费者模型) import queue  # 不能进行多进程之间的数据传输 (1)from multiprocessing import Queue    借助Queue解决生产者消费者模型,队列是安全的. q = Queue(num) num :为队列的最大长度 q.get() # 阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待 q.put() # 阻塞,如果可以继续往队列中放数据,就直接放,不能放就阻塞等待 q.get_now

生产者消费者模型及队列,进程池

生产者消费者模型 生产者消费者模型 主要是为了解耦 可以借助队列来实现生产者消费者模型 栈 : 先进后出(First In Last Out 简称 FILO) 队列 : 先进先出(First In First Out 简称 FIFO) import queue #不能进行多进程之间的数据传输(1) from multiprocessing import Queue #借助Queue解决生产者消费者模型,队列是安全的q=Queue(num)num : 队列的最大长度q.get() #阻塞等待获取数

进程池 协程 与I/O模型

一.进程池与线程池 进程池:限制进程创建的数量,使用时直接从进程池,获取空闲进程去执行任务,减少创建和销毁进程带来的时间消耗:如果进程的数量没有达到最大数量,且没有空闲进程去执行  任务,就会再创建一个新的进程,放入进程池去执行任务:如果进程池中的进程数量达到最大数量,没有空闲进程去,这是任务需要等待某个进程成为空闲进程之后, 再去执行任务. from concurrent.futures import ProcessPoolExecutor import time import os # 池子

5,线程池,进程池,协程,IO模型

今日内容: 1,线程池 2,进程池 3,协程 4,IO 模型 服务端要满足这三个条件: 1,24小时不间断的提供服务 2,能够支持高并发 3,要有固定的IP地址和端口在服务端这个地方会出现阻塞态情况: 阻塞IO 操作有: 1,链接循环 2,通信循环单线程实现高并发思路: 为了更好的提高程序的运行效率,即实现高并发,让服务端同时能够接受多个客户端的消息 所以一般在服务端会把,连接循环和通信循环封装为两个不同的函数方法, 这样当一个客户端与服务端进行通信时,服务端的连接循环可以和其他客户端进行连接,

进程池与线程池、协程、协程实现TCP服务端并发、IO模型

进程池与线程池.协程.协程实现TCP服务端并发.IO模型 一.进程池与线程池 1.线程池 ''' 开进程开线程都需要消耗资源,只不过两者比较的情况下线程消耗的资源比较少 在计算机能够承受范围内最大限度的利用计算机 什么是池? 在保证计算机硬件安全的情况下最大限度的利用计算机 池其实是降低了程序的运行效率,但是保证了计算机硬件的安全 (硬件的发展跟不上软件的速度) ''' from concurrent.futures import ThreadPoolExecutor import time p

生产者消费者模型,管道,进程之间共享内存,进程池

课程回顾: 并行:在同一时间点上多个任务同时执行 并发:在同一时间段上多个任务同时执行 进程的三大基本状态: 就绪状态:所有进程需要的资源都获取到了,除了CPU 执行状态:获取到了所有资源包括CPU,进程处于运行状态 阻塞状态:程序停滞不在运行,放弃CPU,进程此时处于内存里 什么叫进程? 正在运行的程序 有代码段,数据段,PCB(进程控制块) 进程是资源分配的基本单位. 进程之间能不能直接通信? 正常情况下,多进程之间是无法进行通信的.因为每个进程都有自己独立的空间 锁: 为了多进程通信时,保

python基础-UDP、进程、进程池、paramike模块

1 基于UDP套接字1.1 介绍 udp是无连接的,是数据报协议,先启动哪段都不会报错 udp服务端 import socket sk = socket() #创建一个服务器的套接字 sk.bind() #绑定服务器套接字 while True: #服务器无限循环 cs = sk.recvfrom()/sk.sendto() # 对话(接收与发送) sk.close() # 关闭服务器套接字 udp客户端 import socket client = socket() # 创建客户套接字 whi

Python--线程队列(queue)、multiprocessing模块(进程对列Queue、管道(pipe)、进程池)、协程

队列(queue) 队列只在多线程里有意义,是一种线程安全的数据结构. get与put方法 ''' 创建一个"队列"对象 import queue q = queue.Queue(maxsize = 10) queue.Queue类即是一个队列的同步实现.队列长度可为无限或者有限.可通过Queue的构造函数的可选参数maxsize来设定队列长度.如果maxsize小于1就表示队列长度无限. 将一个值放入队列中: q.put() 调用队列对象的put()方法在队尾插入一个项目.put()