线程池(VC_Win32)

线程池

(本章节中例子都是用 VS2010 编译调试的)

线程池编写必须在 Windows Vista 操作系统(以及以上版本的操作系统)下,且
C++ 编译器版本至少是 VS2008

线程池的功能

  • 以异步的方式来调用一个函数
  • 每隔一段时间调用一个函数
  • 当内核对象触发的时候调用一个函数
  • 当异步 I/O 请求完成的时候调用一个函数

注意

当一个进程初始化的时候,它并没有任何与线程池的开销.但是,一旦调用了新的线程池函数,系统就会为进程相应的内核资源,其中的一些资源在进程终止之前都将一直存在.正如我们可以看到,使用线程池的开销取决于用法:系统会以进程的名义来分配线程,其他内核以及内部数据结构.因此我们不应该盲目地使用这些线程池函数,而是必须谨慎地考虑,这些函数能做什么,以及它们不能做什么.

在线程池编程中,我们从来不需要自己调用 CreateThread.系统会自动为我们的进程创建线程,并在规定的条件下让线程池中的线程调用我们的回调函数.此外,这个线程在处理完成一个客户请求后,它不会立刻被销毁,而是回到线程池,准备好处理队列中的任何其他工作项,线程池会不断地重复使用其中的线程,而不会频繁地创建销毁线程,对应用程序来说,这样可以显著地提升性能,因为创建和销毁线程会消耗大量的时间.当然,如果线程池检测到创建的另一个线程将能更好地为应用程序服务,那么它会这样做.如果线程池检测到它的线程数量已经供过于求,那么它会销毁其中一些线程.除非我们非常清楚自己在做什么,否则的话最好还是相信线程内部的算法,让它自动地对应用程序的工作量进行微调.

默认线程池,在进程存在期间它不会被销毁.生命周期与进程相同.在进程终止的时候,Windows 会将其销毁并负责所有的清理工作.

对线程池的制定

可以用 CreateThreadpool 来创建一个新的线程池,该函数返回一个
PTP_POOL 值,表示新创建的线程池.接着我们可以调用后面两个函数来设置线程池中线程的最大数量和最小数量 SetThreadpoolThreadMinimum / SetThreadpoolThreadMaximum 线程池始终保持池中的线程数量至少是指定的最小数量,并允许线程数量增长到指定的最大数量,顺便一提,默认线程池的最小数量为1,最大数量为500.然后在引用程序不在需要自定义线程池时,应调用 CloseThreadpool 将其销毁.在调用这个函数后,我们将无法在将任何新的项添加到线程池的队列中.线程池中当前正在处理的队列中的线程会完成它们的处理并终止.此外,线程池的队列中所有尚未开始处理的项将被取消.

一旦我们创建了自己的线程池,并制定了线程池的最小数量和最大数量,我们就可以初始化一个回调环境,它包含了一些课应用于工作项的额外的设置或配置.(线程池回调环境结构 _TP_CALLBACK_ENVIRON ,其定义在 WinNT.h )

/***************************************
显示此结果的编译环境为 VS2010
***************************************/
#if (_WIN32_WINNT >= _WIN32_WINNT_WIN7)

typedef struct _TP_CALLBACK_ENVIRON_V3 {
    TP_VERSION                         Version;
    PTP_POOL                           Pool;
    PTP_CLEANUP_GROUP                  CleanupGroup;
    PTP_CLEANUP_GROUP_CANCEL_CALLBACK  CleanupGroupCancelCallback;
    PVOID                              RaceDll;
    struct _ACTIVATION_CONTEXT        *ActivationContext;
    PTP_SIMPLE_CALLBACK                FinalizationCallback;
    union {
        DWORD                          Flags;
        struct {
            DWORD                      LongFunction :  1;
            DWORD                      Persistent   :  1;
            DWORD                      Private      : 30;
        } s;
    } u;
    TP_CALLBACK_PRIORITY               CallbackPriority;
    DWORD                              Size;
} TP_CALLBACK_ENVIRON_V3;

