【网络组件】事件循环

本节研究事件循环EventLoop以及EventLoopPool,并给出C++实现;

线程模型

本网络组件的线程模型是一个master线程和多个IO线程模型;master线程负责accept连接,然后会将该连接派发到IO线程,可以按照轮转的方式来派发到各IO线程;每一个IO线程有一个EventLoop(事件循环),一个TCP连接必须归这个EventLoop来管理,所有的IO都会转移到这个线程,其他线程是无法读写该TCP连接信息的,也是说TCP连接的fd也只能由这个IO线程去读写;其中EventLoop是要求线程安全的;具体示意图如下:

事件循环

(1)每一个IO线程一个EventLoop对象,并拥有它;

(2)在loop循环中,将会执行_epoller->pollAndHandleEvent(seconds)来处理到期的事件,以及会执行_doPendingFunctors()来处理提交给本IO线程的回调函数Functor,这些回调函数不应该阻塞当前IO线程,否则会造成当前IO线程无法及时处理到来的IO事件;

事件循环时序图如下:

EventLoop

EventLoop声明

class Epoller;
class Event;
class TimerQueue;

class EventLoop final
{
public:
  EventLoop(const EventLoop&) = delete;
  EventLoop& operator=(const EventLoop&) = delete;

  EventLoop();

  ~EventLoop();
  bool isInThreadLoop();

  void assertInThreadLoop();

  void loop();

  void stop();

  void updateEvent(Event* event);

  void removeEvent(Event* event);

  typedef std::function<void()> Functor;
  void runInLoop(const Functor& cb);

  void queueInLoop(const Functor& cb);

  typedef std::function<void()> TimerCallback;
  void addSingleTimer(const TimerCallback& cb, uint32_t interval);

private:
  void _handleRead();
  void _wakeup();
  void _doPendingFunctors();

  int _wakeupFd;

  std::unique_ptr<Event> _wakeupEvent;

  bool _loop;

  std::unique_ptr<Epoller> _epoller;
  std::unique_ptr<TimerQueue> _timerQueue;

  std::vector<Functor> _functorLists;

  pid_t _tid;
  Base::Mutex _mutex;
};

说明几点:

(1)updateEvent,removeEvent来使用epoller来更新或删除相关事件;

(2)_doPendingFunctors为用户提交的需要处理的回调函数Functor;用户的Functor主要通过queueInLoop和runInLoop来提交;当提交的线程与IO线程不是同一个线程时,为了保持线程安全,runInLoop将会间接调用queueInLoop;_wakeupEvent用来queueInLoop提交Functor后,来唤醒当前的IO线程来及时处理Functor;

(3)_loop变量控制loop循环的退出,通过stop()函数,可以让loop循环退出;stop()主要由其他线程调用;

(4)_epoller,_timerQueue,_wakeupEvent都是EventLoop的成员,生命周期由其控制;

构造函数

namespace
{
const int loopSeconds = 10;

int createEventFd()
{
  int fd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
  if (fd < 0)
    {
      LOG_SYSERR << "eventfd system error";
    }

  return fd;
}

}

EventLoop::EventLoop():
    _wakeupFd(createEventFd()),
    _wakeupEvent(new Event(_wakeupFd, this)),
    _loop(false),
    _epoller(new Epoller(this)),
    _timerQueue(new TimerQueue(this)),
    _tid(CurrentThread::tid()),
    _mutex()
{
  assert(_wakeupFd >= 0);
  assert(!_loop);
  assertInThreadLoop();

  LOG_TRACE << "eventfd fd: " << _wakeupFd;
  LOG_INFO << "Thread tid: " << _tid << " run this EventLoop: " << this;

  _wakeupEvent->setReadCallback(std::bind(&EventLoop::_handleRead, this));
  _wakeupEvent->enableReading();
}

说明几点:

(1)_wakeupEvent->setReadCallback(std::bind(&EventLoop::_handleRead, this));设置读事件的处理回调函数,仅仅会读取_wakeup()写入的数值;

(2)_wakeupEvent->enableReading()会将_wakeupFd加入到本epoll的监听事件中去;

(3)assertInThreadLoop();保证执行的线程为拥有EventLoop的IO线程;

析构函数

EventLoop::~EventLoop()
{
  if (_wakeupFd >= 0)
    ::close(_wakeupFd);
}

Event操作

void EventLoop::updateEvent(Event* event)
{
  assertInThreadLoop();
  _epoller->updateEvent(event);
}

