LINUX网络编程 IO 复用

参考《linux高性能服务器编程》

LINUX下处理多个连接时候,仅仅使用多线程和原始socket函数,效率十分低下

于是就出现了selelct poll  epoll等IO复用函数。

这里讨论性能最优的epoll IO复用

用户将需要关注的socket连接使用IO复用函数放进一个事件表中,每当事件表中有一个或者多个SOCKET连接出现读写请求时候,则进行处理

事件表使用一个额外的文件描述符来标识。文件描述符使用 epoll_create函数创建

#inlclude <sys/epoll.h>

int epoll_create(int size); size参数目前不起作用

操作事件表使用epoll_ctl函数进行控制

操作类型有以下3中

EPOLL_CTL_ADD  EPOLL_CTL_MOD  EPOLL_CTL_DEL

在一段超时时间内等待事件表上的事件 使用epoll_wait();

函数范例如下:

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10

int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}

void addfd( int epollfd, int fd, bool enable_et )
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN;
    if( enable_et )
    {
        event.events |= EPOLLET;
    }
    epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );
}

void et( epoll_event* events, int number, int epollfd, int listenfd )
{
    char buf[ BUFFER_SIZE ];
    for ( int i = 0; i < number; i++ )
    {
        int sockfd = events[i].data.fd;
        if ( sockfd == listenfd )
        {
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof( client_address );
            int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
            addfd( epollfd, connfd, true );
        }
        else if ( events[i].events & EPOLLIN )
        {
            printf( "event trigger once\n" );
            while( 1 )
            {
                memset( buf, ‘\0‘, BUFFER_SIZE );
                int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
                if( ret < 0 )
                {
                    if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )
                    {
                        printf( "read later\n" );
                        break;
                    }
                    close( sockfd );
                    break;
                }
                else if( ret == 0 )
                {
                    close( sockfd );
                }
                else
                {
                    printf( "get %d bytes of content: %s\n", ret, buf );
                }
            }
        }
        else
        {
            printf( "something else happened \n" );
        }
    }
}

int main( int argc, char* argv[] )
{
    if( argc <= 2 )
    {
        printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );
        return 1;
    }
    const char* ip = argv[1];
    int port = atoi( argv[2] );

    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    inet_pton( AF_INET, ip, &address.sin_addr );
    address.sin_port = htons( port );

    int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );

    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );

    ret = listen( listenfd, 5 );
    assert( ret != -1 );

    epoll_event events[ MAX_EVENT_NUMBER ];
    int epollfd = epoll_create( 5 );
    assert( epollfd != -1 );
    addfd( epollfd, listenfd, true );

    while( 1 )
    {
        int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( ret < 0 )
        {
            printf( "epoll failure\n" );
            break;
        }

        et( events, ret, epollfd, listenfd );
    }

    close( listenfd );
    return 0;
}

  

在尝试一个多线程代码;

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10

struct fds
{
   int epollfd;
   int sockfd;
};

void* worker( void* arg )
{
    int sockfd = ( (fds*)arg )->sockfd;
    int epollfd = ( (fds*)arg )->epollfd;
    printf( "start new thread to receive data on fd: %d\n", sockfd );
    char buf[ BUFFER_SIZE ];
    memset( buf, ‘\0‘, BUFFER_SIZE );
    while( 1 )
    {
        int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
        if( ret == 0 )
        {
            close( sockfd );
            printf( "foreiner closed the connection\n" );
            break;
        }
        else if( ret < 0 )
        {
            if( errno == EAGAIN )
            {
               // reset_oneshot( epollfd, sockfd );
                printf( "read later\n" );
                break;
            }
        }
        else
        {
            printf( "get content: %s\n", buf );
            sleep( 5 );
        }
    }
    printf( "end thread receiving data on fd: %d\n", sockfd );
}

int setnonblocking(int fd)
{
	int old_option = fcntl(fd,F_GETFL);
	int new_option = old_option | O_NONBLOCK;
	fcntl(fd,F_SETFL,new_option);
	return old_option;
}