typedef TP_CALLBACK_ENVIRON_V3 TP_CALLBACK_ENVIRON, *PTP_CALLBACK_ENVIRON;

#else

typedef struct _TP_CALLBACK_ENVIRON_V1 {
    TP_VERSION                         Version;
    PTP_POOL                           Pool;
    PTP_CLEANUP_GROUP                  CleanupGroup;
    PTP_CLEANUP_GROUP_CANCEL_CALLBACK  CleanupGroupCancelCallback;
    PVOID                              RaceDll;
    struct _ACTIVATION_CONTEXT        *ActivationContext;
    PTP_SIMPLE_CALLBACK                FinalizationCallback;
    union {
        DWORD                          Flags;
        struct {
            DWORD                      LongFunction :  1;
            DWORD                      Persistent   :  1;
            DWORD                      Private      : 30;
        } s;
    } u;
} TP_CALLBACK_ENVIRON_V1;

typedef TP_CALLBACK_ENVIRON_V1 TP_CALLBACK_ENVIRON, *PTP_CALLBACK_ENVIRON;

#endif

然后我们可以调用 InitializeThreadpoolEnvironment 初始化这个结构体,接着当然回调环境必须调用 SetThreadpoolCallbackPool 标明给工作项应该由哪个线程池来处理.最后当我们不在需要这个使用线程池回调环境的时候,应该调用DestroyThreadpoolEnvironment 来对它进行清理工作.

    • 然而可以调用 SetThreadpoolCallbackRunsLong 函数来告诉回调环境,工作项通常需要较长的时间来处理.这使得线程池会更快地创建线程,其目的是为了尝试在对工作项进行处理的时候,以一种更为公平的方式来替代最有效的方式.
    • 也可以调用 SetThreadpoolCallbackLibrary 来确保只要线程池中还有待处理的工作项,就将一个特定的
      DLL 一直保持在进程空间中.基本上 SetThreadpoolCallbackLibrary 函数的存在目的是为了消除潜在的竞态条件(race
      condition),从而避免可能导致死锁.这个相当高级的特性,更详细信息参阅 Platform SDK 文档.

线程池的销毁(清理组)

为了得体地销毁私有线程池,我们首先可以需要通过调用 CreateThreadpoolCleanupGroup 来创建一个清理组,然后再将这个清理组与一个以绑定到线程池的回调函数结构体
TP_CALLBACK_ENVIRON 调用 SetThreadpoolCallbackCleanupGroup 函数把两者关联起来.其中 SetThreadpoolCallbackCleanupGroup 的
pfng 参数标识一个回调函数的地址(函数原型 即CleanupGroupCancelCallback),如果传给
pfng 参数值不为 NULL ,且当清理组被取消时那么这个回调函数会被调用.

当我们调用 CreateThreadpoolWorkCreateThreadpoolTimerCreateThreadpoolWait 或 CreateThreadpoolIo 的时候,如果最后那个参数,即指向
PTP_CALLBACK_ENVIRON 结构体指针,不等于 NULL,那么所创建的项会被添加到对应的回调环境的清理组中,其目的是为了表示有线程池中添加了一项,需要潜在清理.在这些对了项完成后,如果我们调用 CloseThreadpoolWork,CloseThreadpoolTimerCloseThreadpoolWait 和 CloseThreadpoolIo,
那就等于是隐式将对应的项从组中移除.

