Linux组件封装(五)一个生产者消费者问题示例

生产者消费者问题是计算机中一类重要的模型,主要描述的是:生产者往缓冲区中放入产品、消费者取走产品。生产者和消费者指的可以是线程也可以是进程。

生产者消费者问题的难点在于:

为了缓冲区数据的安全性,一次只允许一个线程进入缓冲区,它就是所谓的临界资源。

生产者往缓冲区放物品时,如果缓冲区已满,那么需要等待,一直到消费者取走产品为止。

消费者取走产品时,如果没有物品,需要等待,一直到有生产者放入为止。

第一个问题属于互斥问题,我们需要使用一把互斥锁,来实现对缓冲区的安全访问。

后两个属于同步问题,两类线程相互协作,需要两个条件变量,一个用于通知生产者放入产品,一个用来通知消费者取走物品。

生产者线程的大概流程是:

1.加锁

2.如果缓冲区已满,则等待。

3.放入物品

4.解锁

5.通知消费者,可以取走产品

消费者的逻辑恰好相反:

1.加锁

2.缓冲区为空,等待

3.取走物品

4.解锁

5.通知生产者,可以放入物品

我们设计出一个缓冲区:

#ifndef BUFFER_H_
#define BUFFER_H_

#include "NonCopyable.h"
#include "MutexLock.h"
#include "Condition.h"
#include <queue>

class Buffer : NonCopyable
{
public:
    Buffer(size_t size);
    void push(int val);
    int pop();

    bool empty() const;
    size_t size() const;

private:
    mutable MutexLock mutex_;
    Condition full_;
    Condition empty_;

    size_t size_; //缓冲区的大小
    std::queue<int> queue_;
};

#endif //BUFFER_H_

这里注意,我们把同步与互斥的操作都放入Buffer中,使得生产者和消费者线程不必考虑其中细节,这符号软件设计的“高内聚、低耦合”原则。

还有一点,mutex被声明为mutable类型,意味着mutex在const函数中仍然可以被改变,这是符合程序的逻辑的。把mutex声明为mutable,是一种标准实践。

重点是push和pop的实现:

void Buffer::push(int val)
{
    //lock
    //wait
    //push
    //notify
    //lock
    {
        MutexLockGuard lock(mutex_);
        while(queue_.size() >= size_)
            empty_.wait();
        queue_.push(val);
    }
    full_.notify();
}

int Buffer::pop()
{
    int temp = 0;
    {
        MutexLockGuard lock(mutex_);
        while(queue_.empty())
            full_.wait();
        temp = queue_.front();
        queue_.pop();
    }
    empty_.notify();

    return temp;
}

这里注意:

1.条件变量的等待必须使用while,这是一种最佳实践,原因可见Condition的封装Linux组件封装(二)中条件变量Condition的封装

2.可以先notify再解锁,也可以先解锁。不过推荐先解锁,原因是如果先notify,唤醒一个线程B,但是还未解锁,此时如果线程切换至刚唤醒的线程B,B马上尝试lock,但是肯定失败,然后阻塞,这增加了一次线程切换的开销

我们还可以继续封装,将缓冲区与多个生产者、消费者封装成一个车间类,如下:

#ifndef WORKSHOP_H_
#define WORKSHOP_H_

#include "NonCopyable.h"
#include "Buffer.h"
#include <vector>

class ProducerThread;
class ConsumerThread;

class WorkShop : NonCopyable
{
public:
    WorkShop(size_t bufferSize,
             size_t producerSize,
             size_t consumerSize);
    ~WorkShop();

    void startWorking();
private:
    size_t bufferSize_;
    Buffer buffer_;

    size_t producerSize_;
    size_t consumerSize_;
    std::vector<ProducerThread*> producers_;
    std::vector<ConsumerThread*> consumers_;
};

#endif //WORKSHOP_H_

这样就可以方便的指定线程的数目。

 

完整的项目代码请参见这里:生产者消费者完整代码

时间: 2024-10-10 16:37:23

Linux组件封装(五)一个生产者消费者问题示例的相关文章

Linux组件封装之五:生产者消费者问题

生产者,消费者问题是有关互斥锁(MutexLock).条件变量(Condition).线程(Thread)的经典案例: 描述的问题可以叙述为 生产者往buffer中投放产品,而消费者则从buffer中消费产品. 生产着消费者问题的难点在于: 为了缓冲区数据的安全性,一次只允许一个线程进入缓冲区投放或者消费产品,这个buffer就是所谓的临界资源. 生产者往缓冲区中投放产品时,如果缓冲区已满,那么该线程需要等待,即进入阻塞状态,一直到消费者取走产品为止. 相应的,消费者欲取走产品,如果此时缓冲区为

