使用互斥量和条件变量实现线程同步控制

管程(monitor)说明

在并发编程中,管程(monitor)是一个同步构件,管程实现了同一时间点,最多只有一个线程可以执行管程的某个子程序。与那些通过修改数据结构实现互斥访问的并发程序设计相比,管程的实现很大程度上简化了程序设计。

管程可以确保一次只有一个进程执行管程中的程序,因此程序员不需要显式地编写同步代码,但是如果需要就某些特定条件上的同步,则需要定义一些条件结构(condition variable)来实现,并且对条件变量的操作仅有wait()和signal(),如下:

condition x, y;

x.wait();

...

x.signal();

调用x.wait()操作可能会使得一个进程挂起,直到另一个进程调用x.signal()操作。与信号量中的signal()操作相比,管程中如果在没有任何进程挂起的情况下调用signal()没有任何作用,而在信号量中,则必然会改变信号量的状态。

一个管程(mointor)的示意图如下所示:

一个mointor中的程序运行前必须首先获取mutex,直至程序运行完成或者线程等待的某个条件发生时才释放mutex。当一个线程执行mointor中的一个子程序时,称为占用(occupy)该mointor,因此必须等到没有其他线程执行管程程序时方可调用管程程序,这是互斥保证。在管程的简单实现中,编译器为每个管程对象自动加入一把私有的mutex(lock),初始状态为unlock,管程中的每个对象入口处执行lock操作,出口处执行unlock操作。

因此设计monitor时至少必须包含mutex(lock) object(互斥量)和condition variables(条件变量)。一个条件变量可以看作是等待该条件发生的线程集合。

注:monitor也称为<线程安全对象/类/模块>。

条件变量

为何需要条件变量?

考虑如下一个busy waiting loop:

while not(P)

do skip

如果仅有mutex,则线程必须等待P为真时才能继续执行。如此,将会导致其他线程无法进入临界区使得条件P为真,因此该管程可能发生死锁。

可以用条件变量解决。一个条件变量C可以看作是一个线程队列,其中存放的线程正在等待与之关联的条件变为真。当一个线程等待一个条件变量C时,其将mutex释放,然后其他线程就可以进入该管程中,通过改变C的值可以使得条件C满足,因此对条件变量C可以有如下操作:

(1)wait(c, m):线程调用该操作,等待条件C满足后继续执行,在等待过程中,释放mutex,因此此过程中,该线程不占用管程。

(2)signal(c):线程调用该操作表明此时条件C为真。

一个线程发生signal()后,至少有两个线程想要占用包含条件变量的管程:发出signal()操作的线程P,等待条件变量的线程Q,此时有两种选择:

1.非阻塞式条件变量:Q继续等待直到P完成。

2.阻塞式条件变量:P继续等待直到Q完成。

两种条件变量类型

阻塞式条件变量

也被称为霍尔风格(Hoare-style)管程,如下图所示:

每个管程包含两个线程队列e,s,其中:

e:入口队列

s:发出signal的线程队列

对于每个条件变量C,有一个线程队列,用C.q表示,如上图的a.q、b.q,这些队列很多情况下可以实现为FIFO模式。

阻塞式条件变量实现如下:

非阻塞式条件变量

也称为Mesa风格管程,如下图所示:

该模型中,发出signal()操作的线程不会失去管程的占用权,被notified()的线程将会被移到队列e中,相较于阻塞式条件变量,该模型不需要队列s。例如Pthread中的条件变量就采用这种非阻塞模式,即发出signal()操作的线程优先级高于被notified()的线程,要使用这种条件变量:首先利用pthread_mutex_lock获取互斥锁,然后调用pthread_cond_wait在线程睡眠等待之前先释放互斥锁,在其被唤醒后再重新获取互斥锁。关于pthread条件变量如下会有详细介绍。

非阻塞条件变量实现如下:

POSIX同步之互斥锁和条件变量的使用

如下为经典的有界缓冲区问题,可以用生产者/消费者模型描述,示意图如下:

采用互斥量的生产者/消费者代码如下:

  1 [[email protected] unp]# cat producer_consumer_mutex.c
  2 #include <unistd.h>
  3 #include <sys/types.h>
  4 #include <pthread.h>
  5 #include <stdlib.h>
  6 #include <string.h>
  7 #include <errno.h>
  8 #include <stdio.h>
  9
 10 #define CONSUMER_COUNT 1        /* 1个消费者线程 */
 11 #define PRODUCER_COUNT 3        /* 3个生产者线程 */
 12 #define BUFFERSIZE 10
 13
 14 int g_buffer[BUFFERSIZE];
 15
 16 unsigned short in = 0;
 17 unsigned short out = 0;
 18
 19 pthread_mutex_t g_mutex;
 20
 21 pthread_t g_thread[CONSUMER_COUNT + PRODUCER_COUNT];    /* 存放生产者和消费者的线程号 */
 22
 23 void* consumer(void* arg)
 24 {
 25         int num = (int)arg;
 26         /* 不断消费 */
 27         while (1)
 28         {
 29                 pthread_mutex_lock(&g_mutex);
 30
 31                 /* 打印仓库当前状态 */
 32                 int i;
 33                 for (i = 0; i < BUFFERSIZE; i++)
 34                 {
 35                         if (g_buffer[i] == -1)
 36                                 printf("g_buffer[%d] = %s\n", i, "null");
 37                         else
 38                                 printf("g_buffer[%d] = %d\n", i, g_buffer[i]);
 39
 40                         if (i == out)
 41                                 printf("g_buffer[%d]可以消费\n", i);
 42                 }
 43
 44                 /* 消费产品 */
 45                 printf("thread %d 开始消费产品 %d\n", num, g_buffer[out]);
 46 sleep(4);       /* 消费一个产品需要4秒 */
 47                 g_buffer[out] = -1;
 48                 printf("消费完毕\n");
 49                 out = (out + 1) % BUFFERSIZE;
 50
 51                 pthread_mutex_unlock(&g_mutex);
 52         }
 53
 54         return NULL;
 55 }
 56
 57 void* producer(void* arg)
 58 {
 59         int num = (int)arg;
 60         /* 不断生产 */
 61         while (1)
 62         {
 63                 pthread_mutex_lock(&g_mutex);
 64
 65                 /* 打印仓库当前状态 */
 66                 int i;
 67                 for (i = 0; i < BUFFERSIZE; i++)
 68         {
 69                 if (g_buffer[i] == -1)
 70                 printf("g_buffer[%d] = %s\n", i, "null");
 71             else
 72                 printf("g_buffer[%d] = %d\n", i, g_buffer[i]);
 73
 74             if (i == in)
 75                 printf("g_buffer[%d]可以生产\n", i);
 76         }
 77
 78                 /* 生产产品 */
 79                 g_buffer[in]++;
 80                 printf("thread %d 开始生产产品 %d\n", num, g_buffer[in]);
 81                 sleep(2);       /* 生产一个产品需要2秒 */
 82                 printf("生产完毕\n");
 83                 in = (in + 1) % BUFFERSIZE;
 84
 85                 pthread_mutex_unlock(&g_mutex);
 86         }
 87
 88         return NULL;
 89 }
 90
 91 int main(void)
 92 {
 93         /* 初始化仓库 */
 94         int i;
 95         for (i = 0; i < BUFFERSIZE; i++)
 96                 g_buffer[i] = -1;
 97
 98         /* 创建消费者线程,线程号为:[0, CONSUMER_COUNT) */
 99         for (i = 0; i < CONSUMER_COUNT; i++)
100         {
101                 pthread_create(&g_thread[i], NULL, consumer, (void*)i);
102         }
103
104         /* 创建生产者线程,线程号为:[CONSUMER_COUNT, CONSUMER_COUNT + PRODUCER_COUNT) */
105         for (i = 0; i < PRODUCER_COUNT; i++)
106         {
107                 pthread_create(&g_thread[i + CONSUMER_COUNT], NULL, producer, (void*)(i + CONSUMER_COUNT));
108         }
109
110         /* 等待创建的所有线程退出 */
111         for (i = 0; i < CONSUMER_COUNT + PRODUCER_COUNT; i++)
112         {
113                 pthread_join(g_thread[i], NULL);
114         }
115
116         return 0;
117 }
118
119 // output
120 ...
121 thread 2 开始生产产品 4
122 生产完毕
123 g_buffer[0] = 4
124 g_buffer[1] = 4
125 g_buffer[2] = 4
126 g_buffer[3] = 2
127 g_buffer[3]可以生产
128 g_buffer[4] = 2
129 g_buffer[5] = 1
130 g_buffer[6] = 1
131 g_buffer[7] = 0
132 g_buffer[8] = 0
133 g_buffer[9] = 4
134 thread 1 开始生产产品 3
135 生产完毕
136 g_buffer[0] = 4
137 g_buffer[1] = 4
138 g_buffer[2] = 4
139 g_buffer[3] = 3
140 g_buffer[4] = 2
141 g_buffer[5] = 1
142 g_buffer[6] = 1
143 g_buffer[7] = 0
144 g_buffer[8] = 0
145 g_buffer[9] = 4
146 g_buffer[9]可以消费
147 thread 0 开始消费产品 4
148 消费完毕
149 ...

