浅析线程间通信一:互斥量和条件变量

线程同步的目的简单来讲就是保证数据的一致性。在Linux中,常用的线程同步方法有互斥量( mutex )、读写锁和条件变量,合理使用这三种方法可以保证数据的一致性,但值得的注意的是,在设计应用程序时,所有的线程都必须遵守相同的数据访问规则为前提,才能保证这些同步方法有效,如果允许某个线程在没有得到访问权限(比如锁)的情况下访问共享资源,那么其他线程在使用共享资源前都获得了锁,也会出现数据不一致的问题。另外还有自旋锁、barrier和信号量线程同步方法。本文将讨论互斥量和条件变量的使用,并给出了相应的代码和注意事项,相关代码也在我的github上下载。

    互斥量

互斥量从本质上说是一把锁,在访问共享资源前主动对互斥量进行加锁,在访问完成后需要主动释放互斥量上的锁。对互斥量加锁后,其他线程试图对互斥量加锁时都会被阻塞直到互斥量的锁释放(注意如果线程试图对同一个互斥量加锁两次,那么它自身就会陷入阻塞,即死锁状态。)。如果释放互斥量锁时有多个线程阻塞,所有阻塞在该互斥量的线程都会变成可运行状态,其中第一个变成可运行状态的线程获得互斥量的锁,其他线程会再次阻塞。(注意那个阻塞线程的获得锁是跟实现相关的,是不确定的,比如可能是首先唤醒优先级最高的被阻塞线程,而优先级相同的,则按FIFO原则来唤醒。)下面是使用互斥量来处理经典的生产者-消费者问题,代码如下:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define MAXNITEMS       1000000
#define MAXNTHREADS         100
#define MIN(a,b) (((a) < (b))?(a):(b))

int     nitems;         /* read-only by producer and consumer,to express maximum item */
struct {
  pthread_mutex_t   mutex;
  int   buff[MAXNITEMS];
  int   nput;
  int   nval;
} shared = { PTHREAD_MUTEX_INITIALIZER };

void    *produce(void *), *consume(void *);

int main(int argc, char **argv)
{
    int         i, nthreads, count[MAXNTHREADS];
    pthread_t   tid_produce[MAXNTHREADS], tid_consume;

    if (argc != 3)
    {
        printf("usage: prodcons4 <#items> <#threads>\n");
        return 1;
    }
    nitems = MIN(atoi(argv[1]), MAXNITEMS);
    nthreads = MIN(atoi(argv[2]), MAXNTHREADS);

    printf("main:%d,%d,%d",shared.nput,shared.nval,shared.buff[0]);

    /* create all producers and one consumer */
    for (i = 0; i < nthreads; i++) {
        count[i] = 0;
        pthread_create(&tid_produce[i], NULL, produce, &count[i]);
    }
    pthread_create(&tid_consume, NULL, consume, NULL);

    /* wait for all producers and the consumer */
    for (i = 0; i < nthreads; i++) {
        pthread_join(tid_produce[i], NULL);
        printf("count[%d] = %d\n", i, count[i]);
}
    pthread_join(tid_consume, NULL);

    exit(0);
}

void * produce(void *arg)
{
    for ( ; ; ) {
        pthread_mutex_lock(&shared.mutex);
        if (shared.nput >= nitems) {
            pthread_mutex_unlock(&shared.mutex);
            return(NULL);       /* array is full, we're done */
        }
        shared.buff[shared.nput] = shared.nval;
        shared.nput++;
        shared.nval++;
        pthread_mutex_unlock(&shared.mutex);
        *((int *) arg) += 1;
    }
}

void consume_wait(int i)
{
    for ( ; ; ) {
        pthread_mutex_lock(&shared.mutex);
        if (i < shared.nput) {
            pthread_mutex_unlock(&shared.mutex);
            return;         /* an item is ready */
        }
        pthread_mutex_unlock(&shared.mutex);
        sched_yield();
    }
}

void * consume(void *arg)
{
    int     i;

    for (i = 0; i < nitems; i++) {
        consume_wait(i);
        if (shared.buff[i] != i)
            printf("buff[%d] = %d\n", i, shared.buff[i]);
    }
    return(NULL);
}

编译运行后结果如下:

$gcc -Wall -lpthread mutex_example.c -o mutex_example
$./mutex_example 1000000 5
count[0] = 188090
count[1] = 197868
count[2] = 194924
count[3] = 211562
count[4] = 207556

