Unix IPC之互斥锁与条件变量

互斥锁

1、函数声明

#include <pthread.h>

/* Mutex handling.  */

/* Initialize a mutex.  */
extern int pthread_mutex_init (pthread_mutex_t *__mutex,
                   __const pthread_mutexattr_t *__mutexattr)
     __THROW __nonnull ((1));

/* Destroy a mutex.  */
extern int pthread_mutex_destroy (pthread_mutex_t *__mutex)
     __THROW __nonnull ((1));

/* Try locking a mutex.  */
extern int pthread_mutex_trylock (pthread_mutex_t *__mutex)
     __THROW __nonnull ((1));

/* Lock a mutex.  */
extern int pthread_mutex_lock (pthread_mutex_t *__mutex)
     __THROW __nonnull ((1));
/* Unlock a mutex.  */
extern int pthread_mutex_unlock (pthread_mutex_t *__mutex)
     __THROW __nonnull ((1));

2、函数使用

pthread_mutex_lock(mutex);
// 临界区
// do something....
// 临界区
pthread_mutex_unlock(mutex);

/**
 * 如果尝试给一个已由另外某个线程锁住的互斥锁上锁
 * pthread_mutex_lock将阻塞,直到该互斥锁解锁为止
 * pthread_mutex_trylock是对应的非阻塞函数,若互斥锁已锁住,则立即返回一个EBUSY错误
 */

3、测试用例:

/* include main */
#include    "unpipc.h"

#define MAXNITEMS       1000000
#define MAXNTHREADS         100

int     nitems;         /* read-only by producer and consumer */
struct
{
    pthread_mutex_t   mutex;
    int   buff[MAXNITEMS];
    int   nput; // 记录已写条目数目
    int   nval;
} shared = { PTHREAD_MUTEX_INITIALIZER };

void    *produce(void *);
void    *consume(void *);

int main(int argc, char **argv)
{
    int         i, nthreads, count[MAXNTHREADS];
    pthread_t   tid_produce[MAXNTHREADS]; // 多生产者线程
    pthread_t   tid_consume; // 单消费者线程

    if (argc != 3)
        err_quit("usage: prodcons2 <#items> <#threads>");
    nitems = min(atoi(argv[1]), MAXNITEMS);
    nthreads = min(atoi(argv[2]), MAXNTHREADS);

    /* 最好调用pthread_setconcurrency函数 */
    Set_concurrency(nthreads); // 设置并行级别,大部分系统中该函数并没有什么作用
    /* 4start all the producer threads */
    for (i = 0; i < nthreads; i++)
    {
        count[i] = 0;
        Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
    }

    /* 4wait for all the producer threads */
    for (i = 0; i < nthreads; i++)
    {
        Pthread_join(tid_produce[i], NULL);
        printf("count[%d] = %d\n", i, count[i]);
    }

    /* 4start, then wait for the consumer thread */
    Pthread_create(&tid_consume, NULL, consume, NULL);
    Pthread_join(tid_consume, NULL);

    exit(0);
}
/* end main */

/* include producer */
void *produce(void *arg)
{
    for ( ; ; )
    {
        Pthread_mutex_lock(&shared.mutex);
        if (shared.nput >= nitems)
        {
            Pthread_mutex_unlock(&shared.mutex);
            return(NULL);       /* array is full, we‘re done */
        }
        shared.buff[shared.nput] = shared.nval;
        shared.nput++;
        shared.nval++;
        Pthread_mutex_unlock(&shared.mutex);
        *((int *) arg) += 1; // 每个线程修改各自的元素
    }
}

void *consume(void *arg)
{
    int     i;

    for (i = 0; i < nitems; i++)
    {
        if (shared.buff[i] != i)
            printf("buff[%d] = %d\n", i, shared.buff[i]);
    }
    return(NULL);
}
/* end producer */

条件变量

互斥锁用于上锁,条件变量用于等待,这是两种不同类型的同步。

1、函数声明

