muduo::EventLoopThread、EventLoopThreadPool分析

  • EventLoopThread
  • EventLoopThreadPool

muduo的并发模型为one loop per thread+ threadpool。为了方便使用,muduo封装了EventLoop和Thread为EventLoopThread,为了方便使用线程池,又把EventLoopThread封装为EventLoopThreadPool。

EventLoopThread

任何一个线程,只要创建并运行了EventLoop,就是一个IO线程。

EventLoopThread类就是一个封装了的IO线程。

EventLoopThread的工作流程为:

1、在主线程(暂且这么称呼)创建EventLoopThread对象。

2、主线程调用EventLoopThread.start(),启动EventLoopThread中的线程(称为IO线程),这是主线程要等待IO线程创建完成EventLoop对象。

3、IO线程调用threadFunc创建EventLoop对象。通知主线程已经创建完成。

4、主线程返回创建的EventLoop对象。

EventLoopThread.h

class EventLoopThread : boost::noncopyable
{
 public:
  typedef boost::function<void(EventLoop*)> ThreadInitCallback;

  EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),
                  const string& name = string());
  ~EventLoopThread();
  EventLoop* startLoop();//启动本线程,返回本线程中的EventLoop

 private:
  void threadFunc();

  EventLoop* loop_;//本线程持有的EventLoop对象指针
  bool exiting_;//是否已经退出
  Thread thread_;//本线程
  MutexLock mutex_;
  Condition cond_;
  ThreadInitCallback callback_;//回调函数
};

EventLoopThread.cc

EventLoopThread::EventLoopThread(const ThreadInitCallback& cb,
                                 const string& name)
  : loop_(NULL),
    exiting_(false),
    thread_(boost::bind(&EventLoopThread::threadFunc, this), name),//创建线程,在回调函数创建EventLoop
    mutex_(),
    cond_(mutex_),
    callback_(cb)
{
}

EventLoopThread::~EventLoopThread()
{
  exiting_ = true;
  if (loop_ != NULL) // not 100% race-free, eg. threadFunc could be running callback_.
  {
    // still a tiny chance to call destructed object, if threadFunc exits just now.
    // but when EventLoopThread destructs, usually programming is exiting anyway.
    loop_->quit();//退出loop循环
    thread_.join();//等待线程退出
  }
}

EventLoop* EventLoopThread::startLoop()//另一个线程在调用这个函数
{
  assert(!thread_.started());
  thread_.start();//当前线程启动,调用threadFunc()

  {
    MutexLockGuard lock(mutex_);
    while (loop_ == NULL)
    {
      cond_.wait();//等待创建好当前IO线程
    }
  }

  return loop_;
}

void EventLoopThread::threadFunc()
{
  EventLoop loop;//创建EventLoop对象。注意,在栈上

  if (callback_)
  {
    callback_(&loop);
  }

  {
    MutexLockGuard lock(mutex_);
    loop_ = &loop;
    cond_.notify();//通知startLoop
  }

  loop.loop();//会在这里循环,直到EventLoopThread析构。此后不再使用loop_访问EventLoop了
  //assert(exiting_);
  loop_ = NULL;
}

EventLoopThreadPool

muduo的思想时eventLoop+thread pool,为了更方便使用,将EventLoopThread做了封装。main reactor可以创建sub reactor,并发一些任务分发到sub reactor中去。

EventLoopThreadPool的思想比较简单,用一个main reactor创建EventLoopThreadPool。在EventLoopThreadPool中将EventLoop和Thread绑定,可以返回EventLoop对象来使用EventLoopThreadPool中的Thread。

EventLoopThreadPool.h

class EventLoop;

class EventLoopThread : boost::noncopyable
{
 public:
  typedef boost::function<void(EventLoop*)> ThreadInitCallback;

  EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),
                  const string& name = string());
  ~EventLoopThread();
  EventLoop* startLoop();//启动本线程,返回本线程中的EventLoop

 private:
  void threadFunc();

  EventLoop* loop_;//本线程持有的EventLoop对象指针
  bool exiting_;//是否已经退出
  Thread thread_;//本线程
  MutexLock mutex_;
  Condition cond_;
  ThreadInitCallback callback_;//回调函数
};

EventLoopThreadPool.cc

EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg)
  : baseLoop_(baseLoop),
    name_(nameArg),
    started_(false),
    numThreads_(0),
    next_(0)
{
}

EventLoopThreadPool::~EventLoopThreadPool()
{
  // Don‘t delete loop, it‘s stack variable
}

void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
  assert(!started_);
  baseLoop_->assertInLoopThread();

  started_ = true;

  for (int i = 0; i < numThreads_; ++i)
  {
    char buf[name_.size() + 32];
    snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
    EventLoopThread* t = new EventLoopThread(cb, buf);
    threads_.push_back(t);
    loops_.push_back(t->startLoop());
  }
  if (numThreads_ == 0 && cb)
  {
    cb(baseLoop_);
  }
}

EventLoop* EventLoopThreadPool::getNextLoop()
{
  baseLoop_->assertInLoopThread();
  assert(started_);
  EventLoop* loop = baseLoop_;//loops_为空,则返回baseloop

  if (!loops_.empty())//循环分配
  {
    // round-robin
    loop = loops_[next_];
    ++next_;
    if (implicit_cast<size_t>(next_) >= loops_.size())
    {
      next_ = 0;
    }
  }
  return loop;
}

EventLoop* EventLoopThreadPool::getLoopForHash(size_t hashCode)
{
  baseLoop_->assertInLoopThread();
  EventLoop* loop = baseLoop_;

  if (!loops_.empty())
  {
    loop = loops_[hashCode % loops_.size()];//根据hashCode分配
  }
  return loop;
}

std::vector<EventLoop*> EventLoopThreadPool::getAllLoops()
{
  baseLoop_->assertInLoopThread();
  assert(started_);
  if (loops_.empty())
  {
    return std::vector<EventLoop*>(1, baseLoop_);
  }
  else
  {
    return loops_;
  }
}

可以写个简单的测试程序,创建一个EventLoopThreadPool,打印其中线程的ID和name。

#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
#include <muduo/net/EventLoopThreadPool.h>

#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

void runInThread()
{
    printf("runInThread(): name = %s, tid = %d\n",
           CurrentThread::name(), CurrentThread::tid());
}

int main()
{
    printf("main(): pid = %d, tid = %d\n",
           getpid(), CurrentThread::tid());

    runInThread();
    EventLoop loop;
    EventLoopThreadPool loopThreadPool(&loop, "sub Reactor");
    loopThreadPool.setThreadNum(5);
    loopThreadPool.start();
    for(int i=0; i<10; ++i)
    {
        EventLoop* loopFromPool=loopThreadPool.getNextLoop();
        loopFromPool->runInLoop(runInThread);
    }
    sleep(3);
    printf("exit main().\n");
}

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-11-05 21:51:22

muduo::EventLoopThread、EventLoopThreadPool分析的相关文章

muduo源码分析--我对muduo的理解

