epoll中et+多线程模式中很重要的EPOLL_ONESHOT实验

因为et模式需要循环读取,但是在读取过程中,如果有新的事件到达,很可能触发了其他线程来处理这个socket,那就乱了。

EPOLL_ONESHOT就是用来避免这种情况。注意在一个线程处理完一个socket的数据,也就是触发EAGAIN errno时候,就应该重置EPOLL_ONESHOT的flag,这时候,新到的事件,就可以重新进入触发流程了。

服务器代码如下:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <assert.h>

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 1024
struct fds {
        int epollfd;
        int sockfd;
};

int setnonblocking(int fd) {
        int old_option = fcntl(fd, F_GETFL);
        int new_option = old_option | O_NONBLOCK;
        fcntl(fd, F_SETFL, new_option);
        return old_option;
}

void addfd(int epollfd, int fd, bool oneshot) {
        epoll_event event;
        event.data.fd = fd;
        event.events = EPOLLIN | EPOLLET;
        if (oneshot) {
                event.events |= EPOLLONESHOT;
        }
        epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
        setnonblocking(fd);
}

void reset_oneshot(int epollfd, int fd) {
        epoll_event event;
        event.data.fd = fd;
        event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
        epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
}

void* worker(void *arg) {
        int sockfd = ((fds*)arg)->sockfd;
        int epollfd = ((fds*)arg)->epollfd;
        pthread_t pid = pthread_self();

        printf("start new thread %u to recv data on fd: %d\n", pid, sockfd);
        char buf[BUFFER_SIZE];
        memset(buf, ‘\0‘, BUFFER_SIZE);

        while(1) {
                int ret = recv(sockfd, buf, BUFFER_SIZE-1, 0);
                if (ret == 0) {
                        close(sockfd);
                        printf("foreiner closed the connection\n");
                        break;
                }
                else if (ret < 0) {
                        if (errno == EAGAIN) {
                                reset_oneshot(epollfd, sockfd);
                                printf("EAGAIN read later\n");
                                break;
                        }
                }
                else {
                        buf[ret] = ‘\0‘;
                        printf("thread %u get content: %s\n", pid, buf);
                        printf("thread %u about to sleep\n", pid);
                        sleep(5);
                        printf("thread %u back from sleep\n", pid);
                }
        }
        //printf("end thread %u receiving data on fd: %d\n", pid, sockfd);

}

int main(int argc, char *argv[]) {
        if (argc <= 1) {
                printf("usage: %s port_number [ip_address]\n", basename(argv[0]));
                return 1;
        }
        int port = atoi(argv[1]);

        int ret = 0;
        sockaddr_in address;
        bzero(&address, sizeof(address));
        address.sin_family = AF_INET;
        if (argc >= 3) {
                const char *ip =argv[2];
                inet_pton(AF_INET, ip, &address.sin_addr);
        }
        else {
                address.sin_addr.s_addr = INADDR_ANY;
        }
        address.sin_port = htons(port);
        int listenfd = socket(PF_INET, SOCK_STREAM, 0);
        assert(listenfd >= 0);

        ret = bind(listenfd, (sockaddr*)&address, sizeof(address));
        assert(ret != -1);

        ret = listen(listenfd, 5);
        assert(ret != -1);

        epoll_event events[MAX_EVENT_NUMBER];
        int epollfd = epoll_create(5);
        assert(epollfd != -1);

        addfd(epollfd, listenfd, false);

        while(1) {
                int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
                if (ret < 0) {
                        printf("epoll failure\n");
                        break;
                }

                for (int i=0; i<ret; i++) {
                        int sockfd = events[i].data.fd;
                        if (sockfd == listenfd) {
                                sockaddr_in client_address;
                                socklen_t client_addrlength = sizeof(client_address);
                                int connfd = accept(listenfd, (sockaddr*)&client_address,
                                                        &client_addrlength);

                                addfd(epollfd, connfd, true);
                                printf("new connection is added to epollfd\n");
                        }
                        else if (events[i].events & EPOLLIN) {
                                pthread_t thread;
                                fds fds_for_new_worker;
                                fds_for_new_worker.epollfd = epollfd;
                                fds_for_new_worker.sockfd = sockfd;
                                // new thread
                                pthread_create(&thread, NULL, worker,
                                                (void*)&fds_for_new_worker);
                        }
                        else {
                                printf("something else happened\n");
                        }
                }
        }

        close(listenfd);
        return 0;

}

以下是使用telnet客户端发送的文本,匀速敲入代码:

$telnet 127.0.0.1 12346
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is ‘^]‘.
hi1
hi2
hi3
hi4
hi5
hi6
hi7
hi8
Connection closed by foreign host.

以下是服务器的运行和输出:

$./epoll_oneshot 12346
new connection is added to epollfd
start new thread 1734051584 to recv data on fd: 5
thread 1734051584 get content: hi1

thread 1734051584 about to sleep
thread 1734051584 back from sleep
thread 1734051584 get content: hi2
hi3

thread 1734051584 about to sleep
thread 1734051584 back from sleep
thread 1734051584 get content: hi4
hi5
hi6

thread 1734051584 about to sleep
thread 1734051584 back from sleep
thread 1734051584 get content: hi7

thread 1734051584 about to sleep
thread 1734051584 back from sleep
EAGAIN read later
start new thread 1723561728 to recv data on fd: 5
thread 1723561728 get content: hi8

thread 1723561728 about to sleep
thread 1723561728 back from sleep
EAGAIN read later
^C

最后用Ctrl+C来结束服务器。

