Muduo 多线程模型对比

  本文主要对比Muduo多线程模型方案8 和方案9 。

  方案8:reactor + thread pool ,有一个线程来充当reactor 接受连接分发事件,将要处理的事件分配给thread pool中的线程,由thread pool 来完成事件处理。实例代码见:examples/sudoku/server_threadpool.cc

  这里截取关键部分代码进行说明。

class SudokuServer

{

public :

SudokuServer(EventLoop* loop, const InetAddress& listenAddr, int numThreads)

: loop_(loop),

server_(loop, listenAddr, "SudokuServer"),

numThreads_(numThreads),

startTime_(Timestamp::now())

{

server_.setConnectionCallback(

boost::bind(&SudokuServer::onConnection, this, _1));

server_.setMessageCallback(

boost::bind(&SudokuServer::onMessage, this, _1, _2, _3));

}

void start()

{

LOG_INFO << "starting " << numThreads_ << " threads.";

threadPool_.start(numThreads_); // 注意这里,threadPool 的类型是: ThreadPool,且位置在start 里面

server_.start();

}

private :

void onConnection(const TcpConnectionPtr& conn)

{

LOG_TRACE << conn->peerAddress().toIpPort() << " -> "

<< conn->localAddress().toIpPort() << " is "

<< (conn->connected() ? "UP" : "DOWN");

}

void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)

{

...

if (!processRequest(conn, request)) // 封装计算任务执行方法

{

conn->send( "Bad Request!\r\n");

conn->shutdown();

break;

}

}

...

}

}

bool processRequest(const TcpConnectionPtr& conn, const string& request)

{

...

if (puzzle.size() == implicit_cast<size_t>(kCells))

{

threadPool_.run(boost::bind(&solve, conn, puzzle, id));// 将计算任务转移到 threadPool 线程

}

else

{

goodRequest = false;

}

return goodRequest;

}

static void solve(const TcpConnectionPtr& conn,

const string& puzzle,

const string& id)

{

LOG_DEBUG << conn->name();

string result = solveSudoku(puzzle); // solveSudou 是一个pure function, 是可重入的

if (id.empty())

{

conn->send(result+ "\r\n");

}

else

{

conn->send(id+ ":"+result+ "\r\n");

}

}

EventLoop* loop_;

TcpServer server_;

ThreadPool threadPool_; // 注意类型,方案8, reactor + threadpool

int numThreads_;

Timestamp startTime_;

};

void ThreadPool::start( int numThreads)  // 创建 thread pool,具体thread 调度这里暂时不分析

{

assert(threads_.empty());

running_ = true;

threads_.reserve(numThreads);

for (int i = 0; i < numThreads; ++i)

{

char id[32];

snprintf(id, sizeof id, "%d", i);

threads_.push_back( new muduo::Thread(

boost::bind(&ThreadPool::runInThread, this), name_+id));

threads_[i].start();

}

}

方案9:main-reactor + subreactors, one loop per thread, 有一个主线程来扮演main-reactor 专门语句 accept 连接,其它线程负责读写文件描述符(socket)

class SudokuServer

{

public :

SudokuServer(EventLoop* loop, const InetAddress& listenAddr, int numThreads)

: loop_(loop),

server_(loop, listenAddr, "SudokuServer"),

numThreads_(numThreads),

startTime_(Timestamp::now())

{

server_.setConnectionCallback(

boost::bind(&SudokuServer::onConnection, this, _1));

server_.setMessageCallback(

boost::bind(&SudokuServer::onMessage, this, _1, _2, _3));

server_.setThreadNum(numThreads); // 设置 EventLoopThreadPool里面的thread数量

}

void start()

{

LOG_INFO << "starting " << numThreads_ << " threads.";

server_.start();

}

private :

void onConnection(const TcpConnectionPtr& conn)

{

LOG_TRACE << conn->peerAddress().toIpPort() << " -> "

<< conn->localAddress().toIpPort() << " is "

<< (conn->connected() ? "UP" : "DOWN");

}

void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp)

{

...

if (!processRequest(conn, request)) //准备计算

{

conn->send( "Bad Request!\r\n");

conn->shutdown();

break;

}

...

}

}

bool processRequest(const TcpConnectionPtr& conn, const string& request)

