Muduo网络库源码分析(四)EventLoopThread和EventLoopThreadPool的封装

muduo的并发模型为one loop per thread+ threadpool。为了方便使用,muduo封装了EventLoop和Thread为EventLoopThread,为了方便使用线程池,又把EventLoopThread封装为EventLoopThreadPool。所以这篇博文并没有涉及到新鲜的技术,但是也有一些封装和逻辑方面的注意点需要我们去分析和理解。

EventLoopThread

任何一个线程,只要创建并运行了EventLoop,就是一个IO线程。 EventLoopThread类就是一个封装好了的IO线程。

EventLoopThread的工作流程为:

1、在主线程创建EventLoopThread对象。

2、主线程调用EventLoopThread.start(),启动EventLoopThread中的线程(称为IO线程),并且主线程要等待IO线程创建完成EventLoop对象。

3、IO线程调用threadFunc创建EventLoop对象。通知主线程已经创建完成。

4、主线程返回创建的EventLoop对象。

EventLoopThread.h

class EventLoopThread : boost::noncopyable
{
 public:
  typedef boost::function<void(EventLoop*)> ThreadInitCallback;

  EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback());
  ~EventLoopThread();
  EventLoop* startLoop();	// 启动线程,该线程就成为了IO线程

 private:
  void threadFunc();		// 线程函数

  EventLoop* loop_;			// loop_指针指向一个EventLoop对象
  bool exiting_;
  Thread thread_;
  MutexLock mutex_;
  Condition cond_;
  ThreadInitCallback callback_;		// 回调函数在EventLoop::loop事件循环之前被调用
};

EventLoopThread.cc

EventLoopThread::EventLoopThread(const ThreadInitCallback& cb)
  : loop_(NULL),
    exiting_(false),
    thread_(boost::bind(&EventLoopThread::threadFunc, this)),
    mutex_(),
    cond_(mutex_),
    callback_(cb)
{
}

EventLoopThread::~EventLoopThread()
{
  exiting_ = true;
  loop_->quit();		// 退出IO线程,让IO线程的loop循环退出,从而退出了IO线程
  thread_.join(); //等待线程退出
}

EventLoop* EventLoopThread::startLoop()
{
  assert(!thread_.started());
  thread_.start();//线程启动,调用threadFunc()

  {
    MutexLockGuard lock(mutex_);
    while (loop_ == NULL)
    {
      cond_.wait();//需要等待EventLoop对象的创建
    }
  }

  return loop_;
}

void EventLoopThread::threadFunc()
{
  EventLoop loop;

  if (callback_)
  {
    callback_(&loop);
  }

  {
    MutexLockGuard lock(mutex_);
    // loop_指针指向了一个栈上的对象,threadFunc函数退出之后,这个指针就失效了
    // threadFunc函数退出,就意味着线程退出了,EventLoopThread对象也就没有存在的价值了。
    // 因而不会有什么大的问题
    loop_ = &loop;
    cond_.notify(); //创建好,发送通知
  }

  loop.loop();// 会在这里循环,直到EventLoopThread析构。此后不再使用loop_访问EventLoop了
  //assert(exiting_);
}

测试程序:

#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>

#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

void runInThread()
{
  printf("runInThread(): pid = %d, tid = %d\n",
         getpid(), CurrentThread::tid());
}

int main()
{
  printf("main(): pid = %d, tid = %d\n",
         getpid(), CurrentThread::tid());

  EventLoopThread loopThread;
  EventLoop* loop = loopThread.startLoop();
  // 异步调用runInThread,即将runInThread添加到loop对象所在IO线程,让该IO线程执行
  loop->runInLoop(runInThread);
  sleep(1);
  // runAfter内部也调用了runInLoop,所以这里也是异步调用
  loop->runAfter(2, runInThread);
  sleep(3);
  loop->quit();

  printf("exit main().\n");
}

对调用过程进行分析:(查看日志)

