眉目传情之并发无锁环形队列的实现

眉目传情之并发无锁环形队列的实现

  • Author:Echo Chen(陈斌)
  • Email:[email protected]
  • Blog:Blog.csdn.net/chen19870707
  • Date:October 10th, 2014

    前面在《眉目传情之匠心独运的kfifo》一文中详细解析了 linux  内核并发无锁环形队列kfifo的原理和实现,kfifo鬼斧神工,博大精深,让人叹为观止,但遗憾的是kfifo为内核提供服务,并未开放出来。剑不试则利钝暗,弓不试则劲挠诬,鹰不试则巧拙惑,马不试则良驽疑,光说不练是不能学到精髓的,下面就动手实现自己的并发无锁队列UnlockQueue(单生产者单消费者)。

    一、UnlockQueue声明

       1: #ifndef _UNLOCK_QUEUE_H
       2: #define _UNLOCK_QUEUE_H
       3:  
       4: class UnlockQueue
       5: {
       6: public:
       7:     UnlockQueue(int nSize);
       8:     virtual ~UnlockQueue();
       9:  
      10:     bool Initialize();
      11:  
      12:     unsigned int Put(const unsigned char *pBuffer, unsigned int nLen);
      13:     unsigned int Get(unsigned char *pBuffer, unsigned int nLen);
      14:  
      15:     inline void Clean() { m_nIn = m_nOut = 0; }
      16:     inline unsigned int GetDataLen() const { return  m_nIn - m_nOut; }
      17:  
      18: private:
      19:     inline bool is_power_of_2(unsigned long n) { return (n != 0 && ((n & (n - 1)) == 0)); };
      20:     inline unsigned long roundup_power_of_two(unsigned long val);
      21:  
      22: private:
      23:     unsigned char *m_pBuffer;    /* the buffer holding the data */
      24:     unsigned int   m_nSize;        /* the size of the allocated buffer */
      25:     unsigned int   m_nIn;        /* data is added at offset (in % size) */
      26:     unsigned int   m_nOut;        /* data is extracted from off. (out % size) */
      27: };
      28:  
      29: #endif

    UnlockQueue与kfifo 结构相同相同,也是由一下变量组成:

    UnlockQueue kfifo 作用
    m_pBuffer buffer 用于存放数据的缓存
    m_nSize size 缓冲区空间的大小,圆整为2的次幂
    m_nIn in 指向buffer中队头
    m_nOut out 指向buffer中的队尾
    UnlockQueue的设计是用在单生产者单消费者情况下,所以不需要锁 lock 如果使用不能保证任何时间最多只有一个读线程和写线程,必须使用该lock实施同步。

    二、UnlockQueue构造函数和初始化

       1: UnlockQueue::UnlockQueue(int nSize)
       2: :m_pBuffer(NULL)
       3: ,m_nSize(nSize)
       4: ,m_nIn(0)
       5: ,m_nOut(0)
       6: {
       7:     //round up to the next power of 2
       8:     if (!is_power_of_2(nSize))
       9:     {
      10:         m_nSize = roundup_power_of_two(nSize);
      11:     }
      12: }
      13:  
      14: UnlockQueue::~UnlockQueue()
      15: {
      16:     if(NULL != m_pBuffer)
      17:     {
      18:         delete[] m_pBuffer;
      19:         m_pBuffer = NULL;
      20:     }
      21: }
      22:  
      23: bool UnlockQueue::Initialize()
      24: {
      25:     m_pBuffer = new unsigned char[m_nSize];
      26:     if (!m_pBuffer)
      27:     {
      28:         return false;
      29:     }
      30:  
      31:     m_nIn = m_nOut = 0;
      32:  
      33:     return true;
      34: }
      35:  
      36: unsigned long UnlockQueue::roundup_power_of_two(unsigned long val)
      37: {
      38:     if((val & (val-1)) == 0)
      39:         return val;
      40:  
      41:     unsigned long maxulong = (unsigned long)((unsigned long)~0);
      42:     unsigned long andv = ~(maxulong&(maxulong>>1));
      43:     while((andv & val) == 0)
      44:         andv = andv>>1;
      45:  
      46:     return andv<<1;
      47: }

    1.在构造函数中,对传入的size进行2的次幂圆整,圆整的好处是可以将m_nIn % m_nSize 可以转化为 m_nIn  & (m_nSize – 1),取模运算”的效率并没有 “位运算” 的效率高。

    2.在构造函数中,未给buffer分配内存,而在Initialize中分配,这样做的原因是:我们知道在new UnlockQueue的时候有两步操作,第一步分配内存,第二步调用构造函数,如果将buffer的分配放在构造函数中,那么就可能 buffer 就可能分配失败,而后面用到buffer,还需要判空。

    三、UnlockQueue入队和出队操作

       1: unsigned int UnlockQueue::Put(const unsigned char *buffer, unsigned int len)
       2: {
       3:     unsigned int l;
       4:  
       5:     len = std::min(len, m_nSize - m_nIn + m_nOut);
       6:  
       7:     /*
       8:      * Ensure that we sample the m_nOut index -before- we
       9:      * start putting bytes into the UnlockQueue.
      10:      */
      11:     __sync_synchronize();
      12:  
      13:     /* first put the data starting from fifo->in to buffer end */
      14:     l = std::min(len, m_nSize - (m_nIn  & (m_nSize - 1)));
      15:     memcpy(m_pBuffer + (m_nIn & (m_nSize - 1)), buffer, l);
      16:  
      17:     /* then put the rest (if any) at the beginning of the buffer */
      18:     memcpy(m_pBuffer, buffer + l, len - l);
      19:  
      20:     /*
      21:      * Ensure that we add the bytes to the kfifo -before-
      22:      * we update the fifo->in index.
      23:      */
      24:     __sync_synchronize();
      25:  
      26:     m_nIn += len;
      27:  
      28:     return len;
      29: }
      30:  
      31: unsigned int UnlockQueue::Get(unsigned char *buffer, unsigned int len)
      32: {
      33:     unsigned int l;
      34:  
      35:     len = std::min(len, m_nIn - m_nOut);
      36:  
      37:     /*
      38:      * Ensure that we sample the fifo->in index -before- we
      39:      * start removing bytes from the kfifo.
      40:      */
      41:     __sync_synchronize();
      42:  
      43:     /* first get the data from fifo->out until the end of the buffer */
      44:     l = std::min(len, m_nSize - (m_nOut & (m_nSize - 1)));
      45:     memcpy(buffer, m_pBuffer + (m_nOut & (m_nSize - 1)), l);
      46:  
      47:     /* then get the rest (if any) from the beginning of the buffer */
      48:     memcpy(buffer + l, m_pBuffer, len - l);
      49:  
      50:     /*
      51:      * Ensure that we remove the bytes from the kfifo -before-
      52:      * we update the fifo->out index.
      53:      */
      54:     __sync_synchronize();
      55:  
      56:     m_nOut += len;
      57:  
      58:     return len;
      59: }

    入队和出队操作与kfifo相同,用到的技巧也完全相同,有不理解的童鞋可以参考前面一篇文章《眉目传情之匠心独运的kfifo》。这里需要指出的是__sync_synchronize()函数,由于linux并未开房出内存屏障函数,而在gcc4.2以上版本提供This
    builtin issues a full memory barrier,有兴趣同学可以参考Built-in functions for atomic memory access

    四、测试程序

    如图所示,我们设计了两个线程,一个生产者随机生成学生信息放入队列,一个消费者从队列中取出学生信息并打印,可以看到整个代码是无锁的

       1: #include "UnlockQueue.h"
       2: #include <iostream>
       3: #include <algorithm>
       4: #include <pthread.h>
       5: #include <time.h>
       6: #include <stdio.h>
       7: #include <errno.h>
       8: #include <string.h>
       9:  
      10: struct student_info
      11: {
      12:    long stu_id;
      13:    unsigned int age;
      14:    unsigned int score;
      15: };
      16:  
      17: void print_student_info(const student_info *stu_info)
      18: {
      19:     if(NULL == stu_info)
      20:         return;
      21:  
      22:     printf("id:%ld\t",stu_info->stu_id);
      23:     printf("age:%u\t",stu_info->age);
      24:     printf("score:%u\n",stu_info->score);
      25: }
      26:  
      27: student_info * get_student_info(time_t timer)
      28: {
      29:      student_info *stu_info = (student_info *)malloc(sizeof(student_info));
      30:      if (!stu_info)
      31:      {
      32:         fprintf(stderr, "Failed to malloc memory.\n");
      33:         return NULL;
      34:      }
      35:      srand(timer);
      36:      stu_info->stu_id = 10000 + rand() % 9999;
      37:      stu_info->age = rand() % 30;
      38:      stu_info->score = rand() % 101;
      39:      //print_student_info(stu_info);
      40:      return stu_info;
      41: }
      42:  
      43: void * consumer_proc(void *arg)
      44: {
      45:      UnlockQueue* queue = (UnlockQueue *)arg;
      46:      student_info stu_info;
      47:      while(1)
      48:      {
      49:          sleep(1);
      50:          unsigned int len = queue->Get((unsigned char *)&stu_info, sizeof(student_info));
      51:          if(len > 0)
      52:          {
      53:              printf("------------------------------------------\n");
      54:              printf("UnlockQueue length: %u\n", queue->GetDataLen());
      55:              printf("Get a student\n");
      56:              print_student_info(&stu_info);
      57:              printf("------------------------------------------\n");
      58:          }
      59:      }
      60:      return (void *)queue;
      61: }
      62:  
      63: void * producer_proc(void *arg)
      64:  {
      65:       time_t cur_time;
      66:       UnlockQueue *queue = (UnlockQueue*)arg;
      67:       while(1)
      68:       {
      69:           time(&cur_time);
      70:           srand(cur_time);
      71:           int seed = rand() % 11111;
      72:           printf("******************************************\n");
      73:           student_info *stu_info = get_student_info(cur_time + seed);
      74:           printf("put a student info to queue.\n");
      75:           queue->Put( (unsigned char *)stu_info, sizeof(student_info));
      76:           free(stu_info);
      77:           printf("UnlockQueue length: %u\n", queue->GetDataLen());
      78:           printf("******************************************\n");
      79:           sleep(1);
      80:       }
      81:      return (void *)queue;
      82:   }
      83:  
      84:  
      85: int main()
      86: {
      87:     UnlockQueue unlockQueue(1024);
      88:     if(!unlockQueue.Initialize())
      89:     {
      90:         return -1;
      91:     }
      92:  
      93:     pthread_t consumer_tid, producer_tid;
      94:  
      95:     printf("multi thread test.......\n");
      96:  
      97:     if(0 != pthread_create(&producer_tid, NULL, producer_proc, (void*)&unlockQueue))
      98:     {
      99:          fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%s\n",
     100:                  errno, strerror(errno));
     101:          return -1;
     102:     }
     103:  
     104:     if(0 != pthread_create(&consumer_tid, NULL, consumer_proc, (void*)&unlockQueue))
     105:     {
     106:            fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%s\n",
     107:                    errno, strerror(errno));
     108:            return -1;
     109:     }
     110:  
     111:     pthread_join(producer_tid, NULL);
     112:     pthread_join(consumer_tid, NULL);
     113:  
     114:     return 0;
     115:  }

    运行结果:

    -

    Echo Chen:Blog.csdn.net/chen19870707

    -

  • 时间: 2024-10-05 06:24:09

    眉目传情之并发无锁环形队列的实现的相关文章

    生产者消费者模式下的并发无锁环形缓冲区

    上一篇记录了几种环形缓冲区的设计方法和环形缓冲区在生产者消费者模式下的使用(并发有锁),这一篇主要看看怎么实现并发无锁. 0.简单的说明 首先对环形缓冲区做下说明: 环形缓冲区使用改进的数组版本,缓冲区容量为2的幂 缓冲区满阻塞生产者,消费者进行消费后,缓冲区又有可用资源,由消费者唤醒生产者 缓冲区空阻塞消费者,生产者进程生产后,缓冲区又有可用资源,由生产者唤醒消费者 然后对涉及到的几个技术做下说明: ⑴CAS,Compare & Set,X86下对应的是CMPXCHG 汇编指令,原子操作,基本

    环形缓冲区的设计及其在生产者消费者模式下的使用(并发有锁环形队列)

    1.环形缓冲区 缓冲区的好处,就是空间换时间和协调快慢线程.缓冲区可以用很多设计法,这里说一下环形缓冲区的几种设计方案,可以看成是几种环形缓冲区的模式.设计环形缓冲区涉及到几个点,一是超出缓冲区大小的的索引如何处理,二是如何表示缓冲区满和缓冲区空,三是如何入队.出队,四是缓冲区中数据长度如何计算. ps.规定以下所有方案,在缓冲区满时不可再写入数据,缓冲区空时不能读数据 1.1.常规数组环形缓冲区 设缓冲区大小为N,队头out,队尾in,out.in均是下标表示: 初始时,in=out=0 队头

    无锁环形队列

    1 #include "stdafx.h" 2 #include <process.h> 3 #include <stdio.h> 4 #include <Windows.h> 5 #include <stdlib.h> 6 #include <assert.h> 7 8 #define MAX_VIDEO_BUFF 1024 9 10 struct Header 11 { 12 WORD wSize; 13 char dat

    再看无锁环形队列

    今天通过打印头和尾的值来看里面的规律,数学不好真看不懂下面的意思 int size = (m_nTail - m_nHead + MAX_LEN) % MAX_LEN; int size = (m_nHead - m_nTail + MAX_LEN - 1) % MAX_LEN; 但通过打印头和尾的值能发现其中的规律 队列为空:当头和尾的值相等时表示队列是空的 队列为满:当头和尾相差为1时,当然是尾索引向右移动一格时正好到达头部这种情况 对于为什么是线程安全,因为读线程只操作头指针,写线程只操作

    并发无锁队列

    1.前言 队列在计算机中非常重要的一种数据结构,尤其在操作系统中.队列典型的特征是先进先出(FIFO),符合流水线业务流程.在进程间通信.网络通信之间经常采用队列做缓存,缓解数据处理压力.结合自己在工作中遇到的队列问题,总结一下对不同场景下的队列实现.根据操作队列的场景分为:单生产者--单消费者.多生产者--单消费者.单生产者--多消费者.多生产者--多消费者四大模型.其实后面三种的队列,可以归纳为一种多对多.根据队列中数据分为:队列中的数据是定长的.队列中的数据是变长的. 2.队列操作模型 (

    并发无锁队列学习之二【单生产者单消费者】

    1.前言 最近工作比较忙,加班较多,每天晚上回到家10点多了.我不知道自己还能坚持多久,既然选择了就要做到最好.写博客的少了.总觉得少了点什么,需要继续学习.今天继续上个开篇写,介绍单生产者单消费者模型的队列.根据写入队列的内容是定长还是变长,分为单生产者单消费者定长队列和单生产者单消费者变长队列两种.单生产者单消费者模型的队列操作过程是不需要进行加锁的.生产者通过写索引控制入队操作,消费者通过读索引控制出队列操作.二者相互之间对索引是独享,不存在竞争关系.如下图所示: 2.单生产者单消费者定长

    并发无锁队列学习(概念介绍)

    1.前言 队列在计算机中非常重要的一种数据结构,尤其在操作系统中.队列典型的特征是先进先出(FIFO),符合流水线业务流程.在进程间通信.网络通信之间经常采用队列做缓存,缓解数据处理压力.结合自己在工作中遇到的队列问题,总结一下对不同场景下的队列实现.根据操作队列的场景分为:单生产者--单消费者.多生产者--单消费者.单生产者--多消费者.多生产者--多消费者四大模型.其实后面三种的队列,可以归纳为一种多对多.根据队列中数据分为:队列中的数据是定长的.队列中的数据是变长的. 2.队列操作模型 (

    一个无锁消息队列引发的血案:怎样做一个真正的程序员?(二)——月:自旋锁

    前续 一个无锁消息队列引发的血案:怎样做一个真正的程序员?(一)——地:起因 一个无锁消息队列引发的血案:怎样做一个真正的程序员?(二)——月:自旋锁 平行时空 在复制好上面那一行我就先停下来了,算是先占了个位置,虽然我知道大概要怎么写,不过感觉还是很乱. 我突然想到,既然那么纠结,那么混乱,那么不知所措,我们不如换个视角.记得高中时看过的为数不多的长篇小说<穆斯林的葬礼>,作者是:霍达(女),故事描写了两个发生在不同时代.有着不同的内容却又交错扭结的爱情悲剧,一个是“玉”的故事,一个是“月”

    一个无锁消息队列引发的血案(六)——RingQueue(中) 休眠的艺术 [续]

    目录 (一)起因 (二)混合自旋锁 (三)q3.h 与 RingBuffer (四)RingQueue(上) 自旋锁 (五)RingQueue(中) 休眠的艺术 (六)RingQueue(中) 休眠的艺术 [续] 开篇 这是第五篇的后续,这部分的内容同时会更新和添加在 第五篇:RingQueue(中) 休眠的艺术 一文的末尾. 归纳 紧接上一篇的末尾,我们把 Windows 和 Linux 下的休眠策略归纳总结一下,如下图: 我们可以看到,Linux 下的 sched_yield() 虽然包括了