转载:C++线程池的一个实现

原文转自:http://www.cnblogs.com/lidabo/p/3328646.html

略有修改

Cthread类参见:http://www.cnblogs.com/tangxin-blog/p/4835211.html

CThreadPool.h

 1 #ifndef __MY_THREAD_POOL_H_
 2 #define __MY_THREAD_POOL_H_
 3
 4 #include "CThread.h"
 5 #include <set>
 6 #include <list>
 7 #include <windows.h>
 8 using namespace std;
 9
10 class CThreadPool
11 {
12 public:
13     CThreadPool(void);
14     virtual ~CThreadPool(void);
15     // 初始化线程池,创建minThreads个线程
16     bool Initialize(unsigned int minThreadCnt,unsigned int maxThreadCnt,unsigned int maxTaskQueueLength);
17     bool AddTask( CRunnable *pRunnable, bool bRun = true);
18     void Terminate();
19     // 获取线程数量
20     unsigned int GetThreadCnt();
21 private:
22     // 从任务队列头中取出一个任务
23     CRunnable *GetTask();
24     // 执行任务线程
25     static unsigned int WINAPI StaticThreadFunc(void * arg);
26 private:
27     // 工作者类
28     class CWorker : public CThread
29     {
30     public:
31         CWorker(CThreadPool *pThreadPool,CRunnable *pFirstTask = NULL);
32         ~CWorker();
33         void Run();
34     private:
35         CThreadPool * const m_pThreadPool;
36         CRunnable * const m_pFirstTask;
37         volatile bool m_bRun;
38     };
39
40     typedef std::set<CWorker *> ThreadPool;
41     typedef std::list<CRunnable *> Tasks;
42     typedef Tasks::iterator TasksItr;
43     typedef ThreadPool::iterator ThreadPoolItr;
44
45     CRITICAL_SECTION m_csTasksLock;
46     CRITICAL_SECTION m_csThreadPoolLock;
47
48     // 线程池
49     ThreadPool m_ThreadPool;
50     // 垃圾线程
51     ThreadPool m_TrashThread;
52     // 任务队列
53     Tasks m_Tasks;
54     // 是否在运行
55     volatile bool m_bRun;
56     // 能否插入任务
57     volatile bool m_bEnableInsertTask;
58     // 最小线程数
59     volatile unsigned int m_minThreads;
60     // 最大线程数
61     volatile unsigned int m_maxThreads;
62     // 最大挂起任务数量
63     volatile unsigned int m_maxPendingTasks;
64 };
65
66 #endif

CthreadPool.cpp

#include "CThreadPool.h"

CThreadPool::CWorker::CWorker(CThreadPool *pThreadPool,CRunnable *pFirstTask)
    :m_pThreadPool(pThreadPool),m_pFirstTask(pFirstTask),m_bRun(true)
{

}

CThreadPool::CWorker::~CWorker()
{
}

void CThreadPool::CWorker::Run()
{
    CRunnable * pTask = NULL;
    while(m_bRun)
    {
        // 从线程池的任务队列中取出一个任务
        pTask = m_pThreadPool->GetTask();
        // 如果没有取到任务
        if(NULL == pTask)
        {
            EnterCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));
            // 如果运转的线程数大于最小线程数,需要清除多余的线程
            if(m_pThreadPool->GetThreadCnt() > m_pThreadPool->m_minThreads)
            {
                ThreadPoolItr itr = m_pThreadPool->m_ThreadPool.find(this);
                if(itr != m_pThreadPool->m_ThreadPool.end())
                {
                    m_pThreadPool->m_ThreadPool.erase(itr);
                    m_pThreadPool->m_TrashThread.insert(this);
                }
                m_bRun = false;
            }
            else
            {
                // 等待已经开始运行的线程结束
                ThreadPoolItr itr = m_pThreadPool->m_TrashThread.begin();
                while(itr != m_pThreadPool->m_TrashThread.end())
                {
                    (*itr)->Join();
                    delete (*itr);
                    m_pThreadPool->m_TrashThread.erase(itr);
                    itr = m_pThreadPool->m_TrashThread.begin();
                }
            }
            LeaveCriticalSection(&(m_pThreadPool->m_csThreadPoolLock));
            continue;
        }
        else
        {
            pTask->Run();
            pTask = NULL;
        }
    }
}

