muduo::BlockingQueue、BoundedBlockingQueue分析

  • BlockingQueue
  • BoundedBlockingQueue

在学习源码之前,先了解一个概念:有界缓冲和无界缓冲。

以生产者、消费者模型。

有界缓冲是指生产者在向仓库添加数据时要先判断仓库是否已满,如果已满则通知消费者来取走数据。消费者在消费时,先判断仓库是否已空,如果是则先通知生产者生产数据。

BlockingQueue

在无界缓冲中,生产者不用关心仓库是否已满,只需添加数据;消费者在判断仓库已空时要等待生产者的信号。这时只需要用一个信号量。

BlockingQueue就是这样的一个模型

template<typename T>
class BlockingQueue : boost::noncopyable
{
 public:
  BlockingQueue()
    : mutex_(),//先初始化互斥量
      notEmpty_(mutex_),//再用互斥量初始化信号了
      queue_()
  {
  }

  void put(const T& x)//生产数据
  {
    MutexLockGuard lock(mutex_);
    queue_.push_back(x);
    notEmpty_.notify(); // wait morphing saves us
    // http://www.domaigne.com/blog/computing/condvars-signal-with-mutex-locked-or-not/
  }

#ifdef __GXX_EXPERIMENTAL_CXX0X__
  void put(T&& x)//右值
  {
    MutexLockGuard lock(mutex_);
    queue_.push_back(std::move(x));
    notEmpty_.notify();
  }
  // FIXME: emplace()
#endif

  T take()//消费数据
  {
    MutexLockGuard lock(mutex_);
    // always use a while-loop, due to spurious wakeup
    while (queue_.empty())//仓库已空
    {
      notEmpty_.wait();//等待生产者信号
    }
    assert(!queue_.empty());
#ifdef __GXX_EXPERIMENTAL_CXX0X__
    T front(std::move(queue_.front()));
#else
    T front(queue_.front());
#endif
    queue_.pop_front();
    return front;
  }

  size_t size() const
  {
    MutexLockGuard lock(mutex_);
    return queue_.size();
  }

 private:
  mutable MutexLock mutex_;//互斥量
  Condition         notEmpty_;//信号量
  std::deque<T>     queue_;//仓库
};

可以写个生产者消费者测试代码,有了这个BlockingQueue,不用自己在管理互斥量和信号量了。

#include <muduo/base/BlockingQueue.h>
#include <muduo/base/Thread.h>

#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>

#include <iostream>
using namespace muduo;

using namespace boost;

void Produce(shared_ptr<BlockingQueue<int> > queue)
{
    while(true)
    {
        int product=rand()%1000+1;
        std::cout<<"Produce: "<<product<<std::endl;
        queue->put(product);
        sleep(rand()%5);
    }

}
void Consome(shared_ptr<BlockingQueue<int> > queue)
{
    while(true)
    {
        int product=queue->take();
        std::cout<<"Consome: "<<product<<std::endl;
    }
}
int main()
{
    shared_ptr<BlockingQueue<int> > blockingQueue(new BlockingQueue<int>);
    Thread t1(boost::bind(Produce, blockingQueue));
    Thread t2(boost::bind(Consome, blockingQueue));

    t1.start();
    t2.start();

    t1.join();
    t2.join();

    return 0;
}

BoundedBlockingQueue

与BlockingQueue不同,BoundedBlockingQueue是有边界的,即仓库是有限的。这时仓库有四个状态:非空、已空;非满、已满。

当生产者生产时,会先判断仓库是否已满,如果是则等待仓库非满的信号;否则则向仓库添加货物,之后通知消费者仓库非空。

当消费者取货物时会先判断仓库是否为空,如果是则等待仓库非空信号;否则取走货物,通知生产者仓库非满。

这时候实现生产者消费者模型时需要2个信号量,一个是非空,表示消费者可以消费了;一个是非满,表示生产者可以生产了。源码如下:

template<typename T>
class BoundedBlockingQueue : boost::noncopyable
{
 public:
  explicit BoundedBlockingQueue(int maxSize)//最大容量
    : mutex_(),
      notEmpty_(mutex_),
      notFull_(mutex_),
      queue_(maxSize)
  {
  }

  void put(const T& x)
  {
    MutexLockGuard lock(mutex_);
    while (queue_.full())//仓库已满
    {
      notFull_.wait();//等待非满信号,即消费者消费后会通知
    }
    assert(!queue_.full());
    queue_.push_back(x);
    notEmpty_.notify();//通知消费者仓库已经有货(非空)
  }

  T take()
  {
    MutexLockGuard lock(mutex_);
    while (queue_.empty())//仓库已空
    {
      notEmpty_.wait();//等待生产者向仓库添加货物
    }
    assert(!queue_.empty());
    T front(queue_.front());
    queue_.pop_front();
    notFull_.notify();//通知生产者仓库已经非空了
    return front;
  }

  bool empty() const
  {
    MutexLockGuard lock(mutex_);
    return queue_.empty();
  }

  bool full() const
  {
    MutexLockGuard lock(mutex_);
    return queue_.full();
  }

  size_t size() const
  {
    MutexLockGuard lock(mutex_);
    return queue_.size();
  }

  size_t capacity() const
  {
    MutexLockGuard lock(mutex_);
    return queue_.capacity();
  }

 private:
  mutable MutexLock          mutex_;
  Condition                  notEmpty_;//非空信号量
  Condition                  notFull_;//非满信号量
  boost::circular_buffer<T>  queue_;
};

测试代码:

//boundedBlokcingQueue.cpp

#include <muduo/base/BoundedBlockingQueue.h>
#include <muduo/base/Thread.h>