#include  <pthread.h>

/* Wake up one thread waiting for condition variable COND.  */
extern int pthread_cond_signal (pthread_cond_t *__cond)
     __THROW __nonnull ((1));

/* Wake up all threads waiting for condition variables COND.  */
extern int pthread_cond_broadcast (pthread_cond_t *__cond)
     __THROW __nonnull ((1));

/* Wait for condition variable COND to be signaled or broadcast.
   MUTEX is assumed to be locked before.

   This function is a cancellation point and therefore not marked with
   __THROW.  */
extern int pthread_cond_wait (pthread_cond_t *__restrict __cond,
                  pthread_mutex_t *__restrict __mutex)
     __nonnull ((1, 2));

/* Wait for condition variable COND to be signaled or broadcast until
   ABSTIME.  MUTEX is assumed to be locked before.  ABSTIME is an
   absolute time specification; zero is the beginning of the epoch
   (00:00:00 GMT, January 1, 1970).

   This function is a cancellation point and therefore not marked with
   __THROW.  */
extern int pthread_cond_timedwait (pthread_cond_t *__restrict __cond,
                   pthread_mutex_t *__restrict __mutex,
                   __const struct timespec *__restrict
                   __abstime) __nonnull ((1, 2, 3));

2、函数使用

struct
{
    pthread_mutex_t   mutex;
    pthread_cond_t    cond;
    // ...
} var = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };

/**
 * 给条件变量发送信号代码
 */
Pthread_mutex_lock(&var.mutex);
if(condition0 == true)
{
    Pthread_cond_signal(&var.cond);
}
// do something...
pthread_mutex_unlock(&var.mutex);

/**
 * 测试条件变量
 */
Pthread_mutex_lock(&var.mutex);
while(condition1 == false) // 防止接收到错误信号
{
    Pthread_cond_wait(&var.cond, &var.mutex);
}
// do something...
pthread_mutex_unlock(&var.mutex);

关于Pthread_cond_wait(&var.cond, &var.mutex)函数的说明

The mutex passed to pthread_cond_wait protects the condition.The caller passes it locked to the function, which then atomically places them calling thread on the list of threads waiting for the condition and unlocks the mutex. This closes the window between the time that the condition is checked and the time that the thread goes to sleep waiting for the condition to change, so that the thread doesn‘t miss a change in the condition. When pthread_cond_wait returns, the mutex is again locked.

上面是APUE的原话,就是说pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)函数传入的参数mutex用于保护条件,因为我们在调用pthread_cond_wait时,如果条件不成立我们就进入阻塞,但是进入阻塞这个期间,如果条件变量改变了的话,那我们就漏掉了这个条件。因为这个线程还没有放到等待队列上,所以调用pthread_cond_wait前要先锁互斥量,即调用pthread_mutex_lock(),pthread_cond_wait在把线程放进阻塞队列后,自动对mutex进行解锁,使得其它线程可以获得加锁的权利。这样其它线程才能对临界资源进行访问并在适当的时候唤醒这个阻塞的进程。当pthread_cond_wait返回的时候又自动给mutex加锁。

// 函数执行期间锁的调用(伪码)
lock(mutex)   ----------------a.lock

pthread_cond_wait()
{
    unlock(mutex)-------------a.unlock
    if (条件不满足)
      suspend();
    else
    {
      lock(mutex)-------------b.lock
      return
    }
}

dosomething();

unlock(mutex);---------------b.unlock

3、测试用例

/* include globals */
#include    "unpipc.h"

#define MAXNITEMS       1000000
#define MAXNTHREADS         100

/* globals shared by threads */
int     nitems;             /* read-only by producer and consumer */
int     buff[MAXNITEMS];
struct
{
    pthread_mutex_t   mutex;
    int               nput;   /* next index to store */
    int               nval;   /* next value to store */
} put = { PTHREAD_MUTEX_INITIALIZER };

