一个Windows下线程池的实现(C++)

前言

  本文配套代码:https://github.com/TTGuoying/ThreadPool

  先看看几个概念:

  1. 线程:进程中负责执行的执行单元。一个进程中至少有一个线程。
  2. 多线程:一个进程中有多个线程同时运行,根据cpu切换轮流工作,在多核cpu上可以几个线程同时在不同的核心上同时运行。
  3. 线程池:基本思想还是一种对象池思想,开辟一块内存空间,里面存放一些休眠(挂起Suspend)的线程。当有任务要执行时,从池中取一个空闲的线程执行任务,执行完成后线程休眠放回池中。这样可以避免反复创建线程对象所带来的性能开销,节省了系统的资源。

  我们为什么要使用线程池呢?

  简单来说就是线程本身存在开销,我们利用多线程来进行任务处理,单线程也不能滥用,无止禁的开新线程会给系统产生大量消耗,而线程本来就是可重用的资源,不需要每次使用时都进行初始化,因此可以采用有限的线程个数处理无限的任务。

代码实现

  本文的线程池是在Windows上实现的。主要思路是维护一个空闲线程队列、一个忙碌线程队列和一个任务队列,一开始建立一定数量的空闲线程放进空闲线程队列,当有任务进入任务队列时,从空闲线程队列中去一个线程执行任务,线程变为忙碌线程移入忙碌线程队列,任务执行完成后,线程到任务队列中取任务继续执行,如果任务队列中没有任务线程休眠后从忙碌线程队列回到空闲线程队列。下面是线程池的工作原理图:

  本线程池类实现了自动调节池中线程数。

  废话少说,直接上代码:

  1 /*
  2 ==========================================================================
  3 * 类ThreadPool是本代码的核心类,类中自动维护线程池的创建和任务队列的派送
  4
  5 * 其中的TaskFun是任务函数
  6 * 其中的TaskCallbackFun是回调函数
  7
  8 *用法:定义一个ThreadPool变量,TaskFun函数和TaskCallbackFun回调函数,然后调用ThreadPool的QueueTaskItem()函数即可
  9
 10 Author: TTGuoying
 11
 12 Date: 2018/02/19 23:15
 13
 14 ==========================================================================
 15 */
 16 #pragma once
 17 #include <Windows.h>
 18 #include <list>
 19 #include <queue>
 20 #include <memory>
 21
 22 using std::list;
 23 using std::queue;
 24 using std::shared_ptr;
 25
 26 #define THRESHOLE_OF_WAIT_TASK  20
 27
 28 typedef int(*TaskFun)(PVOID param);                // 任务函数
 29 typedef void(*TaskCallbackFun)(int result);        // 回调函数
 30
 31 class ThreadPool
 32 {
 33 private:
 34     // 线程类(内部类)
 35     class Thread
 36     {
 37     public:
 38         Thread(ThreadPool *threadPool);
 39         ~Thread();
 40
 41         BOOL isBusy();                                                    // 是否有任务在执行
 42         void ExecuteTask(TaskFun task, PVOID param, TaskCallbackFun taskCallback);    // 执行任务
 43
 44     private:
 45         ThreadPool *threadPool;                                            // 所属线程池
 46         BOOL    busy;                                                    // 是否有任务在执行
 47         BOOL    exit;                                                    // 是否退出
 48         HANDLE  thread;                                                    // 线程句柄
 49         TaskFun    task;                                                    // 要执行的任务
 50         PVOID    param;                                                    // 任务参数
 51         TaskCallbackFun taskCb;                                            // 回调的任务
 52         static unsigned int __stdcall ThreadProc(PVOID pM);                // 线程函数
 53     };
 54
 55     // IOCP的通知种类
 56     enum WAIT_OPERATION_TYPE
 57     {
 58         GET_TASK,
 59         EXIT
 60     };
 61
 62     // 待执行的任务类
 63     class WaitTask
 64     {
 65     public:
 66         WaitTask(TaskFun task, PVOID param, TaskCallbackFun taskCb, BOOL bLong)
 67         {
 68             this->task = task;
 69             this->param = param;
 70             this->taskCb = taskCb;
 71             this->bLong = bLong;
 72         }
 73         ~WaitTask() { task = NULL; taskCb = NULL; bLong = FALSE; param = NULL; }
 74
 75         TaskFun    task;                    // 要执行的任务
 76         PVOID param;                    // 任务参数
 77         TaskCallbackFun taskCb;            // 回调的任务
 78         BOOL bLong;                        // 是否时长任务
 79     };
 80
 81     // 从任务列表取任务的线程函数
 82     static unsigned int __stdcall GetTaskThreadProc(PVOID pM)
 83     {
 84         ThreadPool *threadPool = (ThreadPool *)pM;
 85         BOOL bRet = FALSE;
 86         DWORD dwBytes = 0;
 87         WAIT_OPERATION_TYPE opType;
 88         OVERLAPPED *ol;
 89         while (WAIT_OBJECT_0 != WaitForSingleObject(threadPool->stopEvent, 0))
 90         {
 91             BOOL bRet = GetQueuedCompletionStatus(threadPool->completionPort, &dwBytes, (PULONG_PTR)&opType, &ol, INFINITE);
 92             // 收到退出标志
 93             if (EXIT == (DWORD)opType)
 94             {
 95                 break;
 96             }
 97             else if (GET_TASK == (DWORD)opType)
 98             {
 99                 threadPool->GetTaskExcute();
100             }
101         }
102         return 0;
103     }
104
105     //线程临界区锁
106     class CriticalSectionLock
107     {
108     private:
109         CRITICAL_SECTION cs;//临界区
110     public:
111         CriticalSectionLock() { InitializeCriticalSection(&cs); }
112         ~CriticalSectionLock() { DeleteCriticalSection(&cs); }
113         void Lock() { EnterCriticalSection(&cs); }
114         void UnLock() { LeaveCriticalSection(&cs); }
115     };
116
117
118 public:
119     ThreadPool(size_t minNumOfThread = 2, size_t maxNumOfThread = 10);
120     ~ThreadPool();
121
122     BOOL QueueTaskItem(TaskFun task, PVOID param, TaskCallbackFun taskCb = NULL, BOOL longFun = FALSE);       // 任务入队
123
124 private:
125     size_t getCurNumOfThread() { return getIdleThreadNum() + getBusyThreadNum(); }    // 获取线程池中的当前线程数
126     size_t GetMaxNumOfThread() { return maxNumOfThread - numOfLongFun; }            // 获取线程池中的最大线程数
127     void SetMaxNumOfThread(size_t size)            // 设置线程池中的最大线程数
128     {
129         if (size < numOfLongFun)
130         {
131             maxNumOfThread = size + numOfLongFun;
132         }
133         else
134             maxNumOfThread = size;
135     }
136     size_t GetMinNumOfThread() { return minNumOfThread; }                            // 获取线程池中的最小线程数
137     void SetMinNumOfThread(size_t size) { minNumOfThread = size; }                    // 设置线程池中的最小线程数
138
139     size_t getIdleThreadNum() { return idleThreadList.size(); }    // 获取线程池中的线程数
140     size_t getBusyThreadNum() { return busyThreadList.size(); }    // 获取线程池中的线程数
141     void CreateIdleThread(size_t size);                            // 创建空闲线程
142     void DeleteIdleThread(size_t size);                            // 删除空闲线程
143     Thread *GetIdleThread();                                    // 获取空闲线程
144     void MoveBusyThreadToIdleList(Thread *busyThread);            // 忙碌线程加入空闲列表
145     void MoveThreadToBusyList(Thread *thread);                    // 线程加入忙碌列表
146     void GetTaskExcute();                                        // 从任务队列中取任务执行
147     WaitTask *GetTask();                                        // 从任务队列中取任务
148
149     CriticalSectionLock idleThreadLock;                            // 空闲线程列表锁
150     list<Thread *> idleThreadList;                                // 空闲线程列表
151     CriticalSectionLock busyThreadLock;                            // 忙碌线程列表锁
152     list<Thread *> busyThreadList;                                // 忙碌线程列表
153
154     CriticalSectionLock waitTaskLock;
155     list<WaitTask *> waitTaskList;                                // 任务列表
156
157     HANDLE                    dispatchThrad;                        // 分发任务线程
158     HANDLE                    stopEvent;                            // 通知线程退出的时间
159     HANDLE                    completionPort;                        // 完成端口
160     size_t                    maxNumOfThread;                        // 线程池中最大的线程数
161     size_t                    minNumOfThread;                        // 线程池中最小的线程数
162     size_t                    numOfLongFun;                        // 线程池中最小的线程数
163 };
  1 #include "stdafx.h"
  2 #include "ThreadPool.h"
  3 #include <process.h>
  4
  5
  6 ThreadPool::ThreadPool(size_t minNumOfThread, size_t maxNumOfThread)
  7 {
  8     if (minNumOfThread < 2)
  9         this->minNumOfThread = 2;
 10     else
 11         this->minNumOfThread = minNumOfThread;
 12     if (maxNumOfThread < this->minNumOfThread * 2)
 13         this->maxNumOfThread = this->minNumOfThread * 2;
 14     else
 15         this->maxNumOfThread = maxNumOfThread;
 16     stopEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
 17     completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1);
 18
 19     idleThreadList.clear();
 20     CreateIdleThread(this->minNumOfThread);
 21     busyThreadList.clear();
 22
 23     dispatchThrad = (HANDLE)_beginthreadex(0, 0, GetTaskThreadProc, this, 0, 0);
 24     numOfLongFun = 0;
 25 }
 26
 27 ThreadPool::~ThreadPool()
 28 {
 29     SetEvent(stopEvent);
 30     PostQueuedCompletionStatus(completionPort, 0, (DWORD)EXIT, NULL);
 31
 32     CloseHandle(stopEvent);
 33 }
 34
 35 BOOL ThreadPool::QueueTaskItem(TaskFun task, PVOID param, TaskCallbackFun taskCb, BOOL longFun)
 36 {
 37     waitTaskLock.Lock();
 38     WaitTask *waitTask = new WaitTask(task, param, taskCb, longFun);
 39     waitTaskList.push_back(waitTask);
 40     waitTaskLock.UnLock();
 41     PostQueuedCompletionStatus(completionPort, 0, (DWORD)GET_TASK, NULL);
 42     return TRUE;
 43 }
 44
 45 void ThreadPool::CreateIdleThread(size_t size)
 46 {
 47     idleThreadLock.Lock();
 48     for (size_t i = 0; i < size; i++)
 49     {
 50         idleThreadList.push_back(new Thread(this));
 51     }
 52     idleThreadLock.UnLock();
 53 }
 54
 55 void ThreadPool::DeleteIdleThread(size_t size)
 56 {
 57     idleThreadLock.Lock();
 58     size_t t = idleThreadList.size();
 59     if (t >= size)
 60     {
 61         for (size_t i = 0; i < size; i++)
 62         {
 63             auto thread = idleThreadList.back();
 64             delete thread;
 65             idleThreadList.pop_back();
 66         }
 67     }
 68     else
 69     {
 70         for (size_t i = 0; i < t; i++)
 71         {
 72             auto thread = idleThreadList.back();
 73             delete thread;
 74             idleThreadList.pop_back();
 75         }
 76     }
 77     idleThreadLock.UnLock();
 78 }
 79
 80 ThreadPool::Thread *ThreadPool::GetIdleThread()
 81 {
 82     Thread *thread = NULL;
 83     idleThreadLock.Lock();
 84     if (idleThreadList.size() > 0)
 85     {
 86         thread = idleThreadList.front();
 87         idleThreadList.pop_front();
 88     }
 89     idleThreadLock.UnLock();
 90
 91     if (thread == NULL && getCurNumOfThread() < maxNumOfThread)
 92     {
 93         thread = new Thread(this);
 94     }
 95
 96     if (thread == NULL && waitTaskList.size() > THRESHOLE_OF_WAIT_TASK)
 97     {
 98         thread = new Thread(this);
 99         InterlockedIncrement(&maxNumOfThread);
100     }
101     return thread;
102 }
103
104 void ThreadPool::MoveBusyThreadToIdleList(Thread * busyThread)
105 {
106     idleThreadLock.Lock();
107     idleThreadList.push_back(busyThread);
108     idleThreadLock.UnLock();
109
110     busyThreadLock.Lock();
111     for (auto it = busyThreadList.begin(); it != busyThreadList.end(); it++)
112     {
113         if (*it == busyThread)
114         {
115             busyThreadList.erase(it);
116             break;
117         }
118     }
119     busyThreadLock.UnLock();
120
121     if (maxNumOfThread != 0 && idleThreadList.size() > maxNumOfThread * 0.8)
122     {
123         DeleteIdleThread(idleThreadList.size() / 2);
124     }
125
126     PostQueuedCompletionStatus(completionPort, 0, (DWORD)GET_TASK, NULL);
127 }
128
129 void ThreadPool::MoveThreadToBusyList(Thread * thread)
130 {
131     busyThreadLock.Lock();
132     busyThreadList.push_back(thread);
133     busyThreadLock.UnLock();
134 }
135
136 void ThreadPool::GetTaskExcute()
137 {
138     Thread *thread = NULL;
139     WaitTask *waitTask = NULL;
140
141     waitTask = GetTask();
142     if (waitTask == NULL)
143     {
144         return;
145     }
146
147     if (waitTask->bLong)
148     {
149         if (idleThreadList.size() > minNumOfThread)
150         {
151             thread = GetIdleThread();
152         }
153         else
154         {
155             thread = new Thread(this);
156             InterlockedIncrement(&numOfLongFun);
157             InterlockedIncrement(&maxNumOfThread);
158         }
159     }
160     else
161     {
162         thread = GetIdleThread();
163     }
164
165     if (thread != NULL)
166     {
167         thread->ExecuteTask(waitTask->task, waitTask->param, waitTask->taskCb);
168         delete waitTask;
169         MoveThreadToBusyList(thread);
170     }
171     else
172     {
173         waitTaskLock.Lock();
174         waitTaskList.push_front(waitTask);
175         waitTaskLock.UnLock();
176     }
177
178 }
179
180 ThreadPool::WaitTask *ThreadPool::GetTask()
181 {
182     WaitTask *waitTask = NULL;
183     waitTaskLock.Lock();
184     if (waitTaskList.size() > 0)
185     {
186         waitTask = waitTaskList.front();
187         waitTaskList.pop_front();
188     }
189     waitTaskLock.UnLock();
190     return waitTask;
191 }
192
193
194 ThreadPool::Thread::Thread(ThreadPool *threadPool) :
195     busy(FALSE),
196     thread(INVALID_HANDLE_VALUE),
197     task(NULL),
198     taskCb(NULL),
199     exit(FALSE),
200     threadPool(threadPool)
201 {
202     thread = (HANDLE)_beginthreadex(0, 0, ThreadProc, this, CREATE_SUSPENDED, 0);
203 }
204
205 ThreadPool::Thread::~Thread()
206 {
207     exit = TRUE;
208     task = NULL;
209     taskCb = NULL;
210     ResumeThread(thread);
211     WaitForSingleObject(thread, INFINITE);
212     CloseHandle(thread);
213 }
214
215 BOOL ThreadPool::Thread::isBusy()
216 {
217     return busy;
218 }
219
220 void ThreadPool::Thread::ExecuteTask(TaskFun task, PVOID param, TaskCallbackFun taskCallback)
221 {
222     busy = TRUE;
223     this->task = task;
224     this->param = param;
225     this->taskCb = taskCallback;
226     ResumeThread(thread);
227 }
228
229 unsigned int ThreadPool::Thread::ThreadProc(PVOID pM)
230 {
231     Thread *pThread = (Thread*)pM;
232
233     while (true)
234     {
235         if (pThread->exit)
236             break; //线程退出
237
238         if (pThread->task == NULL && pThread->taskCb == NULL)
239         {
240             pThread->busy = FALSE;
241             pThread->threadPool->MoveBusyThreadToIdleList(pThread);
242             SuspendThread(pThread->thread);
243             continue;
244         }
245
246         int resulst = pThread->task(pThread->param);
247         if(pThread->taskCb)
248             pThread->taskCb(resulst);
249         WaitTask *waitTask = pThread->threadPool->GetTask();
250         if (waitTask != NULL)
251         {
252             pThread->task = waitTask->task;
253             pThread->taskCb = waitTask->taskCb;
254             delete waitTask;
255             continue;
256         }
257         else
258         {
259             pThread->task = NULL;
260             pThread->param = NULL;
261             pThread->taskCb = NULL;
262             pThread->busy = FALSE;
263             pThread->threadPool->MoveBusyThreadToIdleList(pThread);
264             SuspendThread(pThread->thread);
265         }
266     }
267
268     return 0;
269 }
 1 // ThreadPool.cpp: 定义控制台应用程序的入口点。
 2 //
 3
 4 #include "stdafx.h"
 5 #include "ThreadPool.h"
 6 #include <stdio.h>
 7
 8 class Task
 9 {
10 public:
11     static int Task1(PVOID p)
12     {
13         int i = 10;
14         while (i >= 0)
15         {
16             printf("%d\n", i);
17             Sleep(100);
18             i--;
19         }
20         return i;
21     }
22 };
23
24 class TaskCallback
25 {
26 public:
27     static void TaskCallback1(int result)
28     {
29         printf("   %d\n", result);
30     }
31 };
32
33 int main()
34 {
35     ThreadPool threadPool(2, 10);
36     for (size_t i = 0; i < 30; i++)
37     {
38         threadPool.QueueTaskItem(Task::Task1, NULL, TaskCallback::TaskCallback1);
39     }
40     threadPool.QueueTaskItem(Task::Task1, NULL, TaskCallback::TaskCallback1, TRUE);
41
42     getchar();
43
44     return 0;
45 }