void EventLoop::removeEvent(Event* event)  //only invoke by _handclose
{
  assertInThreadLoop();
  _epoller->removeEvent(event);
}

提交Functor

void EventLoop::runInLoop(const Functor& cb)    //another thread can invoke
{
  if (isInThreadLoop())
    {
      if (cb)
        cb();
    }
  else
    {
      queueInLoop(cb);
    }
}

void EventLoop::queueInLoop(const Functor& cb)  //another thread can invoke
{
  {
    MutexLockGuard lock(_mutex);
    _functorLists.push_back(cb);
  }

  if (!isInThreadLoop())
    {
      _wakeup();
    }
}

void EventLoop::_wakeup()         //other thread wake up loop thread
{
   uint64_t value = 1;
   auto s = ::write(_wakeupFd, &value, sizeof(uint64_t));

   if (s != sizeof(uint64_t))
      LOG_SYSERR << "write system error";
}

void EventLoop::_handleRead()
{
  assertInThreadLoop();

  uint64_t value;
  auto s = ::read(_wakeupFd, &value, sizeof(uint64_t));
  //when value is non-zero, when write more times, it only read one time by add the read value

 if (s != sizeof(uint64_t))
     LOG_SYSERR << "read system error"; 

  LOG_TRACE << "eventfd read times: " << value;
}

说明几点:

(1)用户的Functor主要通过queueInLoop和runInLoop来提交;当提交的线程与IO线程不是同一个线程时,为了保持线程安全,runInLoop将会间接调用queueInLoop;_wakeupEvent用来queueInLoop提交Functor后,若不是当前IO线程的提交,将使用 _wakeup()来唤醒当前的IO线程来,及时处理Functor;

(2)_handleRead()为_wakeupEvent读事件的处理回调函数;

(3)queueInLoop要保证线程安全,因此加入到_functorLists,使用互斥量来保护临界区;

执行Functor

void EventLoop::_doPendingFunctors()
{
  assertInThreadLoop();

  std::vector<Functor> _functors;
  {
    MutexLockGuard lock(_mutex);
    _functors.swap( _functorLists);
  }

  for (auto& cb : _functors)
    {
      if (cb)
        cb();
    }
}

说明几点:

(1)使用_functors栈对象来和当前的_functorLists交换,减小操作_functorLists的时间,这样不会过多的阻塞其他线程向_functorLists提交Functor回调

loop函数

bool EventLoop::isInThreadLoop()
{
  return _tid == CurrentThread::tid();
}

void EventLoop::assertInThreadLoop()
{
  //LOG_TRACE << "Current tid: " << CurrentThread::tid() << " , loop tid: " << _tid;
  assert(isInThreadLoop());
}

void EventLoop::loop()    //only invoke by loop thread
{
  assertInThreadLoop();

  assert(!_loop);
  _loop = true;
  while (_loop)
    {
      int seconds = _timerQueue->minUpdateSeconds() - TimeStamp::now().secondsSinceEpoch();
      if (seconds <= 0) {
        seconds = loopSeconds;
      }

      _epoller->pollAndHandleEvent(seconds);

      _doPendingFunctors();
    }
}

void EventLoop::stop()
{
  _loop = false;
}

说明几点:

(1)loop函数中,使用 _epoller->pollAndHandleEvent(seconds)来处理epoll监听的事件,使用 _epoller->pollAndHandleEvent(seconds),其中seconds为需要阻塞的时间,一般是_timerQueue到期的第一个Timer的时间(绝对时间)减去当前的时间,但是如果_timerQueue没有Timer,那么seconds为周期性的时间loopSeconds

(2)_doPendingFunctors()为执行用户提交的Functor;

提交非周期性定时函数

void EventLoop::addSingleTimer(const TimerCallback& cb, uint32_t interval)
{
  _timerQueue->addTimer(cb, interval, false);
}

EventLoopPool

每一个IO线程都拥有一个EventLoop对象,当主线程派发socket连接时,将使用轮转法从EventLoopPool获得某IO线程的EventLoop对象,该socket连接后面事件的监听将会由该IO线程负责;

EventLoopPool声明

class EventLoop;

class EventLoopPool final
{
public:
  EventLoopPool(const EventLoopPool&) = delete;
  EventLoopPool& operator=(const EventLoopPool&) = delete;

  explicit EventLoopPool(size_t loopNums);

