基于C++11的线程池

  • 1.封装的线程对象
class task : public std::tr1::enable_shared_from_this<task>
{
public:
	task():exit_(false){}
	task( const task & ) = delete;
	~task(){}
	task & operator =( const task &) = delete;

	void start();
	void stop()
	{
		exit_ = true;
		sync_.notify_one();
	}
	void set_job( const std::function<void()> & job, const std::string & file, int line)
	{//提交任务
		{
			std::unique_lock<std::mutex> lock(mutex_);
			job_ = job;
			file_ = file;
			line_ = line;
		}
		sync_.notify_one();//通知主线程有任务要执行....
	}
	void print_job(){
		LOG(INFO)<<"sumbit from:"<<file_<<":"<<line_;
	}
private:

	bool exit_;
	std::mutex mutex_;
	std::condition_variable sync_;
	std::function< void()> job_;          //线程执行的任务,线程任意时刻,最多只能执行一个任务。
	std::thread::id       id_;
	std::string		     file_;
	int                   line_;

};
void task::start()
{
	auto job_proxy = [this] (){

		id_ = std::this_thread::get_id();

		while( !exit_ )
		{

			std::unique_lock<std::mutex> lock(mutex_);

			if( job_ )
			{//有任务了,需要执行任务了
				try
				{
					job_(); //执行任务的代码
				}catch( std::exception & e)
				{

				}catch(...)
				{

				}
				job_ = std::function<void()>(); //释放任务绑定的资源,主要为闭包捕获的资源,特别是shared_ptr对象.
				tasks->job_completed( shared_from_this() ); //任务执行完成,通知线程池
			}else{
			       //没有任务的时候,等待其他线程提交任务。。
				sync_.wait(lock);

			}
		}
	};

	std::thread t(job_proxy); //创建并启动与task管理的线程
	t.detach();               //分离模式,thread对象销毁了,但是其创建的线程还活着。。。
}



  • 2.线程池管理对象
class task_pool
{
public:
	task_pool(unsigned int pool_size = 128):max_size_(pool_size),stop_all_(true)
	{

	}
	~task_pool()
	{
	}
	void job_completed( const std::tr1::shared_ptr<task> & t)//回收task对象
	{

		std::lock_guard<std::mutex> lock(mutex_);
		bool need_to_notify = idle_tasks_.empty() && (!wait_for_running_jobs_.empty());
		busying_tasks_.erase(t);
		idle_tasks_.push_back(t);
		LOG(INFO)<<"after job_completed, current idle tasks size:"<< idle_tasks_.size()
			<<" busying tasks size:"<<busying_tasks_.size()
			<<" wait for running jobs size:"<<wait_for_running_jobs_.size();
		if( !busying_tasks_.empty() ){
			(*busying_tasks_.begin())->print_job();
		}
		if( need_to_notify )//任务太多了,之前空闲线程使用完了,有任务在等待执行,需要通知
		{
			sync_.notify_one();
		}
	};
	//提交任务
	void submit_job( const std::function<void()> & job, const std::string file, int line)
	{
		if( stop_all_ )
		{
			return;
		}
		std::lock_guard<std::mutex> lock(mutex_);
		bool need_notify = wait_for_running_jobs_.empty();
		wait_for_running_jobs_.push(std::make_tuple(job,file,line));

		if( need_notify )//等待执行的任务为空时,需要通知,其他情况不需要通知.
		{
			sync_.notify_one();
		}

	}
	void execute_job()
	{

		while(true)
		{
			std::unique_lock<std::mutex> lock(mutex_);
			while(!stop_all_ && wait_for_running_jobs_.empty() )
			{
				//等待其他线程提交任务
				sync_.wait(lock);
			}

			if( stop_all_ )
			{
				return;
			}
			while(!stop_all_ && idle_tasks_.empty())
			{
				//有任务要执行,但是没有空闲线程,等待其他任务执行完成。
				sync_.wait(lock);
			}
			if( stop_all_ )
			{
				return;
			}

			//有任务,也有空闲线程了
			auto t = get_task();
			auto job =wait_for_running_jobs_.front();
			wait_for_running_jobs_.pop();
			//分发任务到task 线程.
			t->set_job(std::get<0>(job), std::get<1>(job), std::get<2>(job));
		}
	}
	void stop_all()
	{

		std::lock_guard<std::mutex> lock(mutex_);
		stop_all_ = true;
		for( auto t : idle_tasks_ )
		{
			t->stop();
		}
		idle_tasks_.clear();
		for( auto t : busying_tasks_ )
		{
			t->stop();
		}
		while(!wait_for_running_jobs_.empty()){
			wait_for_running_jobs_.pop();
		}

		sync_.notify_one();
	}