原文地址:https://www.cnblogs.com/tanguoying/p/8454637.html

时间: 2024-12-12 10:27:16

一个Windows下线程池的实现(C++)的相关文章

一个windows下的ddos样本

一个windows下的ddos样本. 加载器 程序运行之后会在临时目录释放出一个256_res.tmp的文件 之后将该文件移动至system32目录下,以rasmedia.dll命名. 删除原文件. 加载开始释放的dll文件,并调用该dll导出的install函数. Rasmedia.dll 函数install会将将该dll注册成一个服务WinHelp32. 具体如下 服务运行之后会有两个线程开启. 一个线程用于和远端服务器进行通信. 一个线程会将自身拷贝到内存中,当发现映像文件被删除,会将自己

Linux下线程池的理解与简单实现

首先,线程池是什么?顾名思义,就是把一堆开辟好的线程放在一个池子里统一管理,就是一个线程池. 其次,为什么要用线程池,难道来一个请求给它申请一个线程,请求处理完了释放线程不行么?也行,但是如果创建线程和销毁线程的时间比线程处理请求的时间长,而且请求很多的情况下,我们的CPU资源都浪费在了创建和销毁线程上了,所以这种方法的效率比较低,于是,我们可以将若干已经创建完成的线程放在一起统一管理,如果来了一个请求,我们从线程池中取出一个线程来处理,处理完了放回池内等待下一个任务,线程池的好处是避免了繁琐的

