生产者消费者模式下的并发无锁环形缓冲区

上一篇记录了几种环形缓冲区的设计方法和环形缓冲区在生产者消费者模式下的使用(并发有锁),这一篇主要看看怎么实现并发无锁。

0、简单的说明

首先对环形缓冲区做下说明:

  1. 环形缓冲区使用改进的数组版本,缓冲区容量为2的幂
  2. 缓冲区满阻塞生产者,消费者进行消费后,缓冲区又有可用资源,由消费者唤醒生产者
  3. 缓冲区空阻塞消费者,生产者进程生产后,缓冲区又有可用资源,由生产者唤醒消费者

然后对涉及到的几个技术做下说明:

⑴CAS,Compare & Set,X86下对应的是CMPXCHG 汇编指令,原子操作,基本语义如下:

int CAS(address,old_value,new_value)
{
    int ret = *addresss;
    if(ret == old_value){
        *address = new_value;
    }
    return ret;
}

为了方便调用者知道调用结果,通常被编译器改写成如下形式:

bool CAS(address,old_value,new_value)
{
    int ret = *addresss;
    if(ret == old_value){
        *address = new_value;
        return true;
    }
    return ret;
}

⑵sched_yield(),调用sched_yield可以使当前线程让出cpu,内核会把当前线程插入到线程优先级所对应的就绪队列,然后调度新的线程占有cpu,Stack Overflow上的解释

  1. sched_yield() causes the calling thread to relinquish the CPU. The thread is moved to the end of the queue for its static priority and a new thread gets to run.
  2. If the calling thread is the only thread in the highest priority list at that time, it will continue to run after a call to sched_yield().

1、无锁环形队列的实现

无锁队列的实现依托3个变量,in,out,max_out

in,被所有生产者共享,表示第一个可写入的位置,每次成功读取,值加1

out,被消费者共享,表示第一个课读取的位置,每次成功读取,值加1

max_out,生产者用于发布数据,[out,max_out]这一段表示消费者可消费的数据,max_out只能被生产者有序修改

out=max_out时,表示没有数据可消费

que_size,环形缓冲区大小,总是2的幂

in-out表示缓冲区中有多少数据

in-out = que_size,表示缓冲区满

 1 template<typename ELEM_TYPE>
 2 class queue{
 3 public:
 4     queue(int size);//把size上调至2的幂保存到que_size中
 5     bool enqueue(const ELEM_TYPE& in_data);
 6     bool dequeue(ELEM_TYPE& out_data );
 7     ...//其他成员
 8 private:
 9     ELEM_TYPE *arry;
10     int in;
11     int out;
12     int max_out;//用于读写线程同步
13     int que_size;
14     ... //其他成员
15 }
16
17 template <typename ELEM_T>
18 bool queue<ELEM_T>::enqueue(const ELEM_T& in_data)
19 {
20     int cur_in ;
21     int cur_out;
22
23     do{
24         cur_in  = in;
25         cur_out = out;
26
27         if(cur_in - cur_out == que_size){
28             return false;
29         }
30
31     }while(!CAS(&in,cur_in,cur_in+1))//如果cur_in==in依然成立,说明没有其他线程修改in,cur_in是可用的,并修改in;如果cur_in!=in,说明in值已被其他线程修改
32
33     arry[cur_in&(que_size-1)] = in_data;
34
35     while(!CAS(&max_out,cur_in,cur_in+1)){//发布数据
36         sched_yield();//发布数据失败,cur_in之前还有数据没有发布出去,让出cpu,让其他线程先执行
37     }
38
39     return true;
40 }
41
42 template <typename ELEM_T>
43 bool queue<ELEM_T,QUE_SIZE>::dequeue(ELEM_T& out_data)
44 {
45     int cur_out;
46     int cur_max_out;
47
48     do{
49         cur_out   = out;
50         cur_max_out = max_out;
51
52         if(cur_out == cur_max_out){
53             return false;
54         }
55
56         out_data = arry[cur_out&(que_size-1)] ;//先尝试获取数据
57
58     }while(!CAS(&out,cur_out,cur_out+1))
59
60     return true;
61 }

enqueue操作:

23行到30行:

先预取可插入位置cur_in

然后判断缓冲区是否满,满了就不插入了,直接退出,返回false,表示缓冲区满

