[多线程] 生产者消费者模型的BOOST实现

说明

如果 使用过程中有BUG 一定要告诉我:在下面留言或者给我邮件(sawpara at 126 dot com)

使用boost::thread库来实现生产者消费者模型中的缓冲区!

  • 仓库内最多可以存放 capacity 个产品。
  • 条件变量 condition_put 标记是否可以往仓库中存放一个产品。
  • 条件变量 condition_get 标记是否可以从仓库中取出一个产品。
  • 互斥量 mutexer 用于保证当前仓库只有一个线程拥有主权。

实现

#include <queue>
#include "boost/thread/condition.hpp"
#include "boost/thread/mutex.hpp"
template<class Product>
class Repository
{
public:
  Repository() : _capacity(2), _emptynum(_capacity){}
  Repository(int capacity) : _emptynum(capacity), _capacity(capacity){}

  // success : when new_capacity > _capacity - _emptynum
  bool set_capacity(int new_capacity){
    boost::mutex::scoped_lock lock();
    if (_capacity - _emptynum < new_capacity ){
      _emptynum = new_capacity - _capacity + _emptynum;
      _capacity = new_capacity;
      return true;
    }
    return false;
  }

  bool empty(){
    boost::mutex::scoped_lock lock(_mutexer);
    return _is_empty();
  }

  void put(Product &product){
    {
      boost::mutex::scoped_lock lock(_mutexer);
      while (_is_full()){
        // unlock _mutexer and blocked until _condition_put.notifiy_one/all is called.
        _condition_put.wait(_mutexer);
      }
      _products.push(product); // need implement the copy constructor and =
      _emptynum--; // decrease empty position
    }
    _condition_get.notify_one(); // have put one, one another thread can get product now
  }

  void get(Product &product){
    {
      // lock this repository
      boost::mutex::scoped_lock lock(_mutexer);
      while (_is_empty()){
        // unlock _mutexer and blocked until _condition_put.notifiy_one/all is called.
        _condition_get.wait(_mutexer);
      }
      product = _products.front(); // need implement the copy constructor and =
      _products.pop();
      _emptynum++; // increase empty position
    }
    // have removed one product, one another thread can put product now
    _condition_put.notify_one();
  }

private:
  inline bool _is_full (){ return _emptynum ==         0; } // if have empty position
  inline bool _is_empty(){ return _emptynum == _capacity; } // 

private:
  int _capacity; // capacity of this repository
  int _emptynum; // number of empty position
  std::queue<Product> _products; // products in a FIFO queue

  boost::mutex _mutexer; // race access
  boost::condition _condition_put;
  boost::condition _condition_get;
};

实验

#include <iostream>
#include "boost/thread.hpp"
boost::mutex g_io_mutexer;

boost::mutex g_mutexer;
bool g_is_finished = false;

Repository<int> g_repository(4);

void producing(){
  for (int i = 0; i < 100; i++){
    {
      boost::mutex::scoped_lock lock(g_io_mutexer);
      std::cout << "Producing product : " << i << std::endl;
    }
    g_repository.put(i);
  }
  boost::mutex::scoped_lock lock(g_mutexer);
  g_is_finished = true;
}

void consuming(){
  int product;
  while (true){
    {
      boost::mutex::scoped_lock lock(g_mutexer);
      if (g_is_finished && g_repository.empty()){
        break;
      }
    }
    g_repository.get(product);
    {
      boost::mutex::scoped_lock lock(g_io_mutexer);
      std::cout << "Consuming product : " << product << std::endl;
    }
  }
}

int main(){
  boost::thread producer(producing);
  boost::thread consumer_1(consuming);
  boost::thread consumer_2(consuming);
  producer.join();
  consumer_1.join();
  consumer_2.join();
  return 0;
}

all the code

为方便理解代码,下面的代码中,三个空行一个意群。

#include <iostream>
#include <queue>

#include "boost/thread.hpp"

#include "boost/thread/condition.hpp"
#include "boost/thread/mutex.hpp"

template<class Product>
class Repository
{
public:
  Repository() : _capacity(2), _emptynum(_capacity){}
  Repository(int capacity) : _emptynum(capacity), _capacity(capacity){}