但是上述程序中存在一个问题,就是当生产者线程未准备好产品时,消费者线程却在不断执行循环,这种被称为轮转(spinning)或者轮询(polling)的现象是对CPU资源的一大浪费。如下引入条件变量与互斥锁共同工作,互斥锁用于加锁互斥,而条件变量则专注于等待,每个条件变量总是和一个互斥锁关联。

采用条件变量的生产者/消费者代码如下:

  1 [[email protected] unp]# cat producer_consumer_condition.c
  2 #include <unistd.h>
  3 #include <sys/types.h>
  4 #include <pthread.h>
  5 #include <stdlib.h>
  6 #include <string.h>
  7 #include <errno.h>
  8 #include <stdio.h>
  9
 10 #define CONSUMER_COUNT 1        /* 1个消费者线程 */
 11 #define PRODUCER_COUNT 3        /* 3个生产者线程 */
 12 #define BUFFERSIZE 10
 13
 14 int g_buffer[BUFFERSIZE];
 15
 16 unsigned short in = 0;
 17 unsigned short out = 0;
 18
 19 pthread_mutex_t g_mutex;
 20
 21 typedef struct
 22 {
 23         pthread_mutex_t mutex;
 24         pthread_cond_t cond;
 25 } Condition;
 26
 27 Condition not_empty = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER};
 28 Condition not_full = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER};
 29
 30 int nready;             /* 可以消费的产品数量 */
 31
 32 pthread_t g_thread[CONSUMER_COUNT + PRODUCER_COUNT];    /* 存放生产者和消费者的线程号 */
 33
 34 void* consumer(void* arg)
 35 {
 36         int num = (int)arg;
 37         /* 不断消费 */
 38         while (1)
 39         {
 40                 pthread_mutex_lock(&g_mutex);
 41
 42                 /* 打印仓库当前状态,(为了便于比较,这段打印临界区依然只使用互斥锁保护) */
 43                 int i;
 44         for (i = 0; i < BUFFERSIZE; i++)
 45         {
 46                 if (g_buffer[i] == -1)
 47                 printf("g_buffer[%d] = %s\n", i, "null");
 48             else
 49                 printf("g_buffer[%d] = %d\n", i, g_buffer[i]);
 50
 51             if (i == out)
 52                 printf("g_buffer[%d]可以消费\n", i);
 53         }
 54
 55                 pthread_mutex_unlock(&g_mutex);
 56
 57                 /* 消费产品 */
 58                 pthread_mutex_lock(&not_empty.mutex);
 59
 60                 while (nready == 0)
 61                         pthread_cond_wait(&not_empty.cond, &not_empty.mutex);
 62                 printf("thread %d 开始消费产品 %d\n", num, g_buffer[out]);
 63                 sleep(4);       /* 消费一个产品需要4秒 */
 64                 g_buffer[out] = -1;
 65                 printf("消费完毕\n");
 66                 --nready;
 67                 out = (out + 1) % BUFFERSIZE;
 68
 69                 pthread_cond_signal(&not_full.cond);
 70                pthread_mutex_unlock(&not_empty.mutex);
 71         }
 72
 73         return NULL;
 74 }
 75
 76 void* producer(void* arg)
 77 {
 78         int num = (int)arg;
 79         /* 不断生产 */
 80         while (1)
 81         {
 82                 pthread_mutex_lock(&g_mutex);
 83
 84                 /* 打印仓库当前状态 */
 85                 int i;
 86                 for (i = 0; i < BUFFERSIZE; i++)
 87         {
 88                 if (g_buffer[i] == -1)
 89                 printf("g_buffer[%d] = %s\n", i, "null");
 90             else
 91                 printf("g_buffer[%d] = %d\n", i, g_buffer[i]);
 92
 93             if (i == in)
 94                 printf("g_buffer[%d]可以生产\n", i);
 95         }
 96
 97                 pthread_mutex_unlock(&g_mutex);
 98
 99                 /* 生产产品 */
100                 pthread_mutex_lock(&not_full.mutex);
101
102                 while (nready == BUFFERSIZE)
103                         pthread_cond_wait(&not_full.cond, &not_full.mutex);
104                 g_buffer[in]++;
105                 printf("thread %d 开始生产产品 %d\n", num, g_buffer[in]);
106                 sleep(2);       /* 生产一个产品需要2秒 */
107                 printf("生产完毕\n");
108                 ++nready;
109                 in = (in + 1) % BUFFERSIZE;
110
111                 pthread_cond_signal(&not_empty.cond);
112                 pthread_mutex_unlock(&not_full.mutex);
113         }
114
115         return NULL;
116 }
117
118 int main(void)
119 {
120         /* 初始化仓库 */
121         int i;
122         for (i = 0; i < BUFFERSIZE; i++)
123                 g_buffer[i] = -1;
124
125         /* 创建消费者线程,线程号为:[0, CONSUMER_COUNT) */
126         for (i = 0; i < CONSUMER_COUNT; i++)
127         {
128                 pthread_create(&g_thread[i], NULL, consumer, (void*)i);
129         }
130
131         /* 创建生产者线程,线程号为:[CONSUMER_COUNT, CONSUMER_COUNT + PRODUCER_COUNT) */
132         for (i = 0; i < PRODUCER_COUNT; i++)
133         {
134                 pthread_create(&g_thread[i + CONSUMER_COUNT], NULL, producer, (void*)(i + CONSUMER_COUNT));
135         }
136
137         /* 等待创建的所有线程退出 */
138         for (i = 0; i < CONSUMER_COUNT + PRODUCER_COUNT; i++)
139         {
140                 pthread_join(g_thread[i], NULL);
141         }
142
143         return 0;
144 }
145
146 // output is the same as above