#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>

#include <iostream>
using namespace muduo;

using namespace boost;

void Produce(shared_ptr<BoundedBlockingQueue<int> > queue)
{
    while(true)
    {
        int product=rand()%1000+1;
        std::cout<<"Produce: "<<product<<std::endl;
        queue->put(product);
        sleep(rand()%5);
    }

}
void Consome(shared_ptr<BoundedBlockingQueue<int> > queue)
{
    while(true)
    {
        int product=queue->take();
        std::cout<<"Consome: "<<product<<std::endl;
        sleep(rand()%5);
    }
}
int main()
{
    shared_ptr<BoundedBlockingQueue<int> > boundedBlockingQueue(new BoundedBlockingQueue<int>(5));
    Thread t1(boost::bind(Produce, boundedBlockingQueue));
    Thread t2(boost::bind(Consome, boundedBlockingQueue));

    t1.start();
    t2.start();

    t1.join();
    t2.join();

    return 0;
}

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-11 04:01:23

muduo::BlockingQueue、BoundedBlockingQueue分析的相关文章

muduo源码分析--我对muduo的理解

分为几个模块 EventLoop.TcpServer.Acceptor.TcpConnection.Channel等 对于EventLoop来说: 他只关注里面的主驱动力,EventLoop中只关注poll,这类系统调用使得其成为Reactor模式,EventLoop中有属于这个loop的所有Channel,这个loop属于哪一个Server. 几个类存在的意义: 从应用层使用的角度来看,用户需要初始化一个EventLoop,然后初始化一个TcpServer(当然也可以自定义个TcpServer

muduo::thread类分析

在看源代码前,先学习一个关键字:__thread. 线程共享进程的数据,如果想要每个线程都有一份独立的数据,那么可以使用__thread关键字修饰数据. __thread只能用于修饰POD类型的数据,不能修饰class,因为它无法调用构造函数和析构函数.__thread可以修饰全局变量.函数内的静态变量,不能修饰函数内的局部变量或class的普通成员变量. 在muduo/base/thread.{h, cc}中实现了线程的封装.thread的封装和一个命名空间muduo::CurrentThre

BlockingQueue 原理 分析

ReentrantLock 两个两个比较大的特性.1.中断2.定时3.公平锁. ReadWriteLock 读读不互斥读写互斥写写互斥.Condition 类似于 Object.wait()和Object.notify()和synchronized配套使用 CountDownLatch lanchisynchronized static final CountDownLatch end = new CountDownLatch(10);end.countDown();end.await(); C

muduo源码分析--Reactor模式在muduo中的使用

一. Reactor模式简介 Reactor释义"反应堆",是一种事件驱动机制.和普通函数调用的不同之处在于:应用程序不是主动的调用某个API完成处理,而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到Reactor上,如果相应的时间发生,Reactor将主动调用应用程序注册的接口,这些接口又称为"回调函数". 二. moduo库Reactor模式的实现 muduo主要通过3个类来实现Reactor模式:EventLoop,Chann

muduo源码分析--Reactor模式的在muduo中的使用(二)

一. TcpServer类: 管理所有的TCP客户连接,TcpServer供用户直接使用,生命期由用户直接控制.用户只需设置好相应的回调函数(如消息处理messageCallback)然后TcpServer::start()即可. 主要数据成员: boost::scoped_ptr<Accepter> acceptor_; 用来接受连接 std::map<string,TcpConnectionPtr> connections_; 用来存储所有连接 connectonCallbac

ArrayBlockingQueue,BlockingQueue分析

BlockingQueue接口定义了一种阻塞的FIFO queue,每一个BlockingQueue都有一个容量,让容量满时往BlockingQueue中添加数据时会造成阻塞,当容量为空时取元素操作会阻塞. ArrayBlockingQueue是一个由数组支持的有界阻塞队列.在读写操作上都需要锁住整个容器,因此吞吐量与一般的实现是相似的,适合于实现“生产者消费者”模式. 基于链表的阻塞队列,同ArrayBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产

muduo::ThreadPoll分析

线程池本质上是一个生产者消费者的模型.在线程池有一个存放现场的ptr_vector,相当于消费者;有一个存放任务的deque,相当于仓库.线程(消费者)去仓库取任务,然后执行;当有新程序员是生产者,当有新任务时,就把任务放到deque(仓库). 任务队列(仓库)是有边界的,所以在实现时需要有两个信号量,相当与BoundedBlockingQueue. 每个线程在第一次运行时都会调用一次回调函数threadInitCallback_,为线程执行做准备. 在线程池开始运行之前,要先设置任务队列的大小

Muduo网络库源码分析(一) EventLoop事件循环(Poller和Channel)

从这一篇博文起,我们开始剖析Muduo网络库的源码,主要结合<Linux多线程服务端编程>和网上的一些学习资料! (一)TCP网络编程的本质:三个半事件 1. 连接的建立,包括服务端接受(accept) 新连接和客户端成功发起(connect) 连接.TCP 连接一旦建立,客户端和服务端是平等的,可以各自收发数据. 2. 连接的断开,包括主动断开(close 或shutdown) 和被动断开(read(2) 返回0). 3. 消息到达,文件描述符可读.这是最为重要的一个事件,对它的处理方式决定

muduo::Logging、LogStream分析

Logging LogStream Logging Logging类是用来记录分析日志用的. 下面是Logging.h的源码 namespace muduo { class TimeZone;//forward declaration class Logger { public: enum LogLevel//用来设置不同的日志级别 { TRACE, DEBUG, INFO, WARN, ERROR, FATAL, NUM_LOG_LEVELS,//级别个数 }; // compile time