最后,在程序想要销毁线程池的时候,调用 CloseThreadpoolCleanupGroupMembers.这个函数与下面的
WaitForThreadpool*(例: WaitForThreadpoolWork)函数相似.当线程调用 CloseThreadpoolCleanupGroupMembers 时候,函数会一直等待,知道线程池的工作组中所有剩余的项(即已经创建当尚未关闭的项)都已经处理完毕为止.调用者还可以传
TRUE 给 fCancelPendingCallbacks  参数.这样会将所有已提交但尚未处理的工作项直接取消,函数会在所有当前正在运行的工作项王城之后返回.如果传给 fCancelPendingCallbacks 参数为 TRUE,而且传给 SetThreadpoolCallbackCleanupGroup 的
pfng 参数值是一个 CleanupGroupCancelCallback 函数地址,那么对每一个被取消的工作项,我们的回调函数会被调用,在 CleanupGroupCancelCallback函数中.参数
ObjectContext 会包含每个被取消的上下文.(该上下文信息是通过 CreateThreadpool* 函数的 pv 参数设置的)在 CleanupGroupCancelCallback 函数中,
参数 CleanupContext 包含的上下文是通过 CloseThreadpoolCleanupGroupMembers 函数的 pvCleanupContext
参数传入的.如果在调用 CloseThreadpoolCleanupGroupMembers 时传入
FALSE 给 fCancelPendingCallbacks 参数.那么在返回之前,线程池会发时间来处理队列中所有剩余的项.注意这种情况下我们的 CleanupGroupCancelCallback 函数绝对不会被调用.因此可以传
NULL 给 pvCleanupContext 参数.

线程池制定与销毁代码样例

编写步骤

程序源码

#include<Windows.h>
#include<iostream>
#include<cstdlib>

using namespace std;

VOID CALLBACK SimpleCallback(PTP_CALLBACK_INSTANCE Instance,PVOID Context);

void main()
{
    PTP_POOL tPool;
    TP_CALLBACK_ENVIRON pcbe;

    //创建线程池
    tPool = CreateThreadpool(NULL);

    //设置线程池最大最小的线程数量
    SetThreadpoolThreadMinimum(tPool,1);
    SetThreadpoolThreadMaximum(tPool,2);

    //初始化线程池环境变量
    InitializeThreadpoolEnvironment(&pcbe);

    //为线程池设置线程池环境变量
    SetThreadpoolCallbackPool(&pcbe,tPool);

    //单次工作提交
    TrySubmitThreadpoolCallback(SimpleCallback,NULL,&pcbe);

    system("pause");
    //清理线程池的环境变量
    DestroyThreadpoolEnvironment(&pcbe);
    //关闭线程池
    CloseThreadpool(tPool);
}

VOID CALLBACK SimpleCallback(PTP_CALLBACK_INSTANCE Instance,PVOID Context)
{
    cout<<"this is SimpleCallback function!"<<endl;
}

运行结果

以异步的方式调用函数

相关函数

  • TrySubmitThreadpoolCallback (向线程池提交工作请求函数<一次性>,回调函数原型)
  • CreateThreadpoolWork (为线程池创建一个提交工作的工作对象,回调函数原型)

    绝对不要让回调函数调用 WaitForThreadpoolWork 并将自己的工作项作为参数传入,因为这样会导致死锁.

  • SubmitThreadpoolWork (想线程池提交工作请求函数<非一次性>)
  • WaitForThreadpoolWorkCallbacks (取消已提交但未执行的工作项
    / 等待工作项处理完成把自己挂起)

    其中的 fCancelPendingCallbacks 参数

      若为 true ,函数会试图取消先前提交的那个工作项.如果线程池中的线程正在处理那个工作项,那么该过程不会被打断,函数会一直等到该工作项已经被处理完成后在返回.

      若为 false ,那么函数会将调用线程挂起,知道指定工作项的处理已经完成而且线程池中处理该工作项的线程也已经被回收并准备处理下一个工作项.

    如果用一个 PTP_WORK 对象已经提交了很多工作项,而且传给 fCancelPendingCallbacks 参数为 false,那么 WaitForThreadpoolWorkCallbacks 会等待线程池处理完所有提交的工作项.如果传给 fCancelPendingCallbacks 为 true,那么 WaitForThreadpoolWorkCallbacks 只会等到当前正在运行的工作项完成为止.

  • CloseThreadpoolWork (取消可以多次提交工作的工作对象)

编写步骤