主线程调用 loop->runInLoop(runInThread);
由于主线程(不是IO线程)调用runInLoop, 故调用queueInLoop()
将runInThead
添加到队列,然后wakeup() IO线程,IO线程在doPendingFunctors()
中取loop->runAfter()
要唤醒一下,此时只是执行runAfter()
添加了一个2s的定时器, 2s超时,timerfd_
可读,先handleRead()一下然后执行回调函数runInThread()。

那为什么exit main()
之后wakeupFd_
还会有可读事件呢?那是因为EventLoopThead
栈上对象析构,在析构函数内 loop_ ->quit(),
由于不是在IO线程调用quit(),故也需要唤醒一下,IO线程才能从poll
返回,这样再次循环判断 while (!quit_)
就能退出IO线程。

EventLoopThreadPool

muduo的线程模型:

muduo的思想时eventLoop+thread pool,为了更方便使用,将EventLoopThread做了封装。main reactor可以创建sub reactor,并发一些任务分发到sub reactor中去。EventLoopThreadPool的思想比较简单,用一个main reactor创建EventLoopThreadPool。在EventLoopThreadPool中将EventLoop和Thread绑定,可以返回EventLoop对象来使用EventLoopThreadPool中的Thread。

EventLoopThreadPool.h

class EventLoopThreadPool : boost::noncopyable
{
 public:
  typedef boost::function<void(EventLoop*)> ThreadInitCallback;

  EventLoopThreadPool(EventLoop* baseLoop);
  ~EventLoopThreadPool();
  void setThreadNum(int numThreads) { numThreads_ = numThreads; }
  void start(const ThreadInitCallback& cb = ThreadInitCallback());
  EventLoop* getNextLoop();

 private:

  EventLoop* baseLoop_;	// 与Acceptor所属EventLoop相同
  bool started_;
  int numThreads_;		// 线程数
  int next_;			// 新连接到来,所选择的EventLoop对象下标
  boost::ptr_vector<EventLoopThread> threads_;		// IO线程列表
  std::vector<EventLoop*> loops_;					// EventLoop列表
};

EventLoopThreadPool.cc

EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop)
  : baseLoop_(baseLoop),
    started_(false),
    numThreads_(0),
    next_(0)
{
}

EventLoopThreadPool::~EventLoopThreadPool()
{
  // Don't delete loop, it's stack variable
}

void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
  assert(!started_);
  baseLoop_->assertInLoopThread();

  started_ = true;

  for (int i = 0; i < numThreads_; ++i)
  {
    EventLoopThread* t = new EventLoopThread(cb);
    threads_.push_back(t);
    loops_.push_back(t->startLoop());	// 启动EventLoopThread线程,在进入事件循环之前,会调用cb
  }
  if (numThreads_ == 0 && cb)
  {
    // 只有一个EventLoop,在这个EventLoop进入事件循环之前,调用cb
    cb(baseLoop_);
  }
}

EventLoop* EventLoopThreadPool::getNextLoop()
{
  baseLoop_->assertInLoopThread();
  EventLoop* loop = baseLoop_;

  // 如果loops_为空,则loop指向baseLoop_
  // 如果不为空,按照round-robin(RR,轮叫)的调度方式选择一个EventLoop
  if (!loops_.empty())
  {
    // round-robin
    loop = loops_[next_];
    ++next_;
    if (implicit_cast<size_t>(next_) >= loops_.size())
    {
      next_ = 0;
    }
  }
  return loop;
}

mainReactor关注监听事件,已连接套接字事件轮询给线程池中的subReactors 处理,一个新的连接对应一个subReactor

我们采用round-robin(RR,轮叫)的调度方式选择一个EventLoop,也就是getNextLoop函数。极端情况下,线程池中个数为0时,那么新的连接交给mainReactor,这样就退化成单线程的模式。

时间: 2024-11-07 04:24:36

Muduo网络库源码分析(四)EventLoopThread和EventLoopThreadPool的封装的相关文章

Muduo网络库源码分析(一) EventLoop事件循环(Poller和Channel)

