首先,先简单介绍,线程池的工作原理。
1.他自身拥有一定数量的线程数组 threads,处于等待状态,等待唤醒(通过条件变量)
2.拥有一个任务队列 m_tasks,存储用户的任务,有新任务以后,唤醒线程,取出任务,通过回调函数的方式调用任务,执行完以后继续等待。
使用情况:线程池,适用于会话简短的情况下,http访问可以使用线程池,如需要长时间保持通讯的,如会话,就不要用线程池了。
本例子,采用单例模式,线程安全。
公开接口两个:
static CMyThreadPool * getInstance();
bool start(Task fun);
用户的函数 fun 的参数,可通过,bind来传递,不过要注意,如果传的是指针,需要注意他的生存周期,如果传的是 new,处理完以后,要自己 delete.
void showTicket(mutex* m){ lock_guard<std::mutex> l(*m); cout <<" show ticket: " << ticket++ << endl; } pool->start(bind(showTicket, m));
头文件:
//定义一个函数对象类型typedef std::function<void()> Task; class CMyThreadPool { private: int max_thread; // max thread; int max_task; // max task; // thread array: vector<thread> threads; // task queue: queue<Task> m_tasks; // lock: mutex m_lock; // condition: condition_variable has_task; bool running_flag; public: ~CMyThreadPool(void); //获取线程池对象指针 static CMyThreadPool * getInstance(); //添加任务,成功返回true,失败返回false bool start(Task fun); private: CMyThreadPool(void); bool InitThread(); void DestroyPool(); //工作线程 void WorkFun(); static CMyThreadPool * m_pool; static std::mutex *singal_mutex; };
实现:
#include "MyThreadPool.h" CMyThreadPool * CMyThreadPool::m_pool = NULL; mutex* CMyThreadPool::singal_mutex = new mutex(); CMyThreadPool::CMyThreadPool(void):max_thread(default_max_thread), max_task(default_max_task),running_flag(true) { } CMyThreadPool::~CMyThreadPool(void) { DestroyPool(); } CMyThreadPool * CMyThreadPool::getInstance() { if( NULL == m_pool){ //lock(); std::lock_guard<std::mutex> l(*singal_mutex); if( NULL == m_pool){ m_pool = new CMyThreadPool(); } //unlock(); } return m_pool; } bool CMyThreadPool::start( Task fun ) { //判断是否第一次,延缓线程初始化 { if( threads.size() == 0){ unique_lock<mutex> l(m_lock); if( threads.size() == 0){ //初始化线程 if(!InitThread()){ return false; } } } } //判断工作队列是否已满,没满则加入工作队列 { unique_lock<mutex> l(m_lock); if( (unsigned int)max_task > m_tasks.size()){ m_tasks.push(fun); }else{ return false; } } //唤醒一个线程 has_task.notify_one(); return true; } //已经上着锁了 bool CMyThreadPool::InitThread() { for (int i = 0; i != max_thread; i++){ threads.push_back(thread(&CMyThreadPool::WorkFun, this)); } return true; } void CMyThreadPool::WorkFun() { while(running_flag || !m_tasks.empty()){ Task t; //获取task { unique_lock<mutex> l(m_lock); while( m_tasks.empty()) has_task.wait(l); t = m_tasks.front(); m_tasks.pop(); } //执行task t(); } } void CMyThreadPool::DestroyPool() { { unique_lock<mutex> u_lock(m_lock); running_flag = false; } has_task.notify_all(); for( auto &t : threads){ t.join(); } threads.clear(); }
测试用例:
#include <iostream> #include "MyThreadPool.h" #include <memory> #define _CRTDBG_MAP_ALLOC #include <crtdbg.h> #include <Windows.h> using namespace std; int ticket = 0; void showTicket(mutex* m){ lock_guard<std::mutex> l(*m); #ifdef WIN32 //打印当前线程号 cout << "Thread id: " << GetCurrentThreadId(); #endif cout <<" show ticket: " << ticket++ << endl; } int main(){ mutex *m = new mutex; int sum = 0; { std::shared_ptr<CMyThreadPool> pool(CMyThreadPool::getInstance()); for(int i = 0; i < 100;i++){ if(!pool->start(bind(showTicket, m))){ sum++; } } } cout << "not use task : "<< sum << endl; delete m; _CrtDumpMemoryLeaks(); system("pause"); return 0; }
时间: 2024-11-01 10:27:25