Linux组件封装(五) WorkShop的封装

我们封装好了Buffer后,却不知道具体要多少个线程,一个一个线程的去关联Buffer太繁琐了. 那么,我们是不是可以讲Buffer与线程的一个队列封装在一起呢 ? 由于vector中不能存放Thread,所以,我们应在vector中添加相应的Thread *,这样,就可以完成封装了. 声明代码如下: 1 #ifndef WORKSHOP_H 2 #define WORKSHOP_H 3 #include "NonCopyable.h" 4 #include "Buffer.

Linux组件封装(七)——线程池的简单封装

线程池的封装,基础思想与生产者消费者的封装一样,只不过我们是将线程池封装为自动获取任务.执行任务,让用户调用相应的接口来添加任务. 在线程池的封装中,我们同样需要用到的是MutexLock.Condition.Thread这些基本的封装. 基础封装如下: MutexLock: 1 #ifndef MUTEXLOCK_H 2 #define MUTEXLOCK_H 3 4 #include "NonCopyable.h" 5 #include <pthread.h> 6 #i

Linux组件封装(六)——定时器的简单封装

在Linux中,有一种简单的定时器——timerfd,它通过查看fd是否可读来判断定时器时候到时. timerfd中常用的函数有timerfd_create.timerfd_settime.timerfd_gettime,这些函数都相对简单,我们可以到man手册来查看用法. 值得注意的是:create中的参数CLOCK_REALTIME是一个相对时间,我们可以通过调整系统时间对其进行调整,而CLOCK_MONOTIC是一个绝对时间,系统时间的改变不会影响它.在create中,flags一般设置为

Linux多线程之同步2 &mdash;&mdash; 生产者消费者模型

思路 生产者和消费者(互斥与同步).资源用队列模拟(要上锁,一个时间只能有一个线程操作队列). m个生产者.拿到锁,且产品不满,才能生产.当产品满,则等待,等待消费者唤醒.当产品由空到不空,通知消费者.n个消费者.拿到锁,且有产品,才能消费.当产品空,则等待,等待生产者唤醒.当产品由满到不满,通知生产者.    生产者条件:队列不满消费者条件:队列不空因此有两个条件变量. 代码 /**********************************************************

五、生产者消费者模型

1.生产者消费者模型作用和示例如下:1)通过平衡生产者的生产能力和消费者的消费能力来提升整个系统的运行效率 ,这是生产者消费者模型最重要的作用2)解耦,这是生产者消费者模型附带的作用,解耦意味着生产者和消费者之间的联系少,联系越少越可以独自发展而不需要收到相互的制约备注:对于生产者消费者模型的理解将在并发队列BlockingQueue章节进行说明,本章不做详细介绍. package threadLearning.productCustomerModel; /* wait/notify 机制:以资

Linux组件封装之二:Condition

本博文讨论Linux中的条件变量Condition的封装: 条件变量Condition 主要描述的是 线程间 的同步,即协作关系. Linux中的条件变量通常涉及以下几个函数: int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr); int pthread_cond_signal(pthread_cond_t *cond); int pthread_cond_broadcast(pthread_con

Linux组件封装(四) Buffer的封装

这里,我们需要将缓冲区封装起来,然后让缓冲区与线程想连接,所以我们需要一个相应的接口. 在Buffer中,我们需要想对应的一把锁与两个条件变量. 当满足队列为空时,消费者等待,反之,生产者等待. Buffer的声明如下: 1 #ifndef BUFFER_H 2 #define BUFFER_H 3 4 #include "NonCopyable.h" 5 #include "MutexLock.h" 6 #include "Condition.h&quo

Linux组件封装(八)——Socket的封装

我们要封装Socket,首先我们需要了解Socket需要哪些要素: 1) 首先,一个套接字创建后,需要绑定一块网卡的IP,以及连接的对口号,所以我们先封装InetAddr. 在class中,仅有的一个私有成员就是struct sockaddr_in类型的一个对象,我们需要将该对象的几种赋值与创建封装到类中,这样,我们仅需传递相应的IP与port即可获得一个addr. 在这里,我们为了方便获得该addr的IP及port,封装几个将addr转化为IP及port的函数,这样我们仅需调用函数即可. 然后