- 1.封装的线程对象
class task : public std::tr1::enable_shared_from_this<task> { public: task():exit_(false){} task( const task & ) = delete; ~task(){} task & operator =( const task &) = delete; void start(); void stop() { exit_ = true; sync_.notify_one(); } void set_job( const std::function<void()> & job, const std::string & file, int line) {//提交任务 { std::unique_lock<std::mutex> lock(mutex_); job_ = job; file_ = file; line_ = line; } sync_.notify_one();//通知主线程有任务要执行.... } void print_job(){ LOG(INFO)<<"sumbit from:"<<file_<<":"<<line_; } private: bool exit_; std::mutex mutex_; std::condition_variable sync_; std::function< void()> job_; //线程执行的任务,线程任意时刻,最多只能执行一个任务。 std::thread::id id_; std::string file_; int line_; };void task::start() { auto job_proxy = [this] (){ id_ = std::this_thread::get_id(); while( !exit_ ) { std::unique_lock<std::mutex> lock(mutex_); if( job_ ) {//有任务了,需要执行任务了 try { job_(); //执行任务的代码 }catch( std::exception & e) { }catch(...) { } job_ = std::function<void()>(); //释放任务绑定的资源,主要为闭包捕获的资源,特别是shared_ptr对象. tasks->job_completed( shared_from_this() ); //任务执行完成,通知线程池 }else{ //没有任务的时候,等待其他线程提交任务。。 sync_.wait(lock); } } }; std::thread t(job_proxy); //创建并启动与task管理的线程 t.detach(); //分离模式,thread对象销毁了,但是其创建的线程还活着。。。 }
- 2.线程池管理对象
class task_pool { public: task_pool(unsigned int pool_size = 128):max_size_(pool_size),stop_all_(true) { } ~task_pool() { } void job_completed( const std::tr1::shared_ptr<task> & t)//回收task对象 { std::lock_guard<std::mutex> lock(mutex_); bool need_to_notify = idle_tasks_.empty() && (!wait_for_running_jobs_.empty()); busying_tasks_.erase(t); idle_tasks_.push_back(t); LOG(INFO)<<"after job_completed, current idle tasks size:"<< idle_tasks_.size() <<" busying tasks size:"<<busying_tasks_.size() <<" wait for running jobs size:"<<wait_for_running_jobs_.size(); if( !busying_tasks_.empty() ){ (*busying_tasks_.begin())->print_job(); } if( need_to_notify )//任务太多了,之前空闲线程使用完了,有任务在等待执行,需要通知 { sync_.notify_one(); } }; //提交任务 void submit_job( const std::function<void()> & job, const std::string file, int line) { if( stop_all_ ) { return; } std::lock_guard<std::mutex> lock(mutex_); bool need_notify = wait_for_running_jobs_.empty(); wait_for_running_jobs_.push(std::make_tuple(job,file,line)); if( need_notify )//等待执行的任务为空时,需要通知,其他情况不需要通知. { sync_.notify_one(); } } void execute_job() { while(true) { std::unique_lock<std::mutex> lock(mutex_); while(!stop_all_ && wait_for_running_jobs_.empty() ) { //等待其他线程提交任务 sync_.wait(lock); } if( stop_all_ ) { return; } while(!stop_all_ && idle_tasks_.empty()) { //有任务要执行,但是没有空闲线程,等待其他任务执行完成。 sync_.wait(lock); } if( stop_all_ ) { return; } //有任务,也有空闲线程了 auto t = get_task(); auto job =wait_for_running_jobs_.front(); wait_for_running_jobs_.pop(); //分发任务到task 线程. t->set_job(std::get<0>(job), std::get<1>(job), std::get<2>(job)); } } void stop_all() { std::lock_guard<std::mutex> lock(mutex_); stop_all_ = true; for( auto t : idle_tasks_ ) { t->stop(); } idle_tasks_.clear(); for( auto t : busying_tasks_ ) { t->stop(); } while(!wait_for_running_jobs_.empty()){ wait_for_running_jobs_.pop(); } sync_.notify_one(); } void start() {// 初始化启动线程池主线程 try { std::thread t( [this]{ execute_job();}); t.detach(); stop_all_ = false; allocate_tasks(); }catch( std::exception & e ) { LOG(FATAL) << "start tasks pool ... error"<<e.what(); } } protected: std::tr1::shared_ptr<task> get_task() { //获取task对象 if( ! idle_tasks_.empty() ) { auto t = *idle_tasks_.begin(); idle_tasks_.pop_front(); //从空闲队列移除 busying_tasks_.insert(t); //加入忙队列 return t; } return std::tr1::shared_ptr<task>(); } void allocate_tasks() //初始化线程池 { for( int i = 0 ; i < max_size_; i ++ ) { std::tr1::shared_ptr<task> t( new task()); try{ t->start(); idle_tasks_.push_back(t); }catch( std::exception & e) { //超过进程最大线程数限制时,会跑出异常。。。 break; } } } private : unsigned int max_size_; std::list < std::tr1::shared_ptr<task> > idle_tasks_; //空闲任务队列 std::set < std::tr1::shared_ptr<task> > busying_tasks_;//正在执行任务的队列 std::queue< std::tuple< std::function<void()> , std::string, int > > wait_for_running_jobs_; //等待执行的任务 std::mutex mutex_; std::condition_variable sync_; bool stop_all_; };
- usage
static task_pool * tasks = nullptr; static std::once_flag init_flag; static std::once_flag finit_flag; void run_job(const std::function<void()> & job , const std::string & file, int line ) { if( tasks != nullptr) tasks->submit_job(job, file,line); } void task_pool_init( unsigned max_task_size) { std::call_once(init_flag,[max_task_size]{ tasks = new task_pool(max_task_size); tasks->start(); }); } void task_pool_finit() { std::call_once(finit_flag,[]{ tasks->stop_all();}); }
时间: 2024-10-18 06:53:42