条件变量使用说明:

一个条件变量的改变是原子性的,因此需要一个互斥锁来保证,因此,条件变量的使用代码可以如下:

1 typedef struct
2 {
3     pthread_mutex_t mutex;
4     pthread_cond_t cond;
5     // 与条件变量相关的变量声明
6 } Condition;
7 Condition cond_a = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, ...};
8 Condition cond_b = {PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, ...};
9 ...

1.执行signal操作的线程中流程如下:

pthread_mutex_lock(&cond_a.mutex);
// 设置条件为真
pthread_cond_signal(&cond_a.cond);
pthread_mutex_unlock(&cond_a.mutex);

说明:

pthread_cond_signal与pthread_mutex_unlock的顺序:如果先signal后unlock,则可以确定signal操作是由lock住cond_a.mutex的线程调用的;如果先unlock后signal,则任一线程都可调用signal操作。如果需要可预见的调度行为,最好先signal后unlock,就像上面那样。

2.执行wait操作的线程中流程如下:

pthread_mutex_lock(&cond_a.mutex);
while (条件为假)
    pthread_cond_wait(&cond_a.cond, &cond_a.mutex);
// 修改条件
pthread_mutex_unlock(&cond_a.mutex);

说明:

(1)pthread_cond_wait执行如下3个操作:

  • 解锁cond_a.mutex,使得其他线程可以进入以便改变条件
  • 将调用线程阻塞在条件变量cond_a上(睡眠了),直到某个线程将条件设为真
  • 成功返回后(此时某个线程调用了pthread_cond_signal/broadcast)重新对cond_a.mutex加锁。

(2)是否可以将:

while (条件为假)
    pthread_cond_wait(&cond_a.cond, &cond_a.mutex);

替换为:

if (条件为假)
    pthread_cond_wait(&cond_a.cond, &cond_a.mutex);

答案是如果将while替换为if,可以发生虚假(spurious)唤醒:即发出signal的线程并为将条件设为真就调用了pthread_cond_signal,此时pthread_cond_wait却成功返回了,如此将导致后续的代码执行失败。因此必须在pthread_cond_wait返回后再次判断条件是否确实为真,即必须使用循环而非条件判断。

时间: 2024-08-09 02:20:54

使用互斥量和条件变量实现线程同步控制的相关文章

浅析线程间通信一:互斥量和条件变量

线程同步的目的简单来讲就是保证数据的一致性.在Linux中,常用的线程同步方法有互斥量( mutex ).读写锁和条件变量,合理使用这三种方法可以保证数据的一致性,但值得的注意的是,在设计应用程序时,所有的线程都必须遵守相同的数据访问规则为前提,才能保证这些同步方法有效,如果允许某个线程在没有得到访问权限(比如锁)的情况下访问共享资源,那么其他线程在使用共享资源前都获得了锁,也会出现数据不一致的问题.另外还有自旋锁.barrier和信号量线程同步方法.本文将讨论互斥量和条件变量的使用,并给出了相

生产者-消费者问题:介绍POSIX线程的互斥量和条件变量的使用