上面程序实现是多个生产者线程和一个消费者线程同步,有以下几个地方值得注意:

I)结构体shared中包括一个互斥变量和需要同步的数据,把他们封装到一个结构体的目的是为了强调这些变量只应该在拥有其互斥锁时访问。把共享数据和它们的同步变量(互斥锁、条件变量或信号量)封装到一个结构体中,是一个很好的编程技巧。

II)在函数produce()中,每个线程的count元素的增加不属于锁的临界区,因为每个线程由各自的计数器count[i],应该总是尽可能减少由一个互斥锁锁住的代码量。

III)在函数consume()中,调用了函数consume_wait(i)来检测期待的条目是否准备好了,若没有准备好,则调用sched_yield(),调用该接口后,会使得消费线程让出当前的CPU,并把该线程移到相应优先级队列的末尾。在这里是消费线程把CPU让出给生产者线程。但这仍然不是理想的方法,理想的方法,当buff中有数据时(或有指定数目的数据时),生产者线程唤醒消费者线程。

    条件变量

条件变量是线程可用的另一种同步机制。互斥量用于上锁,条件变量则用于等待,并且条件变量总是需要与互斥量一起使用。下面是用条件变量重新实现上面的消费者-生产者问题,代码如下:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define MAXNITEMS       1000000
#define MAXNTHREADS         100
#define MIN(a,b) (((a) < (b))?(a):(b))

int     nitems;             /* read-only by producer and consumer,to express maximum item */
int     buff[MAXNITEMS];
struct {
  pthread_mutex_t   mutex;
  int               nput;   /* next index to store */
  int               nval;   /* next value to store */
} put = { PTHREAD_MUTEX_INITIALIZER };

struct {
  pthread_mutex_t   mutex;
  pthread_cond_t    cond;
  int               nready; /* number ready for consumer */
} nready = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };

int     nsignals; /*call pthread_cond_signal count*/

void    *produce(void *), *consume(void *); 

int main(int argc, char **argv)
{
    int         i, nthreads, count[MAXNTHREADS];
    pthread_t   tid_produce[MAXNTHREADS], tid_consume;

    if (argc != 3)
    {
        printf("usage: prodcons4 <#items> <#threads>\n");
        return 1;
    }
    nitems = MIN(atoi(argv[1]), MAXNITEMS);
    nthreads = MIN(atoi(argv[2]), MAXNTHREADS);

    /* create all producers and one consumer */
for (i = 0; i < nthreads; i++) {
        count[i] = 0;
        pthread_create(&tid_produce[i], NULL, produce, &count[i]);
    }
    pthread_create(&tid_consume, NULL, consume, NULL);

    /* wait for all producers and the consumer */
    for (i = 0; i < nthreads; i++) {
        pthread_join(tid_produce[i], NULL);
        printf("count[%d] = %d\n", i, count[i]);
    }
    pthread_join(tid_consume, NULL);
    printf("nsignals = %d\n", nsignals);

    exit(0);
}

void * produce(void *arg)
{
    for ( ; ; ) {
        pthread_mutex_lock(&put.mutex);
        if (put.nput >= nitems) {
            pthread_mutex_unlock(&put.mutex);
            return(NULL);       /* array is full, we're done */
        }
        buff[put.nput] = put.nval;
        put.nput++;
        put.nval++;
        pthread_mutex_unlock(&put.mutex);

        pthread_mutex_lock(&nready.mutex);
        if (nready.nready == 0) {
            pthread_cond_signal(&nready.cond);
            nsignals++;
        }
        nready.nready++;
        pthread_mutex_unlock(&nready.mutex);

        *((int *) arg) += 1;
    }
}

void * consume(void *arg)
{
    int     i;

    for (i = 0; i < nitems; i++) {
        pthread_mutex_lock(&nready.mutex);
        while (nready.nready == 0)
            pthread_cond_wait(&nready.cond, &nready.mutex);
        nready.nready--;
        pthread_mutex_unlock(&nready.mutex);

        if (buff[i] != i)
            printf("buff[%d] = %d\n", i, buff[i]);
    }
    return(NULL);
}

编译运行后结果如下:

$gcc -Wall -lpthread cond_var_example.c  -o cond_var_example
$./cond_var_example  1000000 5
count[0] = 220234
count[1] = 201652
count[2] = 199165
count[3] = 182972
count[4] = 195977
nsignals = 3