代码样例

程序源码

按 Ctrl+C 复制代码

<textarea style="margin:0px; padding-top:0px; padding-left:0px; width:746.3999999761581px; height:600px; font-family:'Courier New'; font-size:12px; line-height:1.5"></textarea>

按 Ctrl+C 复制代码

运行结果

以时间段来调用函数

相关函数

编写步骤

代码样例

程序源码

#include<Windows.h>
#include<iostream>
#include<cstdlib>

using namespace std;

VOID CALLBACK TimerCallback(PTP_CALLBACK_INSTANCE Instance,PVOID Context,PTP_TIMER Timer);

void main()
{
    PTP_TIMER tpTimer;
    FILETIME liDueTime;
    LARGE_INTEGER liUTC;

    liUTC.QuadPart = -30000000;

    liDueTime.dwLowDateTime = liUTC.LowPart;
    liDueTime.dwHighDateTime = liUTC.HighPart;

    //创建定时调用的工作对象
    tpTimer = CreateThreadpoolTimer(TimerCallback,NULL,NULL);

    //判断定时调用的工作对象是否注册过计时器
    if(!IsThreadpoolTimerSet(tpTimer))
    {
        //为定时调用工作对象注册计时器
        SetThreadpoolTimer(tpTimer,&liDueTime,0,0);
    }

    //睡眠主进程,等待计时器添加工作
    Sleep(4000);

    //等待定时调用的工作对象
    WaitForThreadpoolTimerCallbacks(tpTimer,false);

    //关闭定时调用的工作对象
    CloseThreadpoolTimer(tpTimer);

    system("pause");
}

VOID CALLBACK TimerCallback(PTP_CALLBACK_INSTANCE Instance,PVOID Context,PTP_TIMER Timer)
{
    cout<<"this is TimerCallback function!"<<endl;
}

运行结果

以内核对象的触发状态来调用函数

运行原理

其实,在此功能中线程池在内部会让一个线程调用 WaitForMultipleObjects 函数,传入通过 SetThreadpoolWait 函数注册一组句柄,并传入
false 给 bWaitAll 参数,这样当任何一个句柄被触发的时候,线程池就会被唤醒.由于 WaitForMultipleObjects 有一个限制,一次嘴甜只能等待
64(MAXMUM_WAIT_OBJECTS)个句柄,因此线程池事实上正是为每 64 个内核对象分配一个线程来进行等待,其效率还是相当高的.另外,由于 WaitForMultipleObjects 不允许我们将同一个句柄传入多次,因此我们必须确保不会用 SetThreadpoolWait 来多次注册同一个句柄.但是,我们可以调用 DuplicateHandle,
这样就可以分别注册原始句柄和复制句柄.

注意

绝对不要让回调函数调用 WaitForThreadpoolWait 并将自己的工作项作为参数传入,因为这样会导致死锁.另外,当线程在等待传给 SetThreadpoolWait 的句柄时,我们应该确保该句柄不会被关闭.最后我们可能并不想通过
PulseEvent 来触发一个已注册的事件,因为当 PulseEvent 被调用的时候,我们无法保证线程池正好在等待该事件.

相关函数

  • CreateThreadpoolWait (为线程池创建一个等待内核对象触发的工作对象,回调函数原型)
  • SetThreadpoolWait (将内核绑定到等待内核对象触发的工作对象上)

    如果想让回调函数在同一个内核对象被触发的时候再次被调用,那么需要调用 SetThreadpoolWait 来再次注册.也可以通过 SetThreadpoolWait 来重用该等待项,即可以传入一个不同的内核对象句柄,也可以传入 NULL 来将该等待项从线程池中移除

  • WaitForThreadpoolWaitCallbacks (取消已提交但未执行的等待内核对象触发的工作项
    / 等待现有等待内核对象触发的工作项处理完成把自己挂起)
  • CloseThreadpoolWait (取消等待内核对象触发的工作对象)

编写步骤

代码样例

程序源码

