本节研究事件和IO复用的实现,主要是Event和Epoller并给出C++实现;
线程模型
本网络组件的线程模型是一个master线程和多个IO线程模型;master线程负责accept连接,然后会将该连接派发到IO线程,可以按照轮转的方式来派发到各IO线程;每一个IO线程有一个EventLoop(事件循环),一个TCP连接必须归这个EventLoop来管理,所有的IO都会转移到这个线程,其他线程是无法读写该TCP连接信息的,也是是说TCP连接的fd也只能由这个IO线程去读写;其中EventLoop是要求线程安全的;具体示意图如下:
事件循环
(1)每一个IO线程一个EventLoop对象,并拥有它,其主要功能就是loop循环;
(2)Event对象始终负责一个fd的IO事件分发,但是它不拥有该fd,析构时也不会关闭这个fd,Event会根据时间类型的不同分发为不同的回调,如ReadCallback、WriteCallback等;回调对象为C++中的function表示;
(3)Epoller是IO复用(epoll机制)的实现,它是EventLoop对象的成员,只供EventLoop调用;
事件循环时序图如下:
事件操作
(1)_update主要包括两个操作,添加和更新事件;最终有epoll机制的epoll_ctl来操作,对应的类型EPOLL_CTL_ADD,EPOLL_CTL_MOD;
(2)_remove主要删除事件操作;最终有epoll机制的epoll_ctl来操作,对应的类型EPOLL_CTL_DEL;
事件操作时序图如下:
Event
Event声明
class EventLoop; class Event final { public: Event(const Event&) = delete; Event& operator=(const Event&) = delete; Event(int fd, EventLoop *loop); void setReadCallback(const ReadCallback& cb) { _readCallback = cb; } void setWriteCallback(const WriteCallback& cb) { _writeCallback = cb; } void setErrorCallback(const ErrorCallback& cb) { _errorCallback = cb; } void setCloseCallback(const CloseCallback& cb) { _closeCallback = cb; } bool isWriting() const { return _events & _cWriteEvent; } bool isReading() const { return _events & _cReadEvent; } void disableWriting() { _events &= ~_cWriteEvent; _update(); } void enableWriting() { _events |= _cWriteEvent; _update(); } void disableAll() { _events = 0; _update(); } void enableReading() { _events |= _cReadEvent; _update(); } int fd() const { return _fd; } uint32_t events() const { return _events; } void setRevents(uint32_t revents) { _revents = revents; } void handleEvent(); void remove(); private: void _update(); uint32_t _events; uint32_t _revents; int _fd; EventLoop* _loop; ReadCallback _readCallback; WriteCallback _writeCallback; ErrorCallback _errorCallback; CloseCallback _closeCallback; static uint32_t _cReadEvent; static uint32_t _cWriteEvent; };
说明几点
(1)_events为需要关心的事件类型,而_revents为epoll_wait返回后发生的事件类型,需要根据_revents进行相应的分发操作;
(2)具体的分发操作为 ReadCallback _readCallback、WriteCallback _writeCallback、ErrorCallback _errorCallback、CloseCallback _closeCallback;
Event实现
uint32_t Event::_cReadEvent = EPOLLIN; uint32_t Event::_cWriteEvent = EPOLLOUT; Event::Event(int efd, EventLoop *loop): _events(0), //impotant _fd(efd), _loop(loop) { } void Event::remove() { _loop->removeEvent(this); } void Event::_update() { _loop->updateEvent(this); } void Event::handleEvent() { if ((_revents & EPOLLHUP) && !(_revents & EPOLLIN)) //do not handle HUP about socket, we should handle before EPOLLIN,because if readCallback handclose ,the event will destructor, the _revents will random { LOG_TRACE << "_closeCallback, fd: " << fd(); if (_closeCallback) _closeCallback(); } if (_revents & (EPOLLIN | EPOLLRDHUP)) { LOG_TRACE << "_readCallback, fd: " << fd(); if (_readCallback) _readCallback(TimeStamp::now()); } if (_revents & EPOLLOUT) { LOG_TRACE << "_writeCallback, fd: " << fd(); if (_writeCallback) _writeCallback(); } if (_revents & EPOLLERR) { LOG_TRACE << "_errorCallback, fd: " << fd(); if (_errorCallback) _errorCallback(); } }
说明几点:
(1)handleEvent()就是根据_revents不同的事件类型进行相应不同的回调操作;
(2)_update()和remove即为相应的事件操作;
Epoller
Epoller声明
class Event; class EventLoop; class Epoller final { public: Epoller(const Epoller&) = delete; Epoller& operator=(const Epoller&) = delete; Epoller(EventLoop* loop); ~Epoller(); void removeEvent(Event* event); void updateEvent(Event* event); void pollAndHandleEvent(int seconds); private: int _epollfd; EventLoop* _loop; std::vector<struct epoll_event> _activeEvents; std::map<int, Event*> _eventMap; static const int cMaxEventNumber = 1000; }; }
说明几点:
(1)pollAndHandleEvent主要为调用epoll_wait和事件处理函数;
(2)removeEvent,updateEvent()分别为删除事件和更新事件;
(3)_eventMap为建立fd和Event建立的map,红黑树高效查找;_activeEvents为内核epoll_wait填写的活动事件列表;
Epoller实现
Epoller::Epoller(EventLoop* loop) : _epollfd(epoll_create(100)), _loop(loop) { assert(_epollfd >= 0); _activeEvents.resize(cMaxEventNumber); LOG_TRACE << "epollfd fd: " << _epollfd; } Epoller::~Epoller() { ::close(_epollfd); } void Epoller::updateEvent(Event* event) { struct epoll_event epollEvent; epollEvent.data.fd = event->fd(); epollEvent.events = event->events(); int err; if (_eventMap.find(event->fd()) == _eventMap.end()) { //LOG_TRACE << "fd: [" << event->fd() << "] add to epoll"; err = epoll_ctl(_epollfd, EPOLL_CTL_ADD, event->fd(), &epollEvent); _eventMap[event->fd()] = event; } else { //LOG_TRACE << "fd: [" << event->fd() << "] update from epoll"; err = epoll_ctl(_epollfd, EPOLL_CTL_MOD, event->fd(), &epollEvent); } if (err != 0) { LOG_SYSERR << "epoll_ctl error"; } } void Epoller::removeEvent(Event* event) { if (epoll_ctl(_epollfd, EPOLL_CTL_DEL, event->fd(), NULL) != 0) { LOG_SYSERR << "epoll_ctl error"; } assert(_eventMap.find(event->fd()) != _eventMap.end()); if (_eventMap.find(event->fd()) != _eventMap.end()) { _eventMap.erase(event->fd()); } } void Epoller::pollAndHandleEvent(int seconds) { int num = ::epoll_wait(_epollfd, &(*_activeEvents.begin()), _activeEvents.size(), seconds * 1000); if (num == static_cast<int>(_activeEvents.size())) { _activeEvents.resize(2 * num); } for (int i = 0; i < num; ++i) { int fd = _activeEvents[i].data.fd; Event* event = _eventMap[fd]; event->setRevents(_activeEvents[i].events); event->handleEvent(); } }
EventLoop相关
EventLoop和本节相关的函数,完整介绍请见后面的博客;
更新和删除事件
void EventLoop::updateEvent(Event* event) { assertInThreadLoop(); _epoller->updateEvent(event); } void EventLoop::removeEvent(Event* event) //only invoke by _handclose { assertInThreadLoop(); _epoller->removeEvent(event); }
loop中调用_epoller的pollAndHandleEvent
void EventLoop::loop() //only invoke by loop thread { assertInThreadLoop(); assert(!_loop); _loop = true; while (_loop) { int seconds = _timerQueue->minUpdateSeconds(); if (seconds == 0) { seconds = loopSeconds; } _epoller->pollAndHandleEvent(seconds); _doPendingFunctors(); } }