8.6 条件变量(Condition Variables)——可利用临界区或SRWLock锁来实现
8.6.1 条件变量的使用
(1)条件变量机制就是为了简化 “生产者-消费者”问题而设计的一种线程同步机制。其目的让线程以原子方式释放锁并将自己阻塞,直到某一个条件成立为止。如读者线程当没有数据可读取时,则应释放锁并等待,直到写者线程产生了新的数据。同理,当写者把数据结构写满时,那么写者应该释放SRWLock并等待,直到读者把数据结构清空。
(2)等待函数:SleepConditionVariableCS/SleepConditionVariableSRW。其功能类似于等于事件对象:WaitForSingleObject(hEvent,INFINITE);
参数 |
描述 |
pConditionVariable |
己初始化的条件变量,调用线程正在等待该条件变量 |
pCriticalSection/pSRWLock |
指向临界区或SRWLock的指针,用来同步对共享资源的访问 |
dwMilliSeconds |
等待条件变量被触发(不是锁)的时间 0:只是用来测试,并立即返回。 INFINITE:无限等待条件变量(不是锁),直到它被触发 其它值:如果超时时,则不再等待,直接返回。返回值为FALSE |
Flags |
使用SRWLock才有这个参数,表示一旦条件变量被触发,将于何种方式得到锁,如共享或排它。对于写者线程传入0,即获得一个排它锁。对于读者线程传入CONDITION_VARIABLE_LOCKMODE_SHARDED,表示共享锁。 |
(3)唤醒线程:WakeConditionVariable或WakeAllConditionVariable(阻塞在Sleep*函数中的线程),其参数只有一个,就是条件变量。其功能类似于SetEvent将事件设为有信号状态。
①WakeConditionVariable唤醒一个在Sleep*函数中的线程以便让其获得锁。如消费者消费完数据,可以唤醒一个生产者来生产数据。因为生产者获得的是排他锁,所以一次只需唤醒一个。
②当调用WakeAllConditionVariable时,会唤醒Sleep*函数中的多个线程。如当生产者产生好数据后,可以唤醒所有的消费者,因为这些消费者可共享锁,所以可以同时唤醒。
③注意Wake*函数唤醒的是那些被Sleep*函数阻塞的线程,而当调用RleaseSRWLock*后,唤醒的是那些阻塞在AcquireSRWLock*函数中的线程,这两者是不同的。
(4)条件变量使用的一般模式
//消费者线程函数
DWORD WINAPI Consumer(PVOID pvParam) { AcquireSRWLockShared(&g_srwLock); //请求共享锁(读锁) ……//消费之前的一些操作。然后等待条件变量,如果不满足,会释放锁,并开始等待。 SleepConditionVariableSRW(&g_cvConsume,&g_srwLock,INFINITE, //等待消费者条件变量 CONDITION_VARIABLE_LOCKMODE_SHARED); //等待条件变量,会被生产者唤醒 //当条件满足时会重新获得锁(共享) …… //消费 ReleaseSRWLockShared(&g_srwLock); //释放锁 WakeConditionVariable(&g_cvProduce);//唤醒一个生产者线程,触发生产者条件变量。因为生产者使用排他锁,所以每次只需唤醒一个线程。 }
//生产者线程函数
DWORD WINAPI Producer(PVOID pvParam) { AcquireSRWLockExclusive(&g_srwLock); //请求排他锁(读锁) …… //生产数据之前的一些操作,然后等待条件变量,如果不满足,则释放锁,并开始等待。 //等待条件变量,会被消费者线程唤醒 SleepConditionVariableSRW(&g_cvProduce,&g_srwLock,INFINITE,0); //等待重新获得排他锁 …… //生产 ReleaseSRWLockExclusive(&g_srwLock); //释放排他锁 WakeAllConditionVariable(&g_cvProduce);//触发消费者条件变量,唤醒所有Sleep*函数中的消费者,因为消费者会通过共享锁共享数据,所以可同时唤醒。 }
【Queue程序】使用SRWLock和条件变量演示生产者和消费者问题
效果图
//队列的实现——队列头文件
#pragma once #include <windows.h> ////////////////////////////////////////////////////////////////////////// //此队列并不具有线程安全性,由客户线程和服务线程自行访问的同步问题 class CQueue{ public: //队列中元素的类型 //当某个客户线程请求每一次会将其线程ID和请求号记录在队列中 //本例假设中,请求号是这样给的:每个线程每次发送请求时,请求号是 //递增的。如线程1,第1次会发送1号请求,第2次发送2号请求,依此类推 struct ELEMENT{ int m_nThreadNum; //客户线程号 int m_nRequestNum;//请求号 //其他元素放这以后 }; typedef ELEMENT* PELEMENT; private: struct INNER_ELEMENT{ int m_nStamp; //0表示该元素是空元素。 //记录该元素加入队列时的顺序(即属于第几个加入队列中的元素了) ELEMENT m_element; }; typedef INNER_ELEMENT* PINNER_ELEMENT; private: PINNER_ELEMENT m_pElements; //队列元素数组,数组长度设计为固定,这就是需要保护的数据 int m_nMaxElements; //数组长度。CQueue构造时数组初始长度 int m_nCurrentStamp; //跟踪当前插入的元素,添加元素时递增,且只增不减 private: int GetFreeSlot(); //返回队列中第1个m_nStamp为0的元素 int GetNextSlot(int nThreadNum); //返回需要服务线程nThreadNum处理的最早的请求 //即属于服务线程要处理的请求中m_nStamp值最小的那项 public: CQueue(int nMaxElements); ~CQueue(); BOOL IsFull(); //队列是否己满 BOOL IsEmpty(int nThreadNum); //队列是否存在指定线程nTreadNum要处理的请求 void AddElement(ELEMENT e); BOOL GetNewElement(int nThreadNum, ELEMENT &e); //获取队列中需要线程nThreadNum处理的 //最早插入的元素 };
//队列的实现——队列实现的文件Queue.cpp
#include "Queue.h" // 构造函数 CQueue::CQueue(int nMaxElements) { //分配固定大小的(nMaxElements个元素)队列,并初始化各元素 m_pElements = (PINNER_ELEMENT)HeapAlloc(GetProcessHeap(), 0, sizeof(INNER_ELEMENT)* nMaxElements); ZeroMemory(m_pElements, sizeof(INNER_ELEMENT)*nMaxElements); //各元素初始化为0 //初始化当前元素个数的计数值 m_nCurrentStamp = 0; //保存队列元素的最大个数 m_nMaxElements = nMaxElements; } //析构函数 CQueue::~CQueue() { HeapFree(GetProcessHeap(), 0, m_pElements); } //判断队列是否己满 BOOL CQueue::IsFull() { return (-1 == GetFreeSlot()); } //队列是否存在指定线程nTreadNum要处理的请求 BOOL CQueue::IsEmpty(int nThreadNum) { return (GetNextSlot(nThreadNum) == -1); } //获得队列中第1个空的元素(即m_nStamp=0,说明该元素为空或该元素己被读取过了 //注意,己经读取过的也直接废弃,可以重新被新的请求覆盖使用) int CQueue::GetFreeSlot() { //查找第1个m_nStamp的元素 for (int idx = 0; idx < m_nMaxElements; idx++) { if (m_pElements[idx].m_nStamp == 0) return idx; } //如果找不到元素(即队列己满),返回-1 return (-1); } //返回需要服务线程nThreadNum处理的最早的请求 //即属于服务线程要处理的请求中m_nStamp最小的那项 int CQueue::GetNextSlot(int nThreadNum) { //默认没有需要服务线程nThreadNum处理的元素 int firstSlot = -1; //m_nCurrentStamp为最后添加元素的stamp,为最大的stamp //因要在队列中查找属于线程nThreadNum处理的,且其m_nStamp值最小 //因此根据查找最小值的方法, int minStamp = m_nCurrentStamp + 1; //线程nThreadNum为奇数时处理客户请求号为奇数的元素,为偶数时,处理偶数号请求 for (int idx = 0; idx < m_nMaxElements; idx++){ if ((m_pElements[idx].m_nStamp != 0) && //确保元素非空 ((m_pElements[idx].m_element.m_nRequestNum % 2) == nThreadNum) && //需要该线程处理的请求 (m_pElements[idx].m_nStamp < minStamp)){ //查找m_nStamp的最小值 minStamp = m_pElements[idx].m_nStamp; firstSlot = idx; } } return (firstSlot); } //获取队列中需要线程nThreadNum处理的最早插入的元素 BOOL CQueue::GetNewElement(int nThreadNum, ELEMENT &e) { int nNewSlot = GetNextSlot(nThreadNum); if (-1 == nNewSlot) return FALSE; //拷贝元素的内容 e = m_pElements[nNewSlot].m_element; //将该元素标访为己读 m_pElements[nNewSlot].m_nStamp = 0; return TRUE; } //增加请求(元素) void CQueue::AddElement(ELEMENT e) { //如果队列己满时,不能再增加,直接返回 int nFreeSlot = GetFreeSlot(); if (nFreeSlot == -1) return; //拷贝内容 m_pElements[nFreeSlot].m_element = e; //记录一下该元素加入队列的顺序(即,属于第几个加入进来的元素了) m_pElements[nFreeSlot].m_nStamp = ++m_nCurrentStamp; }
//主程序main.cpp
#include "../../CommonFiles/CmnHdr.h" #include <tchar.h> #include <strsafe.h> #include "resource.h" #include "Queue.h" ////////////////////////////////////////////////////////////////////////// CQueue g_q(10); //共享队列,共有10个元素 volatile LONG g_bShutDown; //是否结束所有读、写者线程 HWND g_hWnd; //供客户/服务线程使用 SRWLOCK g_srwLock; //读\写锁,用于保护共享队列 CONDITION_VARIABLE g_cvReadyToConsume; //写者发出信号,表示可以读取 CONDITION_VARIABLE g_cvReadyToProduce; //读者发出信号,表示可以写入 //所有读、写者线程的句柄(本例共6个线程:4个写者、2个写者) HANDLE g_hThreads[MAXIMUM_WAIT_OBJECTS]; //为了可扩展性,最多可以创建64个线程 //读、写者线程的总数量 int g_nNumThreads = 0; //每创建一个线程,该值加1 ////////////////////////////////////////////////////////////////////////// void AddText(HWND hWndLB, PCTSTR pszFormat, ...) { va_list argList; va_start(argList, pszFormat); TCHAR sz[20 * 1024]; _vstprintf_s(sz, _countof(sz), pszFormat, argList); ListBox_SetCurSel(hWndLB, ListBox_AddString(hWndLB, sz)); va_end(argList); } ////////////////////////////////////////////////////////////////////////// //读取队列中需要服务线程[nThreadNum]处理的最早的元素(每次读完后,最早的这个元素会被废弃),并显示在列表框中 //该函数的第2个参数在本例中没用,被我注释掉了 BOOL ConsumeElement(int nThreadNum, /*int nRequestNum,*/ HWND hWndLB) { // 以共享模式获得SRWLock,如果锁已经被写者线程以独占模式占用,函数会阻塞 // 如果锁已经被另一个读者线程以共享模式获得,运行对请求进行处理 AcquireSRWLockShared(&g_srwLock); // 如果队列不存在该读者线程(nThreadNum)处理的元素,则阻塞,等待一个写者产生新的元素 // 直到写者线程触发g_cvReadyToConsume条件变量为止 while (g_q.IsEmpty(nThreadNum) && !g_bShutDown){ //没有可读的元素 AddText(hWndLB, TEXT("服务线程[%d]没有可处理的数据!"), nThreadNum); //等待,直到写者线程将其唤醒(注意,写者线程只要写入数据,就会唤醒 //这些等待读取数据的线程,让他们自行判断这些数据是不是属于自己的那部分。 SleepConditionVariableSRW(&g_cvReadyToConsume, &g_srwLock, INFINITE, CONDITION_VARIABLE_LOCKMODE_SHARED); } //如果用户按了“结束”按钮 if (g_bShutDown){ //显示当前服务线程退出的信息 AddText(hWndLB, TEXT("服务线程[%d]退出!"), nThreadNum); //唤醒另一个可能被阻塞的写者线程(因为共享锁,读者线程不会阻塞) ReleaseSRWLockShared(&g_srwLock); //通知那些正在等待数据的其他读线程结束(即有部分读线程正在 //阻塞在上面的Sleep*函数中等待数据的到达 WakeConditionVariable(&g_cvReadyToConsume);//用来唤醒其他读线程! return FALSE; } CQueue::ELEMENT e; //获取队列中需要线程[nThreadNum]处理的最早插入的元素 g_q.GetNewElement(nThreadNum, e); //释放共享锁 ReleaseSRWLockShared(&g_srwLock); AddText(hWndLB, TEXT("服务线程[%d]处理了客户线程[%d]的元素%d"), nThreadNum,e.m_nThreadNum,e.m_nRequestNum); //读完一个元素后队列中会产生一个空项,唤醒写者线程 WakeConditionVariable(&g_cvReadyToProduce); return TRUE; } //读者线程函数(服务线程) DWORD WINAPI ReadThread(PVOID pParam) { int nThreadNum = PtrToUlong(pParam); HWND hWndLB = GetDlgItem(g_hWnd, IDC_SERVERS); //课本中nRequestNum这个变量不需要用到,该句改为如下的while语句 //for (int nRequestNum = 1; !g_bShutDown;nRequestNum++){ while (!g_bShutDown){ if (!ConsumeElement(nThreadNum, /*nRequestNum,*/ hWndLB)) return 0; Sleep(2500); //读取下一个元素时先休眠2.5秒 } //结束线程(用户按下了“结束”按钮) AddText(hWndLB, TEXT("服务线程[%d]退出!"), nThreadNum); return 0; } ////////////////////////////////////////////////////////////////////////// //写者线程函数(客户线程) DWORD WINAPI WriteThread(PVOID pParam) { int nThreadNum = PtrToUlong(pParam); //线程的索引号 HWND hWndLB = GetDlgItem(g_hWnd, IDC_CLIENTS); for (int nRequestNum = 1; !g_bShutDown;nRequestNum++){ //模拟写入数据,每次将自己的线程号及请求号保存到队列中去 CQueue::ELEMENT e = { nThreadNum, nRequestNum }; //以独占的方式获得SRWLock锁,如果调用线程获得锁后,其他任何申请锁的线程会被阻塞在 //AcquireSRWLock*函数中,直到锁被释放。当调用线程申请排他锁时,若锁已被其他线程获得 //则他们都释放锁以后才可申请到。 AcquireSRWLockExclusive(&g_srwLock); //获得排它锁后,判断队列是否己满,如果己满,则挂起自己,直到条件变量满足 // 注意在写入的过程中,可以会收到“结束”线程的命令 if (g_q.IsFull() & !g_bShutDown){ //队列己满 AddText(hWndLB, TEXT("线程[%d]检测到队列己满:不能增加元素%d"), nThreadNum,nRequestNum); //睡眠,等待读取者线程读取一个元素,以便腾出空间来容纳新元素 //这里调用线程会交给锁,并挂起自己,直到读者线程读出元素,最后一个参数 //为0,表示当被唤醒时,以排他方式再次获得共享锁 SleepConditionVariableSRW(&g_cvReadyToProduce, &g_srwLock, INFINITE, 0); } //如果用户按了结束按钮,则结束该线程 if (g_bShutDown){ //显示客户线程结束提示 AddText(hWndLB, TEXT("客户线程[%d]退出"), nThreadNum); //释放锁,唤醒那些阻塞在Acuqire*函数中的线程 ReleaseSRWLockExclusive(&g_srwLock); //唤醒那些被阻塞在Sleep*函数中的线程(这些线程已经获得锁了,当释放排他锁后被系统唤醒 //而不是被Wake*函数唤醒),以便让其自然结束。 WakeAllConditionVariable(&g_cvReadyToConsume); return 0; //结束自己 } else{ //将新元素加入到队列中 g_q.AddElement(e); //显示新加入的元素 AddText(hWndLB, TEXT("客户线程[%d]添加元素%d"), nThreadNum, nRequestNum); //释放锁 ReleaseSRWLockExclusive(&g_srwLock); //唤醒所有被阻塞的读者线程,让其自然结束 WakeAllConditionVariable(&g_cvReadyToConsume); //在新增下一个元素前,休眠1.5秒 Sleep(1500); } } //当前线程退出 AddText(hWndLB, TEXT("客户线程[%d]退出"), nThreadNum); return 0; } ////////////////////////////////////////////////////////////////////////// // void StopProcessing() { if (!g_bShutDown){ //请求所有的线程结束 InterlockedExchange(&g_bShutDown, TRUE); //通知所有正在等待条件变量的线程结束 WakeAllConditionVariable(&g_cvReadyToConsume); WakeAllConditionVariable(&g_cvReadyToProduce); //等待所有的线程 WaitForMultipleObjects(g_nNumThreads, g_hThreads, TRUE, INFINITE); //清除线程内核对象 while (g_nNumThreads--) CloseHandle(g_hThreads[g_nNumThreads]); //关闭列表框 AddText(GetDlgItem(g_hWnd, IDC_SERVERS), TEXT("-----------------------------------")); AddText(GetDlgItem(g_hWnd, IDC_CLIENTS), TEXT("-----------------------------------")); } } DWORD WINAPI StoppingThread(PVOID pParam) { StopProcessing(); return 0; } ////////////////////////////////////////////////////////////////////////// BOOL Dlg_OnInitDialog(HWND hwnd, HWND hwndFocus, LPARAM lParam) { chSETDLGICONS(hwnd, IDI_QUEUE); g_hWnd = hwnd; //用于让读、写者线程显示状态 //初始化SRWLock锁 InitializeSRWLock(&g_srwLock); //初始化条件变量 InitializeConditionVariable(&g_cvReadyToConsume); InitializeConditionVariable(&g_cvReadyToProduce); //将被设为TRUE来结束读、写者线程 g_bShutDown = FALSE; //创建4个写者线程(客户线程) DWORD dwThreadID; for (int x = 0; x < 4;x++){ g_hThreads[g_nNumThreads++] = chBEGINTHREADEX(NULL, 0, WriteThread, (PVOID)(INT_PTR)x,//将x作为线程索引号 0, //立即运行 &dwThreadID); } //创建2个读者线程(服务线程) for (int x = 0; x < 2; x++){ g_hThreads[g_nNumThreads++] = chBEGINTHREADEX(NULL, 0, ReadThread, (PVOID)(INT_PTR)x,//将x作为线程索引号 0, //立即运行 &dwThreadID); } return TRUE; } ////////////////////////////////////////////////////////////////////////// void Dlg_OnCommand(HWND hWnd, int id, HWND hWndCtrl, UINT codeNotify) { switch (id) { case IDCANCEL: EndDialog(hWnd, id); break; case IDC_BTN_STOP: //StopProcessing不能在UI线程中调用,因为这个函数 //里会的WaitForMultipleObjects会挂起UI线程,使得当子线程SendMessage //来清空列表框时会发生死锁,这里另开一个线程 DWORD dwThreadID; CloseHandle(chBEGINTHREADEX(NULL, 0, StoppingThread, NULL, 0, &dwThreadID)); //禁用“结束”按钮,防止被多次按下 Button_Enable(hWndCtrl, FALSE); break; } } ////////////////////////////////////////////////////////////////////////// INT_PTR WINAPI DlgProc(HWND hWnd, UINT uMsg, WPARAM wParam, LPARAM lParam) { switch (uMsg) { chHANDLE_DLGMSG(hWnd, WM_INITDIALOG, Dlg_OnInitDialog); chHANDLE_DLGMSG(hWnd, WM_COMMAND, Dlg_OnCommand); } return FALSE; } ////////////////////////////////////////////////////////////////////////// int WINAPI _tWinMain(HINSTANCE hInstExe, HINSTANCE, PTSTR, int) { DialogBox(hInstExe, MAKEINTRESOURCE(IDD_QUEUE), NULL, DlgProc); StopProcessing(); return 0; }
//resource.h
//{{NO_DEPENDENCIES}} // Microsoft Visual C++ 生成的包含文件。 // 供 8_Queue.rc 使用 // #define IDD_QUEUE 101 #define IDI_QUEUE 102 #define IDC_SERVERS 1001 #define IDC_CLIENTS 1002 #define IDC_BTN_STOP 1003 // Next default values for new objects // #ifdef APSTUDIO_INVOKED #ifndef APSTUDIO_READONLY_SYMBOLS #define _APS_NEXT_RESOURCE_VALUE 103 #define _APS_NEXT_COMMAND_VALUE 40001 #define _APS_NEXT_CONTROL_VALUE 1004 #define _APS_NEXT_SYMED_VALUE 101 #endif #endif
//资源文件Queue.rc
// Microsoft Visual C++ generated resource script. // #include "resource.h" #define APSTUDIO_READONLY_SYMBOLS ///////////////////////////////////////////////////////////////////////////// // // Generated from the TEXTINCLUDE 2 resource. // #include "winres.h" ///////////////////////////////////////////////////////////////////////////// #undef APSTUDIO_READONLY_SYMBOLS ///////////////////////////////////////////////////////////////////////////// // 中文(简体,中国) resources #if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_CHS) LANGUAGE LANG_CHINESE, SUBLANG_CHINESE_SIMPLIFIED #ifdef APSTUDIO_INVOKED ///////////////////////////////////////////////////////////////////////////// // // TEXTINCLUDE // 1 TEXTINCLUDE BEGIN "resource.h\0" END 2 TEXTINCLUDE BEGIN "#include ""winres.h""\r\n" "\0" END 3 TEXTINCLUDE BEGIN "\r\n" "\0" END #endif // APSTUDIO_INVOKED ///////////////////////////////////////////////////////////////////////////// // // Dialog // IDD_QUEUE DIALOGEX 0, 0, 393, 203 STYLE DS_SETFONT | DS_MODALFRAME | DS_FIXEDSYS | WS_POPUP | WS_CAPTION | WS_SYSMENU CAPTION "队列" FONT 8, "MS Shell Dlg", 400, 0, 0x1 BEGIN DEFPUSHBUTTON "停止",IDC_BTN_STOP,172,182,50,14 GROUPBOX "客户线程",IDC_STATIC,14,15,178,163 GROUPBOX "服务线程",IDC_STATIC,201,15,178,163 LISTBOX IDC_SERVERS,207,25,166,151,NOT LBS_NOTIFY | WS_VSCROLL | WS_TABSTOP LISTBOX IDC_CLIENTS, 19, 25, 166, 151, NOT LBS_NOTIFY | WS_VSCROLL | WS_TABSTOP END ///////////////////////////////////////////////////////////////////////////// // // DESIGNINFO // #ifdef APSTUDIO_INVOKED GUIDELINES DESIGNINFO BEGIN IDD_QUEUE, DIALOG BEGIN LEFTMARGIN, 7 RIGHTMARGIN, 386 TOPMARGIN, 7 BOTTOMMARGIN, 196 END END #endif // APSTUDIO_INVOKED ///////////////////////////////////////////////////////////////////////////// // // Icon // // Icon with lowest ID value placed first to ensure application icon // remains consistent on all systems. IDI_QUEUE ICON "Queue.ico" #endif // 中文(简体,中国) resources ///////////////////////////////////////////////////////////////////////////// #ifndef APSTUDIO_INVOKED ///////////////////////////////////////////////////////////////////////////// // // Generated from the TEXTINCLUDE 3 resource. // ///////////////////////////////////////////////////////////////////////////// #endif // not APSTUDIO_INVOKED