  ~EventLoopPool();

  void start();

  void stop();

  EventLoop* getNextLoop();

private:
  void _run();

  size_t _loopNums;
  bool _running;

  size_t _curIndex;

  std::vector<EventLoop*> _loops;
  std::vector<std::shared_ptr<Base::Thread>> _threads;

  Base::Mutex _mutex;
  Base::CountDownLatch _countDownLatch;
};

说明几点:

(1)master线程调用getNextLoop()可按照轮转法来获得EventLoop对象;

(2)CountDownLatch是倒计时,当所有的IO线程初始化好EventLoop对象后,master线程才会继续执行,防止master线程已经需要派发连接socket,而getNextLoop()时,对应的IO线程尚初始化好EventLoop对象(尚未放入_loops)而造成错误;

构造与析构函数

EventLoopPool::EventLoopPool(size_t loopNums) :
    _loopNums(loopNums),
    _running(false),
    _curIndex(0),
    _mutex(),
    _countDownLatch(loopNums)
{
  assert(_loopNums > 0);
  _loops.reserve(loopNums);
  _threads.reserve(loopNums);
}

EventLoopPool::~EventLoopPool()
{
  if (_running)
    stop();
}

启动和停止IO线程

void EventLoopPool::start()
{
  assert(!_running);

  _running = true;
  for (size_t i = 0; i < _loopNums; ++i)
    {
      std::shared_ptr<Thread> thread(new Thread(std::bind(&EventLoopPool::_run, this)));    //impotant
      _threads.push_back(thread);

      thread->start();
    }

  LOG_TRACE << "wait , Current tid: " << CurrentThread::tid();
  _countDownLatch.wait();
}

void  EventLoopPool::stop()
{
  assert(_running);
  _running = false;

for (auto& loop : _loops)
    {
      loop->stop();
    }

for (auto& thread : _threads)
    {
      thread->join();
    }

  _loops.clear();
  _threads.clear();
}

说明几点:

(1)在master线程start后,产生多个IO线程后,将会执行_countDownLatch.wait()来等待所有的IO线程初始化好EventLoop对象,各个IO线程将会调用_countDownLatch.countDown();

(2)在stop函数中,首先会停止各个EventLoop继续loop,此后这些线程将会退出,此时使用thread->join()来依次等待各个IO线程结束;

轮转法派发EventLoop

EventLoop* EventLoopPool::getNextLoop()
{
  _curIndex %= _loopNums;

  EventLoop* loop = _loops[_curIndex];
  ++_curIndex;

  return loop;
}

生成EventLoop对象

void EventLoopPool::_run()
{
  EventLoop loop;     //一个loop
  std::function<void()> fun = std::bind(&EventLoop::loop, &loop);
  {
    MutexLockGuard lock(_mutex);
    _loops.push_back(&loop);        //放入到loop缓冲池中
  }

  LOG_TRACE << "countDown , Current tid: " << CurrentThread::tid();
  _countDownLatch.countDown();    //loop should in the _loops, otherwise getNextLoop will have error

  if (fun) {
    while (_running)
       fun();
  }
}

说明几点:

(1)EventLoop为栈对象,当加入到_loops后说明EventLoop已经完整初始化,此时调用_countDownLatch.countDown(),倒计时计数器将会减1;

时间: 2024-10-30 22:29:45

【网络组件】事件循环的相关文章

【网络组件】事件和IO复用

本节研究事件和IO复用的实现,主要是Event和Epoller并给出C++实现: 线程模型 本网络组件的线程模型是一个master线程和多个IO线程模型:master线程负责accept连接,然后会将该连接派发到IO线程,可以按照轮转的方式来派发到各IO线程:每一个IO线程有一个EventLoop(事件循环),一个TCP连接必须归这个EventLoop来管理,所有的IO都会转移到这个线程,其他线程是无法读写该TCP连接信息的,也是是说TCP连接的fd也只能由这个IO线程去读写:其中EventLo

Muduo网络库源码分析(一) EventLoop事件循环(Poller和Channel)

从这一篇博文起,我们开始剖析Muduo网络库的源码,主要结合<Linux多线程服务端编程>和网上的一些学习资料! (一)TCP网络编程的本质:三个半事件 1. 连接的建立,包括服务端接受(accept) 新连接和客户端成功发起(connect) 连接.TCP 连接一旦建立,客户端和服务端是平等的,可以各自收发数据. 2. 连接的断开,包括主动断开(close 或shutdown) 和被动断开(read(2) 返回0). 3. 消息到达,文件描述符可读.这是最为重要的一个事件,对它的处理方式决定

