Linux C语言多线程库Pthread中条件变量的的正确用法逐步详解
(本文的读者定位是了解Pthread常用多线程API和Pthread互斥锁,但是对条件变量完全不知道或者不完全了解的人群。如果您对这些都没什么概念,可能需要先了解一些基础知识)
关于条件变量典型的实际应用,可以参考非常精简的Linux线程池实现(一)——使用互斥锁和条件变量,但如果对条件变量不熟悉最好先看完本文。
Pthread库的条件变量机制的主要API有三个:
- int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
- int pthread_cond_broadcast(pthread_cond_t *cond);
- int pthread_cond_signal(pthread_cond_t *cond);
注:还有一个没说的API是pthread_cond_timewait,它跟pthread_cond_wait(作用见下面)的唯一不同就是可以指定一个等待的超时时间,这里不对它作额外讨论。
它们和其它几个Pthread API一起用于处理一种特定情形的线程同步问题:
- 若干个线程在某个条件没满足时不能继续往下面走,于是纷纷调用pthread_cond_wait使自己在这个条件上陷入等待(休眠);
- 当条件满足以后,另外有个活跃着的线程调用pthread_cond_broadcast通知(唤醒)刚才那些等待在这个条件上的所有线程,让它们继续往下运行。
这种情形是非常通用、非常基础的,很多更加具体的线程同步问题都是这种情形的扩展,比如说经典的消费者/生产者问题,读者/写者问题等等。明显“条件”是这种情形的核心,所以Pthread的这套线程同步机制叫做“条件变量”。可以看出条件变量机制跟Java的wait/notify机制非常类似。
上面这种情形也可以用POSIX定义的另外一套线程/进程同步机制来实现——信号量(semaphore),而且信号量机制在实际场景中用起来比条件变量机制还简单一些,但是信号量的性能不如Pthread库中的条件变量。
条件变量通过pthread_cond_t数据类型来声明,而且使用之前必须先要初始化:
[cpp] view plaincopyprint?
- pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
上面这种是通过预定义的初始化宏来静态初始化,也可以用函数动态初始化:
[cpp] view plaincopyprint?
- pthread_cond_t cond;
- pthread_cond_init(&cond, NULL);
pthread_cond_t cond; pthread_cond_init(&cond, NULL);
注意条件变量应该声明为全局可见的,因为条件变量会在多个线程(的函数)中被访问。条件变量最后不再使用了的时候应该销毁:
[cpp] view plaincopyprint?
- pthread_cond_destroy(&cond);
pthread_cond_destroy(&cond);
在初始化后到销毁前这段时间内就是条件变量的正常生命周期了,可以按需要对它调用pthread_cond_wait、pthread_cond_signal和pthread_cond_broadcast。
pthread_cond_signal的作用跟pthread_cond_broadcast相似,但不同的是pthread_cond_signal会通知所有等待线程中的至少一个,让它(们)继续往下运行,而所有其它没被通知的等待线程则继续等待(休眠)。之所以pthread_cond_signal并不是严格地只唤醒一个等待线程,是因为在多处理器或多核系统中,可能无法实现只唤醒一个等待线程,就算能强行做到只唤醒一个等待线程,也会带来很大的性能损失,这对一个通用的基础线程同步API来说并不合适。
但实际应用场景中我们通常希望每调用一次pthread_cond_signal就唤醒一个等待线程,比如说下面这种情况:
某个线程专门负责从网络接收数据包,其它若干线程专门负责处理数据包。当没有任何数据包时,处理线程全部调用pthread_cond_wait陷入等待。当一个数据包到达时,接收线程调用phtread_cond_signal唤醒一个处理线程,处理线程拿走这个数据包去处理。当又一个数据包到达时,接收线程再次调用pthread_cond_signal唤醒一个线程……
这个问题对信号量机制来说很容易,因为信号量中的sem_post函数只会唤醒一个等待的进程或线程。虽然pthread_cond_signal本身不保证只唤醒一个等待线程,但是POSIX标准在定义这套API时考虑过了这个问题,它留了一个“后门”,让我们在应用程序中可以通过额外的代码来解决这个问题。
先不考虑那个所谓的“后门”,一个粗略看上去可行的解决办法是,除了条件变量以外,再额外设置一个全局的普通计数变量表示允许唤醒多少个等待线程:
[cpp] view plaincopyprint?
- int global_count=0;
int global_count=0;
那么当通知线程需要调用pthread_cond_signal唤醒别的等待线程之前,应该先增加全局变量的计数,表示允许唤醒的线程数目又增加了一个:
[cpp] view plaincopyprint?
- global_count++;
- pthread_cond_signal(&cond);
global_count++; pthread_cond_signal(&cond);
pthread_cond_signal一调用,那些调用pthread_cond_wait等待在cond的线程可能会有好几个都唤醒了,索性假设全部都被唤醒了。但其实我们只想让其中一个继续往下走,其它的不应该往下走,那么其它那些等待线程就都应该再次调用pthread_cond_wait继续等待(这里明显该有个循环)。
下面的问题就是,怎么决定哪一个等待线程继续走呢?可以这样,当大家都被唤醒的时候,大家都判断一下global_count是不是大于0,也就是当前允不允许唤醒线程。如果某个等待线程检测到的global_count是大于0的,就赶紧把global_count减掉一个,然后自己往下走。这时候global_count少了一个,可能就是0了,表示不允许再唤醒线程,其它几个等待线程发现这一状况以后就不往下走,再次调用pthread_cond_wait继续等待:
[cpp] view plaincopyprint?
- while(global_count<=0) {
- pthread_cond_wait(&cond, ...);
- }
- global_count--;
while(global_count<=0) { pthread_cond_wait(&cond, ...); } global_count--;
到现在为止问题基本解决了,但是引出了一个新的问题:多个线程同时访问global_count变量会造成竞态条件。问题看上去很容易解决,使用互斥锁保护就好了。
对于通知线程线程:
[cpp] view plaincopyprint?
- pthread_mutex_lock(&mutex);
- global_count++;
- pthread_cond_signal(&cond);
- pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex); global_count++; pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex);
对于等待线程:
[cpp] view plaincopyprint?
- pthread_mutex_lock(&mutex);
- while(global_count<=0) {
- pthread_cond_wait(&cond, ...);
- }
- global_count--;
- pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex); while(global_count<=0) { pthread_cond_wait(&cond, ...); } global_count--; pthread_mutex_unlock(&mutex);
新的问题又出来了:等待线程调用pthread_cond_wait陷入等待时,还占有着mutex互斥锁,下次通知线程想要唤醒线程时就无法获取mutex互斥锁了,于是出现了死锁。所以在调用pthread_cond_wait将当前线程陷入等待之前,我们应该解开mutex互斥锁,当线程被唤醒,从pthread_cond_wait函数返回时,我们应该重新获取mutex互斥锁。
比如像这样:
[cpp] view plaincopyprint?
- pthread_mutex_lock(&mutex);
- while(global_count<=0) {
- pthread_mutex_unlock(&mutex);
- pthread_cond_wait(&cond, ...);
- pthread_mutex_lock(&mutex);
- }
- global_count--;
- pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex); while(global_count<=0) { pthread_mutex_unlock(&mutex); pthread_cond_wait(&cond, ...); pthread_mutex_lock(&mutex); } global_count--; pthread_mutex_unlock(&mutex);
这段代码还是有问题的,在pthread_cond_wait函数调用的前后当前线程都有一段看上去“很短”的不拥有mutex互斥锁的真空期,但是对于CPU来说这段真空期并不算太短。
假设某个等待线程检测到global_count==0,于是解开mutex互斥锁,进入真空期,即将调用pthread_cond_wait。就在这时候,通知线程增加了一下global_count的计数值然后调用了pthread_cond_signal。接下来,刚才那个等待线程调用pthread_cond_wait陷入等待,由于pthread_cond_wait的调用发生在pthread_cond_signal之后,所以pthread_cond_wait并不会返回。如果程序里的等待线程就这一个,这个通知就丢失了。 问题到了这里似乎没路可走了,但是别忘了还有个“后门”没用上,那就是前面一直没提的pthread_cond_wait的第二个参数了。看看本文最开始列出的函数声明,第二个参数赫然是mutex!这下猜也能猜到这第二个参数是干嘛的了,明显就是专门帮我们解开mutex锁啊,然后在pthread_cond_wait返回之前再自动获取mutex锁。这里顺道澄清一下,和条件变量关联的mutex,不是像网上部分人说的那样是用来保护条件变量的,条件变量在实现的时候是能够做到线程安全的,因为它内部还有一个自己的互斥锁。
所以正确的做法是:
[cpp] view plaincopyprint?
- pthread_mutex_lock(&mutex);
- while(global_count<=0) {
- pthread_cond_wait(&cond, &mutex);
- }
- global_count--;
- pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex); while(global_count<=0) { pthread_cond_wait(&cond, &mutex); } global_count--; pthread_mutex_unlock(&mutex);
到这里问题是不是完全解决了?很遗憾还差一点。pthread_cond_wait是线程撤销点(cancellation points)之一,这意味着当某个线程因为调用pthread_cond_wait而陷入休眠等待时,别的线程可以通过这个线程的ID调用pthread_cancel让这个线程强制从pthread_cond_wait返回并开始执行一些清理工作,最后结束退出。
问题就出在pthread_cond_wait返回上,上面标红的地方已经强调了,pthread_cond_wait返回之前会先自动获取mutex,也就是说返回以后已经占有了mutex互斥锁。这种情况下线程直接退出会导致互斥锁一直被占用,其它线程就无法获取这个互斥锁了,再次出现死锁。 这个问题有两种解决办法,第一个办法是在线程退出前的清理工作中加入解开互斥锁的代码,这个并不难办到,因为POSIX定义了两个API:
[cpp] view plaincopyprint?
- void pthread_cleanup_pop(int execute);
- void pthread_cleanup_push(void (*routine)(void*), void *arg);
void pthread_cleanup_pop(int execute); void pthread_cleanup_push(void (*routine)(void*), void *arg);
pthread_cleanup_push用于向一个特殊栈压入一个函数指针,当线程退出时,这个特殊栈中的所有函数会被一个个从栈顶弹出并执行(return退出的情况除外)。phtread_cleanup_pop用于从这个特殊栈的栈顶手动弹出函数指针,execute参数非0时,弹出的函数会被自动执行。
需要注意的是,POSIX标准允许这两个API被实现为带未闭合花括号的宏,所以这两个API一定(最好)要配套使用:它们必须一前一后(push在前),而且在同一个函数的同一个嵌套层次内。
比如说这两个API的实现有可能会是类似于这样:
[cpp] view plaincopyprint?
- #define pthread_cleanup_pop(execute) XXXX { XXXX
- #define pthread_cleanup_push(routine, arg) XXXX } XXXX
#define pthread_cleanup_pop(execute) XXXX { XXXX #define pthread_cleanup_push(routine, arg) XXXX } XXXX
所以这就是为什么它们的调用要求如此奇怪了。 有了这两个API,想要解决刚才的问题,首先要定义一个清理回调函数:
[cpp] view plaincopyprint?
- void mutex_clean(void *mutex) {
- pthread_mutex_unlock((pthread_mutex_t*)mutex);
- }
void mutex_clean(void *mutex) { pthread_mutex_unlock((pthread_mutex_t*)mutex); }
然后在等待线程里调用那两个API:
[cpp] view plaincopyprint?
- pthread_mutex_lock(&mutex);
- pthread_cleanup_push(mutex_clean, &mutex);
- while(global_count<=0) {
- pthread_cond_wait(&cond, &mutex);
- }
- global_count--;
- pthread_cleanup_pop(0);
- pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex); pthread_cleanup_push(mutex_clean, &mutex); while(global_count<=0) { pthread_cond_wait(&cond, &mutex); } global_count--; pthread_cleanup_pop(0); pthread_mutex_unlock(&mutex);
或者一个稍微简洁一些的写法:
[cpp] view plaincopyprint?
- pthread_mutex_lock(&mutex);
- pthread_cleanup_push(mutex_clean, &mutex);
- while(global_count<=0) {
- pthread_cond_wait(&cond, &mutex);
- }
- global_count--;
- pthread_cleanup_pop(1);
pthread_mutex_lock(&mutex); pthread_cleanup_push(mutex_clean, &mutex); while(global_count<=0) { pthread_cond_wait(&cond, &mutex); } global_count--; pthread_cleanup_pop(1);
另外一种解决方案比这个麻烦一些,那就是设置mutex互斥锁的robust属性值为PTHREAD_MUTEX_ROBUST。
对于robust互斥锁,当持有它的线程没解锁就退出以后,别的线程再去调用pthread_mutex_lock,函数会回一个EOWNERDEAD错误,线程检测到这这个错误后可以调用pthread_mutex_consistent使robust互斥锁恢复一致性,紧接着就可以调用phtread_mutex_unlock解锁了(尽管这个锁并不是当前这个线程加持的)。解锁完毕就可以重新调用pthread_mutex_lock了。 采用这种用方案的时候,首先要声明一个全局可见的mutex属性变量:
[cpp] view plaincopyprint?
- pthread_mutexattr_t mutexattr;
pthread_mutexattr_t mutexattr;
然后初始化并设置属性值:
[cpp] view plaincopyprint?
- pthread_mutexattr_init(&mutexattr);
- pthread_mutexattr_setrobust(&mutexaddtr, PTHREAD_MUTEX_ROBUST);
pthread_mutexattr_init(&mutexattr); pthread_mutexattr_setrobust(&mutexaddtr, PTHREAD_MUTEX_ROBUST);
有了mutex属性,接下来就是在初始化mutex的地方作修改了,通常我们对mutex的初始化都是pthread_mutex_init(&mutex, NULL),现在改成:
[cpp] view plaincopyprint?
- pthread_mutex_init(&mutex, &mutexattr);
pthread_mutex_init(&mutex, &mutexattr);
现在准备工作已经完毕,开始干正事了。对于通知线程:
[cpp] view plaincopyprint?
- while(EOWNERDEAD==pthread_mutex_lock(&mutex)) {
- pthread_mutex_consistent(&mutex);
- pthread_mutex_unlock(&mutex);
- }
- global_count++;
- pthread_cond_signal(&cond);
- pthread_mutex_unlock(&mutex);
while(EOWNERDEAD==pthread_mutex_lock(&mutex)) { pthread_mutex_consistent(&mutex); pthread_mutex_unlock(&mutex); } global_count++; pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex);
对于等待线程:
[cpp] view plaincopyprint?
- while(EOWNERDEAD==pthread_mutex_lock(&mutex)) {
- pthread_mutex_consistent(&mutex);
- pthread_mutex_unlock(&mutex);
- }
- while(global_count<=0) {
- pthread_cond_wait(&cond, &mutex);
- }
- global_count--;
- pthread_mutex_unlock(&mutex);
while(EOWNERDEAD==pthread_mutex_lock(&mutex)) { pthread_mutex_consistent(&mutex); pthread_mutex_unlock(&mutex); } while(global_count<=0) { pthread_cond_wait(&cond, &mutex); } global_count--; pthread_mutex_unlock(&mutex);
到这里,所有的事情终于完成了!