第15章 高并发服务器编程(2)_I/O多路复用

3. I/O多路复用:select函数

3.1 I/O多路复用简介

(1)通信领域的时分多路复用

(2)I/O多路复用(I/O multiplexing)

  ①同一线程,通过“拨开关”方式,来同时处理多个I/O流,哪个IO准备就绪就把开关拨向它。(I/O多路复用类似于通信领域中的“时分复用”

  ②通过select/poll函数可以实现IO多路复用,他们采用轮询的方式来监视I/O。而epoll是对select/poll的加强,它是基于事件驱动,epoll_ctl注册事件并注册callback回调函数,epoll_wait只返回发生的事件避免了像select/poll对事件的整个轮询操作。

  ③I/O多路复用避免阻塞在I/O上,使原本为多进程或多线程来接收多个连接的消息变为单进程或单线程保存多个socket的状态后轮询处理。

(3)I/O复用模型

  ①进程阻塞于select调用,等待数据报socket可读。当select返回socket可读条件时,可以再调用recvfrom将数据报拷贝到应用缓冲区中。

  ②使用了系统调用select,要求两次系统调用(另一次为recvfrom),好象使得比单纯使用recvfrom效率更得更差。但实际上使用select给我们带来的好处却是让我们可以同时等待多个socket准备好。

3.2 select函数

(1)select函数


头文件


#include <sys/types.h>

#include <sys/time.h>

#include <unistd.h>


函数


int select(int maxfdp1, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout);


参数


(1)maxfdp1:最大fd加1(max fd plus 1),在三个描述符集中找出最高描述符编号值,然后加1,这就是第一个参数值。

(2)readfds、writefds和exceptfds:是指向描述符集的指针。这三个描述符集说明了我们关心的可读、可写或处理异常条件的各个描述符。每个描述符集存放在一个fd_set数据类型中。

(3)timeval结构体:指定愿意等待的时间。NULL:表示永远等待,直到列表中的某个套接字就绪才返回。如果timeout中的时间设置为0,表示不等待,测试所有指定的描述符并立即返回其他具体值表示等待的时间

struct timeval{
    long tv_set;   //秒
    long tv_usec;  //微秒
};


返回值


准备就绪的描述符数,若超时则为0,若出错则为-1


功能


确定一个或多个套接口的状态,如果有必要,则会等待。

(2)select的作用


备注


传给select的参数告诉内核


①我们所关心的socket

②对于每个socket,我们所关心的条件(是否可读一个socket,是否可写一个socket,是否关心一个socket的异常

③希望等待多长时间(可以永远等待或等待一个固定量时间,或完全不等待)


从select返时时内核告诉我们


①己准好的socket的数量

哪一个socket己准备好读、写或异常条件

③使用这种返回值,就可调用相应的I/O函数(一般是read/write),并且确知该函数不会阻塞因为socket返回就说明要等待的条件己经满足,就可以直接处理而不必阻塞等待了。

(3)处理文件描述符集(socket集)的四个宏



作用


FD_ZERO(fd_set* set)


清除一个文件描述符集


FD_SET(int fd, fd_set* set)


将一个文件描述符加入到fd_set中


FD_CLR(int fd, fd_set* set)


将一个fd从fd_set中清除


FD_ISSET(int fd, fd_set* set)


测试fd_set中的一个给定fd是否有变化


备注:

(1)在使用select函数之前,首先使用FD_ZERO和FD_SET来初始化fd_set,并在使用select函数时,可循环使用FD_ISSET测试fd_set。

【编程实验】echo服务器(利用I/O多路复用方式实现)

//vector_fd.h(与上一例相同)

#ifndef __VECTOR_H__
#define __VECTOR_H__

#include <pthread.h>

//用于存放sock的动态数组(线程安全!)
typedef struct{
    int     *fd;
    int     counter;    //元素个数
    int     max_counter;//最多存数个数,会动态增长
    pthread_mutex_t mutex;
}VectorFD, *PVectorFD;

//动态数组相关的操作函数
extern  VectorFD*  create_vector_fd(void);
extern  void       destroy_vector_fd(VectorFD* vfd);
extern  int        get_fd(VectorFD* vfd, int index);
extern  void       remove_fd(VectorFD* vfd, int fd);
extern  void       add_fd(VectorFD* vfd, int fd);

#endif

//vector_fd.c  //动态数组操作函数(与上一例相同)

#include "vector_fd.h"
#include <memory.h>
#include <malloc.h>
#include <assert.h>

//查找指定fd在数组中的索引值
static int indexof(VectorFD* vfd, int fd)
{
    int ret = -1;

    int i=0;
    for(; i<vfd->counter; i++){
        if(vfd->fd[i] == fd){
            ret = i;
            break;
        }
    }

    return ret;
}

//数组空间的动态增长
static void encapacity(VectorFD* vfd)
{
    if(vfd->counter >=vfd->max_counter){
        int* fds = (int*)calloc(vfd->counter + 5, sizeof(int));
        assert(fds != NULL);
        memcpy(fds, vfd->fd, sizeof(int) * vfd->counter);

        free(vfd->fd);
        vfd->fd = fds;
        vfd->max_counter += 5;
    }
}

//动态数组相关的操作
VectorFD*  create_vector_fd(void)
{
    VectorFD* vfd = (VectorFD*)calloc(1, sizeof(VectorFD));
    assert(vfd != NULL);

    //分配存放fd的数组空间
    vfd->fd = (int*)calloc(5, sizeof(int));
    assert(vfd->fd != NULL);

    vfd->counter = 0;
    vfd->max_counter = 0;

    //对互斥锁进行初始化
    pthread_mutex_init(&vfd->mutex, NULL);

    return vfd;
}

void  destroy_vector_fd(VectorFD* vfd)
{
    assert(vfd != NULL);
    //销毁互斥锁
    pthread_mutex_destroy(&vfd->mutex);

    free(vfd->fd);
    free(vfd);
}

int  get_fd(VectorFD* vfd, int index)
{
    int ret = 0;
    assert(vfd != NULL);

    pthread_mutex_lock(&vfd->mutex);

    if((0 <= index) && (index < vfd->counter)){
        ret = vfd->fd[index];
    }

    pthread_mutex_unlock(&vfd->mutex);

    return ret;
}

void  remove_fd(VectorFD* vfd, int fd)
{
    assert(vfd != NULL);

    pthread_mutex_lock(&vfd->mutex);

    int index = indexof(vfd, fd);

    if(index >= 0){
        int i = index;
        for(; i<vfd->counter-1; i++){
             vfd->fd[i] = vfd->fd[i+1];
        }

        vfd->counter--;
    }

    pthread_mutex_unlock(&vfd->mutex);
}

void  add_fd(VectorFD* vfd, int fd)
{
    assert(vfd != NULL);

    encapacity(vfd);
    vfd->fd[vfd->counter++] = fd;
}

//echo_tcp_server_select.c(与上一例相同)

#include <netdb.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <memory.h>
#include <time.h>
#include <pthread.h>
#include <signal.h>
#include <errno.h>
#include "vector_fd.h"
#include <fcntl.h>

/*基于I/O多路复用的高并发服务器编程
测试:telnet 127.0.0.1 xxxx
      http://xxx.xxx.xxx.xxx:端口号
注意:演示时可关闭服务器的防火墙,防火墙口被过滤
      #service iptables status     查看防火墙
      #service iptables stop       关闭防火墙
*/

VectorFD* vfd;
int sockfd;
int bStop = 0;

void sig_handler(int signo)
{
    if(signo == SIGINT){
        bStop = 1;
        printf("server close\n");

        close(sockfd);
        destroy_vector_fd(vfd);

        exit(1);
    }
}

void out_addr(struct sockaddr_in* clientAddr)
{
    char ip[16];
    memset(ip, 0, sizeof(ip));
    int port = ntohs(clientAddr->sin_port);
    inet_ntop(AF_INET, &clientAddr->sin_addr.s_addr, ip, sizeof(ip));

    printf("%s(%d) connnected!\n", ip, port);
}

/*服务程序
 *  fd对应于某个连接的客户端,和某一个连接的客户端进行双向通信
 */
void do_service(int fd)
{
    /*服务端和客户端进行读写操作(双向通信)*/
    char buff[512];

    memset(buff, 0, sizeof(buff));
    size_t size = read(fd, buff, sizeof(buff));

    //读取客户端发送过来的消息
    //若读不到数据直接返回了,直接服务于下一个客户端
    //因此不需要判断size小于0的情况。
    if(size == 0){  //客户端己关闭连接
        char info[] = "client close\n";
        write(STDOUT_FILENO, info, sizeof(info));

        //将fd从动态数组中删除
        remove_fd(vfd, fd);
        close(fd);
    }else if(size > 0){
        write(STDOUT_FILENO, buff, sizeof(buff));//显示客户端发送的消息
        //写回客户端(回显功能)
        if(write(fd, buff, sizeof(buff)) != size){
            if(errno == EPIPE){
                //如果客户端己被关闭(相当于管道的读端关闭),会产生SIGPIPE信号
                //并将errno设置为EPIPE
                perror("write error");
                remove_fd(vfd, fd);
                close(fd);
            }
        }
    }
}

//遍历动态数组中所有的socket描述符,并将之加入到fd_set中。
//同时此函数返回动态数组中最大的那个描述符
static int add_set(fd_set* set)
{
    FD_ZERO(set);  //清空描述符集
    int max_fd = vfd->fd[0];

    int i=0;
    for(; i<vfd->counter; i++){
        int fd = get_fd(vfd, i);
        if(fd > max_fd)
            max_fd = fd;
        FD_SET(fd, set); //将fd加入到fd_set中
    }

    return max_fd;
}

//线程函数
void* th_fn(void* arg)
{
    struct timeval t;
    t.tv_sec = 2;
    t.tv_usec = 0;
    int n = 0; //返回select返回的准备好的socket数量
    int maxfd; //所有socket描述符的最大值
    fd_set set;
    maxfd = add_set(&set);
    /*
     * 调用select函数会阻塞,委托内核去检查传入的描述符集是否有socket己准备好,
     * 若有,则返回准备好的socket数量,超时则返回0
     * 第1个参数为fd_set中socket的范围(最大描述符+1)
     */
    while(((n = select(maxfd + 1, &set, NULL, NULL, &t)) >=0) && (!bStop)){
        if(n > 0){
            int i = 0;
            //检测哪些socket准备好,并和这些准备好的socket对应的客户端进行双向通信
            for(; i<vfd->counter; i++){
                int fd = get_fd(vfd, i);
                if(FD_ISSET(fd, &set)){
                    do_service(fd);
                }
            }
        }

        //重新设置时间
        t.tv_sec = 2;
        t.tv_usec = 0;

        //清空描述符集
        //重新遍历动态数组中最新的描述符,并放置到fd_set
        maxfd = add_set(&set);
    }

    return (void*)0;
}

int main(int argc, char* argv[])
{
    if(argc < 2){
        printf("usage: %s port\n", argv[0]);
        exit(1);
    }

    //按ctrl-c时中止服务端程序
    if(signal(SIGINT, sig_handler) == SIG_ERR){
        perror("signal sigint error");
        exit(1);
    }

    /*步骤1:创建socket(套接字)
     *注:socket创建在内核中,是一个结构体
     *AF_INET:IPv4
     *SOCK_STREAM:tcp协议
     */
    sockfd = socket(AF_INET, SOCK_STREAM, 0);

    /*步骤2:将sock和地址(包括ip、port)进行绑定*/
    struct sockaddr_in servAddr; //使用专用地址结构体
    memset(&servAddr, 0, sizeof(servAddr));
    //往地址中填入ip、port和Internet地址族类型
    servAddr.sin_family = AF_INET;//IPv4
    servAddr.sin_port = htons(atoi(argv[1])); //port
    servAddr.sin_addr.s_addr = INADDR_ANY; //任一可用的IP

    if(bind(sockfd, (struct sockaddr*)&servAddr, sizeof(servAddr)) <0 ){
        perror("bind error");
        exit(1);
    }

    /*步骤3:调用listen函数启动监听
     *       通知系统去接受来自客户端的连接请求
     */
    if(listen(sockfd, 10) < 0){  //队列中最多允许10个连接请求
        perror("listen error");
        exit(1);
    }

    //创建放置套接字描述符的动态数组
    vfd = create_vector_fd();

    //设置线程的分离属性
    pthread_t  th;
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    //启动子线程
    int err;
    if((err = pthread_create(&th, &attr, th_fn, (void*)0)) != 0){
        perror("pthread create error");
        exit(1);
    }
    pthread_attr_destroy(&attr);

    /*(1)主线程获得客户端连接,将新的socket描述符放置到动态数组中
     *(2)子线程的任务
         A.调用select委托内核去检查传入到select中的描述符是否准备好
     *   B.利用FD_ISSET来找出准备好的那些描述符并和对应的客户端进行双向通信
     */

    struct sockaddr_in clientAddr;
    socklen_t len = sizeof(clientAddr);

    while(!bStop){
        /*步骤4:调用accept函数,从请求队列中获取一个连接
         *       并返回新的socket描述符
         * */
        int fd = accept(sockfd, (struct sockaddr*)&clientAddr, &len);

        if(fd < 0){
            perror("accept error");
            continue;
        }

        //输出客户端信息
        out_addr(&clientAddr);

        //将返回的新socket描述符加入到动态数组中
        add_fd(vfd, fd);
    }

    close(sockfd);
    destroy_vector_fd(vfd);
    return 0;
}
/*输出结果
 * [[email protected] 15.AdvNet]# gcc -o bin/echo_tcp_server_select -Iinclude bin/vector_fd.o src/echo_tcp_server_select.c -lpthread
 * [[email protected] 15.AdvNet]# bin/echo_tcp_server_select 8888
 * 127.0.0.1(40695) connnected!
 * abcdefaaabbbcdef^Cserver close
 */

//echo_tcp_client.c(与上一例相同)

#include <netdb.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <memory.h>

int main(int argc, char* argv[])
{
    if(argc < 3){
        printf("usage: %s ip port\n", argv[0]);
        exit(1);
    }

    /*步骤1: 创建socket(套接字)*/
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if(sockfd < 0){
        perror("socket error");
    }

    //往servAddr中填入ip、port和地址族类型
    struct sockaddr_in servAddr;
    memset(&servAddr, 0, sizeof(servAddr));
    servAddr.sin_family = AF_INET;
    servAddr.sin_port = htons(atoi(argv[2]));
    //将ip地址转换成网络字节序后填入servAdd中
    inet_pton(AF_INET, argv[1], &servAddr.sin_addr.s_addr);

    /*步骤2: 客户端调用connect函数连接到服务器端*/
    if(connect(sockfd, (struct sockaddr*)&servAddr, sizeof(servAddr)) < 0){
        perror("connect error");
        exit(1);
    }

    /*步骤3: 调用自定义的协议处理函数和服务端进行双向通信*/
    char buff[512];
    size_t size;
    char* prompt = ">";

    while(1){
        memset(buff, 0, sizeof(buff));
        write(STDOUT_FILENO, prompt, 1);
        size = read(STDIN_FILENO, buff, sizeof(buff));
        if(size < 0) continue;

        buff[size-1] = ‘\0‘;
        //将键盘输入的内容发送到服务端
        if(write(sockfd, buff, sizeof(buff)) < 0){
            perror("write error");
            continue;
        }else{
            memset(buff, 0, sizeof(buff));
            //读取来自服务端的消息
            if(read(sockfd, buff, sizeof(buff)) < 0){
                perror("read error");
                continue;
            }else{
                printf("%s\n", buff);
            }
        }
    }

    /*关闭套接字*/
    close(sockfd);
}
时间: 2024-12-23 10:44:01

第15章 高并发服务器编程(2)_I/O多路复用的相关文章

Linux 高并发服务器

高并发服务器 一.多进程并发服务器 1. 实现示意图 2. 使用多进程并发服务器时要考虑以下几点: 父进程最大文件描述个数(父进程中需要close关闭accept返回的新文件描述符) 系统内创建进程个数(与内存大小相关) 进程创建过多是否降低整体服务性能(进程调度) 3. 使用多进程的方式, 解决服务器处理多连接的问题:     (1)共享 读时共享, 写时复制 文件描述符 内存映射区 -- mmap     (2)父进程 的角色是什么? 等待接受客户端连接 -- accept 有链接: 创建一

JAVA NIO non-blocking模式实现高并发服务器

JAVA NIO non-blocking模式实现高并发服务器 分类: JAVA NIO2014-04-14 11:12 1912人阅读 评论(0) 收藏 举报 目录(?)[+] Java自1.4以后,加入了新IO特性,NIO. 号称new IO. NIO带来了non-blocking特性. 这篇文章主要讲的是如何使用NIO的网络新特性,来构建高性能非阻塞并发服务器. 文章基于个人理解,我也来搞搞NIO.,求指正. 在NIO之前 服务器还是在使用阻塞式的java socket. 以Tomcat最

为一个支持GPRS的硬件设备搭建一台高并发服务器用什么开发比较容易?

高并发服务器开发,硬件socket发送数据至服务器,服务器对数据进行判断,需要实现心跳以保持长连接. 同时还要接收另外一台服务器的消支付成功消息,接收到消息后控制硬件执行操作. 查了一些资料,java的netty,go,或者是用C/C++不知道该用哪个,想问一下哪个比较适合,学习更容易一些. 为一个支持GPRS的硬件设备搭建一台高并发服务器用什么开发比较容易? >> golang 这个答案描述的挺清楚的:http://www.goodpm.net/postreply/golang/101000

linux僵死进程与并发服务器编程

序 僵死(zombie)进程简而言之就是:子进程退出时,父进程并未对其发出的SIGCHILD信号进行适当处理,导致子进程停留在僵死状态等待其父进程为其收尸,这个状态下的子进程就是僵死进程. 因为并发服务器常常fork很多子进程,子进程终结之后需要服务器进程去wait清理资源.对于某些进程,特别是服务器进程往往在请求到来时生成子进程处理请求.如果父进程不等待子进程结束,子进程将成为僵尸进程(zombie)从而占用系统资源.如果父进程等待子进程结束,将增加父进程的负担,影响服务器进程的并发性能. 查

linux学习之多高并发服务器篇(一)

高并发服务器 高并发服务器 并发服务器开发 1.多进程并发服务器 使用多进程并发服务器时要考虑以下几点: 父最大文件描述个数(父进程中需要close关闭accept返回的新文件描述符) 系统内创建进程个数(内存大小相关) 进程创建过多是否降低整体服务性能(进程调度) server /* server.c */ #include <stdio.h> #include <string.h> #include <netinet/in.h> #include <arpa/

linux学习之高并发服务器篇(二)

高并发服务器 1.线程池并发服务器 两种模型: 预先创建阻塞于accept多线程,使用互斥锁上锁保护accept(减少了每次创建线程的开销) 预先创建多线程,由主线程调用accept 线程池 3.多路I/O转接服务器 三种模型性能分析 select模型 select用来阻塞监听4,5,6,7是否有数据传入,若7这个文件描述符有数据到达,select返回就绪文件描述符个数,若检测到7有数据接收,accept接收客户链接请求,创建一个新的文件描述符. select (1)select能监听的文件描述

高并发服务器开发与配置

一.4大具有代表性的并发模型及其优缺点        4大具有代表性的并发模型:Apache模型(Process Per Connection,简称PPC),TPC(Thread PerConnection)模型,select模型和poll模型.Epoll模型.        Apache(PPC)模型和TPC模型是最容易理解的,Apache模型在并发上是通过多进程实现的,而TPC模型是通过多线程实现的,但是这2种方式在大量进程/线程切换时会造成大量的开销.        select模型是通过

2高并发服务器:多线程服务器

 1多进程并发服务器 在使用线程模型开发服务器时需要考虑以下问题: A 调整进程最大文件描述符上限 B 线程如有共享数据,考虑线程同步 C 服务于客户端线程退出时,退出处理 D 2.案例说明 server.c,代码如下: /* server.c */ #include <stdio.h> #include <string.h> #include <netinet/in.h> #include <arpa/inet.h> #include <pthr

Linux下高并发网络编程

1.修改用户进程可打开文件数限制 在Linux平台上,无论编写客户端程序还是服务端程序,在进行高并发TCP连接处理时, 最高的并发数量都要受到系统对用户单一进程同时可打开文件数量的限制(这是因为系统 为每个TCP连接都要创建一个socket句柄,每个socket句柄同时也是一个文件句柄). 可使用ulimit命令查看系统允许当前用户进程打开的文件数限制: [[email protected] ~]$ ulimit -n 1024 这表示当前用户的每个进程最多允许同时打开1024个文件,这1024