谈一谈linux下线程池

什么是线程池: 首先,顾名思义,就是把一堆开辟好的线程放在一个池子里统一管理,就是一个线程池. 其次,为什么要用线程池,难道来一个请求给它申请一个线程,请求处理完了释放线程不行么?也行,但是如果创建线程和销毁线程的时间比线程处理请求的时间长,而且请求很多的情况下,我们的CPU资源都浪费在了创建和销毁线程上了,所以这种方法的效率比较低,于是,我们可以将若干已经创建完成的线程放在一起统一管理,如果来了一个请求,我们从线程池中取出一个线程来处理,处理完了放回池内等待下一个任务,线程池的好处是避免了繁琐

windows下线程创建

windows下线程创建: CreateThread()函数是Windows提供的API接口 1.HANDLE WINAPI CreateThread( LPSECURITY_ATTRIBUTESlpThreadAttributes, SIZE_TdwStackSize, LPTHREAD_START_ROUTINElpStartAddress, LPVOIDlpParameter, DWORDdwCreationFlags, LPDWORDlpThreadId ); 函数说明: 第一个参数表示

做一个WINDOWS下破解WIFI。不需要Linux抓包!

搬家了,没网了.没有WIFI了! 想破解,不过没有Linux环境,不能抓包!破解! 于是自己动手开工! 在windows 下直接破解.貌似国内 还没看到.如果有了,那么请各位童鞋 提醒一下.赶急 要使用啊!! 最终: 不过有点问题,如果路由器 启用混淆模式,那么如何拿到真实的握手协议呢? 做一个WINDOWS下破解WIFI.不需要Linux抓包!