全局初始化互斥量和条件变量(不全局也行,但至少要对线程启动函数可见,这样才能使用.) static pthread_cont_t cond = PTHREAD_COND_INITIALIZER; static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; 使用互斥量锁住一块代码方法如下(默认忽略pthread开头的函数的错误检查,即类似 int s = pthread_xxx(...); if (s != 0) { printErrorMsg(

互斥量和条件变量

1.如何利用2个条件变量实现线程同步? 思路:就是来回的利用pthread_cond_signal()函数,当一方被阻塞时,唤醒函数可以唤醒pthread_cond_wait()函数,只不过pthread_cond_wait()这个方法要执行其后的语句,必须遇到下一个阻塞(也就是pthread_cond_wait()方法时),才执行唤醒后的其后语句. 代码如下: #include<stdio.h> #include<unistd.h> #include<stdlib.h>

读写锁————用互斥量和条件变量模拟

一. 读写锁 在多线程环境下为了防止对临界资源访问的冲突我们往往会在线程函数中加入互斥锁来完成线程间的互斥:但是,在有些情况下,互斥锁mutex并不是那么高效,比如当要对一块缓冲区进行读写操作的时候,因为读的需要比写入修改的需要要多,读取数据并不会修改缓冲区的数据个数或者内容,如果要使用互斥锁就会耗费额外的时间,每一次读取都要争夺锁资源挂起等待,因此就可以使用另外一种锁机制----读写锁. 有读写锁的存在当然就会有读者和写者,也就是多个线程,但是它们之间的相互关系和mutex锁中有所不同: 当读

互斥量和条件变量的区别

互斥量与条件变量的区别 转载自:http://www.360doc.com/content/12/0129/10/1317564_182456205.shtml 前面谈过了线程锁,下面我们在继续研究一下线程锁: 互斥量从本质上来说是一个锁,对互斥量加锁后任何其他试图给它加锁的线程都会被阻塞直至当前线程释放互斥量. 同样在设计时需要规定所有的线程必须遵守相同的数据访问规则,只有这样互斥机制才能正常工作(只要是锁都这样,这是锁工作的本质) 互斥量用pthread_mutex_t 数据类型表示,在使用

[转]一个简单的Linux多线程例子 带你洞悉互斥量 信号量 条件变量编程

一个简单的Linux多线程例子 带你洞悉互斥量 信号量 条件变量编程 希望此文能给初学多线程编程的朋友带来帮助,也希望牛人多多指出错误. 另外感谢以下链接的作者给予,给我的学习带来了很大帮助 http://blog.csdn.net/locape/article/details/6040383 http://www.cnblogs.com/liuweijian/archive/2009/12/30/1635888.html 一.什么是多线程? 当我自己提出这个问题的时候,我还是很老实的拿着操作系

Linux多线程同步之互斥量和条件变量

1. 什么是互斥量 互斥量从本质上说是一把锁,在访问共享资源前对互斥量进行加锁,在访问完成后释放互斥量上的锁.对互斥量进行加锁以后,任何其他试图再次对互斥量加锁的线程将会被阻塞直到当前线程释放该互斥锁.如果释放互斥锁时有多个线程阻塞,所以在该互斥锁上的阻塞线程都会变成可进行状态,第一个变成运行状态的线程可以对互斥量加锁,其他线程在次被阻塞,等待下次运行状态. pthread_mutex_t 就是POSIX对于mutex的实现. 函数名 参数 说明 pthread_mutex_init pthre

POSIX 使用互斥量和条件变量实现生产者/消费者问题

boost的mutex,condition_variable非常好用.但是在Linux上,boost实际上做的是对pthread_mutex_t 和pthread_cond_t的一系列的封装.因此通过对原生态的POSIX 的mutex,cond的生成者,消费者的实现,我们可以再次体会boost带给我们的便利. 1. 什么是互斥量 互斥量从本质上说是一把锁,在访问共享资源前对互斥量进行加锁,在访问完成后释放互斥量上的锁.对互斥量进行加锁以后,任何其他试图再次对互斥量加锁的线 程将会被阻塞直到当前线

信号量、互斥量、同步变量、条件变量和事件变量

信号量:信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用.在进入一个关键代码段之前,线程必须获取一个信号量:一旦该关键代码段完成了,那么该线程必须释放信号量.其它想进入该关键代码段的线程必须等待直到第一个线程释放信号量.为了完成这个过程,需要创建一个信号量VI,然后将Acquire Semaphore VI以及Release Semaphore VI分别放置在每个关键代码段的首末端.确认这些信号量VI引用的是初始创建的