Condition Variables

Condition variables are synchronization primitives that enable threads to wait until a particular condition occurs.

Condition variables are user-mode objects that cannot be shared across processes.

Condition variables enable threads to atomically release a lock and enter the sleeping state.

They can be used with critical sections or slim reader/writer (SRW) locks.

Condition variables support operations that "wake one" or "wake all" waiting threads.

After a thread is woken, it re-acquires the lock it released when the thread entered the sleeping state.

条件变量是能够在特殊条件满足前使线程处于等待状态的同步原语.

条件变量是不能被跨进程共享的用户模式下的同步对象.

条件变量能够使线程原子性释放一个锁同一时候进入sleep 状态. 条件变量和Critical Section Object一起使用.

条件变量支持wake one 或者wake all 等待的线程.

Windows Server 2003 and Windows XP:  Condition variables are not supported.

It is often convenient to use more than one condition variable with the same lock.

For example, an implementation of a reader/writer lock might use a single critical section but separate condition variables for readers and writers.

在同一个锁上面使用条件变量是很实用的.

比方:对于同一个临界区通过把读操作和写操作通过条件变量来分离能够实现读锁,写锁以及读写锁.

The following code implements a producer/consumer queue.

The queue is represented as a bounded circular buffer, and is protected by a critical section.

The code uses two condition variables: one used by producers (BufferNotFull) and one used by consumers (BufferNotEmpty).

The code calls the InitializeConditionVariable function to create the condition variables.

The consumer threads call the SleepConditionVariableCS function to wait for items to be added to the queue and

the WakeConditionVariable function to signal the producer that it is ready for more items.

The producer threads call SleepConditionVariableCS to wait for the consumer to remove items from the queue and

WakeConditionVariable to signal the consumer that there are more items in the queue.

实现一个生产者/消费者队列.

队列是一个被Critical Section Object 保护的有界限圆形BUFFER.

通过调用InitializeConditionVariable()函数去创建一个条件变量.

消费者调用SleepConditionVariableCS()函数等待有物品被增加到队列中,通过WakeConditionVariable()函数通知生产者生产很多其它的物品.

生产者调用SleepConditionVariableCS()函数等待消费者把物品从队列中移除,通过WakeConditionVariable()函数来通知消费者去消费很多其它的物品.

測试代码:UsingConditionVariables.cpp

#include <windows.h>
#include <stdlib.h>
#include <stdio.h>

#define BUFFER_SIZE 10
#define PRODUCER_SLEEP_TIME_MS 500
#define CONSUMER_SLEEP_TIME_MS 2000

LONG Buffer[BUFFER_SIZE];
LONG LastItemProduced;
ULONG QueueSize;
ULONG QueueStartOffset;

ULONG TotalItemsProduced;
ULONG TotalItemsConsumed;

CONDITION_VARIABLE BufferNotEmpty;
CONDITION_VARIABLE BufferNotFull;
CRITICAL_SECTION   BufferLock;

BOOL StopRequested;

DWORD WINAPI ProducerThreadProc (PVOID p)
{
    ULONG ProducerId = (ULONG)(ULONG_PTR)p;

    while (true)
    {
        // Produce a new item.

        Sleep (rand() % PRODUCER_SLEEP_TIME_MS);
		//原子锁
        ULONG Item = InterlockedIncrement (&LastItemProduced);
		//进入临界区,其它线程不能訪问下面被保护的资源
        EnterCriticalSection (&BufferLock);
		//仅仅有当有界缓冲区满了之后,才通知消费者来消费资源,否则就一直生产物品
        while (QueueSize == BUFFER_SIZE && StopRequested == FALSE)
        {
            // Buffer is full - sleep so consumers can get items.
            SleepConditionVariableCS (&BufferNotFull, &BufferLock, INFINITE);
        }

        if (StopRequested == TRUE)
        {
            LeaveCriticalSection (&BufferLock);
            break;
        }

        // Insert the item at the end of the queue and increment size.

        Buffer[(QueueStartOffset + QueueSize) % BUFFER_SIZE] = Item;
        QueueSize++;
        TotalItemsProduced++;

        printf ("Producer %u: item %2d, queue size %2u\r\n", ProducerId, Item, QueueSize);
		//离开临界区,其它线程可訪问该临界区
        LeaveCriticalSection (&BufferLock);

        // If a consumer is waiting, wake it.

        WakeConditionVariable (&BufferNotEmpty);
    }

    printf ("Producer %u exiting\r\n", ProducerId);
    return 0;
}
//消费者线程
DWORD WINAPI ConsumerThreadProc (PVOID p)
{
	//消费数量
    ULONG ConsumerId = (ULONG)(ULONG_PTR)p;

    while (true)
    {   //临界区,当一个线程在获取临界区权利时,其它线程都要等待.
        EnterCriticalSection (&BufferLock);
		//当前缓存区为零
        while (QueueSize == 0 && StopRequested == FALSE)
        {
            // Buffer is empty - sleep so producers can create items.
			//通知生产者进行生产物品.当生产者完毕生产后,则通知消费者来消费
            SleepConditionVariableCS (&BufferNotEmpty, &BufferLock, INFINITE);
        }

        if (StopRequested == TRUE && QueueSize == 0)
        {
            LeaveCriticalSection (&BufferLock);
            break;
        }

        // Consume the first available item.

        LONG Item = Buffer[QueueStartOffset];

        QueueSize--;
        QueueStartOffset++;
        TotalItemsConsumed++;

        if (QueueStartOffset == BUFFER_SIZE)
        {
            QueueStartOffset = 0;
        }

        printf ("Consumer %u: item %2d, queue size %2u\r\n",
            ConsumerId, Item, QueueSize);

        LeaveCriticalSection (&BufferLock);

        // If a producer is waiting, wake it.

        WakeConditionVariable (&BufferNotFull);

        // Simulate processing of the item.

        Sleep (rand() % CONSUMER_SLEEP_TIME_MS);
    }

    printf ("Consumer %u exiting\r\n", ConsumerId);
    return 0;
}