一个简单的线程池程序设计(消费者和生产者)

最近在学习linux下的编程,刚开始接触感觉有点复杂,今天把线程里比较重要的线程池程序重新理解梳理一下. 实现功能:创建一个线程池,该线程池包含若干个线程,以及一个任务队列,当有新的任务出现时,如果任务队列不满,则把该任务加入到任务队列中去,并且向线程发送一个信号,调用某个线程为任务队列中的任务服务.如果线程池中的线程都在忙,那么任务队列中的任务则等待.本程序较为简单,把任务定义为了两个数相加,输出它们的和. 采用自顶向下的设计方法,先把整体框架构建出来,然后再慢慢把细节,小模块补全. 1.在l

Linux平台下线程池的原理及实现

转自:http://blog.csdn.net/lmh12506/article/details/7753952 前段时间在github上开了个库,准备实现自己的线程池的,因为换工作的事,一直也没有实现,参考这篇文章准备着手实现一下. 什么时候需要创建线程池呢?简单的说,如果一个应用需要频繁的创建和销毁线程,而任务执行的时间又非常短,这样线程创建和销毁的带来的开销就不容忽视,这时也是线程池该出场的机会了.如果线程创建和销毁时间相比任务执行时间可以忽略不计,则没有必要使用线程池了. 下面是Linu

Linux C 一个简单的线程池程序设计

实现功能:创建一个线程池,该线程池包含若干个线程,以及一个任务队列,当有新的任务出现时,如果任务队列不满,则把该任务加入到任务队列中去,并且向线程发送一个信号,调用某个线程为任务队列中的任务服务.如果线程池中的线程都在忙,那么任务队列中的任务则等待.本程序较为简单,把任务定义为了两个数相加,输出它们的和. 采用自顶向下的设计方法,先把整体框架构建出来,然后再慢慢把细节,小模块补全. 1.在linux环境下构建三个文件夹(include,src,bin) include:包含该程序所需要的头文件.

一个简单的线程池实现

前段时间学习了线程方面的知识,看了关于线程池的教程,自己也试着实现一个.跟大家分享,同时也整理整理思路.   对线程池的要求: 1.用于处理大量短暂的任务. 2.动态增加线程,直到达到最大允许的线程数量. 3.动态销毁线程.   线程池的实现类似于"消费者--生产者"模型: 用一个队列存放任务(仓库,缓存) 主线程添加任务(生产者生产任务) 新建线程函数执行任务(消费者执行任务) 由于任务队列是全部线程共享的,就涉及到同步问题.这里采用条件变量和互斥锁来实现. ------------