  // success : when new_capacity > _capacity - _emptynum
  bool set_capacity(int new_capacity){
    boost::mutex::scoped_lock lock();
    if (_capacity - _emptynum < new_capacity ){
      _emptynum = new_capacity - _capacity + _emptynum;
      _capacity = new_capacity;
      return true;
    }
    return false;
  }

  bool empty(){
    boost::mutex::scoped_lock lock(_mutexer);
    return _is_empty();
  }

  void put(Product &product){
    {
      boost::mutex::scoped_lock lock(_mutexer);
      while (_is_full()){
        // unlock _mutexer and blocked until _condition_put.notifiy_one/all is called.
        _condition_put.wait(_mutexer);
      }
      _products.push(product); // need implement the copy constructor and =
      _emptynum--; // decrease empty position
    }
    _condition_get.notify_one(); // have put one, one another thread can get product now
  }

  void get(Product &product){
    {
      // lock this repository
      boost::mutex::scoped_lock lock(_mutexer);
      while (_is_empty()){
        // unlock _mutexer and blocked until _condition_put.notifiy_one/all is called.
        _condition_get.wait(_mutexer);
      }
      product = _products.front(); // need implement the copy constructor and =
      _products.pop();
      _emptynum++; // increase empty position
    }
    // have removed one product, one another thread can put product now
    _condition_put.notify_one();
  }

private:
  inline bool _is_full (){ return _emptynum ==         0; } // if have empty position
  inline bool _is_empty(){ return _emptynum == _capacity; } // 

private:
  int _capacity; // capacity of this repository
  int _emptynum; // number of empty position
  std::queue<Product> _products; // products in a FIFO queue

  boost::mutex _mutexer; // race access
  boost::condition _condition_put;
  boost::condition _condition_get;
};

boost::mutex g_io_mutexer;

boost::mutex g_mutexer;
bool g_is_finished = false;

Repository<int> g_repository(4);

void producing(){
  for (int i = 0; i < 100; i++){
    {
      boost::mutex::scoped_lock lock(g_io_mutexer);
      std::cout << "Producing product : " << i << std::endl;
    }
    g_repository.put(i);
  }
  boost::mutex::scoped_lock lock(g_mutexer);
  g_is_finished = true;
}

void consuming(){
  int product;
  while (true){
    {
      boost::mutex::scoped_lock lock(g_mutexer);
      if (g_is_finished && g_repository.empty()){
        break;
      }
    }
    g_repository.get(product);
    {
      boost::mutex::scoped_lock lock(g_io_mutexer);
      std::cout << "Consuming product : " << product << std::endl;
    }
  }
}

int main(){
  boost::thread producer(producing);
  boost::thread consumer_1(consuming);
  boost::thread consumer_2(consuming);
  producer.join();
  consumer_1.join();
  consumer_2.join();
  return 0;
}
时间: 2024-10-23 09:13:58

[多线程] 生产者消费者模型的BOOST实现的相关文章

多线程,生产者消费者模型(生产馒头,消费馒头)

