【网络组件】事件和IO复用

本节研究事件和IO复用的实现,主要是Event和Epoller并给出C++实现;

线程模型

本网络组件的线程模型是一个master线程和多个IO线程模型;master线程负责accept连接,然后会将该连接派发到IO线程,可以按照轮转的方式来派发到各IO线程;每一个IO线程有一个EventLoop(事件循环),一个TCP连接必须归这个EventLoop来管理,所有的IO都会转移到这个线程,其他线程是无法读写该TCP连接信息的,也是是说TCP连接的fd也只能由这个IO线程去读写;其中EventLoop是要求线程安全的;具体示意图如下:

事件循环

(1)每一个IO线程一个EventLoop对象,并拥有它,其主要功能就是loop循环;

(2)Event对象始终负责一个fd的IO事件分发,但是它不拥有该fd,析构时也不会关闭这个fd,Event会根据时间类型的不同分发为不同的回调,如ReadCallback、WriteCallback等;回调对象为C++中的function表示;

(3)Epoller是IO复用(epoll机制)的实现,它是EventLoop对象的成员,只供EventLoop调用;

事件循环时序图如下:

事件操作

(1)_update主要包括两个操作,添加和更新事件;最终有epoll机制的epoll_ctl来操作,对应的类型EPOLL_CTL_ADD,EPOLL_CTL_MOD;

(2)_remove主要删除事件操作;最终有epoll机制的epoll_ctl来操作,对应的类型EPOLL_CTL_DEL;

事件操作时序图如下:

Event

Event声明

class EventLoop;

class Event final
{
public:
  Event(const Event&) = delete;
  Event& operator=(const Event&) = delete;

  Event(int fd, EventLoop *loop);

  void setReadCallback(const ReadCallback& cb)
  {
    _readCallback = cb;
  }

  void setWriteCallback(const WriteCallback& cb)
  {
    _writeCallback = cb;
  }

  void setErrorCallback(const ErrorCallback& cb)
  {
    _errorCallback = cb;
  }

  void setCloseCallback(const CloseCallback& cb)
  {
    _closeCallback = cb;
  }

  bool isWriting() const
  {
    return _events & _cWriteEvent;
  }

  bool isReading() const
  {
    return _events & _cReadEvent;
  }

  void disableWriting()
  {
    _events &= ~_cWriteEvent;
    _update();
  }

  void enableWriting()
  {
    _events |= _cWriteEvent;
    _update();
  }

  void disableAll()
  {
    _events = 0;
    _update();
  }

  void enableReading()
  {
    _events |= _cReadEvent;
    _update();
  }

  int fd() const
  {
    return _fd;
  }

  uint32_t events() const
  {
    return _events;
  }

  void setRevents(uint32_t revents)
  {
    _revents = revents;
  }

  void handleEvent();

  void remove();

private:
  void _update();

  uint32_t _events;
  uint32_t _revents;

  int _fd;
  EventLoop* _loop;

  ReadCallback _readCallback;
  WriteCallback _writeCallback;
  ErrorCallback _errorCallback;
  CloseCallback _closeCallback;

  static uint32_t _cReadEvent;
  static uint32_t _cWriteEvent;
};

说明几点

(1)_events为需要关心的事件类型,而_revents为epoll_wait返回后发生的事件类型,需要根据_revents进行相应的分发操作;

(2)具体的分发操作为  ReadCallback _readCallback、WriteCallback _writeCallback、ErrorCallback _errorCallback、CloseCallback _closeCallback;

Event实现

uint32_t Event::_cReadEvent = EPOLLIN;
uint32_t Event::_cWriteEvent = EPOLLOUT;

Event::Event(int efd, EventLoop *loop):
    _events(0),       //impotant
    _fd(efd),
    _loop(loop)
{

}

void Event::remove()
{
  _loop->removeEvent(this);
}

void Event::_update()
{
  _loop->updateEvent(this);
}

void Event::handleEvent()
{
  if ((_revents & EPOLLHUP) && !(_revents & EPOLLIN))
  //do not handle HUP about socket, we should handle before EPOLLIN,because if readCallback handclose ,the event will destructor, the _revents will random
      {
       LOG_TRACE << "_closeCallback, fd: " << fd();
        if (_closeCallback)
          _closeCallback();
      }

  if (_revents & (EPOLLIN | EPOLLRDHUP))
    {
      LOG_TRACE << "_readCallback, fd: " << fd();

      if (_readCallback)
        _readCallback(TimeStamp::now());
    }

  if (_revents & EPOLLOUT)
    {
     LOG_TRACE << "_writeCallback, fd: " << fd();
      if (_writeCallback)
        _writeCallback();
    }

  if (_revents & EPOLLERR)
    {
     LOG_TRACE << "_errorCallback, fd: " << fd();
      if (_errorCallback)
        _errorCallback();
    }
}