然后使用CAS(&in,cur_in,cur_in+1)判断该位置是否被使用过,在单线程中,25行到31行,cur_in和in永远相等,因为这之间根本就没有修过cur_in或in的语句,但是,在多线程中,in是所有生产者共享的,可能被其他线程在31行修改,所以,执行到31行时,要么cur_in<in,要么cur_in=in,CAS操作刚好可以判断相等的这个关系,如果相等,说明没有别的线程修改过in,也就是则预取的插入位置cur_in有效;如果不等,忙等待。

35行到37行:

这是生产者发布自己生产的数据,发布的方式就是把max_out指向当前线程的插入位置,同样,max_out可能会被其他线程修改,所以可能导致CAS失败,这时就不做忙等待了,而是让出cpu,仔细想一下,CAS失败的原因就是现在这个线程执行的太快了,导致这个线程插入位置cur_in之前还有数据没有发布出去,所以这个线程先让出cpu,先让没发布好数据的生产者先发布数据。

其实这样做会存在问题:如果线程A发布数据时,发现在他之前,还有没发布好数据的线程,假设为B,那么线程B在32行到34行之间挂掉之后,B之后的所有线程会一直处于忙等待的状态。我现在还不知道这个问题要怎么解决。

dequeue操作类似

2、生产者消费者

说明一下,下面18行、21行、31行、34行是伪码,看注释就知道其含义了。

 1 queue<int> dataque(1024);//环形缓冲区
 2
 3 queue<int> asleep_producers(32);//缓冲区满时,生产者应该阻塞,该队列就是生产者等待队列
 4 queue<int> asleep_consumers(32);//缓冲区空时,消费者应该阻塞,该队列就是消费者等待队列
 5
 6
 7 void produce(){
 8     if(!dataque.enqueue(in_data)){//生产的数据入队列
 9
10         /* 数据入队列失败,把当前数据丢掉,或者保存到磁盘中等等*/
11         ...
12         ...
13
14         if(!asleep_producers.enqueue(this->gettid())){//把当前生产者线程加入等待队列
15                 pthread_exit();//如果加入失败,说明已经达到了等待队列的最大值,那么线程主动退出
16         }
17
18         producer_poller.wait();//睡眠,等待信号的到来
19     }
20
21     wakeup_consumers(asleep_consumers);//生产了一个数据,应该从消费者等待队列中取一个线程消费数据,本质上可以发送一个数据可消费者的poller
22 }
23
24
25 void consume()
26 {
27     if(!dataque.dequeue(out_data)){
28
29         asleep_consumers.enqueue(this->gettid());//没能成功取得数据,把自己加入到消费者等待队列中
30
31         consumer_poller.wait();//睡眠,等待信号到来
32     }
33
34     wakeup_producers(asleep_producers);//消费了一个数据,应该从生产者等待队列中取一个线程生产数据,本质上可以发送一个数据给生产者的poller
35 }

关于producer_poller、consumer_poller、wait、wakeup的实现可以用下面的方案:

每个线程内含一个socketpair和select,select用于监听socketpair的读端。

生产者线程起来的时候先去尝试一次生产(调用produce函数),失败之后使用select监听消费者发过来的可生产信号,select成功监听到事件后,再去生产(调用produce)

消费者线程起来的时候先去尝试一次消费(调用consume函数),失败之后使用select监听生产者发过来的可消费信号,select成功监听到事件后,再去消费(调用consume)

具体的实现方案可以参考:zeromq源码分析笔记之线程间收发命令(2)

参考资料

基于数组的无锁队列(译)

ABA problem

无锁队列的实现

sched_yield(2) - Linux man page

时间: 2024-08-01 10:43:45

生产者消费者模式下的并发无锁环形缓冲区的相关文章

眉目传情之并发无锁环形队列的实现

眉目传情之并发无锁环形队列的实现 Author:Echo Chen(陈斌) Email:[email protected] Blog:Blog.csdn.net/chen19870707 Date:October 10th, 2014 前面在<眉目传情之匠心独运的kfifo>一文中详细解析了 linux  内核并发无锁环形队列kfifo的原理和实现,kfifo鬼斧神工,博大精深,让人叹为观止,但遗憾的是kfifo为内核提供服务,并未开放出来.剑不试则利钝暗,弓不试则劲挠诬,鹰不试则巧拙惑,马不

环形缓冲区的设计及其在生产者消费者模式下的使用(并发有锁环形队列)

