C++11下的线程池以及灵活的functional + bind + lamda

利用boost的thread实现一个线程类,维护一个任务队列,以便可以承载非常灵活的调用。这个线程类可以方便的为后面的线程池打好基础。线程池还是动态均衡,没有什么别的。由于minGW 4.7 对 C++11 thread 不支持,所以采用 boost 代替,linux 下是支持的,只是名字空间不同而已,套路都一样。先上代码: [cpp] view plaincopy #include #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> #include #include #include #include #include #include #include #include //This class defines a class contains a thread, a task queue class cpp11_thread { public: cpp11_thread() :m_b_is_finish(false) ,m_pthread(nullptr) { } ~cpp11_thread() { if (m_pthread != nullptr) delete m_pthread; m_list_tasks.clear(); } public: //wait until this thread is terminated; void join() { terminate(); if (m_pthread!=nullptr) m_pthread->join(); } //wait until this thread has no tasks pending. void wait_for_idle() { while(load()) boost::this_thread::sleep(boost::posix_time::milliseconds(200)); } //set the mask to termminiate void terminate() {m_b_is_finish = true; m_cond_incoming_task.notify_one();} //return the current load of this thread size_t load() { size_t sz = 0; m_list_tasks_mutex.lock(); sz = m_list_tasks.size(); m_list_tasks_mutex.unlock(); return sz; } //Append a task to do size_t append(std::function< void (void) > func) { if (m_pthread==nullptr) m_pthread = new boost::thread(std::bind(&cpp11_thread::run,this)); size_t sz = 0; m_list_tasks_mutex.lock(); m_list_tasks.push_back(func); sz = m_list_tasks.size(); //if there were no tasks before, we should notidy the thread to do next job. if (sz==1) m_cond_incoming_task.notify_one(); m_list_tasks_mutex.unlock(); return sz; } protected: std::atomic< bool> m_b_is_finish; //atomic bool var to mark the thread the next loop will be terminated. std::list<std::function< void (void)> > m_list_tasks; //The Task List contains function objects boost::mutex m_list_tasks_mutex; //The mutex with which we protect task list boost::thread *m_pthread; //inside the thread, a task queue will be maintained. boost::mutex m_cond_mutex; //condition mutex used by m_cond_locker boost::condition_variable m_cond_incoming_task; //condition var with which we notify the thread for incoming tasks protected: void run() { // loop wait while (!m_b_is_finish) { std::function< void (void)> curr_task ; bool bHasTasks = false; m_list_tasks_mutex.lock(); if (m_list_tasks.empty()==false) { bHasTasks = true; curr_task = *m_list_tasks.begin(); } m_list_tasks_mutex.unlock(); //doing task if (bHasTasks) { curr_task(); m_list_tasks_mutex.lock(); m_list_tasks.pop_front(); m_list_tasks_mutex.unlock(); } if (!load()) { boost::unique_lock< boost::mutex> m_cond_locker(m_cond_mutex); boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(5000); if (m_cond_locker.mutex()) m_cond_incoming_task.timed_wait(m_cond_locker,timeout);//m_cond_incoming_task.wait(m_cond_locker); } } } }; //the thread pool class class cpp11_thread_pool { public: cpp11_thread_pool(int nThreads) :m_n_threads(nThreads) { assert(nThreads>0 && nThreads<=512); for (int i = 0; i< nThreads ;i++) m_vec_threads.push_back(std::shared_ptr<cpp11_thread>(new cpp11_thread())); } ~cpp11_thread_pool() { } public: //total threads; size_t count(){return m_vec_threads.size();} //wait until all threads is terminated; void join() { for_each(m_vec_threads.begin(),m_vec_threads.end(),[this](std::shared_ptr<cpp11_thread> & item) { item->terminate(); item->join(); }); } //wait until this thread has no tasks pending. void wait_for_idle() { int n_tasks = 0; do { if (n_tasks) boost::this_thread::sleep(boost::posix_time::milliseconds(200)); n_tasks = 0; for_each(m_vec_threads.begin(),m_vec_threads.end(),[this,&n_tasks](std::shared_ptr<cpp11_thread> & item) { n_tasks += item->load(); }); }while (n_tasks); } //set the mask to termminiate void terminate() { for_each(m_vec_threads.begin(),m_vec_threads.end(),[this](std::shared_ptr<cpp11_thread> & item) { item->terminate(); }); } //return the current load of this thread size_t load(int n) { return (n>=m_vec_threads.size())?0:m_vec_threads[n]->load(); } //Append a task to do void append(std::function< void (void) > func) { int nIdx = -1; unsigned int nMinLoad = -1; for (unsigned int i=0;i<m_n_threads;i++) { if (nMinLoad> m_vec_threads[i]->load()) { nMinLoad = m_vec_threads[i]->load(); nIdx = i; } } assert(nIdx>=0 && nIdx<m_n_threads); m_vec_threads[nIdx]->append(func); } protected: //NO. threads int m_n_threads; //vector contains all the threads std::vector<std::shared_ptr<cpp11_thread> > m_vec_threads; }; //a function which will be executed in sub thread. void hello() { //sleep for a while boost::this_thread::sleep(boost::posix_time::milliseconds(rand()%900+100)); std::cout << "Hello world, I‘m a function runing in a thread!" << std::endl; } //a class has a method, which will be called in a thread different from the main thread. class A { private: int m_n; public: A(int n) :m_n(n) {} ~A(){} public: void foo (int k) { //sleep for a while boost::this_thread::sleep(boost::posix_time::milliseconds(rand()%900+100)); std::cout <<"n*k = "<<k*m_n<<std::endl; m_n++; } }; //let‘s test the thread. int main() { cpp11_thread_pool thread(2); srand((unsigned int)time(0)); A a(1),b(2),c(3); int nsleep = rand()%900+100; //append a simple function task thread.append(&hello); //append lamda thread.append ( [&nsleep]() { boost::this_thread::sleep(boost::posix_time::milliseconds(nsleep)); std::cout<<"I‘m a lamda runing in a thread"<<std::endl; } ); //append object method with copy-constructor(value-assignment) thread.append(std::bind(&A::foo,a,10)); thread.append(std::bind(&A::foo,b,11)); thread.append(std::bind(&A::foo,c,12)); thread.append(std::bind(&A::foo,a,100)); //append object method with address assignment, will cause the objects‘ member increase. thread.append(std::bind(&A::foo,&a,10)); thread.append(std::bind(&A::foo,&b,11)); thread.append(std::bind(&A::foo,&c,12)); thread.append(std::bind(&A::foo,&a,100)); //wait for all tasks done. thread.wait_for_idle(); //kill thread.terminate(); //wait for killed thread.join(); //test function std::function < void (void) > func1 = &hello; std::function < void (void) > func2 = &hello; if (func1.target()!=func2.target()) return 0; else return 1; } 程序输出: [plain] view plaincopy Hello world, I‘m a function runing in a thread! I‘m a lamda runing in a thread n*k = 22 n*k = 10 n*k = 36 n*k = 100 n*k = 22 n*k = 10 n*k = 36 n*k = 200 Process returned 0 (0x0) execution time : 2.891 s Press any key to continue. 下面来看看代码。首先是线程类。 第13-99行是线程类。该类实现了一个带任务队列的线程模型。关键部件是62行的std::list<std::function< void (void)> > m_list_tasks; ,这个fifo 用来承载顺序在子线程运行的任务。任务通过48行的append方法进行追加,64行m_list_tasks_mutex是一个mutex,来保护队列的进出。65行定义的线程对象boost::thread在构造函数中初始化并运行,绑定了本对象的run方法。线程得以运行的关键是run方法,在71-99行定义。该方法首先判断是否有pending的任务,有的话就弹出来执行。如果任务做完了,则使用67行定义的条件变量m_cond_incoming_task 进行wait, 直到新的任务到来,在第56行触发条件,激活队列。 线程类还提供了一些方法,比如load()返回队列的大小,以及terminate终止线程。而后,转到线程池。线程池采用最简单的策略,即直接分配给最空闲的线程。 有了上述封装,main函数就简单多了。可以append几乎所有的东西到线程池,这是以前简单的利用 virtual function 很难做到的。

