- EventLoopThread
- EventLoopThreadPool
muduo的并发模型为one loop per thread+ threadpool。为了方便使用,muduo封装了EventLoop和Thread为EventLoopThread,为了方便使用线程池,又把EventLoopThread封装为EventLoopThreadPool。
EventLoopThread
任何一个线程,只要创建并运行了EventLoop,就是一个IO线程。
EventLoopThread类就是一个封装了的IO线程。
EventLoopThread的工作流程为:
1、在主线程(暂且这么称呼)创建EventLoopThread对象。
2、主线程调用EventLoopThread.start(),启动EventLoopThread中的线程(称为IO线程),这是主线程要等待IO线程创建完成EventLoop对象。
3、IO线程调用threadFunc创建EventLoop对象。通知主线程已经创建完成。
4、主线程返回创建的EventLoop对象。
EventLoopThread.h
class EventLoopThread : boost::noncopyable
{
public:
typedef boost::function<void(EventLoop*)> ThreadInitCallback;
EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),
const string& name = string());
~EventLoopThread();
EventLoop* startLoop();//启动本线程,返回本线程中的EventLoop
private:
void threadFunc();
EventLoop* loop_;//本线程持有的EventLoop对象指针
bool exiting_;//是否已经退出
Thread thread_;//本线程
MutexLock mutex_;
Condition cond_;
ThreadInitCallback callback_;//回调函数
};
EventLoopThread.cc
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb,
const string& name)
: loop_(NULL),
exiting_(false),
thread_(boost::bind(&EventLoopThread::threadFunc, this), name),//创建线程,在回调函数创建EventLoop
mutex_(),
cond_(mutex_),
callback_(cb)
{
}
EventLoopThread::~EventLoopThread()
{
exiting_ = true;
if (loop_ != NULL) // not 100% race-free, eg. threadFunc could be running callback_.
{
// still a tiny chance to call destructed object, if threadFunc exits just now.
// but when EventLoopThread destructs, usually programming is exiting anyway.
loop_->quit();//退出loop循环
thread_.join();//等待线程退出
}
}
EventLoop* EventLoopThread::startLoop()//另一个线程在调用这个函数
{
assert(!thread_.started());
thread_.start();//当前线程启动,调用threadFunc()
{
MutexLockGuard lock(mutex_);
while (loop_ == NULL)
{
cond_.wait();//等待创建好当前IO线程
}
}
return loop_;
}
void EventLoopThread::threadFunc()
{
EventLoop loop;//创建EventLoop对象。注意,在栈上
if (callback_)
{
callback_(&loop);
}
{
MutexLockGuard lock(mutex_);
loop_ = &loop;
cond_.notify();//通知startLoop
}
loop.loop();//会在这里循环,直到EventLoopThread析构。此后不再使用loop_访问EventLoop了
//assert(exiting_);
loop_ = NULL;
}
EventLoopThreadPool
muduo的思想时eventLoop+thread pool,为了更方便使用,将EventLoopThread做了封装。main reactor可以创建sub reactor,并发一些任务分发到sub reactor中去。
EventLoopThreadPool的思想比较简单,用一个main reactor创建EventLoopThreadPool。在EventLoopThreadPool中将EventLoop和Thread绑定,可以返回EventLoop对象来使用EventLoopThreadPool中的Thread。
EventLoopThreadPool.h
class EventLoop;
class EventLoopThread : boost::noncopyable
{
public:
typedef boost::function<void(EventLoop*)> ThreadInitCallback;
EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),
const string& name = string());
~EventLoopThread();
EventLoop* startLoop();//启动本线程,返回本线程中的EventLoop
private:
void threadFunc();
EventLoop* loop_;//本线程持有的EventLoop对象指针
bool exiting_;//是否已经退出
Thread thread_;//本线程
MutexLock mutex_;
Condition cond_;
ThreadInitCallback callback_;//回调函数
};
EventLoopThreadPool.cc
EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg)
: baseLoop_(baseLoop),
name_(nameArg),
started_(false),
numThreads_(0),
next_(0)
{
}
EventLoopThreadPool::~EventLoopThreadPool()
{
// Don‘t delete loop, it‘s stack variable
}
void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
assert(!started_);
baseLoop_->assertInLoopThread();
started_ = true;
for (int i = 0; i < numThreads_; ++i)
{
char buf[name_.size() + 32];
snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
EventLoopThread* t = new EventLoopThread(cb, buf);
threads_.push_back(t);
loops_.push_back(t->startLoop());
}
if (numThreads_ == 0 && cb)
{
cb(baseLoop_);
}
}
EventLoop* EventLoopThreadPool::getNextLoop()
{
baseLoop_->assertInLoopThread();
assert(started_);
EventLoop* loop = baseLoop_;//loops_为空,则返回baseloop
if (!loops_.empty())//循环分配
{
// round-robin
loop = loops_[next_];
++next_;
if (implicit_cast<size_t>(next_) >= loops_.size())
{
next_ = 0;
}
}
return loop;
}
EventLoop* EventLoopThreadPool::getLoopForHash(size_t hashCode)
{
baseLoop_->assertInLoopThread();
EventLoop* loop = baseLoop_;
if (!loops_.empty())
{
loop = loops_[hashCode % loops_.size()];//根据hashCode分配
}
return loop;
}
std::vector<EventLoop*> EventLoopThreadPool::getAllLoops()
{
baseLoop_->assertInLoopThread();
assert(started_);
if (loops_.empty())
{
return std::vector<EventLoop*>(1, baseLoop_);
}
else
{
return loops_;
}
}
可以写个简单的测试程序,创建一个EventLoopThreadPool,打印其中线程的ID和name。
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
#include <muduo/net/EventLoopThreadPool.h>
#include <stdio.h>
using namespace muduo;
using namespace muduo::net;
void runInThread()
{
printf("runInThread(): name = %s, tid = %d\n",
CurrentThread::name(), CurrentThread::tid());
}
int main()
{
printf("main(): pid = %d, tid = %d\n",
getpid(), CurrentThread::tid());
runInThread();
EventLoop loop;
EventLoopThreadPool loopThreadPool(&loop, "sub Reactor");
loopThreadPool.setThreadNum(5);
loopThreadPool.start();
for(int i=0; i<10; ++i)
{
EventLoop* loopFromPool=loopThreadPool.getNextLoop();
loopFromPool->runInLoop(runInThread);
}
sleep(3);
printf("exit main().\n");
}
版权声明:本文为博主原创文章,未经博主允许不得转载。