#include<Windows.h>
#include<iostream>
#include<cstdlib>

using namespace std;

VOID CALLBACK WaitCallback(PTP_CALLBACK_INSTANCE Instance,PVOID Context,PTP_WAIT Wait,TP_WAIT_RESULT WaitResult);

void main()
{
    PTP_WAIT tpWait;
    HANDLE hMutex;

    //创建等待内核对象触发的工作对象
    tpWait = CreateThreadpoolWait(WaitCallback,NULL,NULL);
    //创建互斥对象
    hMutex=CreateMutex(NULL,false,0);

    //等待互斥对象被释放后获得互斥对象拥有权
    WaitForSingleObject(hMutex,INFINITE);

    //设置等待内核事件触发调用回调函数
    SetThreadpoolWait(tpWait,hMutex,0);

    //释放互斥对象拥有权
    ReleaseMutex(hMutex);

    //等待等待内核对象触发的工作对象结束
    WaitForThreadpoolWaitCallbacks(tpWait,false);
    //关闭等待内核对象触发的工作对象
    CloseThreadpoolWait(tpWait);

    system("pause");
}

VOID CALLBACK WaitCallback(PTP_CALLBACK_INSTANCE Instance,PVOID Context,PTP_WAIT Wait,TP_WAIT_RESULT WaitResult)
{
    cout<<"this is WaitCallback function!"<<endl;
}

运行结果

以异步 I/O 请求完成时调用函数

相关函数

  • CreateThreadpoolIo (为线程池创建一个异步
    I/O 请求完成的工作对象,回调函数原型)
  • StartThreadpoolIo (将嵌入在
    I/O 项中的文件/设备与线程池内部的 I/O 完成端口关联起来)

    在每次调用 ReadFile 和 WriteFile 之前,我们必须调用 StartThreadpoolIo.如果每次在发出 I/O 请求之前没有调用 StartThreadpoolIo,那么 IoCompletionCallback 回调函数将不会被回调

  • CancelThreadpoolIo (在发出
    I/O 请求之后让线程池停止调用回调函数)
  • WaitForThreadpoolIoCallbacks (取消已提交但未执行的异步
    I/O 请求完成的工作项 / 等待异步 I/O 请求完成的工作项处理完成把自己挂起)
  • CloseThreadpoolIo (取消异步
    I/O 请求完成的工作对象)

代码样例

程序源码

运行结果

对回调函数的操作

回调函数的终止操作

线程池提供了一种便利的方法,用来描述在我们的回调函数返回之后,应该执行的一些操作,回调函数用传给它的不透明的 Instance 参数(其类型为 PTP_CALLBACK_INSTANCE)来调用下面的函数

另外两个函数

  • CallbackMayRunLong

    此函数用来通知线程池回调函数的运行时间会比较长.如果一个回调函数认为自己需要较长的时间来处理当前的项,那么它应该调用 CallbackMayRunLong.由于线程池会坚持不创建新线程,因为长时间运行的项可能会使线程池队列中的其他项挨饿.如果 CallbackMayRunLong 返回 TRUE,那么说明线程池中还有其他线程可供使用,来对队列中的项进行处理.如果
    CallbackMayRunLong 返回 false,那么说明线程池中没有其他线程可以用来处理队列中的项.为了维持线程池的运行效率,最好是让该项将它的任务划分成更小的步伐来处理(将每一个部分单独地添加到线程池的队列中).任务的第一部分可以当前线程中执行.

  • DisassociateCurrentThreadFromCallback

    回调函数调用它来告诉线程池,逻辑上自己已经完成了工作,这使得任何由于调用 WaitForThreadpoolWorkCallbacks, WaitForThreadpoolTimerCallbacks, WaitForThreadpoolWaitCallbacks 或 WaitForThreadpoolIoCallbacks
    而被阻塞的线程能够早一些返回,而不必等到线程池的线程从回调函数中返回

时间: 2024-10-07 21:33:56