上面程序实现是多个生产者线程和一个消费者线程同步,有以下几个地方值得注意:

I)因为调度的不确定性,程序运行的结果每次都是不一样的。

II)在consume()中可以看到,消费线程通过一个while循环等待nready.nready(用来统计当前有多少条目供消费者处理)变为非0,若nready.nready值为0,则调用pthread_cond_wait()阻塞线程,使线程进入休眠状态,直到其他线程调用pthread_cond_signal()唤醒它,而在唤醒它之前,该线程一直处于休眠状态,并不会参与调度或消耗CPU。关于函数pthread_cond_wait(),需要注意以下几点:

a、线程在调用pthread_cond_wait()函数之前,需要获得与之关联的互斥量后才能调用改接口,在上面的例子中,就是计数器nready.nready对应的互斥量nready.mutex,只是获得改互斥量后,线程才能判断nready.nready是否为零,若是,进而以刚才互斥量的地址作为参数调用pthread_cond_wait()接口。若没有获取与之关联的互斥量就调用pthread_cond_wait,其结果是未定义的。

b、调用pthread_cond_wait()函数,该函数原子地执行下面两个动作:首先给互斥量nready.mutex解锁;然后把调用线程投入睡眠,直到另外某个线程就本条件变量调用pthread_cond_signal。注意这两个操作必须是原子操作,否则线程在解锁后,投入睡眠之前,其他线程调用pthread_cond_signal的话,当前线程投入睡眠后,就一直得不到通知了,即错过条件的变化,在这里是nready.nready的值变化。执行上面两个操作后,线程已经睡眠了,此时函数pthread_cond_wait()还没有返回。

c、当其他线程调用pthread_cond_signal或pthread_cond_broadcast时,会唤醒相应条件变量等待的线程,此时被唤醒的线程,可以参与调度了,此时被唤醒的线程继续执行pthread_cond_wait()函数,函数pthread_cond_wait()返回之前,会重新给条件变量对应的互斥量上锁,在这里就是nready.mutex,若该函数成功返回,则当前线程有重新获得了nready.mutex锁,当然nready.mutex也可能被其他线程继续占有,此时线程再次阻塞。总之,在函数pthread_cond_wait()成功返回之前,必然又对相应的互斥量成功加锁了,pthread_cond_wait从调用到返回,整个过程相当于:unlock, just_wait, lock这三个操作,并且前面两个操作是原子操作。

d、在pthread_cond_wait()成功返回后,需要重新检查对应的条件是否成立,在这里是nready.nready的值。因为可能发生虚假的唤醒:期待的条件(在这里是消费线程期待nready.nready的值不为0)还不成立时的唤醒。各种线程实现都应该最大限度减少这些虚假唤醒的数量。

III)在produce()可以看到,在nready.nready的值加1之前,如果计数器的值为0,那就调用pthread_cond_signal唤醒可能正在等待其值变为非零的线程。可以看出,互斥量是用来同步对nready.nready的访问,而关联的条件变量则用于等待和发送信号。如果仅仅是使用互斥量来同步,则就可能出现前面的忙等待(即使调用sched_yield也只是延迟了而已),而使用条件变量后,则线程就可以阻塞休眠了,直到需要的条件发生(在这里就是nready.nready的值非零),而改变这个条件的线程也是通过条件变量(以条件变量做为参数调用pthread_cond_signal或pthread_cond_broadcast)来通知阻塞在相应条件变量的线程。

IV)调用pthread_cond_signal函数前不是一定要对互斥变量加锁的(当然也不存在返回时释放互斥量的情况),但通常修改条件变量时需要加锁,这就导致一个问题。pthread_cond_signal即可以放在pthread_mutex_lock和pthread_mutex_unlock之间,也可以放在 pthread_mutex_lock和pthread_mutex_unlock之后,到底放在什么位置比较好。在上面的例子中,使用的的属于第一种方法。即下面的形式:

pthread_mutex_lock
xxxxxxx
pthread_cond_signal
pthread_mutex_unlock

这样做的缺点是:在最话情况下,当该条件变量被发送信号后,系统立即调度等待其上的线程,该线程开始运行,但是在pthread_cond_wait返回之前,需要再次获得互斥量,而此时互斥量被其他线程(即唤醒它的那个线程)占有,因此被唤醒的线程再次阻塞,所以这样一来一回会有性能的问题。但是在LinuxThreads或者NPTL里面,就不会有这个问题,因为在Linux 线程中,有两个队列,分别是cond_wait队列和mutex_lock队列, pthread_cond_signal只是让线程从cond_wait队列移到mutex_lock队列,而不用返回到用户空间,不会有性能的损耗。另外一种情况形式如下:

pthread_mutex_lock
xxxxxxx
pthread_mutex_unlock
pthread_cond_signal

而我们上面的例子对应代码,可以改成:

int dosignal;
pthread_mutex_lock(&nready.mutex);
dosignal = (nready.nready == 0);
pthread_mutex_unlock(&nready.mutex);
if (dosignal)
pthread_cond_signal(&nready.cond);

这种形式优点是不会出现之前说的那个潜在的性能损耗,因为在通知等待线程之前就已经释放锁了;缺点是如果unlock和pthread_cond_signal之间,有个低优先级的线程正在mutex上等待的话,那么这个低优先级的线程就会抢占高优先级的线程 (调用pthread_cond_wait的线程),而这在上面的放中间的模式下是不会出现的。

V)通常pthread_cond_signal函数只唤醒等待在相应条件变量上的一个线程。如果没有线程处在阻塞等待在相应条件变量,pthread_cond_signal也会成功返回。假如有多个线程正在阻塞等待着这个条件变量的话,那么是根据各等待线程优先级的高低确定哪个线程接收到信号开始继续执行。如果各线程优先级相同,则根据等待时间的长短来确定哪个线程获得信号。但无论如何一个pthread_cond_signal调用最多发信一次(按照POSIX规定是至少一个线程唤醒,该接口实现也可能唤醒多个线程)。而在某些情况下,可能需要唤醒多个线程(比如一个写入者唤醒多个读者线程),此时可以调用pthread_cond_broadcast唤醒阻塞在相应条件变量上的所有线程。

VI)总的来说,给条件变量发送信号的代码大体如下:

struct {
  pthread_mutex_t   mutex;
  pthread_cond_t    cond;
  维护本条件的各个变量
  } var= { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };
pthread_mutex_lock(&var.mutex);
设置条件为真
pthread_cond_signal(&var.cond);
pthread_mutex_unlock(&var.mutex);

在我们的例子中,用来维护条件的变量是一个整数计数器,设置条件的操作就是给该计数器加1。我们做了优化处理,即只有该计数器从0变为1时才发出条件变量的信号。测试条件并进入睡眠以等待条件变量为真的代码大体如下:

pthread_mutex_lock(&var.mutex);
while (条件为假)
pthread_cond_wait(&var.cond, &var.mutex);
修改条件
pthread_mutex_unlock(&var.mutex);

VI)为什么使用条件变量,需要对一个互斥量加锁?因为条件变量的实现,是以已经获得的加锁的互斥量为前提的。若没有获取与之关联的互斥量就调用pthread_cond_wait,其结果是未定义的。这个互斥量就是用来对保护条件本身的访问。

参考资料

《UNIX环境高级编程》 11.6线程的同步

《UNIX网络编程卷2:进程间通信》第7章、第8章和第10章

http://stackoverflow.com/questions/2763714/why-do-pthreads-condition-variable-functions-require-a-mutex

时间: 2024-07-30 20:28:09

浅析线程间通信一:互斥量和条件变量的相关文章

浅析线程间通信三:Barriers、信号量(semaphores)以及各种同步方法比较

之前的文章讨论了互斥量.条件变量.读写锁和自旋锁用于线程的同步,本文将首先讨论Barriers和信号量的使用,并给出了相应的代码和注意事项,相关代码也可在我的github上下载,然后对线程各种同步方法进行了比较. Barriers Barriers是一种不同于前面线程同步机制,它主要用于协调多个线程并行(parallel)共同完成某项任务.一个barrier对象可以使得每个线程阻塞,直到所有协同(合作完成某项任务)的线程执行到某个指定的点,才让这些线程继续执行.前面使用的pthread_join

浅析线程间通信二:读写锁和自旋锁

上文讨论了互斥量和条件变量用于线程的同步,本文将讨论读写锁和自旋锁的使用,并给出了相应的代码和注意事项,相关代码也可在我的github上下载. 读写锁 对于互斥量要么是锁住状态要么是不加锁锁状态,而且一次只有一个线程可以对其加锁,而读写锁对线程的读数据加锁请求和写数据加锁请求进行了区分,从而在某些情况下,程序有更高的并发性.对于读写锁,一次只有一个线程可以占有写模式的读写锁,但是多个线程可以同时占有读模式的读写锁.虽然读写锁的实现各不相同,但当读写锁处于读模式锁住状态时,如果有另外的线程试图以写

