并行编程之条件变量(posix condition variables)

在整理Java LockSupport.park()的东东,看到了个"Spurious wakeup",重新梳理下。

首先来个《UNIX环境高级编程》里的例子:

[cpp] view
plain
copy

  1. #include <pthread.h>
  2. struct msg {
  3. struct msg *m_next;
  4. /* ... more stuff here ... */
  5. };
  6. struct msg *workq;
  7. pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
  8. pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;
  9. void process_msg(void) {
  10. struct msg *mp;
  11. for (;;) {
  12. pthread_mutex_lock(&qlock);
  13. while (workq == NULL)
  14. pthread_cond_wait(&qready, &qlock);
  15. mp = workq;
  16. workq = mp->m_next;
  17. pthread_mutex_unlock(&qlock);
  18. /* now process the message mp */
  19. }
  20. }
  21. void enqueue_msg(struct msg *mp) {
  22. pthread_mutex_lock(&qlock);
  23. mp->m_next = workq;
  24. workq = mp;
  25. pthread_mutex_unlock(&qlock);
  26. pthread_cond_signal(&qready);
  27. }

一个简单的消息生产者和消费者的代码。它们之间用condition同步。

这个代码最容易让人搞混的是process_msg函数里的pthread_mutex_lock 和 pthread_mutex_unlock 是一对函数调用,前面加锁,后面解锁。的确,是加锁解锁,但是它们两不是一对的。它们的另一半在pthread_cond_wait函数里。

pthread_cond_wait函数可以认为它做了三件事:

  • 把自身线程放到condition的等待队列里,把mutex解锁;
  • 等待被唤醒(当其它线程调用pthread_cond_signal或者pthread_cond_broadcast时);
  • 被唤醒之后,对metex加锁,再返回。

mutex和condition实际上是绑定在一起的,一个condition只能对应一个mutex。在Java的代码里,Condition对象只能通过lock.newCondition()的函数来获取。

Spurious wakeup

所谓的spurious wakeup,指的是一个线程调用pthread_cond_signal(),却有可能不止一个线程被唤醒。为什么会出现这种情况?wiki和其它的一些文档都只是说在多核的情况下,简化实现允许出现这种spurious wakeup。。

在man文档里给出了一个可能的实现,然后解析为什么会出现。

假定有三个线程,线程A正在执行pthread_cond_wait,线程B正在执行pthread_cond_signal,线程C正准备执行pthread_cond_wait函数。

[cpp] view
plain
copy

  1. pthread_cond_wait(mutex, cond):
  2. value = cond->value; /* 1 */
  3. pthread_mutex_unlock(mutex); /* 2 */
  4. pthread_mutex_lock(cond->mutex); /* 10 */
  5. if (value == cond->value) { /* 11 */
  6. me->next_cond = cond->waiter;
  7. cond->waiter = me;
  8. pthread_mutex_unlock(cond->mutex);
  9. unable_to_run(me);
  10. } else
  11. pthread_mutex_unlock(cond->mutex); /* 12 */
  12. pthread_mutex_lock(mutex); /* 13 */
  13. pthread_cond_signal(cond):
  14. pthread_mutex_lock(cond->mutex); /* 3 */
  15. cond->value++; /* 4 */
  16. if (cond->waiter) { /* 5 */
  17. sleeper = cond->waiter; /* 6 */
  18. cond->waiter = sleeper->next_cond; /* 7 */
  19. able_to_run(sleeper); /* 8 */
  20. }
  21. pthread_mutex_unlock(cond->mutex); /* 9 */

线程A执行了第1,2步,这时它释放了mutex,然后线程B拿到了这个mutext,并且pthread_cond_signal函数时执行并返回了。于是线程B就是一个所谓的“spurious wakeup”。

为什么pthread_cond_wait函数里一进入,就释放了mutex?没有找到什么解析。。

查看了glibc的源代码,大概可以看出上面的一些影子,但是太复杂了,也没有搞明白为什么。。

/build/buildd/eglibc-2.19/nptl/pthread_cond_wait.c

/build/buildd/eglibc-2.19/nptl/pthread_cond_signal.c

不过从上面的解析,可以发现《UNIX高级编程》里的说明是错误的(可能是因为太久了)。

The  caller passes it locked to the function, which then atomically places the calling thread on the list of threads waiting for the condition and unlocks the mutex.

上面的伪代码,一进入pthread_cond_wait函数就释放了mutex,明显和书里的不一样。

wait morphing优化

在《UNIX环境高级编程》的示例代码里,是先调用pthread_mutex_unlock,再调用pthread_cond_signal。

[cpp] view
plain
copy

  1. void enqueue_msg(struct msg *mp) {
  2. pthread_mutex_lock(&qlock);
  3. mp->m_next = workq;
  4. workq = mp;
  5. <strong>  pthread_mutex_unlock(&qlock);
  6. pthread_cond_signal(&qready);</strong>
  7. }