void addfd(int epollfd,int fd,bool enable_et)
{
	epoll_event event;
	event.data.fd = fd;
	event.events = EPOLLIN;

	if(enable_et){
		event.events |= EPOLLET;
	}
	epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event);
	setnonblocking(fd);
}

void et(epoll_event* events,int number,int epollfd,int listenfd){
	char buf[BUFFER_SIZE];
	for(int i = 0; i < number;i++){
		int sockfd = events[i].data.fd;
		if(sockfd == listenfd){
			struct sockaddr_in client_address;
			socklen_t client_addrlength = sizeof(client_address);
			int connfd = accept(listenfd,(struct sockaddr*)&client_address,&client_addrlength);
			addfd(epollfd,connfd,true);
		}else if(events[i].events & EPOLLIN){
			/*
			printf("event trigger once\n");
			while(1){
				memset(buf,‘\0‘,BUFFER_SIZE);
				int ret = recv(sockfd,buf,BUFFER_SIZE-1,0);
				if(ret < 0){
					if((errno == EAGAIN) || (errno == EWOULDBLOCK)){
						printf("read later\n");
						break;
					}
					close(sockfd);
					break;
				}else if(ret == 0){
					close(sockfd);
				}else{
					printf("get %d bytes of content: %s \n",ret,buf);
				}
			}*/
			 pthread_t thread;
                fds fds_for_new_worker;
                fds_for_new_worker.epollfd = epollfd;
                fds_for_new_worker.sockfd = sockfd;
                pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );
		}else{
			printf("something else happened \n");
		}
	}
}

int main()
{
	int ret = 0;
	struct sockaddr_in address;
	bzero(&address,sizeof(address));
	address.sin_family = AF_INET;
	inet_pton(AF_INET,"127.0.0.1",&address.sin_addr);
	address.sin_port = htons(1134);

	int listenfd = socket(AF_INET,SOCK_STREAM,0);
	assert(listenfd >= 0);

	ret = bind(listenfd,(struct sockaddr*)&address,sizeof(address));
	assert(ret != -1);

	ret = listen(listenfd,5);
	assert(ret != -1);

	epoll_event events[MAX_EVENT_NUMBER];
	int epollfd = epoll_create(5);
	assert(epollfd != -1);
	addfd(epollfd,listenfd,true);

	while(1){
		int ret = epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);
		if(ret < 0){
			printf("epoll failure\n");
			break;
		}
		et(events,ret,epollfd,listenfd);
	}

	close(listenfd);
	return 0;
}

  

多线程版本中 如果我们 连续多次输入会出现一个问题

如果我们连续多次在telnet的客户端输入

服务端会开启多个线程 接受这个socket的输入

显示如下:

fd: 5, sockfdget content: dfdsf
...
start new thread to receive data on fd: 5
fd: 5, sockfdget content: sd
...
start new thread to receive data on fd: 5
fd: 5, sockfdget content: sd
...
start new thread to receive data on fd: 5
fd: 5, sockfdget content: sd
...
start new thread to receive data on fd: 5
fd: 5, sockfdget content: sd
...
start new thread to receive data on fd: 5
fd: 5, sockfdget content: sd
...
start new thread to receive data on fd: 5
fd: 5, sockfdget content: sd

如果多个线程同时操作一个socket 可能出现各种意外情况

那么我们就需要设置socket的EPOLLONESHOT标志

对于连续操作,操作系统最多触发该socket上一次读写异常事件。

代码如下:

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10

struct fds
{
   int epollfd;
   int sockfd;
};

void reset_oneshot( int epollfd, int fd )
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event );
}

void* worker( void* arg )
{
    int sockfd = ( (fds*)arg )->sockfd;
    int epollfd = ( (fds*)arg )->epollfd;
    printf( "start new thread to receive data on fd: %d\n", sockfd );
    char buf[ BUFFER_SIZE ];
    memset( buf, ‘\0‘, BUFFER_SIZE );
    while( 1 )
    {
        int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
        if( ret == 0 )
        {
            close( sockfd );
            printf( "foreiner closed the connection\n" );
            break;
        }
        else if( ret < 0 )
        {
            if( errno == EAGAIN )
            {
                reset_oneshot( epollfd, sockfd );
                printf( "read later\n" );
                break;
            }
        }
        else
        {
            printf( "get content: %s\n", buf );
            sleep( 5 );
        }
    }
    printf( "end thread receiving data on fd: %d\n", sockfd );
}