Qt 学习之路:线程和事件循环

前面一章我们简单介绍了如何使用QThread实现线程.现在我们开始详细介绍如何“正确”编写多线程程序.我们这里的大部分内容来自于Qt的一篇Wiki文档,有兴趣的童鞋可以去看原文. 在介绍在以前,我们要认识两个术语: 可重入的(Reentrant):如果多个线程可以在同一时刻调用一个类的所有函数,并且保证每一次函数调用都引用一个唯一的数据,就称这个类是可重入的(Reentrant means that all the functions in the referenced class can be

node事件循环和消息队列简单分析

node的好处毋庸置疑,事件驱动,异步非阻塞I/O,以及处理高并发的能力深入人心,因此大家喜欢用node做一些小型后台服务或者作为中间层和其他服务配合完成一些大型应用场景. 什么是异步? 异步和同步应该是经常谈的一个话题了.同步的概念很简单,自上而下依次执行,必须等上边执行完下边才会执行.而异步可以先提交一个命令,中间可以去执行别的事务,而当执行完之后回过头来返回之前的任务. 举个栗子: 你很幸运,找了一个漂亮的女朋友,有一天你的女朋友发短信问你晚上看什么电影?但你并不知道看什么,马上打开电脑查

redis源码分析(2)——事件循环

redis作为服务器程序,网络IO处理是关键.redis不像memcached使用libevent,它实现了自己的IO事件框架,并且很简单.小巧.可以选择select.epoll.kqueue等实现. 作为 IO事件框架,需要抽象多种IO模型的共性,将整个过程主要抽象为: 1)初始化 2)添加.删除事件 3)等待事件发生 下面也按照这个步骤分析代码. (1)初始化 回忆一下redis的初始化过程中,initServer函数会调用aeCreateEventLoop创建event loop对象,对事

定时器运行原理 &amp;&amp; javascript事件循环模型

定时器是我们经常使用的一个异步函数,它的用处十分广泛,比如图片轮播.各种小的动画.延时操作等等:定时器函数只有两个setTimeout.setInterval,这两个工作原理相同,唯一的区别是:setTimeout只执行一次,setInterval循环执行:通过以下实例看看对定时器原理掌握程度: 定时器3个实例 首先声明这三个实例输出皆不同,先思考输出结果,以及为何不同 实例一: console.log('test1') for(var i=0;i<10;i++){ setTimeout(()=

node.js的作用、回调、同步异步代码、事件循环

http://www.nodeclass.com/articles/39274 一.node.js的作用 I/O的意义,(I/O是输入/输出的简写,如:键盘敲入文本,输入,屏幕上看到文本显示输出.鼠标移动,在屏幕上看到鼠标的移动.终端的输入,和看到的输出.等等) node.js想解决的问题,(处理输入,输入,高并发 .如 在线游戏中可能会有上百万个游戏者,则有上百万的输入等等)(node.js适合的范畴:当应用程序需要在网络上发送和接收数据时Node.js最为适合.这可能是第三方的API,联网设

iOS 对比网络组件:AFNetworking 与 ASIHTTPRequest

在开发iOS应用过程中,如何高效的与服务端API进行数据交换,是一个常见问题.一般开发者都会选择一个第三方的网络组件作为服务,以提高开发效率和稳定性.这些组件把复杂的网络底层操作封装成友好的类和方法,并且加入异常处理等. 那么,大家最常用的组件是什么?这些组件是如何提升开发效率和稳定性的?哪一款组件适合自己,是 AFNetworking(AFN)还是 ASIHTTPRequest(ASI)?几乎每一个iOS互联网应用开发者都会面对这样的选择题,要从这两个最常用的组件里选出一个好的还真不是那么容易

js: 从setTimeout说事件循环模型

一.从setTimeout说起 setTimeout()方法不是ecmascript规范定义的内容,而是属于BOM提供的功能.查看w3school对setTimeout()方法的定义,setTimeout() 方法用于在指定的毫秒数后调用函数或计算表达式. 语法setTimeout(fn,millisec),其中fn表示要执行的代码,可以是一个包含javascript代码的字符串,也可以是一个函数.第二个参数millisec是以毫秒表示的时间,表示fn需推迟多长时间执行. 调用setTimeou