说明几点:

(1)handleEvent()就是根据_revents不同的事件类型进行相应不同的回调操作;

(2)_update()和remove即为相应的事件操作;

Epoller

Epoller声明

class Event;
class EventLoop;

class Epoller final
{
public:
  Epoller(const Epoller&) = delete;
  Epoller& operator=(const Epoller&) = delete;

  Epoller(EventLoop* loop);

  ~Epoller();

  void removeEvent(Event* event);

  void updateEvent(Event* event);

  void pollAndHandleEvent(int seconds);

private:
  int _epollfd;
  EventLoop* _loop;

  std::vector<struct epoll_event>  _activeEvents;
  std::map<int, Event*> _eventMap;

  static const int cMaxEventNumber = 1000;
};

}

说明几点:

(1)pollAndHandleEvent主要为调用epoll_wait和事件处理函数;

(2)removeEvent,updateEvent()分别为删除事件和更新事件;

(3)_eventMap为建立fd和Event建立的map,红黑树高效查找;_activeEvents为内核epoll_wait填写的活动事件列表;

Epoller实现

Epoller::Epoller(EventLoop* loop) :
    _epollfd(epoll_create(100)),
    _loop(loop)
{
  assert(_epollfd >= 0);
  _activeEvents.resize(cMaxEventNumber);

  LOG_TRACE << "epollfd fd: " << _epollfd;
}

Epoller::~Epoller()
{
  ::close(_epollfd);
}

void Epoller::updateEvent(Event* event)
{
  struct epoll_event epollEvent;
  epollEvent.data.fd = event->fd();
  epollEvent.events = event->events();

  int err;
  if (_eventMap.find(event->fd()) == _eventMap.end())
    {
      //LOG_TRACE << "fd: [" <<  event->fd() << "] add to epoll";
      err = epoll_ctl(_epollfd, EPOLL_CTL_ADD, event->fd(), &epollEvent);
      _eventMap[event->fd()] = event;
    }
  else
    {
      //LOG_TRACE << "fd: [" <<  event->fd() << "] update from epoll";
      err = epoll_ctl(_epollfd, EPOLL_CTL_MOD, event->fd(), &epollEvent);
    }

  if (err != 0)
    {
      LOG_SYSERR << "epoll_ctl error";
    }
}

void Epoller::removeEvent(Event* event)
{
  if (epoll_ctl(_epollfd, EPOLL_CTL_DEL, event->fd(), NULL) != 0)
    {
      LOG_SYSERR << "epoll_ctl error";
    }

  assert(_eventMap.find(event->fd()) != _eventMap.end());
  if (_eventMap.find(event->fd()) != _eventMap.end())
    {
      _eventMap.erase(event->fd());
    }
}

void Epoller::pollAndHandleEvent(int seconds)
{
  int num = ::epoll_wait(_epollfd, &(*_activeEvents.begin()), _activeEvents.size(), seconds * 1000);

  if (num == static_cast<int>(_activeEvents.size())) {
    _activeEvents.resize(2 * num);
  }

  for (int i = 0; i < num; ++i)
    {
      int fd = _activeEvents[i].data.fd;
      Event* event = _eventMap[fd];
      event->setRevents(_activeEvents[i].events);
      event->handleEvent();
    }
}

EventLoop相关

EventLoop和本节相关的函数,完整介绍请见后面的博客;

更新和删除事件

void EventLoop::updateEvent(Event* event)
{
  assertInThreadLoop();
  _epoller->updateEvent(event);
}

void EventLoop::removeEvent(Event* event)  //only invoke by _handclose
{
  assertInThreadLoop();
  _epoller->removeEvent(event);
}

loop中调用_epoller的pollAndHandleEvent

void EventLoop::loop()    //only invoke by loop thread
{
  assertInThreadLoop();

  assert(!_loop);
  _loop = true;
  while (_loop)
    {
      int seconds = _timerQueue->minUpdateSeconds();
      if (seconds == 0) {
        seconds = loopSeconds;
      }

      _epoller->pollAndHandleEvent(seconds);

      _doPendingFunctors();
    }
}

时间: 2024-10-31 13:37:40

【网络组件】事件和IO复用的相关文章

【Unix网络编程】chapter6 IO复用:select和poll函数

chapter6 6.1 概述 I/O复用典型使用在下列网络应用场合. (1):当客户处理多个描述符时,必须使用IO复用 (2):一个客户同时处理多个套接字是可能的,不过不叫少见. (3):如果一个TCP服务器既要处理监听套接字,又要处理已连接套接字. (4):如果一个服务器既要处理TCP,又要处理UDP (5):如果一个服务器要处理多个服务或多个协议 IO复用并非只限于网络,许多重要的应用程序也需要使用这项技术. 6.2 I/O模型 在Unix下可用的5种I/O模型的基本区别: (1)阻塞式I

