阻塞队列实现

0 背景

阻塞队列在生产业务的很多场景里,都有使用需要。例如:在数据流式处理服务中,我们需要并行化的运行上游逻辑a与下游逻辑b;在rpc网络通讯框架场景中,我们需要解耦网络读写线程与消息处理线程。总之,阻塞队列是在不同职责的线程之间进行数据沟通的桥梁。在java的jdk里面有现成的BlockingQueue类,让我们看看一个C++版的实现吧:

1 实现源码

1.1 锁封装

通过信号量保证不同线程之间数据操作的一致性。

class CSemaphore

{

 public:

  CSemaphore()

  {

    sem_init(&m_sem, 0, 0);

  }

  ~CSemaphore()

  {

    sem_destroy(&m_sem);

  }

  void Produce()

  {

    sem_post(&m_sem);

  }

  void Consume()

  {

    while (sem_wait(&m_sem) != 0)

    {

      sched_yield();

    }

  }

  bool Try()

  {

    int value = 0;

    int ret = sem_getvalue(&m_sem, &value);

    if(ret < 0 || value <= 0)

    {

      return false;

    }

    return true;

  }

  bool TryTime(int micSec)

  {

    struct timespec ts;

    clock_gettime(CLOCK_REALTIME,&ts);

    if(micSec >= 1000000)

      ts.tv_sec += micSec/1000000;

    ts.tv_nsec += micSec%1000000*1000;

    if(ts.tv_nsec >= 1000000000)

    {

      ++ts.tv_sec;

      ts.tv_nsec -= 1000000000;

    }

    int ret = sem_timedwait(&m_sem,&ts);

    if(ret < 0)

      return false;

    return true;

  }

  int GetCount()

  {

    int value = 0;

    int ret = sem_getvalue(&m_sem, &value);

    if(ret < 0)

      return -1;

    else

      return value;

  }

 private:

  sem_t  m_sem;

};

1.2 写数据(消息)

  bool Put(const __T& value)

  {

    CMutexLock lock(m_mutex);

    if(m_queue.size() > m_queMax)

    { 

      return false;

    } 

    m_queue.push_back(value);

    m_semaphore.Produce();

    return true;

  }

1.3 读取数据(消息)

  bool Get(__T& value)

  {

    m_semaphore.Consume();

    CMutexLock lock(m_mutex);

    if(m_queue.empty())

      return false;

    value = m_queue.front();

    m_queue.pop_front();

    return true;

  }

2 实例

2.1 源码

2.2 输出


thread id : 139737310840576 output is: 557

thread id : 139737310840576 output is: 558

thread id : 139737310840576 output is: 559

thread id : 139737310840576 output is: 560

thread id : 139737310840576 output is: 561

thread id : 139737310840576 output is: 562

thread id : 139737310840576 output is: 563

thread id : 139737310840576 output is: 564

thread id : 139737310840576 output is: 565

thread id : 139737300350720 output is: 566

thread id : 139737300350720 output is: 567

thread id : 139737300350720 output is: 568

thread id : 139737300350720 output is: 569

thread id : 139737300350720 output is: 570

thread id : 139737310840576 output is: 571

源码请见附件http://files.cnblogs.com/files/gisorange/blockqueue.zip

时间: 2024-08-20 07:37:57

阻塞队列实现的相关文章

caffe数据读取的双阻塞队列说明

caffe的datareader类中 class QueuePair { public: explicit QueuePair(int size); ~QueuePair(); BlockingQueue<T*> free_; BlockingQueue<T*> full_; DISABLE_COPY_AND_ASSIGN(QueuePair); }; 这个就是双阻塞队列,先将free队列填充到最大长度,然后按照如下规则: 1,每当生产者push时,先将full队列pop,如果fu

阻塞队列和生产者-消费者模式

阻塞队列提供了可阻塞的put和take方法.如果队列满了put将阻塞到有空间可用,如果队列为空,take将阻塞到有元素可用.队列可以是有界和无界的,无界的队列put将不会阻塞. 阻塞队列支持生产者消费者模式,该模式将找出需要完成的工作,和执行工作分开.生产者-消费者模式能简化开发过程,因为消除了生产者和消费者之间的代码依赖性,此外,该模式还将生产数据的过程和使用数据的过程解耦开来. 在基于阻塞队列构建的生产者-消费者设计中个,当数据生成时,生产者把数据放入队列,当消费者处理数据时,将从队列中获取

