Linux多线程实践(8) --Posix条件变量解决生产者消费者问题

Posix条件变量

int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
int pthread_cond_destroy(pthread_cond_t *cond);

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond,  pthread_mutex_t  *mutex,  const  struct timespec *abstime);

int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

与互斥锁不同,条件变量是用来等待而不是用来上锁的。条件变量用来自动阻塞调用线程, 直到条件变量所要求的情况发生为止。通常条件变量需要和互斥锁同时使用, 利用互斥量保护条件变量;

条件的检测是在互斥锁的保护下进行的。如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁。如果另一个线程改变了条件,它就发送信号给关联的条件变量, 并唤醒一个或多个等待在该条件变量上的线程,这些线程将重新获得互斥锁,重新评价条件。如果将条件变量放到共享内存中, 而两进程可共享读写这段内存,则条件变量可以被用来实现两进程间的线程同步。

条件变量使用规范

1.等待条件代码

pthread_mutex_lock(&mutex);

while (条件为假)
{
    pthread_cond_wait(&cond, &mutex);
}
修改条件

pthread_mutex_unlock(&mutex);

/**解释: 为什么使用while, 而不用if?

Man-Page给出了答案: If a signal is delivered to a thread waiting for a condition variable, upon return from

the signal handler the thread resumes waiting for the condition variable as if it was not interrupted, or

it shall return zero due to spurious wakeup.

即是说如果正在等待条件变量的一个线程收到一个信号,从信号处理函数返回的时候线程应该重新等待条件变量就好象没有被中断一样,或者被虚假地唤醒返回0。如果是上述情形,那么其实条件并未被改变,那么此时如果没有继续判断一下条件的真假就继续向下执行的话,修改条件将会出现问题,所以需要使用while 循环再判断一下,如果条件还是为假必须继续等待。

注:在多处理器系统中,pthread_cond_signal 可能会唤醒多个等待条件的线程,这也是一种spurious wakeup。

**/

2.给条件发送信号代码

pthread_mutex_lock(&mutex);

设置条件为真
pthread_cond_signal(&cond);

pthread_mutex_unlock(&mutex);

条件变量API说明

1.pthread_cond_init

使用条件变量之前要先进行初始化:可以在单个语句中生成和初始化一个条件变量如:

pthread_cond_t my_condition=PTHREAD_COND_INITIALIZER; //用于进程间线程的通信;

或用函数pthread_cond_init进行动态初始化;

2.pthread_cond_destroy

该函数可以用来摧毁所指定的条件变量,同时将会释放所给它分配的资源。调用该函数的进程并不要求等待在参数所指定的条件变量上;

3.pthread_cond_wait && pthread_cond_timedwait

cond_wait原语完成三件事:

(1)对mutex解锁;

(2)等待条件, 直到有线程向他发送通知;

(3)当wait返回时, 再对mutex重新加锁;

第一个参数cond是指向一个条件变量的指针。第二个参数mutex则是对相关的互斥锁的指针。

函数pthread_cond_timedwait函数类型与函数pthread_cond_wait区别在于:timedwait多了一个超时, 超时值制订了我们愿意等待多长时间, 如果达到或是超过所引用的参数*abstime,它将结束阻塞并返回错误ETIME.

//timespec结构如下:
struct timespec
{
    time_t   tv_sec;        /* seconds */
    long     tv_nsec;       /* nanoseconds */
};

注意: 这个时间值是一个绝对数而不是相对数, 例如, 假设愿意等待三秒钟, 那么并不是把3秒钟转换成timespec结构, 而是需要将当前实践加上3分钟再转换成timespec结构, 这个获取当前时间值的函数可以是clock_gettime(我们采用这一个)也可以是gettimeofday.

4.pthread_cond_signal && pthread_cond_broadcast

cond_signal原语所完成的操作:

向第一个等待条件的线程发起通知, 如果没有任何一个线程处于等待条件的状态, 那么这个通知将被忽略;

cond_broadcast:

向所有等待在该条件上的线程发送通知;

参数cond是一个条件变量的指针。当调用signal时, 一个在相同条件变量上阻塞的线程将被解锁。如果同时有多个线程阻塞,则由调度策略确定接收通知的线程。如果调用broadcast,则将通知阻塞在这个条件变量上的所有线程。一旦被唤醒,线程仍然会要求互斥锁。如果当前没有线程等待通知,则上面两种调用实际上成为一个空操作, 内核会将条件变量的通知忽略(如果参数*cond指向非法地址,则返回值EINVAL);

类Condition封装

//Condition类设计
class Condition
{
public:
    Condition(const pthread_mutexattr_t *mutexAttr = NULL,
              const pthread_condattr_t  *condAttr = NULL);
    ~Condition();