	void start()
	{// 初始化启动线程池主线程
		try
		{
			std::thread t( [this]{ execute_job();});
			t.detach();

			stop_all_ = false;
			allocate_tasks();

		}catch( std::exception & e )
		{
			LOG(FATAL) << "start tasks pool ... error"<<e.what();
		}
	}
protected:
	std::tr1::shared_ptr<task> get_task()
	{
		//获取task对象
		if( ! idle_tasks_.empty() )
		{
			auto t = *idle_tasks_.begin();
			idle_tasks_.pop_front();  //从空闲队列移除
			busying_tasks_.insert(t); //加入忙队列

			return t;
		}

		return std::tr1::shared_ptr<task>();

	}

	void allocate_tasks() //初始化线程池
	{
		for( int i = 0 ; i < max_size_; i ++ )
		{
			std::tr1::shared_ptr<task> t( new task());
			try{
				t->start();
				idle_tasks_.push_back(t);
			}catch( std::exception & e)
			{	//超过进程最大线程数限制时,会跑出异常。。。
				break;
			}
		}
	}
private :
	unsigned int                              max_size_;
	std::list  < std::tr1::shared_ptr<task> > idle_tasks_;   //空闲任务队列
	std::set  <  std::tr1::shared_ptr<task> > busying_tasks_;//正在执行任务的队列
	std::queue< std::tuple< std::function<void()> , std::string, int   >  > wait_for_running_jobs_; //等待执行的任务
	std::mutex             				    mutex_;
	std::condition_variable                 sync_;
	bool stop_all_;
};
  • usage
static task_pool *  tasks = nullptr;
static std::once_flag init_flag;
static std::once_flag finit_flag;

void run_job(const std::function<void()> & job , const std::string &  file, int line )
{
	if( tasks != nullptr)
		tasks->submit_job(job, file,line);

}
void task_pool_init( unsigned max_task_size)
{
	std::call_once(init_flag,[max_task_size]{
		tasks = new task_pool(max_task_size);
		tasks->start();
	});
}
void task_pool_finit()
{
   std::call_once(finit_flag,[]{ tasks->stop_all();});
}

基于C++11的线程池,布布扣,bubuko.com

时间: 2024-10-18 06:53:42

基于C++11的线程池的相关文章

基于C++11的线程池,简洁且可以带任意多的参数

咳咳.C++11 加入了线程库,从此告别了标准库不支持并发的历史.然而 c++ 对于多线程的支持还是比较低级,稍微高级一点的用法都需要自己去实现,譬如线程池.信号量等.线程池(thread pool)这个东西,在面试上多次被问到,一般的回答都是:"管理一个任务队列,一个线程队列,然后每次取一个任务分配给一个线程去做,循环往复." 貌似没有问题吧.但是写起程序来的时候就出问题了. 废话不多说,先上实现,然后再啰嗦.(dont talk, show me ur code !) 代码实现 1

基于ThreadPoolExecutor,自定义线程池简单实现

