应用层协议实现系列(一)——HTTPserver之仿nginx多进程和多路IO的实现

近期在尝试自己写一个Httpserver,在粗略研究了nginx的代码之后,决定仿照nginx中的部分设计自己实现一个高并发的HTTPserver,在这里分享给大家。

眼下使用的较多的Httpserver就是apache和nginx,apache的主要特点就是稳定,而nginx的主要特点是承载的并发量高。在这里从实现原理上做一个分析:

apache採用的是多进程server模型,即server每监听到一个连接时,会创建一个新的进程去处理连接,进程与进程之间是独立的,因此就算进程在处理连接的过程中崩溃了,也不会影响其它进程的执行。但因为server能创建的进程数目与内存有关,因此server的最大并发数会受到机器内存的影响,同一时候假设有人发起大量恶意长链接攻击,就会导致server超载。

nginx採用的是多路IO复用server模型,即server每监听到一个连接时,会将连接增加到连接数组中,使用epoll多路IO复用函数对每一个连接进行监听,当连接中有数据到来时,epoll会返回对应的连接,依此对各个连接进行处理就可以。epoll的最大连接数量尽管也会受到内存的影响,但因为每一个未激活的连接占用的内存非常小,所以相比于apache能够承载更大的并发。

但因为多路IO复用是在一个进程中进行的,所以假设在处理连接的过程中崩溃了,其它连接也会受到影响。为了解决问题,nginx中也引入了多进程,即nginxserver由一个master进程和多个worker进程组成。master进程作为父进程在启动的时候会创建socket套接字以及若干个worker进程,每一个worker进程会使用epoll对master创建的套接字进行监听。当有新连接到来时,若干个worker进程的epoll函数都会返回,但仅仅有一个worker进程能够accept成功。该进程accept成功之后将该连接增加到epoll的监听数组中,该连接之后的数据都将由该worker进程处理。假设当中一个worker进程在处理连接的过程中崩溃了,父进程会收到信号并重新启动该进程以保证server的稳定性。

另外,每次新连接到来都会唤醒若干个worker进程同一时候进行accept,但仅仅有一个worker能accept成功,为了避免这个问题,nginx引入了相互排斥信号量,即每一个worker进程在accept之前都须要先获取锁,假设获取不到则放弃accept。

在明白了上述原理之后,我们就能够仿照nginx实现一个httpserver了。首先是创建套接字的函数:

//创建socket
int startup(int port) {
    struct sockaddr_in servAddr;
    memset(&servAddr, 0, sizeof(servAddr));
    //协议域(ip地址和端口)
    servAddr.sin_family = AF_INET;
    //绑定默认网卡
    servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    //端口
    servAddr.sin_port = htons(port);
    int listenFd;
    //创建套接字
    if ((listenFd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        printf("create socket error: %s(errno: %d)\n",strerror(errno),errno);
        return 0;
    }
    unsigned value = 1;
    setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
    //绑定套接字
    if (bind(listenFd, (struct sockaddr *)&servAddr, sizeof(servAddr))) {
        printf("bind socket error: %s(errno: %d)\n",strerror(errno),errno);
        return 0;
    }
    //開始监听,设置最大连接请求
    if (listen(listenFd, 10) == -1) {
        printf("listen socket error: %s(errno: %d)\n",strerror(errno),errno);
        return 0;
    }
    return listenFd;
}

该函数创建了一个套接字并将其绑定到了一个port上開始监听。因为我们接下来要创建若干个worker进程,能够通过fork函数实现:

//管理子进程的数组,数组多大就有几个子进程
static int processArr[PROCESS_NUM];
//创建若干个子进程,返回当前进程是否父进程
bool createSubProcess() {
    for (int i=0; i<GET_ARRAY_LEN(processArr); i++) {
        int pid = fork();
        //假设是子进程,返回0
        if (pid == 0) {
            return false;
        }
        //假设是父进程,继续fork
        else if (pid >0){
            processArr[i] = pid;
            continue;
        }
        //假设出错
        else {
            fprintf(stderr,"can‘t fork ,error %d\n",errno);
            return true;
        }
    }
    return true;
}

在以上代码中,创建的进程数目由数组大小决定,建议将该进程数目设置为CPU的核数,以充分利用多核CPU。为了避免在父进程退出后,子进程仍然存在产生僵尸进程,我们还须要实现一个信号处理函数:

//信号处理
void handleTerminate(int signal) {
    for (int i=0; i<GET_ARRAY_LEN(processArr); i++) {
        kill(processArr[i], SIGTERM);
    }
    exit(0);
}

该函数实现了当父进程收到退出信号时,向每一个子进程也发送退出信号。以下来看看main函数的实现,因为本人是在mac os下进行开发,mac下不支持epoll函数,于是改为类似的select函数:

int main(int argc, const char * argv[])
{
    int listenFd;

    initMutex();
    //设置port号
    listenFd = startup(8080);

    //创建若干个子进程
    bool isParent = createSubProcess();
    //假设是父进程
    if (isParent) {
        while (1) {
            //注冊信号处理
            signal(SIGTERM, handleTerminate);
            //挂起等待信号
            pause();
        }
    }
    //假设是子进程
    else {
        //套接字集合
        fd_set rset;
        //最大套接字
        int maxFd = listenFd;
        std::set<int> fdArray;
        //循环处理事件
        while (1) {
            FD_ZERO(&rset);
            FD_SET(listenFd, &rset);
            //又一次设置每一个须要监听的套接字
            for (std::set<int>::iterator iterator=fdArray.begin();iterator!=fdArray.end();iterator++) {
                FD_SET(*iterator, &rset);
            }
            //開始监听
            if (select(maxFd+1, &rset, NULL, NULL, NULL)<0) {
                fprintf(stderr, "select error: %s(errno: %d)\n",strerror(errno),errno);
                continue;
            }

            //遍历每一个连接套接字
            for (std::set<int>::iterator iterator=fdArray.begin();iterator!=fdArray.end();) {
                int currentFd = *iterator;
                if (FD_ISSET(currentFd, &rset)) {
                    if (!handleRequest(currentFd)) {
                        close(currentFd);
                        fdArray.erase(iterator++);
                        continue;
                    }
                }
                ++iterator;
            }
            //检查连接监听套接字
            if (FD_ISSET(listenFd, &rset)) {
                if (pthread_mutex_trylock(mutex)==0) {
                    int newFd = accept(listenFd, (struct sockaddr *)NULL, NULL);
                    if (newFd<=0) {
                        fprintf(stderr, "accept socket error: %s(errno: %d)\n",strerror(errno),errno);
                        continue;
                    }
                    //更新最大的套接字
                    if (newFd>maxFd) {
                        maxFd = newFd;
                    }
                    fdArray.insert(newFd);
                    pthread_mutex_unlock(mutex);
                }
            }
        }
    }

    close(listenFd);
    return 0;
}

在以上代码中,还涉及了进程间相互排斥信号量的定义,代码例如以下:

//相互排斥量
pthread_mutex_t *mutex;
//创建共享的mutex
void initMutex()
{
    //设置相互排斥量为进程间共享
    mutex=(pthread_mutex_t*)mmap(NULL, sizeof(pthread_mutex_t), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANON, -1, 0);
    if( MAP_FAILED==mutex) {
        perror("mutex mmap failed");
        exit(1);
    }
    //设置attr的属性
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    int ret = pthread_mutexattr_setpshared(&attr,PTHREAD_PROCESS_SHARED);
    if(ret != 0) {
        fprintf(stderr, "mutex set shared failed");
        exit(1);
    }
    pthread_mutex_init(mutex, &attr);
}

对每一个连接的处理例如以下:

//处理http请求
bool handleRequest(int connFd) {
    if (connFd<=0) return false;
    //读取缓存
    char buff[4096];
    //读取http header
    int len = (int)recv(connFd, buff, sizeof(buff), 0);
    if (len<=0) {
        return false;
    }
    buff[len] = ‘\0‘;
    std::cout<<buff<<std::endl;

    return true;
}

这样就实现了一个仿nginx的高并发server。完整的代码例如以下:

#include <iostream>
#include <set>
#include <signal.h>
#include <sys/select.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>

#include <sys/mman.h>
#include <pthread.h>

#define GET_ARRAY_LEN(array) (sizeof(array) / sizeof(array[0]))
#define PROCESS_NUM 4

//创建socket
int startup(int port) {
    struct sockaddr_in servAddr;
    memset(&servAddr, 0, sizeof(servAddr));
    //协议域(ip地址和端口)
    servAddr.sin_family = AF_INET;
    //绑定默认网卡
    servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    //端口
    servAddr.sin_port = htons(port);
    int listenFd;
    //创建套接字
    if ((listenFd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        printf("create socket error: %s(errno: %d)\n",strerror(errno),errno);
        return 0;
    }
    unsigned value = 1;
    setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
    //绑定套接字
    if (bind(listenFd, (struct sockaddr *)&servAddr, sizeof(servAddr))) {
        printf("bind socket error: %s(errno: %d)\n",strerror(errno),errno);
        return 0;
    }
    //開始监听,设置最大连接请求
    if (listen(listenFd, 10) == -1) {
        printf("listen socket error: %s(errno: %d)\n",strerror(errno),errno);
        return 0;
    }
    return listenFd;
}

//管理子进程的数组,数组多大就有几个子进程
static int processArr[PROCESS_NUM];
//创建若干个子进程,返回当前进程是否父进程
bool createSubProcess() {
    for (int i=0; i<GET_ARRAY_LEN(processArr); i++) {
        int pid = fork();
        //假设是子进程,返回0
        if (pid == 0) {
            return false;
        }
        //假设是父进程,继续fork
        else if (pid >0){
            processArr[i] = pid;
            continue;
        }
        //假设出错
        else {
            fprintf(stderr,"can‘t fork ,error %d\n",errno);
            return true;
        }
    }
    return true;
}

//信号处理
void handleTerminate(int signal) {
    for (int i=0; i<GET_ARRAY_LEN(processArr); i++) {
        kill(processArr[i], SIGTERM);
    }
    exit(0);
}

//处理http请求
bool handleRequest(int connFd) {
    if (connFd<=0) return false;
    //读取缓存
    char buff[4096];
    //读取http header
    int len = (int)recv(connFd, buff, sizeof(buff), 0);
    if (len<=0) {
        return false;
    }
    buff[len] = ‘\0‘;
    std::cout<<buff<<std::endl;

    return true;
}

//相互排斥量
pthread_mutex_t *mutex;
//创建共享的mutex
void initMutex()
{
    //设置相互排斥量为进程间共享
    mutex=(pthread_mutex_t*)mmap(NULL, sizeof(pthread_mutex_t), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANON, -1, 0);
    if( MAP_FAILED==mutex) {
        perror("mutex mmap failed");
        exit(1);
    }
    //设置attr的属性
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    int ret = pthread_mutexattr_setpshared(&attr,PTHREAD_PROCESS_SHARED);
    if(ret != 0) {
        fprintf(stderr, "mutex set shared failed");
        exit(1);
    }
    pthread_mutex_init(mutex, &attr);
}

int main(int argc, const char * argv[])
{
    int listenFd;

    initMutex();
    //设置端口号
    listenFd = startup(8080);

    //创建若干个子进程
    bool isParent = createSubProcess();
    //假设是父进程
    if (isParent) {
        while (1) {
            //注冊信号处理
            signal(SIGTERM, handleTerminate);
            //挂起等待信号
            pause();
        }
    }
    //假设是子进程
    else {
        //套接字集合
        fd_set rset;
        //最大套接字
        int maxFd = listenFd;
        std::set<int> fdArray;
        //循环处理事件
        while (1) {
            FD_ZERO(&rset);
            FD_SET(listenFd, &rset);
            //又一次设置每一个须要监听的套接字
            for (std::set<int>::iterator iterator=fdArray.begin();iterator!=fdArray.end();iterator++) {
                FD_SET(*iterator, &rset);
            }
            //開始监听
            if (select(maxFd+1, &rset, NULL, NULL, NULL)<0) {
                fprintf(stderr, "select error: %s(errno: %d)\n",strerror(errno),errno);
                continue;
            }

            //遍历每一个连接套接字
            for (std::set<int>::iterator iterator=fdArray.begin();iterator!=fdArray.end();) {
                int currentFd = *iterator;
                if (FD_ISSET(currentFd, &rset)) {
                    if (!handleRequest(currentFd)) {
                        close(currentFd);
                        fdArray.erase(iterator++);
                        continue;
                    }
                }
                ++iterator;
            }
            //检查连接监听套接字
            if (FD_ISSET(listenFd, &rset)) {
                if (pthread_mutex_trylock(mutex)==0) {
                    int newFd = accept(listenFd, (struct sockaddr *)NULL, NULL);
                    if (newFd<=0) {
                        fprintf(stderr, "accept socket error: %s(errno: %d)\n",strerror(errno),errno);
                        continue;
                    }
                    //更新最大的套接字
                    if (newFd>maxFd) {
                        maxFd = newFd;
                    }
                    fdArray.insert(newFd);
                    pthread_mutex_unlock(mutex);
                }
            }
        }
    }

    close(listenFd);
    return 0;
}

下一篇文章《仿nginx Httpserver的设计与实现(二)——http协议解析》中,将向大家说明怎样对http协议进行解析。

假设大家认为对自己有帮助的话,还希望能帮顶一下,谢谢:)

个人博客:http://blog.csdn.net/zhaoxy2850

本文地址:http://blog.csdn.net/zhaoxy_thu/article/details/24624729

转载请注明出处,谢谢!

应用层协议实现系列(一)——HTTPserver之仿nginx多进程和多路IO的实现,布布扣,bubuko.com

时间: 2025-01-01 22:53:58

应用层协议实现系列(一)——HTTPserver之仿nginx多进程和多路IO的实现的相关文章

仿nginx Http服务器的设计与实现(二)——http协议解析

上一篇文章<仿nginx Http服务器的设计与实现(一)--多进程和多路IO的实现>中实现了一个仿照nginx的支持高并发的服务器,但只是实现了端口监听和数据接收,并没有实现对http协议的解析,下面就对如何解析http协议进行说明. 我们可以通过浏览器访问之前所搭建的http服务器,可以看到终端输出如下: GET / HTTP/1.1 Host: 127.0.0.1:8080 Connection: keep-alive Cache-Control: max-age=0 Accept: t

http协议学习系列

深入理解HTTP协议(转)  http://www.blogjava.net/zjusuyong/articles/304788.html http协议学习系列   1. 基础概念篇 1.1 介绍 HTTP是Hyper Text Transfer Protocol(超文本传输协议)的缩写.它的发展是万维网协会(World Wide Web Consortium)和Internet工作小组IETF(Internet Engineering Task Force)合作的结果,(他们)最终发布了一系列

涨知识-VI 基于TCP/UDP的应用层协议

基于TCP/UDP的应用层协议: 基于TCP: Telnet(Teletype over the Network, 网络电传),通过一个终端(terminal)登陆到网络 FTP(File Transfer Protocol 文件传输协议) SMTP(Simple Mail Transfer Protocol 简单邮件传输协议),用来发送电子邮件 POP3(Post Office Protocol 3)邮件读取协议,协议通常被用来接受电子邮件 HTTP HTTPS 基于UDP: NFS(net

应用层协议及ip地址划分

1.应用层协议 2.ip地址 3.子网划分及超网合并

编译内核让netfilter支持过滤layer7应用层协议

netfilter/layer7 默认情况下,netfilter只能过滤二.三.四层的数据,但是对于应用层的数据(比如qq,迅雷视频等等)是无法过滤掉的.我们又知道netfilter是工作在内核当中的,因此要让netfilter支持过滤layer7应用层协议的数据必须要重新编译内核.由于iptables是规则编辑工具,因此也要重新编译iptables让其能够支持对layer7应用层协议的编写. 整体步骤如下: 一.需要使用的软件 内核源码:linux-2.6.28.10.tar.gz iptab

TCP-IP之应用层协议

应用层协议是多种多样的,比如 DNS.FTP.Telnet.SMTP.HTTP.RIP.NFS 一.DNS DNS (Domain Name Service 域名服务) 协议基于 UDP,使用端口号 53. 由数字组成的 IP 地址很难记忆,所以我们上网使用网站 IP 地址的别名--域名.实际使用中,域名与 IP 地址是对应的,这种对应关系保存在DNS 服务器之中. 在浏览器中输入一个域名后,会有 DNS 服务器将域名解析为对应的 IP 地址.注意这和网络层的 ARP 协议的不同之处:DNS 提

UDS(ISO14229-2006) 汉译(No.7 应用层协议)【未完,待续】

7.1定义 应用层协议通常作为确认消息的传输,意味着从客户端发送的每一个请求都将有由服务器端产生的与之相对的响应. 唯一的例外在于:例如使用了功能寻址方式,或者该请求/指示没有指定生成响应/确定的少数情况下.为了减轻不必要的消息对系统造成的压力,在个别场合即使服务器处理请求诊断服务失败了也不发送否定响应. 应用层协议是与会话层协议并行执行,这样,即使客户端等待上一个请求的响应时,也将会保持正确的会话层校时功能(例如:发送一个TesterPresent以使其他服务器的诊断会话持续下去:具体实施细则

常用应用层协议HTTP、RTSP、RTMP比较

HTTP(超文本传输协议).RTSP(Real Time Streaming Protocol实时流传输协议).RTMP(Routing Table Maintenance Protocol路由选择表维护协议)是应用层协议,理论上都可以做直播.点播,实际上直播多采用RTMP和RTSP.点播则多用RTSP和HTTP. 一.常用领域: HTTP(HTTPS)所有数据都作为文本处理,广泛应用于网络访问,是公有协议,有专门机构维护. RTSP流媒体协议,多用在监控领域视频直播点播:是公有协议,有专门机构

服务端协议测试系列教程

测试技术分享之服务端协议测试系列教程 童鞋看完后有啥想法,可以发给我改进 在线播放地址:http://www.iqiyi.com/u/2013029540/a 下载地址:链接: http://pan.baidu.com/s/1boDHpbp 密码: p76e