int main ( void )
{

    InitializeConditionVariable (&BufferNotEmpty);
    InitializeConditionVariable (&BufferNotFull);

    InitializeCriticalSection (&BufferLock);

    DWORD id;
    HANDLE hProducer1 = CreateThread (NULL, 0, ProducerThreadProc, (PVOID)1, 0, &id);
    HANDLE hConsumer1 = CreateThread (NULL, 0, ConsumerThreadProc, (PVOID)1, 0, &id);
    HANDLE hConsumer2 = CreateThread (NULL, 0, ConsumerThreadProc, (PVOID)2, 0, &id);

    puts ("Press enter to stop...");
    getchar();

    EnterCriticalSection (&BufferLock);
    StopRequested = TRUE;
    LeaveCriticalSection (&BufferLock);

    WakeAllConditionVariable (&BufferNotFull);
    WakeAllConditionVariable (&BufferNotEmpty);

    WaitForSingleObject (hProducer1, INFINITE);
    WaitForSingleObject (hConsumer1, INFINITE);
    WaitForSingleObject (hConsumer2, INFINITE);

    printf ("TotalItemsProduced: %u, TotalItemsConsumed: %u\r\n",
        TotalItemsProduced, TotalItemsConsumed);
    return 0;
}
时间: 2024-08-27 18:06:32

Condition Variables的相关文章

使用Condition Variables 实现一个线程安全队列

使用Condition Variables实现一个线程安全队列 多线程代码需要面对的一个问题和是如何把数据从一个县城传到另一个县城. 举个栗子,一个常见的是把串行算法并行化方法是,把他们分成块并且做成一个管道.管道中任意一块都可以单独在一个线程里运行.每个阶段完成后添加数据到输入队列给下个阶段. Basic Thread Safety with a Mutex 使用mutex实现简单的线程安全 最简单的办法是封装一个非线程安全的队列,使用mutex保护它(实例使用boost中的方法和类型,需要1

并行编程之条件变量(posix condition variables)

在整理Java LockSupport.park()的东东,看到了个"Spurious wakeup",重新梳理下. 首先来个<UNIX环境高级编程>里的例子: [cpp] view plaincopy #include <pthread.h> struct msg { struct msg *m_next; /* ... more stuff here ... */ }; struct msg *workq; pthread_cond_t qready = P

深入解析条件变量(condition variables)

深入解析条件变量 什么是条件变量(condition variables) 引用APUE中的一句话: Condition variables are another synchronization mechanism available to threads. These synchronization objects provide a place for threads to rendezvous. When used with mutexes, condition variables al

4.锁--并行编程之条件变量(posix condition variables)

在整理Java LockSupport.park()的东东.看到了个"Spurious wakeup".又一次梳理下. 首先来个<UNIX环境高级编程>里的样例: [cpp] view plaincopy #include <pthread.h> struct msg { struct msg *m_next; /* ... more stuff here ... */ }; struct msg *workq; pthread_cond_t qready = 

c++11多线程记录6:条件变量(condition variables)

https://www.youtube.com/watch?v=13dFggo4t_I视频地址 实例1 考虑这样一个场景:存在一个全局队列deque,线程A向deque中推入数据(写),线程B从deque中取出数据(读). deque这个资源对象就需要用mutex做访问控制,代码如下: std::deque<int> q; std::mutex mu; void func1() { int ct = 10; while (ct > 0) { std::unique_lock<std

第8章 用户模式下的线程同步(4)_条件变量(Condition Variable)

8.6 条件变量(Condition Variables)——可利用临界区或SRWLock锁来实现 8.6.1 条件变量的使用 (1)条件变量机制就是为了简化 “生产者-消费者”问题而设计的一种线程同步机制.其目的让线程以原子方式释放锁并将自己阻塞,直到某一个条件成立为止.如读者线程当没有数据可读取时,则应释放锁并等待,直到写者线程产生了新的数据.同理,当写者把数据结构写满时,那么写者应该释放SRWLock并等待,直到读者把数据结构清空. (2)等待函数:SleepConditionVariab

android分析之Condition

Condition的含义是条件变量,其实现依赖于系统,一般都要配合Mutex使用,使用步骤为:给mutex上锁(Lock),调用wait等待“条件”发生,如果没有发生则re-wait(),最后释放mutex(unlock),并继续执行.所有等待(wait)同一个“条件变量(condition)”的线程都要使用相同的一把锁——这样相当于互斥操作该Condition. // ---------------------------------------------------------------

threading模块

threading — Higher-level threading interface¶ Source code: Lib/threading.py This module constructs higher-level threading interfaces on top of the  lower level thread module. See also the mutex and Queue modules. The dummy_threading module is provide

java多线程基本概述(九)——ThreadLocal

public interface Lock Lock implementations provide more extensive locking operations than can be obtained using synchronized methods and statements. //Lock对象提供比同步方法或者同步块更多的灵活性和拓展性,They allow more flexible structuring, may have quite different propert