线程池(VC_Win32)的相关文章

Java四种线程池newCachedThreadPool,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor

介绍new Thread的弊端及Java四种线程池的使用,对Android同样适用.本文是基础篇,后面会分享下线程池一些高级功能. 1.new Thread的弊端 执行一个异步任务你还只是如下new Thread吗? Java new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } }).start(); 1 2 3 4 5 6 7 new Thread(new

线程的控制和线程池

一.WaitHandle: ”.Net 中提供了一些线程间更自由通讯的工具,他们提供了通过"信号"进行通讯的机制 可以通过ManualResetEvent,AutoResetEvent(他是在开门并且一个 WaitOne 通过后自动关门)来进行线程间的通讯 waitOne:    等待开门 Set:           开门 Reset:       关门 static void Main(string[] args) { ManualResetEvent mre = new Manu

内存池、进程池、线程池

首先介绍一个概念"池化技术 ".池化技术 一言以蔽之就是:提前保存大量的资源,以备不时之需以及重复使用. 池化技术应用广泛,如内存池,线程池,连接池等等.内存池相关的内容,建议看看Apache.Nginx等开源web服务器的内存池实现. 起因:由于在实际应用当中,分配内存.创建进程.线程都会设计到一些系统调用,系统调用需要导致程序从用户态切换到内核态,是非常耗时的操作.           因此,当程序中需要频繁的进行内存申请释放,进程.线程创建销毁等操作时,通常会使用内存池.进程池.

缓冲池,线程池,连接池

SSH:[email protected]:unbelievableme/object-pool.git   HTTPS:https://github.com/unbelievableme/object-pool.git 缓冲池 设计要点:包含三个队列:空缓冲队列(emq),装满输入数据的输入的队列(inq),装满输出数据的输出队列(outq),输入程序包括收容输入(hin),提取输入(sin),输出程序包括收容输出(hout)和提取输出(sout). 注意点:输入程序和输出程序会对缓冲区并发访

记5.28大促压测的性能优化&mdash;线程池相关问题

目录: 1.环境介绍 2.症状 3.诊断 4.结论 5.解决 6.对比java实现 废话就不多说了,本文分享下博主在5.28大促压测期间解决的一个性能问题,觉得这个还是比较有意思的,值得总结拿出来分享下. 博主所服务的部门是作为公共业务平台,公共业务平台支持上层所有业务系统(2C.UGC.直播等).平台中核心之一的就是订单域相关服务,下单服务.查单服务.支付回调服务,当然结算页暂时还是我们负责,结算页负责承上启下进行下单.结算.跳支付中心.每次业务方进行大促期间平台都要进行一次常规压测,做到心里

线程池的创建

package com.newer.cn; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Test1 { public static void main(String[] args) { // 创建线程池的方式 // 创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程,在需要时使用提供的 ThreadFactory 创建新线程. E

Java底层技术系列文章-线程池框架

一.线程池结构图    二.示例 定义线程接口 public class MyThread extends Thread { @Override publicvoid run() { System.out.println(Thread.currentThread().getName() + "正在执行"); }}   1:newSingleThreadExecutor ExecutorService pool = Executors. newSingleThreadExecutor()

线程池中的线程的排序问题

1 package org.zln.thread.poolqueue; 2 3 import org.slf4j.Logger; 4 import org.slf4j.LoggerFactory; 5 6 import java.util.Comparator; 7 import java.util.UUID; 8 import java.util.concurrent.*; 9 10 /** 11 * 线程池中的线程的排序问题 12 * Created by sherry on 16/11/4

多线程篇七:通过Callable和Future获取线程池中单个务完成后的结果

使用场景:如果需要拿到线程的结果,或者在线程完成后做其他操作,可以使用Callable 和 Futrue 1.定义一个线程池,向线程池中提交单个callable任务 ExecutorService threadPools=Executors.newSingleThreadExecutor(); Future<String> future=threadPools.submit(new Callable<String>() { @Override public String call(