{

...

if (puzzle.size() == implicit_cast<size_t>(kCells))

{

LOG_DEBUG << conn->name();

string result = solveSudoku(puzzle); // 计算在当前线程完成

if (id.empty())

{

conn->send(result+ "\r\n");

}

...

}

// 注意这里没有类型为ThreadPool的 threadPool_成员,整个类使用Muduo默认线程模型的EventLoopThreadPool,TcpServer 聚合了EventLoopThreadPool

EventLoop* loop_;

TcpServer server_;

int numThreads_;

Timestamp startTime_;

};

void TcpServer::setThreadNum( int numThreads)

{

assert(0 <= numThreads);

threadPool_->setThreadNum(numThreads); // 设置了 EventLoopThreadPool 里面的线程个数,为后面的threadPool_->start()服务

}

void TcpServer::start()

{

if (!started_)

{

started_ = true;

threadPool_->start(threadInitCallback_); // TcpServer 中的 threadPool 类型是 EventLoopThreadPool

}

if (!acceptor_->listenning())

{

loop_->runInLoop(

boost::bind(&Acceptor::listen, get_pointer(acceptor_)));

}

}

void EventLoopThreadPool::start( const ThreadInitCallback& cb) // 开启线程的方式是使用EventLoopThread,这个类将EventLoop 和 Thread 封装在一起实现 one loop per thread

{

assert(!started_);

baseLoop_->assertInLoopThread();

started_ = true;

for (int i = 0; i < numThreads_; ++i)

{

EventLoopThread* t = new EventLoopThread(cb); // 设置线程的 callback

threads_.push_back(t);

loops_.push_back(t->startLoop()); // 保存loop方便管理和分配任务,任务分配其实是通过EventLoop::runInLoop() 来进行的

}

if (numThreads_ == 0 && cb)

{

cb(baseLoop_);

}

}

  总结一下,这里所谓的Reactor就是持有Poller的结构(稍微有点狭隘,这里先就这样理解),Poller负责事件监听和分发。持有EventLoop的结构就持有Poller。

  对于方案8只有一个类持有EventLoop,也就是只创建了一个EventLoop,这个Loop就是reactor,其它的Thread 是通过ThreadPool来实现的,因此只有reactor所在的线程来完成I/O,其它线程用于完成计算任务,所以说这个模型适合于计算密集型而不是I/O密集型。

  对于方案9,存在多个Reactor,其中main reactor 持有Acceptor,专门用于监听三个半事件中的连接建立,消息到达和连接断开以及消息发送事件都让sub reactor来完成。由于main reactor 只关心连接建立事件,能够适应高并发的IO请求,多个subreactor的存在也能兼顾I/O与计算,因此被认为是一个比较好的方案。

  后面还会深入学习Muduo网络库相关的内容,包括Reactor结构的简化,线程池的实现,现代C++的编写方式,使用C++11进行重写等。现在看来C++11 thread library 提供的接口基本可以替换 posix thread library,虽然底层也许是通过posix thread实现的,毕竟Linux内核针对NPTL进行过修改。C++11 提供了 thread_local 来描述 线程局部存储,但是没有pthread_key_create() 提供 destructor那样的功能,或者遇到需要使用TLS的地方转过来使用posix 提供的接口。

Muduo 多线程 线程池 reactor

时间: 2024-10-06 10:23:25

Muduo 多线程模型对比的相关文章

memcached源码阅读----使用libevent和多线程模型

本篇文章主要是我今天阅读memcached源码关于进程启动,在网络这块做了哪些事情. 一.iblievent的使用 首先我们知道,memcached是使用了iblievet作为网络框架的,而iblievet又是单线程模型的基于linux下epoll事件的异步模型.因此,其基本的思想就是 对可读,可写,超时,出错等事件进行绑定函数,等有其事件发生,对其绑定函数回调. 可以减掉了解一下 libevent基本api调用 struct event_base *base; base = event_bas

TT和chrome执行模型对比分析

老大让写一篇高大上的博文,那么如何才能高大上呢?从某种角度讲只要迎合老大的口味给他一篇重口味的岛国动作片剖析就能轻松过关: 从程序员角度讲,能写出高大上的范围有很多,如程序架构,算法分析.编程语言理解.操作系统理解.知名开源程序的原创分析.优秀博文的翻译等都能吸引许多同学的兴趣.今天我再教一招让博文高大上有营养的捷径就是攀高枝,用你现有的程序框架和知名的开源架构做比较剖析.今天我选择走捷径,为同学们来分析下我最近在负责的一款im客户端产品--TeamTalk(简称TT)和chorme执行模型的区