int setnonblocking(int fd)
{
	int old_option = fcntl(fd,F_GETFL);
	int new_option = old_option | O_NONBLOCK;
	fcntl(fd,F_SETFL,new_option);
	return old_option;
}

void addfd(int epollfd,int fd,bool setOneShot)
{
	epoll_event event;
	event.data.fd = fd;
	event.events = EPOLLIN;

	event.events |= EPOLLET;
	if(setOneShot)
		event.events |= EPOLLONESHOT;
	epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event);
	setnonblocking(fd);
}

void et(epoll_event* events,int number,int epollfd,int listenfd){
	char buf[BUFFER_SIZE];
	for(int i = 0; i < number;i++){
		int sockfd = events[i].data.fd;
		if(sockfd == listenfd){
			struct sockaddr_in client_address;
			socklen_t client_addrlength = sizeof(client_address);
			int connfd = accept(listenfd,(struct sockaddr*)&client_address,&client_addrlength);
			addfd(epollfd,connfd,true);
		}else if(events[i].events & EPOLLIN){
			/*
			printf("event trigger once\n");
			while(1){
				memset(buf,‘\0‘,BUFFER_SIZE);
				int ret = recv(sockfd,buf,BUFFER_SIZE-1,0);
				if(ret < 0){
					if((errno == EAGAIN) || (errno == EWOULDBLOCK)){
						printf("read later\n");
						break;
					}
					close(sockfd);
					break;
				}else if(ret == 0){
					close(sockfd);
				}else{
					printf("get %d bytes of content: %s \n",ret,buf);
				}
			}*/
			 pthread_t thread;
                fds fds_for_new_worker;
                fds_for_new_worker.epollfd = epollfd;
                fds_for_new_worker.sockfd = sockfd;
                pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );
		}else{
			printf("something else happened \n");
		}
	}
}

int main()
{
	int ret = 0;
	struct sockaddr_in address;
	bzero(&address,sizeof(address));
	address.sin_family = AF_INET;
	inet_pton(AF_INET,"127.0.0.1",&address.sin_addr);
	address.sin_port = htons(1134);

	int listenfd = socket(AF_INET,SOCK_STREAM,0);
	assert(listenfd >= 0);

	ret = bind(listenfd,(struct sockaddr*)&address,sizeof(address));
	assert(ret != -1);

	ret = listen(listenfd,5);
	assert(ret != -1);

	epoll_event events[MAX_EVENT_NUMBER];
	int epollfd = epoll_create(5);
	assert(epollfd != -1);
	addfd(epollfd,listenfd,false);

	while(1){
		int ret = epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);
		if(ret < 0){
			printf("epoll failure\n");
			break;
		}
		et(events,ret,epollfd,listenfd);
	}

	close(listenfd);
	return 0;
}

  

输入

Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is ‘^]‘.
1
2
3
4
5
sdfufghbskdjhkbaskjhbkjlsadvfblkajwsvdfbljkashdbkfjhasdfhasjkhD

输出

start new thread to receive data on fd: 5
get content: 1

get content: 2
3
4

get content: 5
sdfufg
get content: hbskdjhkb
get content: askjhbkjl
get content: sadvfblka
get content: jwsvdfblj
get content: kashdbkfj
get content: hasdfhasj
get content: khD
hasj
read later
end thread receiving data on fd: 5

时间: 2024-12-14 19:35:26

LINUX网络编程 IO 复用的相关文章

Linux网络编程-IO复用技术

IO复用是Linux中的IO模型之一,IO复用就是进程预先告诉内核需要监视的IO条件,使得内核一旦发现进程指定的一个或多个IO条件就绪,就通过进程进程处理,从而不会在单个IO上阻塞了.Linux中,提供了select.poll.epoll三种接口函数来实现IO复用. 1.select函数 #include <sys/select.h> #include <sys/time.h> int select(int nfds, fd_set *readfds, fd_set *writef