生产者-消费者问题:介绍POSIX线程的互斥量和条件变量的使用

全局初始化互斥量和条件变量(不全局也行,但至少要对线程启动函数可见,这样才能使用.) static pthread_cont_t cond = PTHREAD_COND_INITIALIZER; static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER; 使用互斥量锁住一块代码方法如下(默认忽略pthread开头的函数的错误检查,即类似 int s = pthread_xxx(...); if (s != 0) { printErrorMsg(

[转]一个简单的Linux多线程例子 带你洞悉互斥量 信号量 条件变量编程

一个简单的Linux多线程例子 带你洞悉互斥量 信号量 条件变量编程 希望此文能给初学多线程编程的朋友带来帮助,也希望牛人多多指出错误. 另外感谢以下链接的作者给予,给我的学习带来了很大帮助 http://blog.csdn.net/locape/article/details/6040383 http://www.cnblogs.com/liuweijian/archive/2009/12/30/1635888.html 一.什么是多线程? 当我自己提出这个问题的时候,我还是很老实的拿着操作系

读写锁————用互斥量和条件变量模拟

一. 读写锁 在多线程环境下为了防止对临界资源访问的冲突我们往往会在线程函数中加入互斥锁来完成线程间的互斥:但是,在有些情况下,互斥锁mutex并不是那么高效,比如当要对一块缓冲区进行读写操作的时候,因为读的需要比写入修改的需要要多,读取数据并不会修改缓冲区的数据个数或者内容,如果要使用互斥锁就会耗费额外的时间,每一次读取都要争夺锁资源挂起等待,因此就可以使用另外一种锁机制----读写锁. 有读写锁的存在当然就会有读者和写者,也就是多个线程,但是它们之间的相互关系和mutex锁中有所不同: 当读

互斥量和条件变量

1.如何利用2个条件变量实现线程同步? 思路:就是来回的利用pthread_cond_signal()函数,当一方被阻塞时,唤醒函数可以唤醒pthread_cond_wait()函数,只不过pthread_cond_wait()这个方法要执行其后的语句,必须遇到下一个阻塞(也就是pthread_cond_wait()方法时),才执行唤醒后的其后语句. 代码如下: #include<stdio.h> #include<unistd.h> #include<stdlib.h>

互斥量和条件变量的区别

互斥量与条件变量的区别 转载自:http://www.360doc.com/content/12/0129/10/1317564_182456205.shtml 前面谈过了线程锁,下面我们在继续研究一下线程锁: 互斥量从本质上来说是一个锁,对互斥量加锁后任何其他试图给它加锁的线程都会被阻塞直至当前线程释放互斥量. 同样在设计时需要规定所有的线程必须遵守相同的数据访问规则,只有这样互斥机制才能正常工作(只要是锁都这样,这是锁工作的本质) 互斥量用pthread_mutex_t 数据类型表示,在使用

使用互斥量和条件变量实现线程同步控制

管程(monitor)说明 在并发编程中,管程(monitor)是一个同步构件,管程实现了同一时间点,最多只有一个线程可以执行管程的某个子程序.与那些通过修改数据结构实现互斥访问的并发程序设计相比,管程的实现很大程度上简化了程序设计. 管程可以确保一次只有一个进程执行管程中的程序,因此程序员不需要显式地编写同步代码,但是如果需要就某些特定条件上的同步,则需要定义一些条件结构(condition variable)来实现,并且对条件变量的操作仅有wait()和signal(),如下: condit

Linux多线程同步之互斥量和条件变量

1. 什么是互斥量 互斥量从本质上说是一把锁,在访问共享资源前对互斥量进行加锁,在访问完成后释放互斥量上的锁.对互斥量进行加锁以后,任何其他试图再次对互斥量加锁的线程将会被阻塞直到当前线程释放该互斥锁.如果释放互斥锁时有多个线程阻塞,所以在该互斥锁上的阻塞线程都会变成可进行状态,第一个变成运行状态的线程可以对互斥量加锁,其他线程在次被阻塞,等待下次运行状态. pthread_mutex_t 就是POSIX对于mutex的实现. 函数名 参数 说明 pthread_mutex_init pthre