    //条件变量函数
    int signal();
    int broadcast();
    int wait();
    int timedwait(int seconds);

    //互斥量函数
    int lock();
    int trylock();
    int unlock();

private:
    pthread_mutex_t m_mutex;
    pthread_cond_t  m_cond;
};
//Condition类实现
Condition::Condition(const pthread_mutexattr_t *mutexAttr,
                     const pthread_condattr_t  *condAttr)
{
    //初始化互斥量
    pthread_mutex_init(&m_mutex, mutexAttr);
    //初始化条件变量
    pthread_cond_init(&m_cond, condAttr);
}
Condition::~Condition()
{
    //销毁互斥量
    pthread_mutex_destroy(&m_mutex);
    //销毁条件变量
    pthread_cond_destroy(&m_cond);
}
int Condition::signal()
{
    return pthread_cond_signal(&m_cond);
}
int Condition::broadcast()
{
    return pthread_cond_broadcast(&m_cond);
}
int Condition::wait()
{
    return pthread_cond_wait(&m_cond, &m_mutex);
}
int Condition::timedwait(int seconds)
{
    //获取当前时间
    struct timespec abstime;
    clock_gettime(CLOCK_REALTIME, &abstime);
    //将当前时间加上需要等待的秒数, 构成绝对时间值
    abstime.tv_sec += seconds;
    return pthread_cond_timedwait(&m_cond, &m_mutex, &abstime);
}

int Condition::lock()
{
    return pthread_mutex_lock(&m_mutex);
}
int Condition::trylock()
{
    return pthread_mutex_trylock(&m_mutex);
}
int Condition::unlock()
{
    return pthread_mutex_unlock(&m_mutex);
}

生产者消费者问题(无界缓冲区)

/** 实现: 我们假设是缓冲区是无界的
说明:生产者可以不停地生产,使用pthread_cond_signal  发出通知的时候,如果此时没有消费者线程在等待条件,那么这个通知将被丢弃,但也不影响整体代码的执行,没有消费者线程在等待,说明产品资源充足,即while 判断失败,不会进入等待状态,直接消费产品(即修改条件)。
**/
const unsigned int PRODUCER_COUNT = 5;	//生产者个数
const unsigned int CONSUMER_COUNT = 3;	//消费者个数

//定义Condition类
Condition cond;
//缓冲区 ~O(∩_∩)O~
int nReady = 0;
//消费者
void *consumer(void *args)
{
    int id = *(int *)args;
    delete (int *)args;
    while (true)
    {
        cond.lock();    //锁定mutex
        while (!(nReady > 0))
        {
            printf("-- thread %d wait...\n", id);
            cond.wait();    //等待条件变量
        }

        printf("** thread %d alive, and consume product %d ...\n", id, nReady);
        -- nReady;  //消费
        printf("   thread %d end consume... \n\n", id);

        cond.unlock();  //解锁mutex
        sleep(1);
    }
    pthread_exit(NULL);
}

//生产者
void *producer(void *args)
{
    int id = *(int *)args;
    delete (int *)args;
    while (true)
    {
        cond.lock();    //锁定mutex

        printf("++ thread %d signal, and produce product %d ...\n", id, nReady+1);
        ++ nReady;      //生产
        cond.signal();  //发送条件变量信号
        printf("   thread %d end produce, signal...\n\n", id);
        cond.unlock();  //解锁mutex
        sleep(1);
    }
    pthread_exit(NULL);
}

int main()
{
    pthread_t thread[PRODUCER_COUNT+CONSUMER_COUNT];

    //首先生成消费者
    for (unsigned int i = 0; i < CONSUMER_COUNT; ++i)
        pthread_create(&thread[i], NULL, consumer, new int(i));
    sleep(1);   //使生产者等待一段时间, 加速消费者等待事件产生
    //然后生成生产者
    for (unsigned int i = 0; i < PRODUCER_COUNT; ++i)
        pthread_create(&thread[CONSUMER_COUNT+i], NULL, producer, new int(i));
    for (unsigned int i = 0; i < PRODUCER_COUNT+CONSUMER_COUNT; ++i)
        pthread_join(thread[i], NULL);
}

时间: 2024-10-12 15:40:46

Linux多线程实践(8) --Posix条件变量解决生产者消费者问题的相关文章

Linux多线程实践(6) --Posix读写锁解决读者写者问题