Linux网络编程——端口复用(多个套接字绑定同一个端口)

在<绑定( bind )端口需要注意的问题>提到:一个网络应用程序只能绑定一个端口( 一个套接字只能绑定一个端口 ). 实际上,默认的情况下,如果一个网络应用程序的一个套接字 绑定了一个端口( 占用了 8000 ),这时候,别的套接字就无法使用这个端口( 8000 ), 验证例子如下: [objc] view plaincopy #include <stdio.h> #include <stdlib.h> #include <string.h> #inclu

&lt;网络编程&gt;IO复用

IO复用是一种机制,一个进程可以监听多个描述符,一旦某个描述符就绪(读就绪和写就绪),能够同志程序进行相应的读写操作. 目前支持I/O复用的系统调用有select,poll,pselect,epoll,本质上这些I/O复用技术是同步I/O技术.在读写事件就绪后需要进程自己负责进行读写,即读写过程是进程阻塞的. 与多进程和多线程相比,I/O复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销. 同步I/O操作导致请求进程阻塞,直到I/O操作完成

Linux网络编程--IO函数以及示例

网络数据能够正确到达用户并被用户接收是进行网络数据传输的基本目的, 网络数据的接受和发送有很多种方案,例如:直接发送和接收,通过向量发送和接收,使用消息发送和接收等.本篇文章主要介绍常用的IO函数以及用法,如:最常用的read()/write()函数,和其他标准的套接字专用函数recv()/send(),readv()/writev(),recvmsg()/sendmsg(). 各个函数原型以及介绍如下: ssize_t read(int fd, void *buf, size_t count)

嵌入式 Linux网络编程(二)——TCP编程模型

嵌入式 Linux网络编程(二)--TCP编程模型 一.TCP编程模型 TCP编程的一般模型如下图: TCP编程模型分为客户端和服务器端编程,两者编程流程如下: TCP服务器端编程流程: A.创建套接字: B.绑定套接字: C.设置套接字为监听模式,进入被动接受连接状态: D.接受请求,建立连接: E.读写数据: F.终止连接. TCP客户端编程流程: A.创建套接字: B.与远程服务器建立连接: C.读写数据: D.终止连接. 二.TCP迭代服务器编程模型 TCP循环服务器接受一个客户端的连接

linux网络编程基础--(转自网络)

转自 http://www.cnblogs.com/MyLove-Summer/p/5215287.html Linux下的网络编程指的是socket套接字编程,入门比较简单. 1. socket套接字介绍 socket机制其实就是包括socket, bind, listen, connect, accept等函数的方法,其通过指定的函数实现不同的协议(IP4,IP6等)的数据在不同层之间的传输和获取等处理.其实个人理解socket就是处于应用层和TCP/IP协议之间的一个中间层,具体的数据分析

Linux网络编程——多路复用之epoll

目录 Linux网络编程--多路复用之epoll 基础API 实例一.epoll实现在线聊天 实例二.epoll实现在客户端断开后服务端能一直运行,客户端可以多次重连 Linux网络编程--多路复用之epoll ? epoll是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,因为它会复用文件描述符集合来传递结果而不用迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,另一点原因就是获取事件的时候,

Linux网络编程——tcp并发服务器(poll实现)

想详细彻底地了解poll或看懂下面的代码请参考<Linux网络编程--I/O复用之poll函数> 代码: #include <string.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/select.h> #include <sys/time.h> #include <sys/socket.h> #incl

Linux网络编程 五种I/O 模式及select、epoll方法的理解

Linux网络编程 五种I/O 模式及select.epoll方法的理解 web优化必须了解的原理之I/o的五种模型和web的三种工作模式 五种I/O 模式--阻塞(默认IO模式),非阻塞(常用语管道),I/O多路复用(IO多路复用的应用场景),信号I/O,异步I/OLinux网络编程 五种I/O 模式及select.epoll方法的理解