有的地方给出的是先调用pthread_cond_signal,再调用pthread_mutex_unlock:

[cpp] view
plain
copy

  1. void enqueue_msg(struct msg *mp) {
  2. pthread_mutex_lock(&qlock);
  3. mp->m_next = workq;
  4. workq = mp;
  5. pthread_cond_signal(&qready);
  6. pthread_mutex_unlock(&qlock);
  7. }

先unlock再signal,这有个好处,就是调用enqueue_msg的线程可以再次参与mutex的竞争中,这样意味着可以连续放入多个消息,这个可能会提高效率。类似Java里ReentrantLock的非公平模式。

网上有些文章说,先singal再unlock,有可能会出现一种情况是被singal唤醒的线程会因为不能马上拿到mutex(还没被释放),从而会再次休眠,这样影响了效率。从而会有一个叫“wait morphing”优化,就是如果线程被唤醒但是不能获取到mutex,则线程被转移(morphing)到mutex的等待队列里。

但是我查看了下glibc的源代码,貌似没有发现有这种“wait morphing”优化。

man文档里提到:

The pthread_cond_broadcast() or pthread_cond_signal() functions may be called by a thread whether or not it currently owns the mutex that  threads  calling  pthread_cond_wait()  or pthread_cond_timedwait() have associated with the condition variable
during their waits; however, if predictable scheduling behavior is required, then that mutex shall be locked by the thread calling pthread_cond_broadcast() or pthread_cond_signal().

可见在调用singal之前,可以不持有mutex,除非是“predictable scheduling”,可预测的调度行为。这种可能是实时系统才有这种严格的要求。

为什么要用while循环来判断条件是否成立?

[cpp] view
plain
copy

  1. while (workq == NULL)
  2. pthread_cond_wait(&qready, &qlock);

而不用if来判断?

[cpp] view
plain
copy

  1. if (workq == NULL)
  2. pthread_cond_wait(&qready, &qlock);

一个原因是spurious wakeup,但即使没有spurious wakeup,也是要用While来判断的。

比如线程A,线程B在pthread_cond_wait函数中等待,然后线程C把消息放到队列里,再调用pthread_cond_broadcast,然后线程A先获取到mutex,处理完消息完后,这时workq就变成NULL了。这时线程B才获取到mutex,那么这时实际上是没有资源供线程B使用的。所以从pthread_cond_wait函数返回之后,还是要判断条件是否成功,如果成立,再进行处理。

pthread_cond_signal和pthread_cond_broadcast

在这篇文章里,http://www.cppblog.com/Solstice/archive/2013/09/09/203094.html

给出的示例代码7里,认为调用pthread_cond_broadcast来唤醒所有的线程是比较好的写法。但是我认为pthread_cond_signal和pthread_cond_broadcast是两个不同东东,不能简单合并在同一个函数调用。只唤醒一个效率和唤醒全部等待线程的效率显然不能等同。典型的condition是用CLH或者MCS来实现的,要通知所有的线程,则要历遍链表,显然效率降低。另外,C++11里的condition_variable也提供了notify_one函数。

http://en.cppreference.com/w/cpp/thread/condition_variable/notify_one

mutex,condition是不是公平(fair)的?

这个在参考文档里没有说明,在网上找了些资料,也没有什么明确的答案。

我写了个代码测试,发现mutex是公平的。condition的测试结果也是差不多。

[cpp] view
plain
copy

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <pthread.h>
  4. #include <unistd.h>
  5. pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
  6. volatile int mutexCount = 0;
  7. void mutexFairTest(){
  8. int localCount = 0;
  9. while(1){
  10. pthread_mutex_lock(&lock);
  11. __sync_fetch_and_add(&mutexCount, 1);
  12. localCount += 1;
  13. if(mutexCount > 100000000){
  14. break;
  15. }
  16. pthread_mutex_unlock(&lock);
  17. }
  18. pthread_mutex_unlock(&lock);
  19. printf("localCount:%d\n", localCount);
  20. }
  21. int main() {
  22. pthread_mutex_lock(&lock);
  23. pthread_create(new pthread_t, NULL, (void * (*)(void *))&mutexFairTest, NULL);
  24. pthread_create(new pthread_t, NULL, (void * (*)(void *))&mutexFairTest, NULL);
  25. pthread_create(new pthread_t, NULL, (void * (*)(void *))&mutexFairTest, NULL);
  26. pthread_create(new pthread_t, NULL, (void * (*)(void *))&mutexFairTest, NULL);
  27. pthread_create(new pthread_t, NULL, (void * (*)(void *))&mutexFairTest, NULL);
  28. pthread_create(new pthread_t, NULL, (void * (*)(void *))&mutexFairTest, NULL);
  29. pthread_mutex_unlock(&lock);
  30. sleep(100);
  31. }

输出结果是:

[plain] view
plain
copy

  1. localCount:16930422
  2. localCount:16525616
  3. localCount:16850294
  4. localCount:16129844
  5. localCount:17329693
  6. localCount:16234137

