本节研究事件循环EventLoop以及EventLoopPool,并给出C++实现;
线程模型
本网络组件的线程模型是一个master线程和多个IO线程模型;master线程负责accept连接,然后会将该连接派发到IO线程,可以按照轮转的方式来派发到各IO线程;每一个IO线程有一个EventLoop(事件循环),一个TCP连接必须归这个EventLoop来管理,所有的IO都会转移到这个线程,其他线程是无法读写该TCP连接信息的,也是说TCP连接的fd也只能由这个IO线程去读写;其中EventLoop是要求线程安全的;具体示意图如下:
事件循环
(1)每一个IO线程一个EventLoop对象,并拥有它;
(2)在loop循环中,将会执行_epoller->pollAndHandleEvent(seconds)来处理到期的事件,以及会执行_doPendingFunctors()来处理提交给本IO线程的回调函数Functor,这些回调函数不应该阻塞当前IO线程,否则会造成当前IO线程无法及时处理到来的IO事件;
事件循环时序图如下:
EventLoop
EventLoop声明
class Epoller; class Event; class TimerQueue; class EventLoop final { public: EventLoop(const EventLoop&) = delete; EventLoop& operator=(const EventLoop&) = delete; EventLoop(); ~EventLoop(); bool isInThreadLoop(); void assertInThreadLoop(); void loop(); void stop(); void updateEvent(Event* event); void removeEvent(Event* event); typedef std::function<void()> Functor; void runInLoop(const Functor& cb); void queueInLoop(const Functor& cb); typedef std::function<void()> TimerCallback; void addSingleTimer(const TimerCallback& cb, uint32_t interval); private: void _handleRead(); void _wakeup(); void _doPendingFunctors(); int _wakeupFd; std::unique_ptr<Event> _wakeupEvent; bool _loop; std::unique_ptr<Epoller> _epoller; std::unique_ptr<TimerQueue> _timerQueue; std::vector<Functor> _functorLists; pid_t _tid; Base::Mutex _mutex; };
说明几点:
(1)updateEvent,removeEvent来使用epoller来更新或删除相关事件;
(2)_doPendingFunctors为用户提交的需要处理的回调函数Functor;用户的Functor主要通过queueInLoop和runInLoop来提交;当提交的线程与IO线程不是同一个线程时,为了保持线程安全,runInLoop将会间接调用queueInLoop;_wakeupEvent用来queueInLoop提交Functor后,来唤醒当前的IO线程来及时处理Functor;
(3)_loop变量控制loop循环的退出,通过stop()函数,可以让loop循环退出;stop()主要由其他线程调用;
(4)_epoller,_timerQueue,_wakeupEvent都是EventLoop的成员,生命周期由其控制;
构造函数
namespace { const int loopSeconds = 10; int createEventFd() { int fd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (fd < 0) { LOG_SYSERR << "eventfd system error"; } return fd; } } EventLoop::EventLoop(): _wakeupFd(createEventFd()), _wakeupEvent(new Event(_wakeupFd, this)), _loop(false), _epoller(new Epoller(this)), _timerQueue(new TimerQueue(this)), _tid(CurrentThread::tid()), _mutex() { assert(_wakeupFd >= 0); assert(!_loop); assertInThreadLoop(); LOG_TRACE << "eventfd fd: " << _wakeupFd; LOG_INFO << "Thread tid: " << _tid << " run this EventLoop: " << this; _wakeupEvent->setReadCallback(std::bind(&EventLoop::_handleRead, this)); _wakeupEvent->enableReading(); }
说明几点:
(1)_wakeupEvent->setReadCallback(std::bind(&EventLoop::_handleRead, this));设置读事件的处理回调函数,仅仅会读取_wakeup()写入的数值;
(2)_wakeupEvent->enableReading()会将_wakeupFd加入到本epoll的监听事件中去;
(3)assertInThreadLoop();保证执行的线程为拥有EventLoop的IO线程;
析构函数
EventLoop::~EventLoop() { if (_wakeupFd >= 0) ::close(_wakeupFd); }
Event操作
void EventLoop::updateEvent(Event* event) { assertInThreadLoop(); _epoller->updateEvent(event); } void EventLoop::removeEvent(Event* event) //only invoke by _handclose { assertInThreadLoop(); _epoller->removeEvent(event); }
提交Functor
void EventLoop::runInLoop(const Functor& cb) //another thread can invoke { if (isInThreadLoop()) { if (cb) cb(); } else { queueInLoop(cb); } } void EventLoop::queueInLoop(const Functor& cb) //another thread can invoke { { MutexLockGuard lock(_mutex); _functorLists.push_back(cb); } if (!isInThreadLoop()) { _wakeup(); } } void EventLoop::_wakeup() //other thread wake up loop thread { uint64_t value = 1; auto s = ::write(_wakeupFd, &value, sizeof(uint64_t)); if (s != sizeof(uint64_t)) LOG_SYSERR << "write system error"; } void EventLoop::_handleRead() { assertInThreadLoop(); uint64_t value; auto s = ::read(_wakeupFd, &value, sizeof(uint64_t)); //when value is non-zero, when write more times, it only read one time by add the read value if (s != sizeof(uint64_t)) LOG_SYSERR << "read system error"; LOG_TRACE << "eventfd read times: " << value; }
说明几点:
(1)用户的Functor主要通过queueInLoop和runInLoop来提交;当提交的线程与IO线程不是同一个线程时,为了保持线程安全,runInLoop将会间接调用queueInLoop;_wakeupEvent用来queueInLoop提交Functor后,若不是当前IO线程的提交,将使用 _wakeup()来唤醒当前的IO线程来,及时处理Functor;
(2)_handleRead()为_wakeupEvent读事件的处理回调函数;
(3)queueInLoop要保证线程安全,因此加入到_functorLists,使用互斥量来保护临界区;
执行Functor
void EventLoop::_doPendingFunctors() { assertInThreadLoop(); std::vector<Functor> _functors; { MutexLockGuard lock(_mutex); _functors.swap( _functorLists); } for (auto& cb : _functors) { if (cb) cb(); } }
说明几点:
(1)使用_functors栈对象来和当前的_functorLists交换,减小操作_functorLists的时间,这样不会过多的阻塞其他线程向_functorLists提交Functor回调;
loop函数
bool EventLoop::isInThreadLoop() { return _tid == CurrentThread::tid(); } void EventLoop::assertInThreadLoop() { //LOG_TRACE << "Current tid: " << CurrentThread::tid() << " , loop tid: " << _tid; assert(isInThreadLoop()); } void EventLoop::loop() //only invoke by loop thread { assertInThreadLoop(); assert(!_loop); _loop = true; while (_loop) { int seconds = _timerQueue->minUpdateSeconds() - TimeStamp::now().secondsSinceEpoch(); if (seconds <= 0) { seconds = loopSeconds; } _epoller->pollAndHandleEvent(seconds); _doPendingFunctors(); } } void EventLoop::stop() { _loop = false; }
说明几点:
(1)loop函数中,使用 _epoller->pollAndHandleEvent(seconds)来处理epoll监听的事件,使用 _epoller->pollAndHandleEvent(seconds),其中seconds为需要阻塞的时间,一般是_timerQueue到期的第一个Timer的时间(绝对时间)减去当前的时间,但是如果_timerQueue没有Timer,那么seconds为周期性的时间loopSeconds
(2)_doPendingFunctors()为执行用户提交的Functor;
提交非周期性定时函数
void EventLoop::addSingleTimer(const TimerCallback& cb, uint32_t interval) { _timerQueue->addTimer(cb, interval, false); }
EventLoopPool
每一个IO线程都拥有一个EventLoop对象,当主线程派发socket连接时,将使用轮转法从EventLoopPool获得某IO线程的EventLoop对象,该socket连接后面事件的监听将会由该IO线程负责;
EventLoopPool声明
class EventLoop; class EventLoopPool final { public: EventLoopPool(const EventLoopPool&) = delete; EventLoopPool& operator=(const EventLoopPool&) = delete; explicit EventLoopPool(size_t loopNums); ~EventLoopPool(); void start(); void stop(); EventLoop* getNextLoop(); private: void _run(); size_t _loopNums; bool _running; size_t _curIndex; std::vector<EventLoop*> _loops; std::vector<std::shared_ptr<Base::Thread>> _threads; Base::Mutex _mutex; Base::CountDownLatch _countDownLatch; };
说明几点:
(1)master线程调用getNextLoop()可按照轮转法来获得EventLoop对象;
(2)CountDownLatch是倒计时,当所有的IO线程初始化好EventLoop对象后,master线程才会继续执行,防止master线程已经需要派发连接socket,而getNextLoop()时,对应的IO线程尚初始化好EventLoop对象(尚未放入_loops)而造成错误;
构造与析构函数
EventLoopPool::EventLoopPool(size_t loopNums) : _loopNums(loopNums), _running(false), _curIndex(0), _mutex(), _countDownLatch(loopNums) { assert(_loopNums > 0); _loops.reserve(loopNums); _threads.reserve(loopNums); } EventLoopPool::~EventLoopPool() { if (_running) stop(); }
启动和停止IO线程
void EventLoopPool::start() { assert(!_running); _running = true; for (size_t i = 0; i < _loopNums; ++i) { std::shared_ptr<Thread> thread(new Thread(std::bind(&EventLoopPool::_run, this))); //impotant _threads.push_back(thread); thread->start(); } LOG_TRACE << "wait , Current tid: " << CurrentThread::tid(); _countDownLatch.wait(); } void EventLoopPool::stop() { assert(_running); _running = false; for (auto& loop : _loops) { loop->stop(); } for (auto& thread : _threads) { thread->join(); } _loops.clear(); _threads.clear(); }
说明几点:
(1)在master线程start后,产生多个IO线程后,将会执行_countDownLatch.wait()来等待所有的IO线程初始化好EventLoop对象,各个IO线程将会调用_countDownLatch.countDown();
(2)在stop函数中,首先会停止各个EventLoop继续loop,此后这些线程将会退出,此时使用thread->join()来依次等待各个IO线程结束;
轮转法派发EventLoop
EventLoop* EventLoopPool::getNextLoop() { _curIndex %= _loopNums; EventLoop* loop = _loops[_curIndex]; ++_curIndex; return loop; }
生成EventLoop对象
void EventLoopPool::_run() { EventLoop loop; //一个loop std::function<void()> fun = std::bind(&EventLoop::loop, &loop); { MutexLockGuard lock(_mutex); _loops.push_back(&loop); //放入到loop缓冲池中 } LOG_TRACE << "countDown , Current tid: " << CurrentThread::tid(); _countDownLatch.countDown(); //loop should in the _loops, otherwise getNextLoop will have error if (fun) { while (_running) fun(); } }
说明几点:
(1)EventLoop为栈对象,当加入到_loops后说明EventLoop已经完整初始化,此时调用_countDownLatch.countDown(),倒计时计数器将会减1;