先建立一个容器 /** * 容器 * 共享资源 * @author Administrator * */ public class SynStack { int index = 0; //容器 SteamBread[] stb = new SteamBread[6]; /** * 往容器中放产品 */ public synchronized void push(SteamBread st){ while(index == stb.length){ try { this.wait(); } cat

生产者消费者模型实现多线程异步交互

[Python之旅]第六篇(五):生产者消费者模型实现多线程异步交互 消息队列 生产者消费者模型 多线程异步交互 摘要:  虽然标题是"生产者消费者模型实现多线程异步交互",但这里要说的应该还包括Python的消息队列,因为这里多线程异步交互是通过Python的消息队列来实现的,因此主要内容如下: 1 2 3 4 1.生产者消费者模型:厨师做包子与顾客吃包子 2.Python的消息队列 3.利用... 虽然标题是"生产者消费者模型实现多线程异步交互",但这里要说的应

Java多线程之~~~~使用wait和notify实现生产者消费者模型

在多线程开发中,最经典的一个模型就是生产者消费者模型,他们有一个缓冲区,缓冲区有最大限制,当缓冲区满 的时候,生产者是不能将产品放入到缓冲区里面的,当然,当缓冲区是空的时候,消费者也不能从中拿出来产品,这就 涉及到了在多线程中的条件判断,java为了实现这些功能,提供了wait和notify方法,他们可以在线程不满足要求的时候 让线程让出来资源等待,当有资源的时候再notify他们让他们继续工作,下面我们用实际的代码来展示如何使用wait和 notify来实现生产者消费者这个经典的模型. 首先是

Java多线程15:Queue、BlockingQueue以及利用BlockingQueue实现生产者/消费者模型

Queue是什么 队列,是一种数据结构.除了优先级队列和LIFO队列外,队列都是以FIFO(先进先出)的方式对各个元素进行排序的.无论使用哪种排序方式,队列的头都是调用remove()或poll()移除元素的.在FIFO队列中,所有新元素都插入队列的末尾. Queue中的方法 Queue中的方法不难理解,6个,每2对是一个也就是总共3对.看一下JDK API就知道了: 注意一点就好,Queue通常不允许插入Null,尽管某些实现(比如LinkedList)是允许的,但是也不建议. Blockin

多线程学习-基础(十二)生产者消费者模型:wait(),sleep(),notify()实现

一.多线程模型一:生产者消费者模型   (1)模型图:(从网上找的图,清晰明了) (2)生产者消费者模型原理说明: 这个模型核心是围绕着一个"仓库"的概念,生产者消费者都是围绕着:"仓库"来进行操作,一个仓库同时只能被一个生产者线程或一个消费者线程所操作,synchronized锁住的也是这个仓库,仓库是一个容器,所以会有边界值,0和仓库可存放上限,在这个上限内,可以设置多种级别,不同的级别可以执行不同的策略流程. (3)本案例使用知识点: Thread.curre

多线程、生产者消费者模型

目录 生产者消费者模型 生产者消费者模型 为什么要使用生产者和消费者模式 什么是生产者消费者模式 基于队列实现生产者消费者模型 多线程 什么是线程 开启线程的两种方式 线程与进程区别 Tread类的常用属性 守护线程 线程锁 生产者消费者模型 生产者消费者模型 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题.该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度. 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多

生产者消费者模型 与多线程(1)

生产者消费者模型 模型就是要解决某个问题的固定方法或套路 要解决的问题 生产者:泛指生产数据的一方 消费者:泛指处理数据的一方 双方的处理速度不一致,导致总有一方会在等待 解决问题的方法 先将双方解开耦合,让不同的进程负责不同的任务 提供一个共享的容器如队列,用来平衡双方的能力,用队列是因为队列可以在进程间共享 案例: from multiprocessing import Process,Queue import request import re,os,time,random #生产者任务

Java多线程(十):BlockingQueue实现生产者消费者模型

BlockingQueue BlockingQueue.解决了多线程中,如何高效安全"传输"数据的问题.程序员无需关心什么时候阻塞线程,什么时候唤醒线程,该唤醒哪个线程. 方法介绍 BlockingQueue是Queue的子类 void put(E e) 插入指定元素,当BlockingQueue为满,则线程阻塞,进入Waiting状态,直到BlockingQueue有空闲空间再继续. 这里以ArrayBlockingQueue为例进行分析 void take() 队首出队,当Bloc

Java多线程14:生产者/消费者模型

什么是生产者/消费者模型 一种重要的模型,基于等待/通知机制.生产者/消费者模型描述的是有一块缓冲区作为仓库,生产者可将产品放入仓库,消费者可以从仓库中取出产品,生产者/消费者模型关注的是以下几个点: 1.生产者生产的时候消费者不能消费 2.消费者消费的时候生产者不能生产 3.缓冲区空时消费者不能消费 4.缓冲区满时生产者不能生产 生产者/模型作为一种重要的模型,它的优点在于: 1.解耦.因为多了一个缓冲区,所以生产者和消费者并不直接相互调用,这一点很容易想到,这样生产者和消费者的代码发生变化,