时间: 2024-12-15 22:41:12

C++11下的线程池以及灵活的functional + bind + lamda的相关文章

第11章 Windows线程池(1)_传统的Windows线程池

第11章 Windows线程池 11.1 传统的Windows线程池及API 11.1.1 传统的线程池对象及对应的API 线程池对象 API 普通任务线程池 QueueUserWorkItem 计时器线程池 CreateTimerQueue(创建线程池) CreateTimerQueueTimer(创建计时器) ChangeTimerQueueTimer DeleteTimerQueueTimer DeteTimerQueueEx 同步对象等待线程池 RegisterWaitForSingle

linux下的线程池

什么时候需要创建线程池呢?简单的说,如果一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的机会了.如果线程创建和销毁时间相比任务执行时间可以忽略不计,则没有必要使用线程池了. 下面是Linux系统下用C语言创建的一个线程池.线程池会维护一个任务链表(每个CThread_worker结构就是一个任务).   pool_init()函数预先创建好max_thread_num个线程,每个线程执thread_routine ()函

第11章 Windows线程池(3)_私有的线程池

11.3 私有的线程池 11.3.1 创建和销毁私有的线程池 (1)进程默认线程池 当调用CreateThreadpoolwork.CreateThreadpoolTimer.CreateThreadpoolWait或CreateThreadpoolIo,并使传入参数PTP_CALLBACK_ENVIRON设为NULL时,那么所有的工作项将被添加到进程默认的线程池.一般这个默认的线程池能满足大多数应用程序的要求.其生命期与进程相同,在进程终止的时候,Windows负责线程池的清理和销毁工作. (

基于无锁队列和c++11的高性能线程池

基于无锁队列和c++11的高性能线程池线程使用c++11库和线程池之间的消息通讯使用一个简单的无锁消息队列适用于linux平台,gcc 4.6以上 标签: <无> 代码片段(6)[全屏查看所有代码] 1. [代码]lckfree.h ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 // lckfree.h //

一个Linux下C线程池的实现

在传统服务器结构中, 常是 有一个总的 监听线程监听有没有新的用户连接服务器, 每当有一个新的 用户进入, 服务器就开启一个新的线程用户处理这 个用户的数据包.这个线程只服务于这个用户 , 当 用户与服务器端关闭连接以后, 服务器端销毁这个线程.然而频繁地开辟与销毁线程极大地占用了系统的资源.而且在大量用户的情况下, 系统为了开辟和销毁线程将浪费大量的时间和资源.线程池提供了一个解决外部大量用户与服务器有限资源的矛盾, 线程池和传统的一个用户对应一个线程的处理方法不同, 它的基本思想就是在程序

Linux下简单线程池的实现

线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务(不止一个不同的任务)就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池正是遵循这一思想而产生的,本文将介绍的线程池技术同

Android下基于线程池的网络访问基础框架

引言 现在的Android开发很多都使用Volley.OkHttp.Retrofit等框架,这些框架固然有优秀的地方(以后会写代码学习分享),但是我们今天介绍一种基于Java线程池的网络访问框架. 实现思路及实现 APP界面上面的数据都是通过网络请求获取的,我们能不能将网络请求依次入队,然后配合着Java线程池,让线程依次处理我们的请求,最后返回结果给我们.下面我们先来看一下线程池工具类的实现: 1 public class ThreadPoolUtils { 2 3 private Threa

windows下利用线程池完成多任务的分配和运行

在做项目的过程中有时候为了提升效率,用了多线程的方法来对任务进行分割和应用,后来发现,采用线程池的方法能更好的利用线程资源来计算任务,网上有很多关于如何运行线程池的例子,msdn上也给出了对应的例子:https://msdn.microsoft.com/en-us/library/windows/desktop/ms686980(v=vs.85).aspx 感兴趣的话大家可以去看看,这里我给出一个简单的demo,利用线程池单次调用多次调用,例子如下: [cpp] view plain copy

Linux下简易线程池

线程池简介 简易线程池实现 线程池头文件threadpool.h如下: 1 #ifndef THREADPOOL_H 2 #define THREADPOOL_H 3 4 #include <stdio.h> 5 #include <stdlib.h> 6 #include <unistd.h> 7 #include <pthread.h> 8 9 /** 10 * 线程体数据结构 11 */ 12 typedef struct runner 13 { 14