struct
{
    pthread_mutex_t   mutex;
    pthread_cond_t    cond;
    int               nready; /* number ready for consumer */
} nready = { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };
/* end globals */

void    *produce(void *);
void    *consume(void *);

/* include main */
int
main(int argc, char **argv)
{
    int         i, nthreads, count[MAXNTHREADS];
    pthread_t   tid_produce[MAXNTHREADS], tid_consume;

    if (argc != 3)
        err_quit("usage: prodcons6 <#items> <#threads>");
    nitems = min(atoi(argv[1]), MAXNITEMS);
    nthreads = min(atoi(argv[2]), MAXNTHREADS);

    Set_concurrency(nthreads + 1);
    /* 4create all producers and one consumer */
    for (i = 0; i < nthreads; i++)
    {
        count[i] = 0;
        Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
    }
    Pthread_create(&tid_consume, NULL, consume, NULL);

    /* wait for all producers and the consumer */
    for (i = 0; i < nthreads; i++)
    {
        Pthread_join(tid_produce[i], NULL);
        printf("count[%d] = %d\n", i, count[i]);
    }
    Pthread_join(tid_consume, NULL);

    exit(0);
}
/* end main */

/* include prodcons */
void *
produce(void *arg)
{
    for ( ; ; )
    {
        Pthread_mutex_lock(&put.mutex);
        if (put.nput >= nitems)
        {
            Pthread_mutex_unlock(&put.mutex);
            return(NULL);       /* array is full, we‘re done */
        }
        buff[put.nput] = put.nval;
        put.nput++;
        put.nval++;
        Pthread_mutex_unlock(&put.mutex);

        /* 同步生产者与消费者线程 */
        Pthread_mutex_lock(&nready.mutex);
        if (nready.nready == 0)
        {
            // 发送信号,系统调用等待在nready.cond上的线程
            // 该线程开始运行,但是立即停止,因为无法获取nready.mutex锁
            Pthread_cond_signal(&nready.cond);
        }

        nready.nready++;
        Pthread_mutex_unlock(&nready.mutex);

        *((int *) arg) += 1;
    }
}

void *
consume(void *arg)
{
    int     i;

    for (i = 0; i < nitems; i++)
    {
        Pthread_mutex_lock(&nready.mutex);
        while (nready.nready == 0) // 当产品数目为0时才会有线程阻塞,此时采用必要发送条件信号
        {
            Pthread_cond_wait(&nready.cond, &nready.mutex);
        }

        nready.nready--;
        Pthread_mutex_unlock(&nready.mutex);

        if (buff[i] != i)
            printf("buff[%d] = %d\n", i, buff[i]);
    }
    return(NULL);
}
/* end prodcons */

时间: 2024-10-07 18:16:13

Unix IPC之互斥锁与条件变量的相关文章

UNIX网络编程:互斥锁和条件变量

在网络编程中,一般都是多线程的编程,这就出现了一个问题:数据的同步与共享.而互斥锁和条件变量就是为了允许在线程或进程间共享数据.同步的两种最基本的组成部分.它们总能够用来同步一个进程中的多个线程. 再进入互斥锁和条件变量之前,我们先对多线程的一些相关函数进行简单介绍: 多线程简单介绍和相关函数: 通常,一个进程中包括多个线程,每个线程都是CPU进行调度的基本单位,多线程可以说是在共享内存空间中并发地多道执行程序,与进程相比,线程的具有以下优点: ● 减少系统调度开销.由于线程基本不拥有资源,因此

线程的互斥锁和条件变量通信机制

1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <time.h> 4 #include <pthread.h> 5 6 #define BUFFER_SIZE 2 7 struct prodcons 8 { 9 int buffer[BUFFER_SIZE]; 10 pthread_mutex_t lock; 11 int readpos,writepos; 12 pthread_cond_t no

Linux互斥锁、条件变量和信号量