Posix读写锁 int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr); int pthread_rwlock_destroy(pthread_rwlock_t *rwlock); int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); int pthread_rwlock_wrlock(pthre

使用条件量解决生产者消费者问题

在linux多线程同步中,除了互斥量以外,pthread提供了另一种同步机制:条件变量.正如名字一样,条件量允许线程由于一些未达到的条件而阻塞. 条件变量与互斥量经常一起使用.这种模式用于让一个线程锁住一个变量,然后当它不能获得它期待的结果时等待一个条件变量.最后另一个线程会向他发出信号,使它可以继续执行.pthread_cond_wait原子性地调用并解锁它持有的互斥量.由于这个原因,互斥量是参数之一. 下面通过代码演示如何通过条件量来解决生产者消费者问题. #include<stdio.h>

Linux多线程实践(5) --Posix信号量与互斥量解决生产者消费者问题

Posix信号量 Posix 信号量 有名信号量 无名信号量 sem_open sem_init sem_close sem_destroy sem_unlink sem_wait sem_post 有名信号量 #include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ #include <semaphore.h> sem_t *sem_open(co

POSIX 使用互斥量和条件变量实现生产者/消费者问题

boost的mutex,condition_variable非常好用.但是在Linux上,boost实际上做的是对pthread_mutex_t 和pthread_cond_t的一系列的封装.因此通过对原生态的POSIX 的mutex,cond的生成者,消费者的实现,我们可以再次体会boost带给我们的便利. 1. 什么是互斥量 互斥量从本质上说是一把锁,在访问共享资源前对互斥量进行加锁,在访问完成后释放互斥量上的锁.对互斥量进行加锁以后,任何其他试图再次对互斥量加锁的线 程将会被阻塞直到当前线

Linux下用环形buf以及POSIX版本信号量解决生产者消费者问题

一.Semaphore(信号量) Mutex变量是非0即1的,可看作一种资源的可用数量,初始化时Mutex是1,表示有一个可用资源, 加锁时获得该资源,将Mutex减到0,表示不再有可用资源,解锁时释放该资源,将Mutex重新加到1,表示又有了一个可用资源. 信号量(Semaphore)和Mutex类似,表示可用资源的数量,和Mutex不同的是这个数量可以大于1. 即,如果信号量描述的资源数目是1时,此时的信号量和互斥锁相同! 本次使用的是POSIX semaphore库函数,这种信号量不仅可以

线程同步和互斥(条件变量控制生产者消费者模型)

条件变量 生产者消费者模型: 关系:      同步 生产者<----->消费者    互斥 互斥 生产者<----->生产者       互斥 消费者<----->消费者   场所:   缓冲区,下文以链表方式实现 1.单个生产者,单个消费者,且生产者和消费者访问链表的顺序是LIFO的 代码实现: #include<stdio.h> #include<pthread.h> pthread_mutex_t _mutex_lock=PTHREAD_

POSIX条件变量

一.条件变量 当一个线程互斥地访问某个变量时,可能发现在其他线程改变状态之前,它什么也做不了.例如,一个线程访问队列时,发现队列为空,它只能等待,直到其他线程将一个节点添加到队列中,这种情况需要条件变量. 条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待"条件变量的条件成立"而挂起:另一个线程使"条件成立"(给出条件成立信号).为了防止竞争,条件变量的使用总是和一个互斥锁结合在一起. 条件变量通过允许线程阻塞和等待另一个线程发送信

转载~kxcfzyk:Linux C语言多线程库Pthread中条件变量的的正确用法逐步详解

Linux C语言多线程库Pthread中条件变量的的正确用法逐步详解 多线程c语言linuxsemaphore条件变量 (本文的读者定位是了解Pthread常用多线程API和Pthread互斥锁,但是对条件变量完全不知道或者不完全了解的人群.如果您对这些都没什么概念,可能需要先了解一些基础知识) 关于条件变量典型的实际应用,可以参考非常精简的Linux线程池实现(一)——使用互斥锁和条件变量,但如果对条件变量不熟悉最好先看完本文. Pthread库的条件变量机制的主要API有三个: int p

linux网络编程-----&gt;线程同步--&gt;条件变量

开发使用多线程过程中, 不可避免的会出现多个线程同时操作同一块共享资源, 当操作全部为读时, 不会出现未知结果, 一旦当某个线程操作中有写操作时, 就会出现数据不同步的事件. 而出现数据混乱的原因: 资源共享(独享资源则不会) 调试随机(对数据的访问会出现竞争) 线程间缺少必要的同步机制 以上三点, 前两点不能被改变. 欲提高效率, 传递数据, 资源必须共享. 只要资源共享, 就一定会出现线程间资源竞争, 只要存在竞争关系, 数据就会出现混乱. 所以只能从第三点着手, 使多个线程在访问共享资源的