CThreadPool::CThreadPool(void):m_bRun(false),m_bEnableInsertTask(false)
{
    InitializeCriticalSection(&m_csTasksLock);
    InitializeCriticalSection(&m_csThreadPoolLock);
}

CThreadPool::~CThreadPool(void)
{
    Terminate();
    DeleteCriticalSection(&m_csTasksLock);
    DeleteCriticalSection(&m_csThreadPoolLock);
}

bool CThreadPool::Initialize(unsigned int minThreadCnt,unsigned int maxThreadCnt,unsigned int maxTaskQueueLength)
{
    if(minThreadCnt == 0)
    {
        return false;
    }
    if(minThreadCnt > maxThreadCnt)
    {
        return false;
    }
    m_minThreads = minThreadCnt;
    m_maxThreads = maxThreadCnt;
    m_maxPendingTasks = maxTaskQueueLength;
    unsigned int i = m_ThreadPool.size();
    for(; i<minThreadCnt; i++)
    {
        //创建线程 minThreadCnt 个线程
        CWorker * pWorker = new CWorker(this);
        if(NULL == pWorker)
        {
            return false;
        }
        EnterCriticalSection(&m_csThreadPoolLock);
        m_ThreadPool.insert(pWorker);
        LeaveCriticalSection(&m_csThreadPoolLock);
        pWorker->Start();
    }
    // 可以开始插入任务队列
    m_bRun = true;
    m_bEnableInsertTask = true;
    return true;
}

unsigned int CThreadPool::GetThreadCnt()
{
    return m_ThreadPool.size();
}

CRunnable * CThreadPool::GetTask()
{
    CRunnable *Task = NULL;
    EnterCriticalSection(&m_csTasksLock);
    if(!m_Tasks.empty())
    {
        Task = m_Tasks.front();
        m_Tasks.pop_front();
    }
    LeaveCriticalSection(&m_csTasksLock);
    return Task;
}

bool CThreadPool::AddTask( CRunnable *pRunnable, bool bRun /*= true*/ )
{
    if(m_bEnableInsertTask == false)
    {
        return false;
    }
    if(NULL == pRunnable)
    {
        return false;
    }
    // 如果达到最大挂起任务数量,不再插入
    if(m_Tasks.size() >= m_maxPendingTasks)
    {
        // 如果小于最大线程数
        if(m_ThreadPool.size() < m_maxThreads)
        {
            CWorker * pWorker = new CWorker(this, pRunnable);
            if(NULL == pWorker)
            {
                return false;
            }
            EnterCriticalSection(&m_csThreadPoolLock);
            m_ThreadPool.insert(pWorker);
            LeaveCriticalSection(&m_csThreadPoolLock);
            pWorker->Start();
        }
        else
        {
            return false;
        }
    }
    else
    {
        EnterCriticalSection(&m_csTasksLock);
        m_Tasks.push_back(pRunnable);
        LeaveCriticalSection(&m_csTasksLock);
    }
    return true;
}

void CThreadPool::Terminate()
{
    m_bEnableInsertTask = false;
    while(m_Tasks.size() > 0)
    {
        Sleep(1);
    }
    m_bRun = false;
    m_minThreads = 0;
    m_maxThreads = 0;
    m_maxPendingTasks = 0;
    while(m_ThreadPool.size() > 0)
    {
        Sleep(1);
    }
    EnterCriticalSection(&m_csThreadPoolLock);
    ThreadPoolItr itr = m_TrashThread.begin();
    while(itr != m_TrashThread.end())
    {
        (*itr)->Join();
        delete (*itr);
        m_TrashThread.erase(itr);
        itr = m_TrashThread.begin();
    }
    LeaveCriticalSection(&m_csThreadPoolLock);
}