Linux互斥锁.条件变量和信号量  来自http://kongweile.iteye.com/blog/1155490 博客分类: Linux sem_init:初始化信号量sem_t,初始化的时候可以指定信号量的初始值,以及是否可以在多进程间共享.sem_wait:一直阻塞等待直到信号量>0.sem_timedwait:阻塞等待若干时间直到信号量>0.sem_post:使信号量加1.sem_destroy:释放信号量.和sem_init对应. 进行多线程编程,最应该注意的就是那些共享的数据

linux 线程的同步 二 (互斥锁和条件变量)

互斥锁和条件变量 为了允许在线程或进程之间共享数据,同步时必须的,互斥锁和条件变量是同步的基本组成部分. 1.互斥锁 互斥锁是用来保护临界区资源,实际上保护的是临界区中被操纵的数据,互斥锁通常用于保护由多个线程或多进程分享的共享数据.一般是一些可供线程间使用的全局变量,来达到线程同步的目的,即保证任何时刻只有一个线程或进程在执行其中的代码.一般加锁的轮廓如下: pthread_mutex_lock() 临界区 pthread_mutex_unlock() 互斥锁API pthread_mutex

多线程之互斥锁、条件变量

多线程 一个进程在同一时刻只能做一件事,而多个线程却可以同时执行,每个线程处理各自独立的任务.多线程有很多好处: 简化处理异步事件的代码 实现内存和文件描述符的共享 改善程序的吞吐量 改善响应时间 互斥锁 互斥锁:互斥锁通过锁机制来实现线程间的同步,在同一时刻通常只允许一个关键部分的代码 当多个线程控制相同的内存时,对于读写操作的时间差距就有可能会导致数据的不同步,下图就很清晰的说明了这种情况: 对于线程A.B此时对同一块内存进行操作,但是由于操作几乎是同时进行的,假设当线程A读入数据i之后,在

互斥锁和条件变量(pthread)相关函数

互斥锁 #include <pthread.h> // 若成功返回0,出错返回正的Exxx值 // mptr通常被初始化为PTHREAD_MUTEX_INITIALIZER int pthread_mutex_lock(pthread_mutex_t *mptr); int pthread_mutex_trylock(pthread_mutex_t *mptr); // pthread_mutex_lock 函数的非阻塞模式 int pthread_mutex_unlock(pthread_m

互斥锁和条件变量的结合使用

互斥锁一个明显的缺点是他只有两种状态:锁定和非锁定.而条件变量通过允许线程阻塞和等待另一个线程发送信号的方法弥补了互斥锁的不足,他常和互斥锁一起使用.使用时,条件变量被用来阻塞一个线程,当条件不满足时,线程往往解开相应的互斥锁并等待条件发生变化.一旦其他的某个线程改变了条件变量,他将通知相应的条件变量唤醒一个或多个正被此条件变量阻塞的线程.这些线程将重新锁定互斥锁并重新测试条件是否满足.一般说来,条件变量被用来进行线承间的同步. 对于条件锁,通常配合一个互斥锁一起使用,以防止多个线程同时请求pt

【转载】同步和互斥的POSIX支持(互斥锁,条件变量,自旋锁)

上篇文章也蛮好,线程同步之条件变量与互斥锁的结合: http://www.cnblogs.com/charlesblc/p/6143397.html 现在有这篇文章: http://blog.csdn.net/goodluckwhh/article/details/8564319 POSIX定义了一系列同步对象用于同步和互斥.同步对象是内存中的变量属于进程中的资源,可以按照与访问数据完全相同的方式对其进行访问.默认情况下POSIX定义的这些同步对象具有进程可见性,即同步对象只对定义它的进程可见:

Linux下多线程编程之互斥锁、条件变量、信号量

1.进程创建 int pthread_create (pthread_t * thread_id, __const pthread_attr_t * __attr, void *(*__start_routine) (void *), void *__restrict __arg); 第一个参数为指向线程标识符的指针,第二个参数用来设置线程属性,第三个参数是线程运行函数的起始地址,最后一个参数是运行函数的参数. 一个实例: void *producer(void *args); pthread_