BlockingQueue(阻塞队列)详解

一. 前言 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全"传输"数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景. 二. 认识BlockingQueue 阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示: 从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由

Java多线程-新特征-阻塞队列ArrayBlockingQueue

阻塞队列是Java5线程新特征中的内容,Java定义了阻塞队列的接口java.util.concurrent.BlockingQueue,阻塞队列的概念是,一个指定长度的队列,如果队列满了,添加新元素的操作会被阻塞等待,直到有空位为止.同样,当队列为空时候,请求队列元素的操作同样会阻塞等待,直到有可用元素为止. 有了这样的功能,就为多线程的排队等候的模型实现开辟了便捷通道,非常有用. java.util.concurrent.BlockingQueue继承了java.util.Queue接口,可

spring线程池ThreadPoolTaskExecutor与阻塞队列BlockingQueue

一: ThreadPoolTaskExecutor是一个spring的线程池技术,查看代码可以看到这样一个字段: private ThreadPoolExecutor threadPoolExecutor; 可以发现,spring的  ThreadPoolTaskExecutor是使用的jdk中的java.util.concurrent.ThreadPoolExecutor进行实现, 直接看代码: @Override protected ExecutorService initializeExe

深入剖析java并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue

关联文章: 深入理解Java类型信息(Class对象)与反射机制 深入理解Java枚举类型(enum) 深入理解Java注解类型(@Annotation) 深入理解Java类加载器(ClassLoader) 深入理解Java并发之synchronized实现原理 Java并发编程-无锁CAS与Unsafe类及其并发包Atomic 深入理解Java内存模型(JMM)及volatile关键字 剖析基于并发AQS的重入锁(ReetrantLock)及其Condition实现原理 剖析基于并发AQS的共

Java里的阻塞队列

JDK7提供了7个阻塞队列,如下: ArrayBlockingQueue  : 一个数组结构组成的有界阻塞队列. LinkedBlockingQueue : 一个由链表结构组成的有界阻塞队列 . PriorityBlockingQueue : 一个支持优先级排序的无界阻塞队列 . DelayQueue : 一个使用优先级队列实现的无界阻塞队列 . SynchronousQueue : 一个不存储元素的阻塞队列 . LinkedTransferQueue : 一个由链表结构组成的无界阻塞队列 .

9.并发包非阻塞队列ConcurrentLinkedQueue

jdk1.7.0_79  队列是一种非常常用的数据结构,一进一出,先进先出. 在Java并发包中提供了两种类型的队列,非阻塞队列与阻塞队列,当然它们都是线程安全的,无需担心在多线程并发环境所带来的不可预知的问题.为什么会有非阻塞和阻塞之分呢?这里的非阻塞与阻塞在于有界与否,也就是在初始化时有没有给它一个默认的容量大小,对于阻塞有界队列来讲,如果队列满了的话,则任何线程都会阻塞不能进行入队操作,反之队列为空的话,则任何线程都不能进行出队操作.而对于非阻塞无界队列来讲则不会出现队列满或者队列空的情况

LinkedBlockingQueue(lbq)阻塞队列

最近开发中,经常使用这个类LinkedBlockingQueue,它是BlockingQueue这个子类. 并发库中的BlockingQueue是一个比较好玩的类,顾名思义,就是阻塞队列.该类主要提供了两个方法put(),offer()和take(),poll(),前者将一个对象放 到队列尾部,如果队列已经满了,就等待直到有空闲节点:后者从head取一个对象,如果没有对象,就等待直到有可取的对象. 反正都是开发中常用的.记哈

Michael-Scott非阻塞队列(lock-free)算法的C实现

Michael-Scott非阻塞队列算法,即MS-queue算法,是1 9 9 6 年由Maged . M .Michael and M. L. Scott提出的,是最为经典的并发FIFO队列上的算法,目前很多对并发FIFO队列的研究都是基于这个算法来加以改进的.在共享内存的多核处理器上,这种基于Compare-and-swap(CAS)的算法在性能上要远远优于以前基于锁的算法,并且已经被Java并发包所采用.它的主要特点在于允许多线程并发的.无干扰的访问队列的头和尾. MS-queue算法依赖