win32线程池代码(WinApi/C++)

健壮, 高效,易用,易于扩, 可用于任何C++编译器 
//说明, 这段代码我用了很久, 我删除了自动调整规模的代码(因为他还不成熟)
/******************************************************************
*  Thread Pool For Win32 
*  VC++ 6, BC++ 5.5(Free), GCC(Free)
*  Update : 2004.6.9 llBird  [email protected]

Use:
1):
void threadfunc(void *p)
{
 //...
}
 ThreadPool tp;
 for(i=0; i<100; i++)
  tp.Call(threadfunc);

ThreadPool tp(20);//20为初始线程池规模
 tp.Call(threadfunc, lpPara);
 tp.AdjustSize(50);//增加50
 tp.AdjustSize(-30);//减少30

2):
class MyThreadJob : public ThreadJob //线程对象从ThreadJob扩展
{
public:
 virtual void DoJob(void *p)//自定义的虚函数
 {
  //....
 }
};
 MyThreadJob mt[10];
 ThreadPool tp;
 for(i=0; i<100 i++)
  tp.Call(mt + i);//tp.Call(mt + i, para);

*******************************************************************/
#ifndef _ThreadPool_H_
#define _ThreadPool_H_

#pragma warning(disable: 4530)
#pragma warning(disable: 4786)

#include <cassert>
#include <vector>
#include <queue>
#include <windows.h>

class ThreadJob  //工作基类
{
public:
 //供线程池调用的虚函数
 virtual void DoJob(void *pPara) = 0;
};

class ThreadPool
{

public:
 //dwNum 线程池规模
 ThreadPool(DWORD dwNum = 4) : _lThreadNum(0), _lRunningNum(0) 
 {
  InitializeCriticalSection(&_csThreadVector);
  InitializeCriticalSection(&_csWorkQueue);

_EventComplete = CreateEvent(0, false, false, NULL);
  _EventEnd = CreateEvent(0, true, false, NULL);
  _SemaphoreCall = CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);
  _SemaphoreDel =  CreateSemaphore(0, 0,  0x7FFFFFFF, NULL);

assert(_SemaphoreCall != INVALID_HANDLE_VALUE);
  assert(_EventComplete != INVALID_HANDLE_VALUE);
  assert(_EventEnd != INVALID_HANDLE_VALUE);
  assert(_SemaphoreDel != INVALID_HANDLE_VALUE);

AdjustSize(dwNum <= 0 ? 4 : dwNum);
 }

~ThreadPool()
 {
  DeleteCriticalSection(&_csWorkQueue);

CloseHandle(_EventEnd);
  CloseHandle(_EventComplete);
  CloseHandle(_SemaphoreCall);
  CloseHandle(_SemaphoreDel);
  
  vector<ThreadItem*>::iterator iter;
  for(iter = _ThreadVector.begin(); iter != _ThreadVector.end(); iter++)
  {
   if(*iter)
    delete *iter;
  }

DeleteCriticalSection(&_csThreadVector);
 }
 //调整线程池规模
 int AdjustSize(int iNum)
 {
  if(iNum > 0)
  {
   ThreadItem *pNew;
   EnterCriticalSection(&_csThreadVector);
   for(int _i=0; _i<iNum; _i++)
   {
    _ThreadVector.push_back(pNew = new ThreadItem(this)); 
    assert(pNew);
    pNew->_Handle = CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL);
    assert(pNew->_Handle);
   }
   LeaveCriticalSection(&_csThreadVector);
  }
  else
  {
   iNum *= -1;
   ReleaseSemaphore(_SemaphoreDel,  iNum > _lThreadNum ? _lThreadNum : iNum, NULL);
  }
  return (int)_lThreadNum;
 }
 //调用线程池
 void Call(void (*pFunc)(void  *), void *pPara = NULL)
 {
  assert(pFunc);

EnterCriticalSection(&_csWorkQueue);
  _JobQueue.push(new JobItem(pFunc, pPara));
  LeaveCriticalSection(&_csWorkQueue);

ReleaseSemaphore(_SemaphoreCall, 1, NULL);
 }
 //调用线程池
 inline void Call(ThreadJob * p, void *pPara = NULL)
 {
  Call(CallProc, new CallProcPara(p, pPara));
 }
 //结束线程池, 并同步等待
 bool EndAndWait(DWORD dwWaitTime = INFINITE)
 {
  SetEvent(_EventEnd);
  return WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0;
 }
 //结束线程池
 inline void End()
 {
  SetEvent(_EventEnd);
 }
 inline DWORD Size()
 {
  return (DWORD)_lThreadNum;
 }
 inline DWORD GetRunningSize()
 {
  return (DWORD)_lRunningNum;
 }
 bool IsRunning()
 {
  return _lRunningNum > 0;
 }