从这一篇博文起,我们开始剖析Muduo网络库的源码,主要结合<Linux多线程服务端编程>和网上的一些学习资料! (一)TCP网络编程的本质:三个半事件 1. 连接的建立,包括服务端接受(accept) 新连接和客户端成功发起(connect) 连接.TCP 连接一旦建立,客户端和服务端是平等的,可以各自收发数据. 2. 连接的断开,包括主动断开(close 或shutdown) 和被动断开(read(2) 返回0). 3. 消息到达,文件描述符可读.这是最为重要的一个事件,对它的处理方式决定

Muduo网络库源码分析(六)TcpConnection 的生存期管理

TcpConnection是使用shared_ptr来管理的类,因为它的生命周期模糊.TcpConnection表示已经建立或正在建立的连接,建立连接后,用户只需要在上层类如TcpServer中设置连接到来和消息到来的处理函数,继而回调TcpConnection中的 setConnectionCallback和setMessageCallback函数,实现对事件的处理.用户需要关心的事件是有限的,其他都由网络库负责. TcpConnection中封装了InputBuffer和OutputBuff

Muduo网络库源码分析(五)Acceptor和TcpServer类

首先,我们先提一下对Socket的封装(不复杂,所以简单说一下). Endian.h : 封装了字节序转换函数(全局函数,位于muduo::net::sockets名称空间中). SocketsOps.h/ SocketsOps.cc :封装了socket相关系统调用. Socket.h/Socket.cc(Socket类): 用RAII方法封装socket file descriptor. InetAddress.h/InetAddress.cc(InetAddress类):网际地址socka

android 网络框架 源码分析

android 网络框架 源码分析 导语: 最近想开发一个协议分析工具,来监控android app 所有的网络操作行为, 由于android 开发分为Java层,和Native层, 对于Native层我们只要对linux下所有网络I/O接口进行拦截即可,对于java 层,笔者对android 网络框架不是很了解,所以这个工具开发之前,笔者需要对android 的网络框架进行一个简单的分析. 分析结论: 1. android 的网络框架都是基于Socket类实现的 2. java 层Socket

baksmali和smali源码分析(四)

baksmali 首先执行的第一个main 函数     public static void main(String[] args) throws IOException {         Locale locale = new Locale("en", "US");         Locale.setDefault(locale);         CommandLineParser parser = new PosixParser();         C

Nouveau源码分析(四):NVIDIA设备初始化之nouveau_drm_load (1)

Nouveau源码分析(四) probe函数成功返回之后,DRM模块就会调用struct drm_driver的load函数,对应nouveau的nouveau_drm_load. 这个函数虽然看起来不是特别长,但每一个调用的函数展开后就会变得非常长了! // /drivers/gpu/drm/nouveau/nouveau_drm.c 364 static int 365 nouveau_drm_load(struct drm_device *dev, unsigned long flags)

mybatis源码分析(四) mybatis与spring事务管理分析

mybatis源码分析(四) mybatis与spring事务管理分析 一丶从jdbc的角度理解什么是事务 从mysql获取一个连接之后, 默认是自动提交, 即执行完sql之后, 就会提交事务. 这种事务的范围是一条sql语句. 将该连接设置非自动提交, 可以执行多条sql语句, 然后由程序决定是提交事务, 还是回滚事务. 这也是我们常说的事务. Connection connection = dataSource.getConnection(); // connection.setTransa

libevent高性能网络库源码分析——事件处理框架(四)

event_base结构 event_base的初始化 接口函数 libevent中基于Reactor模式的事件处理框架对应event_base,在event在完成创建后,需要向event_base注册事件,监控事件的当前状态,当事件状态为激活状(EV_ACTIVE)时,调用回调函数执行.本文主要从以下几方面进行分析:event_base的结构,event_base的创建,事件的注册.事件分发.事件注销 event_base结构 struct event_base { //指定某个eventop

Android网络框架源码分析一---Volley

转载自 http://www.jianshu.com/p/9e17727f31a1?utm_campaign=maleskine&utm_content=note&utm_medium=mobile_author_hots&utm_source=recommendation 公司最近新起了一个项目,对喜欢尝鲜的我们来说,好处就是我们可以在真实的项目中想尝试一些新技术,验证想法.新项目对网络框架的选取,我们存在三种方案: 1.和我们之前的项目一样,使用Loader + HttpCli