Libevent的IO复用技术和定时事件原理

Libevent 是一个用C语言编写的.轻量级的开源高性能网络库,主要有以下几个亮点:事件驱动( event-driven),高性能;轻量级,专注于网络,不如 ACE 那么臃肿庞大:源代码相当精炼.易读:跨平台,支持 Windows. Linux. *BSD 和 Mac Os:支持多种 I/O 多路复用技术, epoll. poll. dev/poll. select 和 kqueue 等:支持 I/O,定时器和信号等事件:注册事件优先级. 1 Libevent中的epoll Libevent重

Linux网络编程-IO复用技术

IO复用是Linux中的IO模型之一,IO复用就是进程预先告诉内核需要监视的IO条件,使得内核一旦发现进程指定的一个或多个IO条件就绪,就通过进程进程处理,从而不会在单个IO上阻塞了.Linux中,提供了select.poll.epoll三种接口函数来实现IO复用. 1.select函数 #include <sys/select.h> #include <sys/time.h> int select(int nfds, fd_set *readfds, fd_set *writef

Unix网络编程之IO复用

上篇存在的问题 在上一篇TCP套接字中,还存在着一些问题. 当客户端连接上服务器后,阻塞于从标准输入读入信息的状态,若此时服务器进程被杀死,即使给客户TCP发来一个 FIN结束分节,但是由于客户处于阻塞状态,它将看不到这个EOF,直到读取之后,此时可能已经过去了很长时间. 因此进程需要一种能力,让内核同时检测多个IO口是否就绪,这个能力就称为IO复用.这是由select和poll两个函数 支持的. select函数 作用 允许进程指示内核等待多个事件中的任何一个发生,并只在有一个或多个事件发生或

【网络组件】事件循环

本节研究事件循环EventLoop以及EventLoopPool,并给出C++实现: 线程模型 本网络组件的线程模型是一个master线程和多个IO线程模型:master线程负责accept连接,然后会将该连接派发到IO线程,可以按照轮转的方式来派发到各IO线程:每一个IO线程有一个EventLoop(事件循环),一个TCP连接必须归这个EventLoop来管理,所有的IO都会转移到这个线程,其他线程是无法读写该TCP连接信息的,也是说TCP连接的fd也只能由这个IO线程去读写:其中EventL

LINUX网络编程 IO 复用

参考<linux高性能服务器编程> LINUX下处理多个连接时候,仅仅使用多线程和原始socket函数,效率十分低下 于是就出现了selelct poll  epoll等IO复用函数. 这里讨论性能最优的epoll IO复用 用户将需要关注的socket连接使用IO复用函数放进一个事件表中,每当事件表中有一个或者多个SOCKET连接出现读写请求时候,则进行处理 事件表使用一个额外的文件描述符来标识.文件描述符使用 epoll_create函数创建 #inlclude <sys/epoll

TCP/IP 网络编程 (抄书笔记 5) -- select 和 IO 复用

TCP/IP 网络编程 (抄书笔记 5) – select 和 IO 复用 TCP/IP 网络编程 (抄书笔记 5) – select 和 IO 复用 利用 fork() 生成子进程 可以达到 服务器端可以同时响应多个 客户端的请求, 但是这样做有缺点: 需要大量的运算和内存空间, 每个进程都要有独立的内存空间, 数据交换也很麻烦 (IPC, 如管道) IO 复用: 以太网的总线结构也是采用了 复用技术, 如果不采用, 那么两两之间就要直接通信 网络知识 int server_sock; int

&lt;网络编程&gt;IO复用

IO复用是一种机制,一个进程可以监听多个描述符,一旦某个描述符就绪(读就绪和写就绪),能够同志程序进行相应的读写操作. 目前支持I/O复用的系统调用有select,poll,pselect,epoll,本质上这些I/O复用技术是同步I/O技术.在读写事件就绪后需要进程自己负责进行读写,即读写过程是进程阻塞的. 与多进程和多线程相比,I/O复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销. 同步I/O操作导致请求进程阻塞,直到I/O操作完成

IO复用之——poll

一. 关于poll 对于IO复用模型,其优点无疑是免去了对一个个IO事件就绪的等待,转而代之的是同时对多个IO数据的检测,当检测等待的事件中至少有一个就绪的时候,就会返回告诉用户进程"已经有数据准备好了,快看看是哪个赶紧处理",而对于IO复用的实现,除了可以用select函数,另外一个函数仍然支持这种复用IO模型,就是poll函数: 二. poll函数的用法 虽然同样是对多个IO事件进行检测等待,但poll和select多少还是有些不同的: 函数参数中, 先来说nfds,这个是和sel