protected:

//工作线程
 static DWORD WINAPI DefaultJobProc(LPVOID lpParameter = NULL)
 {
  ThreadItem *pThread = static_cast<ThreadItem*>(lpParameter);
  assert(pThread);

ThreadPool *pThreadPoolObj = pThread->_pThis;
  assert(pThreadPoolObj);

InterlockedIncrement(&pThreadPoolObj->_lThreadNum);

HANDLE hWaitHandle[3];
  hWaitHandle[0] = pThreadPoolObj->_SemaphoreCall;
  hWaitHandle[1] = pThreadPoolObj->_SemaphoreDel;
  hWaitHandle[2] = pThreadPoolObj->_EventEnd;

JobItem *pJob;
  bool fHasJob;
  
  for(;;)
  {
   DWORD wr = WaitForMultipleObjects(3, hWaitHandle, false, INFINITE);

//响应删除线程信号
   if(wr == WAIT_OBJECT_0 + 1)  
    break;
   
   //从队列里取得用户作业
   EnterCriticalSection(&pThreadPoolObj->_csWorkQueue);
   if(fHasJob = !pThreadPoolObj->_JobQueue.empty())
   {
    pJob = pThreadPoolObj->_JobQueue.front();
    pThreadPoolObj->_JobQueue.pop();
    assert(pJob);
   }
   LeaveCriticalSection(&pThreadPoolObj->_csWorkQueue);

//受到结束线程信号 确定是否结束线程(结束线程信号 && 是否还有工作)
   if(wr == WAIT_OBJECT_0 + 2 && !fHasJob)  
    break;

if(fHasJob && pJob)
   {
    InterlockedIncrement(&pThreadPoolObj->_lRunningNum);
    pThread->_dwLastBeginTime = GetTickCount();
    pThread->_dwCount++;
    pThread->_fIsRunning = true;
    pJob->_pFunc(pJob->_pPara); //运行用户作业
    delete pJob; 
    pThread->_fIsRunning = false;
    InterlockedDecrement(&pThreadPoolObj->_lRunningNum);
   }
  }

//删除自身结构
  EnterCriticalSection(&pThreadPoolObj->_csThreadVector);
  pThreadPoolObj->_ThreadVector.erase(find(pThreadPoolObj->_ThreadVector.begin(), pThreadPoolObj->_ThreadVector.end(), pThread));
  LeaveCriticalSection(&pThreadPoolObj->_csThreadVector);

delete pThread;

InterlockedDecrement(&pThreadPoolObj->_lThreadNum);

if(!pThreadPoolObj->_lThreadNum)  //所有线程结束
   SetEvent(pThreadPoolObj->_EventComplete);

return 0;
 }
 //调用用户对象虚函数
 static void CallProc(void *pPara) 
 {
  CallProcPara *cp = static_cast<CallProcPara *>(pPara);
  assert(cp);
  if(cp)
  {
   cp->_pObj->DoJob(cp->_pPara);
   delete cp;
  }
 }
 //用户对象结构
 struct CallProcPara  
 {
  ThreadJob* _pObj;//用户对象 
  void *_pPara;//用户参数
  CallProcPara(ThreadJob* p, void *pPara) : _pObj(p), _pPara(pPara) { };
 };
 //用户函数结构
 struct JobItem 
 {
  void (*_pFunc)(void  *);//函数
  void *_pPara; //参数
  JobItem(void (*pFunc)(void  *) = NULL, void *pPara = NULL) : _pFunc(pFunc), _pPara(pPara) { };
 };
 //线程池中的线程结构
 struct ThreadItem
 {
  HANDLE _Handle; //线程句柄
  ThreadPool *_pThis;  //线程池的指针
  DWORD _dwLastBeginTime; //最后一次运行开始时间
  DWORD _dwCount; //运行次数
  bool _fIsRunning;
  ThreadItem(ThreadPool *pthis) : _pThis(pthis), _Handle(NULL), _dwLastBeginTime(0), _dwCount(0), _fIsRunning(false) { };
  ~ThreadItem()
  {
   if(_Handle)
   {
    CloseHandle(_Handle);
    _Handle = NULL;
   }
  }
 };
 
 std::queue<JobItem *> _JobQueue;  //工作队列
 std::vector<ThreadItem *>  _ThreadVector; //线程数据