一.线程池作用 在上一篇随笔中有提到多线程具有同一时刻处理多个任务的特点,即并行工作,因此多线程的用途非常广泛,特别在性能优化上显得尤为重要.然而,多线程处理消耗的时间包括创建线程时间T1.工作时间T2.销毁线程时间T3,创建和销毁线程需要消耗一定的时间和资源,如果能够减少这部分的时间消耗,性能将会进一步提高,线程池就能够很好解决问题.线程池在初始化时会创建一定数量的线程,当需要线程执行任务时,从线程池取出线程,当任务执行完成后,线程置回线程池成为空闲线程,等待下一次任务.JDK1.5提供了一个

11 java 线程池 实现原理

一 关键类的实现 1 ThreadPoolExecutor类 java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类. 下面我们来看一下ThreadPoolExecutor类的具体实现源码. 在ThreadPoolExecutor类中提供了四个构造方法: 1 public class ThreadPoolExecutor extends AbstractExecutorService {

使用C++11封装线程池ThreadPool

读本文之前,请务必阅读: 使用C++11的function/bind组件封装Thread以及回调函数的使用 Linux组件封装(五)一个生产者消费者问题示例   线程池本质上是一个生产者消费者模型,所以请熟悉这篇文章:Linux组件封装(五)一个生产者消费者问题示例. 在ThreadPool中,物品为计算任务,消费者为pool内的线程,而生产者则是调用线程池的每个函数. 搞清了这一点,我们很容易就需要得出,ThreadPool需要一把互斥锁和两个同步变量,实现同步与互斥. 存储任务,当然需要一个

11 java 线程池 使用实例

在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果. 1 线程池做什么 网络请求通常有两种形式: 第一种,请求不是很频繁,而且每次连接后会保持相当一段时间来读数据或

基于SmartThreadPool线程池技术实现多任务批量处理

一.多线程技术应用场景介绍 本期同样带给大家分享的是阿笨在实际工作中遇到的真实业务场景,请跟随阿笨的视角去如何采用基于开源组件SmartThreadPool线程池技术实现多任务批量处理.在工作中您是否遇到过如何快速高效的处理Job任务列表.如何通过多线程批量处理订单.如何多线程群发短信.如何批量上传图片到远程图片服务器或者云存储图片服务器.如何通过多线程让应用程序提高对CPU的利用率从而增加应用程序的处理效率,等等.如果您有遇到类似的业务场景的而感到烦恼的话,那么今天您看完阿笨的分享课后下次碰到

Java线程池ThreadPoolExecutor

线程池的好处 1. 降低资源的消耗 通过重复利用已创建的线程降低线程创建和销毁所造成的消耗 2. 提高响应速度 当任务到达时,任务可以不需要等到线程创建就能立即执行 3. 提高线程的可管理型 线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配.调优和监控. 实现原理 当提交一个新任务到线程池时,线程池的处理流程为: 1). 线程池判断核心线程池里的线程是否都在执行任务. 如果不是,则创建一个新的工作线程来执行任务.如果核心线程池里的线程都在执行

java多线程系类:JUC线程池:03之线程池原理(二)(转)

概要 在前面一章"Java多线程系列--"JUC线程池"02之 线程池原理(一)"中介绍了线程池的数据结构,本章会通过分析线程池的源码,对线程池进行说明.内容包括:线程池示例参考代码(基于JDK1.7.0_40)线程池源码分析(一) 创建"线程池"(二) 添加任务到"线程池"(三) 关闭"线程池" 转载请注明出处:http://www.cnblogs.com/skywang12345/p/3509954.h

.NET线程池技术实现多任务批量处理

一.多线程技术应用场景介绍 本期同样带给大家分享的是阿笨在实际工作中遇到的真实业务场景,请跟随阿笨的视角去如何采用基于开源组件SmartThreadPool线程池技术实现多任务批量处理.在工作中您是否遇到过如何快速高效的处理Job任务列表.如何通过多线程批量处理订单.如何多线程群发短信.如何批量上传图片到远程图片服务器或者云存储图片服务器.如何通过多线程让应用程序提高对CPU的利用率从而增加应用程序的处理效率,等等.如果您有遇到类似的业务场景的而感到烦恼的话,那么今天您看完阿笨的分享课后下次碰到