Apache Spark探秘:多进程模型还是多线程模型?(转)

Apache Spark的高性能一定程度上取决于它采用的异步并发模型(这里指server/driver端采用的模型),这与Hadoop 2.0(包括YARN和MapReduce)是一致的.Hadoop 2.0自己实现了类似Actor的异步并发模型,实现方式是epoll+状态机,而Apache Spark则直接采用了开源软件Akka,该软件实现了Actor模型,性能非常高.尽管二者在server端采用了一致的并发模型,但在任务级别(特指Spark任务和MapReduce任务)上却采用了不同的并行机

第13章 TCP编程(4)_基于自定义协议的多线程模型

7. 基于自定义协议的多线程模型 (1)服务端编程 ①主线程负责调用accept与客户端连接 ②当接受客户端连接后,创建子线程来服务客户端,以处理多客户端的并发访问. ③服务端接到的客户端信息后,回显给客户端 (2)客户端编程 ①从键盘输入信息,并发送给服务端 ②接收来自服务端的信息 //msg.h与前一节相同 #ifndef __MSG_H__ #define __MSG_H__ #include <sys/types.h> //求结构体中成员变量的偏移地址 #define OFFSET(T

boost中asio网络库多线程并发处理实现,以及asio在多线程模型中线程的调度情况和线程安全。

1.实现多线程方法: 其实就是多个线程同时调用io_service::run for (int i = 0; i != m_nThreads; ++i)        {            boost::shared_ptr<boost::thread> pTh(new boost::thread(                boost::bind(&boost::asio::io_service::run,&m_ioService)));            m_l

[线程同步互斥]多线程模型

线程的实现方式 线程的实现可以分为两类:用户级线程(User-LevelThread, ULT)和内核级线程(Kemel-LevelThread,  KLT).内核级线程又称为内核支持的线程. 在用户级线程中,有关线程管理的所有工作都由应用程序完成,内核意识不到线程的存在.应用程序可以通过使用线程库设计成多线程程序.通常,应用程序从单线程起始,在该线程中开始运行,在其运行的任何时刻,可以通过调用线程库中的派生例程创建一个在相同进程中运行的新线程.图2-2(a)说明了用户级线程的实现方式. 在内核

多线程的对比与案例(计算目录下文件的大小)

本人使用的是mac 所以有usr目录.把以下的几种情况分别贴出来给大家分析下各自有什么优缺点! 1.顺序计算目录大小code: package jvm; import java.io.File; /** * 第一版 * 顺序计算目录大小 * @author zeuskingzb * */ public class TotalFileSizeSequential { private long getTotalSizeOfFilesInDir(File file){ if (file.isFile(

【转载】COM的多线程模型

原文:COM的多线程模型 COM的多线程模型是COM技术里头最难以理解的部分之一,很多书都有涉及但是都没有很好的讲清楚.很多新人都会在这里觉得很迷惑,google大神能搜到一篇vckbase上的文章,但是个人建议还是不要看的好几乎是胡说八道在乱搞. COM自己其实并没有任何多线程模型,所以他用的多线程模型还是WIN32里头的那一套线程和同步对象.作为准备,这里先简单讲一下WIN32的线程和同步.作为惯例一讲WIN32的线程和同步对象就要把进程.线程这两个东西讲一遍,但是这里不讲,因为会看COM的

关于Python和Java的多进程多线程计算方法对比

原文请见 关于Python和Java的多进程多线程计算方法对比 搞大数据必须要正视的一个问题就是并行计算.就像执行一件任务一样,大伙一起同时干,才有效率,才会很快出成果.正所谓"众人拾柴火焰高"~ 对于并行计算,有很多高大上的概念,我也不全懂.这里就单单罗列一下我对于多进程和多线程计算的理解和总结. 在计算机中,处理一个任务,可以在一个进程中,也可以在一个线程中,确切的说,执行的话都得靠一个个线程来.在我们做某件事的时候,往往需要同时干多个任务才能达到我们所要的效果,比如说看电影,就要