CRITICAL_SECTION _csThreadVector, _csWorkQueue; //工作队列临界, 线程数据临界

HANDLE _EventEnd, _EventComplete, _SemaphoreCall, _SemaphoreDel;//结束通知, 完成事件, 工作信号, 删除线程信号
 long _lThreadNum, _lRunningNum; //线程数, 运行的线程数

};

#endif //_ThreadPool_H_

时间: 2024-10-10 22:35:49

win32线程池代码(WinApi/C++)的相关文章

线程池代码

头文件head.h #ifndef __THREADPOOL_H_ #define __THREADPOOL_H_ typedef struct threadpool_t threadpool_t; /** * @function threadpool_create * @descCreates a threadpool_t object. * @param thr_num thread num * @param max_thr_num max thread size * @param queu

线程池代码(通用版)

一.适用场景 首先,必须明确一点,线程池不是万能的,它有其特定的使用场景.使用线程池是为了减小线程本身的开销对应用性能所产生的影响,但是其 前提是线程本身创建.销毁的开销和线程执行任务的开销相比是不可忽略的 .如果线程本身创建.销毁的开销对应用程序的性能可以忽略不计,那么使用/不使用线程池对程序的性能并不会有太大的影响. 线程池通常适合以下几种场景: ①.单位时间内处理的任务频繁,且任务时间较短 ②.对实时性要求较高.如果接收到任务之后再创建线程,可能无法满足实时性的要求,此时必须使用线程池.

Java线程池代码实现

package com.fgy.demo07; public class RunnableImpl implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName() + "创建了一个新的线程"); } } package com.fgy.demo07; public class ExtendsThread extends Thread { @Ov

简单线程池原理和代码

线程池就是,预先创建一定数量的线程,然后当需要异步任务时,只要把任务放入队列中,线程池自动在队列中取任务,每执行完一个任务就自动取下一个任务 本文提供的是一个简单的线程池,所以并不提供线程的自动增减的功能,以比较简单的代码来理解其原理 代码只有一个文件,算上注释才勉强200行,由于代码较长就不全部贴在这里了. 线程池代码见Github[点击] 由于代码使用了一些c++11的东西,所以先需要复习一下以下几个东西:(不要被吓怕,就算不会其实也能懂下面的讲解,具体语法所表达的意思我会说明) std::

C实现线程池

第一部分为头文件 1 #ifndef __THREADPOOL_H_ 2 #define __THREADPOOL_H_ 3 4 typedef struct threadpool_t threadpool_t; 5 6 /** 7 * @function threadpool_create 8 * @descCreates a threadpool_t object. 9 * @param thr_num thread num 10 * @param max_thr_num max threa

linux下c的线程池

参考网上实现下c的线程池 代码更新:https://github.com/ljfly/pool #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/types.h> #include <pthread.h> #include <assert.h> typedef struct worker { void *(*process) (void *

线程池简单实现

实现了一个简化版的线程池. 实现线程池的关键有两个:一是阻塞队列,用于任务的存取,二是内部的线程对象如何持续性的执行任务,并在空闲时被回收. 线程池代码: 1 package learnConcurrent; 2 3 import java.util.ArrayList; 4 import java.util.Collection; 5 import java.util.LinkedList; 6 import java.util.List; 7 import java.util.concurr

线程池的总结

线程池是经常用的,所以今天特地将其原理.好处.jdk的实现方式整理出来,以供以后复习之用. 问题:服务器应用程序中经常出现的情况是:单个任务处理的时间很短而请求的数目却是巨大的. 如果每个请求对应一个线程(thread-per-request)方法的不足之一是:为每个请求创建一个新线程的开销很大:为每个请求创建新线程的服务器在创建和销毁线程上花费的时间和消耗的系统资源要比花在处理实际的用户请求的时间和资源更多. 其二是:除了创建和销毁线程的开销之外,活动的线程也消耗系统资源.在一个 JVM 里创

简单易用的线程池实现

0 前言 最近在写MySQL冷备server的一个模块,稍微接触到一点线程池的东西,自己也就想尝试写一个简单的线程池练练手. 这个线程池在创建时,即按照最大的线程数生成线程. 然后作业任务通过add_task接口往线程池中加入需要运行的任务,再调用线程池的run函数开始运行所有任务,每个线程从任务队列中读取任务,处理完一个任务后再读取新的任务,直到最终任务队列为空. 1 线程池设计 简单描述如下(假设任务类名为CTasklet): 1.CThreadPool<CTasklet> thread_