还特意在一个单CPU的虚拟机上测试了下。输出的结果差不多。操作系统是ububtu14.04。

参考:

http://en.wikipedia.org/wiki/Spurious_wakeup

http://siwind.iteye.com/blog/1469216

http://www.cppblog.com/Solstice/archive/2013/09/09/203094.html

http://www.cs.cmu.edu/afs/cs/academic/class/15492-f07/www/pthreads.html

并行编程之条件变量(posix condition variables)

时间: 2025-01-05 19:15:22

并行编程之条件变量(posix condition variables)的相关文章

c++11多线程记录6:条件变量(condition variables)

https://www.youtube.com/watch?v=13dFggo4t_I视频地址 实例1 考虑这样一个场景:存在一个全局队列deque,线程A向deque中推入数据(写),线程B从deque中取出数据(读). deque这个资源对象就需要用mutex做访问控制,代码如下: std::deque<int> q; std::mutex mu; void func1() { int ct = 10; while (ct > 0) { std::unique_lock<std

4.锁--并行编程之条件变量(posix condition variables)

在整理Java LockSupport.park()的东东.看到了个"Spurious wakeup".又一次梳理下. 首先来个<UNIX环境高级编程>里的样例: [cpp] view plaincopy #include <pthread.h> struct msg { struct msg *m_next; /* ... more stuff here ... */ }; struct msg *workq; pthread_cond_t qready = 

第8章 用户模式下的线程同步(4)_条件变量(Condition Variable)

8.6 条件变量(Condition Variables)——可利用临界区或SRWLock锁来实现 8.6.1 条件变量的使用 (1)条件变量机制就是为了简化 “生产者-消费者”问题而设计的一种线程同步机制.其目的让线程以原子方式释放锁并将自己阻塞,直到某一个条件成立为止.如读者线程当没有数据可读取时,则应释放锁并等待,直到写者线程产生了新的数据.同理,当写者把数据结构写满时,那么写者应该释放SRWLock并等待,直到读者把数据结构清空. (2)等待函数:SleepConditionVariab

Linux多线程编程的条件变量

在stackoverflow上看到一关于多线程条件变量的问题,题主问道:什么时候会用到条件变量,mutex还不够吗?有个叫slowjelj的人做了很好的回答,我再看这个哥们其他话题的一些回答,感觉水平好高的.这里把他的回答粘贴一下方便以后查看,原帖在这里:When is a conditional variable needed, isn't a mutex enough? Even though you can use them in the way you describe, mutexes

多线程编程中条件变量和的spurious wakeup 虚假唤醒

1. 概述 条件变量(condition variable)是利用共享的变量进行线程之间同步的一种机制.典型的场景包括生产者-消费者模型,线程池实现等. 对条件变量的使用包括两个动作: 1) 线程等待某个条件, 条件为真则继续执行,条件为假则将自己挂起(避免busy wait,节省CPU资源): 2) 线程执行某些处理之后,条件成立:则通知等待该条件的线程继续执行. 3) 为了防止race-condition,条件变量总是和互斥锁变量mutex结合在一起使用. 一般的编程模式: C++代码  

python多线程编程(4): 条件变量同步

互斥锁是最简单的线程同步机制,Python提供的Condition对象提供了对复杂线程同步问题的支持.Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法.线程首先acquire一个条件变量,然后判断一些条件.如果条件不满足则wait:如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件.不断的重复这一过程,从而解决复杂的同步问题. 可以认为Cond

python多线程编程5: 条件变量同步-乾颐堂

互斥锁是最简单的线程同步机制,Python提供的Condition对象提供了对复杂线程同步问题的支持.Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法.线程首先acquire一个条件变量,然后判断一些条件.如果条件不满足则wait:如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件.不断的重复这一过程,从而解决复杂的同步问题. 可以认为Cond

条件变量同步 -- Condition

Python提供的Condition对象提供了对复杂线程同步问题的支持.Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法.线程首先acquire一个条件变量,然后判断一些条件.如果条件不满足则wait:如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件.不断的重复这一过程,从而解决复杂的同步问题. 上图中def_A和def_B两个方法是相互依赖

Windows线程同步【5】条件变量(Condition Variable)

一.引言 假设有一个任务,由我和张三共同完成.张三把寄来的文稿初步审阅后放入一个队列,我负责将这个队列中的文稿进行审批,决定刊登与否.张三审阅一份文稿需要15分钟,我处理一个文稿需要2分钟. 如果将张三和我看作两个线程,那么我们共享一个队列的数据.按照一般的多线程思路,他每隔一段时间往队列中放入数据,我每隔一段时间检查一下队列中是否有数据,若有,则处理之. 若我们按照上面的方式工作,则大部分的时间,我只是在干等着,所以,这是一种比较低效的方式. 但换一种方式之后,情况就好很多了.他每把一个文稿放