1.环形缓冲区 缓冲区的好处,就是空间换时间和协调快慢线程.缓冲区可以用很多设计法,这里说一下环形缓冲区的几种设计方案,可以看成是几种环形缓冲区的模式.设计环形缓冲区涉及到几个点,一是超出缓冲区大小的的索引如何处理,二是如何表示缓冲区满和缓冲区空,三是如何入队.出队,四是缓冲区中数据长度如何计算. ps.规定以下所有方案,在缓冲区满时不可再写入数据,缓冲区空时不能读数据 1.1.常规数组环形缓冲区 设缓冲区大小为N,队头out,队尾in,out.in均是下标表示: 初始时,in=out=0 队头

并发编程基础之生产者消费者模式

一:概念 生产者消费者模式是java并发编程中很经典的并发情况,首先有一个大的容器,生产者put元素到 容器中,消费者take元素出来,如果元素的数量超过容器的容量时,生产者不能再往容器中put元素 ,处于阻塞状态,如果元素的数量等于0,则消费者不能在从容器中take数据,处于阻塞状态. 二:示例 /** * */ package com.hlcui.main; import java.util.LinkedList; import java.util.concurrent.ExecutorSe

生产者/消费者模式(一)

生产者消费者问题是一个多线程同步问题的经典案例,大多数多线程编程问题都是以生产者-消费者模式为基础,扩展衍生来的.在生产者消费者模式中,缓冲区起到了连接两个模块的作用:生产者把数据放入缓冲区,而消费者从缓冲区取出数据,如下图所示: 可以看出Buffer缓冲区作为一个中介,将生产者和消费者分开,使得两部分相对独立,生产者消费者不需要知道对方的实现逻辑,对其中一个的修改,不会影响另一个,从设计模式的角度看,降低了耦合度.而对于图中处在多线程环境中Buffer,需要共享给多个多个生产者和消费者,为了保

并发无锁队列学习之二【单生产者单消费者】

1.前言 最近工作比较忙,加班较多,每天晚上回到家10点多了.我不知道自己还能坚持多久,既然选择了就要做到最好.写博客的少了.总觉得少了点什么,需要继续学习.今天继续上个开篇写,介绍单生产者单消费者模型的队列.根据写入队列的内容是定长还是变长,分为单生产者单消费者定长队列和单生产者单消费者变长队列两种.单生产者单消费者模型的队列操作过程是不需要进行加锁的.生产者通过写索引控制入队操作,消费者通过读索引控制出队列操作.二者相互之间对索引是独享,不存在竞争关系.如下图所示: 2.单生产者单消费者定长

Java 并发编程(四)阻塞队列和生产者-消费者模式

阻塞队列 阻塞队列提供了可阻塞的 put 和 take 方法,以及支持定时的 offer 和 poll 方法.如果队列已经满了,那么put方法将阻塞直到有空间可以用:如果队列为空,那么take方法将一直阻塞直到有元素可用.队列可以使有界的,也可以是无界的,无界队列永远都不会充满,因此无界队列上的put方法永远不会阻塞.一种常见的阻塞生产者-消费者模式就是线程池与工作队列的组合,在 Executor 任务执行框架中就体现了这种模式. 意义:该模式能简化开发过程,因为他消除了生产者和消费者类之间的代

Java并发(基础知识)—— 阻塞队列和生产者消费者模式

1.阻塞队列 BlockingQueue是线程安全的Queue版本,从它的名字就可以看出,它是一个支持阻塞的Queue实现:当向空BlockingQueue请求数据时,它会阻塞至BlockingQueue非空:当向一个已满BlockingQueue插入数据时,线程会阻塞至BlockingQueue可插入. BlockingQueue 的方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 fa

11.9-全栈Java笔记: 线程并发协作(生产者/消费者模式)

多线程环境下,我们经常需要多个线程的并发和协作.这个时候,就需要了解一个重要的多线程并发协作模型"生产者消费者模式". 什么是生产者? 生产者指的是负责生产数据的模块(这里模块可能是:方法.对象.线程.进程). 什么是消费者? 消费者指的是负责处理数据的模块(这里模块可能是:方法.对象.线程.进程). 什么是缓冲区? 消费者不能直接使用生产者的数据,它们之间有个"缓冲区".生产者将生产好的数据放入"缓冲区",消费者从"缓冲区"

聊聊并发——生产者消费者模式

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题.该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度. 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这个问题于是引入了生产者和消费者模式. 什么是生产者消费者模式 生产者