可以看出,在hi7文本和hi8文本之间,服务器收到了EAGAIN,表示读取告一段落。而之后的线程id页换成了线程id。在hi7之前,因为每次服务器sleep结束之后,都还有没有读完的数据,所以线程id始终没有变,始终是同一个线程处理数据。

时间: 2025-01-15 04:40:08

epoll中et+多线程模式中很重要的EPOLL_ONESHOT实验的相关文章

(原创)拨开迷雾见月明-剖析asio中的proactor模式(二)

在上一篇博文中我们提到异步请求是从上层开始,一层一层转发到最下面的服务层的对象win_iocp_socket_service,由它将请求转发到操作系统(调用windows api),操作系统处理完异步请求之后又是如何返回给应用程序的呢,这里是通过iocp(完成端口)来实现的.让我们先来简要的看看iocp的基本步骤: 创建IOCP对象: 创建io object对象: 将io object IOCP对象绑定: 4.进行异步调用: 创建线程或者由线程池等待完成事件的到来: asio实际上也是按照这个步

Java多线程编程中Future模式的详解&lt;转&gt;

Java多线程编程中,常用的多线程设计模式包括:Future模式.Master-Worker模式.Guarded Suspeionsion模式.不变模式和生产者-消费者模式等.这篇文章主要讲述Future模式,关于其他多线程设计模式的地址如下:关于其他多线程设计模式的地址如下:关于Master-Worker模式的详解: Java多线程编程中Master-Worker模式的详解关于Guarded Suspeionsion模式的详解: Java多线程编程中Guarded Suspeionsion模式

细说.NET 中的多线程 (一 概念)

为什么使用多线程 使用户界面能够随时相应用户输入 当某个应用程序在进行大量运算时候,为了保证应用程序能够随时相应客户的输入,这个时候我们往往需要让大量运算和相应用户输入这两个行为在不同的线程中进行. 效率原因 应用程序经常需要等待一些资源,如等待网络资源,等待io资源,等待用户输入等等.这种情况下使用多线程可以避免CPU长时间处于闲置状态. 用户态,内核态 线程内的资源有两种运行态,即用户态和内核态.某些运算可以在堆栈上进行,这种情况线程是在用户态运行的,某些需要高权限运行的指令,或者某些优先级

Java中的多线程你只要看这一篇就够了

Java中的多线程你只要看这一篇就够了 引 如果对什么是线程.什么是进程仍存有疑惑,请先Google之,因为这两个概念不在本文的范围之内. 用多线程只有一个目的,那就是更好的利用cpu的资源,因为所有的多线程代码都可以用单线程来实现.说这个话其实只有一半对,因为反应"多角色"的程序代码,最起码每个角色要给他一个线程吧,否则连实际场景都无法模拟,当然也没法说能用单线程来实现:比如最常见的"生产者,消费者模型". 很多人都对其中的一些概念不够明确,如同步.并发等等,让我

C#中的多线程 - 同步基础

原文:http://www.albahari.com/threading/part2.aspx 1同步概要 在第 1 部分:基础知识中,我们描述了如何在线程上启动任务.配置线程以及双向传递数据.同时也说明了局部变量对于线程来说是私有的,以及引用是如何在线程之间共享,允许其通过公共字段进行通信. 下一步是同步(synchronization):为期望的结果协调线程的行为.当多个线程访问同一个数据时,同步尤其重要,但是这是一件非常容易搞砸的事情. 同步构造可以分为以下四类: 简单的阻塞方法 这些方法

C#中的多线程 - 基础知识

原文:http://www.albahari.com/threading/ 1简介及概念 C# 支持通过多线程并行执行代码,线程有其独立的执行路径,能够与其它线程同时执行. 一个 C# 客户端程序(Console 命令行.WPF 以及 Windows Forms)开始于一个单线程,这个线程(也称为“主线程”)是由 CLR 和操作系统自动创建的,并且也可以再创建其它线程.以下是一个简单的使用多线程的例子: 所有示例都假定已经引用了以下命名空间: using System; using System

C#中的多线程-入门

概述与概念C#支持通过多线程并行地执行代码,一个线程有它独立的执行路径,能够与其它的线程同时地运行.一个C#程序开始于一个单线程,这个单线程是被CLR和操作系统(也称为“主线程”)自动创建的,并具有多线程创建额外的线程.这里的一个简单的例子及其输出: 除非被指定,否则所有的例子都假定以下命名空间被引用了: using System; using System.Threading; class ThreadTest {static void Main() {Thread t = new Threa

android中的多线程机制

Google参考了Windows的消息处理机制,在Android系统中实现了一套类似的消息处理机制.学习Android的消息处理机制,有几个概念(类)必须了解: 1.       Message 消息,理解为线程间通讯的数据单元.例如后台线程在处理数据完毕后需要更新UI,则可发送一条包含更新信息的Message给UI线程. 2.       Message Queue 消息队列,用来存放通过Handler发布的消息,按照先进先出执行. 3.       Handler Handler是Messa

NET 中的多线程

NET 中的多线程 为什么使用多线程 使用户界面能够随时相应用户输入 当某个应用程序在进行大量运算时候,为了保证应用程序能够随时相应客户的输入,这个时候我们往往需要让大量运算和相应用户输入这两个行为在不同的线程中进行. 效率原因 应用程序经常需要等待一些资源,如等待网络资源,等待io资源,等待用户输入等等.这种情况下使用多线程可以避免CPU长时间处于闲置状态. 用户态,内核态 线程内的资源有两种运行态,即用户态和内核态.某些运算可以在堆栈上进行,这种情况线程是在用户态运行的,某些需要高权限运行的