分为几个模块 EventLoop.TcpServer.Acceptor.TcpConnection.Channel等 对于EventLoop来说: 他只关注里面的主驱动力,EventLoop中只关注poll,这类系统调用使得其成为Reactor模式,EventLoop中有属于这个loop的所有Channel,这个loop属于哪一个Server. 几个类存在的意义: 从应用层使用的角度来看,用户需要初始化一个EventLoop,然后初始化一个TcpServer(当然也可以自定义个TcpServer

muduo源码分析--Reactor模式在muduo中的使用

一. Reactor模式简介 Reactor释义"反应堆",是一种事件驱动机制.和普通函数调用的不同之处在于:应用程序不是主动的调用某个API完成处理,而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到Reactor上,如果相应的时间发生,Reactor将主动调用应用程序注册的接口,这些接口又称为"回调函数". 二. moduo库Reactor模式的实现 muduo主要通过3个类来实现Reactor模式:EventLoop,Chann

muduo::thread类分析

在看源代码前,先学习一个关键字:__thread. 线程共享进程的数据,如果想要每个线程都有一份独立的数据,那么可以使用__thread关键字修饰数据. __thread只能用于修饰POD类型的数据,不能修饰class,因为它无法调用构造函数和析构函数.__thread可以修饰全局变量.函数内的静态变量,不能修饰函数内的局部变量或class的普通成员变量. 在muduo/base/thread.{h, cc}中实现了线程的封装.thread的封装和一个命名空间muduo::CurrentThre

muduo源码分析--Reactor模式的在muduo中的使用(二)

一. TcpServer类: 管理所有的TCP客户连接,TcpServer供用户直接使用,生命期由用户直接控制.用户只需设置好相应的回调函数(如消息处理messageCallback)然后TcpServer::start()即可. 主要数据成员: boost::scoped_ptr<Accepter> acceptor_; 用来接受连接 std::map<string,TcpConnectionPtr> connections_; 用来存储所有连接 connectonCallbac

Muduo网络库源码分析(四)EventLoopThread和EventLoopThreadPool的封装

muduo的并发模型为one loop per thread+ threadpool.为了方便使用,muduo封装了EventLoop和Thread为EventLoopThread,为了方便使用线程池,又把EventLoopThread封装为EventLoopThreadPool.所以这篇博文并没有涉及到新鲜的技术,但是也有一些封装和逻辑方面的注意点需要我们去分析和理解. EventLoopThread 任何一个线程,只要创建并运行了EventLoop,就是一个IO线程. EventLoopTh

Muduo网络库源码分析(五)Acceptor和TcpServer类

首先,我们先提一下对Socket的封装(不复杂,所以简单说一下). Endian.h : 封装了字节序转换函数(全局函数,位于muduo::net::sockets名称空间中). SocketsOps.h/ SocketsOps.cc :封装了socket相关系统调用. Socket.h/Socket.cc(Socket类): 用RAII方法封装socket file descriptor. InetAddress.h/InetAddress.cc(InetAddress类):网际地址socka

Muduo网络库源码分析(一) EventLoop事件循环(Poller和Channel)

从这一篇博文起,我们开始剖析Muduo网络库的源码,主要结合<Linux多线程服务端编程>和网上的一些学习资料! (一)TCP网络编程的本质:三个半事件 1. 连接的建立,包括服务端接受(accept) 新连接和客户端成功发起(connect) 连接.TCP 连接一旦建立,客户端和服务端是平等的,可以各自收发数据. 2. 连接的断开,包括主动断开(close 或shutdown) 和被动断开(read(2) 返回0). 3. 消息到达,文件描述符可读.这是最为重要的一个事件,对它的处理方式决定

muduo::Logging、LogStream分析

Logging LogStream Logging Logging类是用来记录分析日志用的. 下面是Logging.h的源码 namespace muduo { class TimeZone;//forward declaration class Logger { public: enum LogLevel//用来设置不同的日志级别 { TRACE, DEBUG, INFO, WARN, ERROR, FATAL, NUM_LOG_LEVELS,//级别个数 }; // compile time

muduo源代码分析--Reactor模式在muduo中的使用

一. Reactor模式简单介绍 Reactor释义"反应堆",是一种事件驱动机制.和普通函数调用的不同之处在于:应用程序不是主动的调用某个API完毕处理.而是恰恰相反.Reactor逆置了事件处理流程,应用程序须要提供对应的接口并注冊到Reactor上,假设对应的时间发生,Reactor将主动调用应用程序注冊的接口,这些接口又称为"回调函数". 二. moduo库Reactor模式的实现 muduo主要通过3个类来实现Reactor模式:EventLoop,Cha