测试代码

#include <iostream>
#include <windows.h>
#include <time.h>
#include "CThread.h"
#include "CThreadPool.h"
using namespace std;

class R : public CRunnable
{
public:
    R(int t):m_nt(t)
    {
    }
    ~R()
    {
        cout<<"~R:"<<m_nt<<endl;
    }
    void Run()
    {
        Sleep(m_nt);
    }
    int m_nt;
};

int main()
{
    int i,n = 100000,m;
    time_t start = 0,end = 0;
    R r(1);
    /*
    // 单线程
    start = clock();
    for(i=0;i < n;i++)
    {
        r.Run();
    }
    end = clock();
    cout<<"单线程用时:"<<end - start<<endl;
    */
    // 多线程
    start = clock();
    CThread *ths = NULL;
    ths = new CThread[n];
    for(i=0;i < n;i++)
    {
        ths[i].SetRunnable(&r);
        ths[i].Start();
    }
    for(i=0;i < n;i++)
    {
        ths[i].Join();
    }
    delete[] ths;
    end = clock();
    cout<<"多线程用时:"<<end - start<<endl;
    // 线程池
    start = clock();
    CThreadPool *threadPool = new CThreadPool();
    if(threadPool->Initialize(200,500,100) == false)
    {
        cout<<"Initialize failed"<<endl;
        return -1;
    }
    m = 0;
    for(int i=0;i<n;i++)
    {
        if( !threadPool->AddTask(&r) )
        {
            m++;
        }
    }
    threadPool->Terminate();
    delete threadPool;
    end = clock();
    if(m!=0)
    {
        cout<<m<<endl;
    }
    cout<<"线程池用时:"<<end - start<<endl;
    system("pause");
    return 0;
}

注意:要合理设置最小线程数,最大线程数,最大挂起任务数量,以便达到最优性能。

时间: 2024-11-08 20:39:07

转载:C++线程池的一个实现的相关文章

[转载] Java线程池框架源码分析

转载自http://www.linuxidc.com/Linux/2014-11/108791.htm 相关类Executor,Executors,AbstractExecutorService,ExecutorService Executor:整个线程池执行者框架的顶层接口.定义了一个execute方法,整个线程执行者框架的核心方法. public interface Executor { void execute(Runnable command);} ExecutorService:这是一

线程池-实现一个取消选项

代码Demo: using System;using System.Threading; 在Main方法下面加入以下代码片段: static void AsyncOperation1(CancellationToken token) { Console.WriteLine("Starting the first task"); for (int i = 0; i < 5; i++) { if (token.IsCancellationRequested) { Console.Wr

spring 线程池 的一个坑。

问题简述: 配置的队列初始化的消费者线程占满了线程池.导致其他的再使用此线程池中线程不运行.不报错,不抛异常.线程的数量仅为为线程池的配置中的最小值. <task:executor pool-size="100-150" queue-capacity="250" > 同时schema描述中写道: The size of the executor's thread pool as either a single value or a range    (e

深入了解java线程池(转载)

出处:http://www.cnblogs.com/dolphin0520/ 本文归作者海子和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利. 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执

转载一份C++线程池的代码,非常实用

#ifndef _ThreadPool_H_ #define _ThreadPool_H_ #pragma warning(disable: 4530) #pragma warning(disable: 4786) #include <cassert> #include <vector> #include <queue> #include <windows.h> using namespace std; class ThreadJob  //工作基类 { p

Java并发编程(十二):线程池的使用(转载)

本文转载自:http://www.cnblogs.com/dolphin0520/p/3932921.html 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.

线程池【转载】

Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPool

Java线程池—转载梅子 http://www.cnblogs.com/dolphin0520/p/3932921.html

Java并发编程:线程池的使用 Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我

四大线程池详解(转载)

new Thread 的弊端 首先看一段代码: public class ThreadTest { public static void main(String[] args) { while (true) { new Thread(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread()); } }).start(); } } } Thread[Thread-0,5,main