使用无锁队列(环形缓冲区)注意事项

环形缓冲区是生产者和消费者模型中常用的数据结构。生产者将数据放入数组的尾端,而消费者从数组的另一端移走数据,当达到数组的尾部时,生产者绕回到数组的头部。如果只有一个生产者和一个消费者,那么就可以做到免锁访问环形缓冲区(Ring Buffer)。写入索引只允许生产者访问并修改,只要写入者在更新索引之前将新的值保存到缓冲区中,则读者将始终看到一致的数据结构。同理,读取索引也只允许消费者访问并修改。

环形缓冲区实现原理图

如图所示,当读者和写者指针相等时,表明缓冲区是空的,而只要写入指针在读取指针后面时,表明缓冲区已满。

清单 9. 2.6.10 环形缓冲区实现代码

/*

* __kfifo_put - puts some data into the FIFO, no locking version

* Note that with only one concurrent reader and one concurrent

* writer, you don‘t need extra locking to use these functions.

*/

unsigned int __kfifo_put(struct kfifo *fifo,

unsigned char *buffer, unsigned int len)

{

unsigned int l;

len = min(len, fifo->size - fifo->in + fifo->out);

/* first put the data starting from fifo->in to buffer end */

l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));

memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);

/* then put the rest (if any) at the beginning of the buffer */

memcpy(fifo->buffer, buffer + l, len - l);

fifo->in += len;

return len;

}

/*

* __kfifo_get - gets some data from the FIFO, no locking version

* Note that with only one concurrent reader and one concurrent

* writer, you don‘t need extra locking to use these functions.

*/

unsigned int __kfifo_get(struct kfifo *fifo,

unsigned char *buffer, unsigned int len)

{

unsigned int l;

len = min(len, fifo->in - fifo->out);

/* first get the data from fifo->out until the end of the buffer */

l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));

memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);

/* then get the rest (if any) from the beginning of the buffer */

memcpy(buffer + l, fifo->buffer, len - l);

fifo->out += len;

return len;

}

需要注意的是

使用ring_buffer_get(kfifo_get)或者ring_buffer_put(kfifo_put)时,如果返回参数与传入参数len不相等时,则操作失败

我们定义一个

//注意student_info 共17字节 按照内存排列占24字节

typedef struct student_info

{

uint64_t stu_id; //8个字节

uint32_t age;  //4字节

uint32_t score;//4字节

char sex;//1字节

}student_info;

我们建立一个环形缓冲区,里面只有64字节大小(虽然我们实际使用时大小远大于此),向里面多次存入24字节student_info,看有什么反应

//打印学生信息

void print_student_info(const student_info *stu_info)

{

assert(stu_info);

printf("id:%lu\t",stu_info->stu_id);

printf("age:%u\t",stu_info->age);

printf("sex:%d\t",stu_info->sex);

printf("score:%u\n",stu_info->score);

}

student_info * get_student_info(time_t timer)

{

student_info *stu_info = (student_info *)malloc(sizeof(student_info));

srand(timer);

stu_info->stu_id = 10000 + rand() % 9999;

stu_info->age = rand() % 30;

stu_info->score = rand() % 101;

stu_info->sex=rand() % 2;

print_student_info(stu_info);

return stu_info;

}

void print_ring_buffer_len(struct ring_buffer *ring_buf)

{

//用于打印缓冲区长度

uint32_t ring_buf_len = 0;

//取得已经使用缓冲区长度 size-ring_buf_len为未使用缓冲区的长度

ring_buf_len=ring_buffer_len(ring_buf);

printf("no use ring_buf_len:%d\n",(ring_buf->size-ring_buf_len));

}

int main(int argc, char *argv[])

{

uint32_t size = 0;

//用于判断存储或者取得数据的字节数

uint32_t oklen = 0;

struct ring_buffer *ring_buf = NULL;

//64字节

size=BUFFER_SIZE;

ring_buf = ring_buffer_alloc(size);

printf("input student\n");

{

student_info *stu_info;

student_info stu_temp;

uint32_t student_len=sizeof(student_info);

printf("ring_buf_len:%d\n",ring_buf->size);

printf("student_len:%d\n",student_len);

//此时环形缓冲区没有数据我们去取数据当然为空

memset(&stu_temp,0,student_len);

oklen=ring_buffer_get(ring_buf, (void *)(&stu_temp), student_len);

if(oklen==student_len)

{

printf("get student data\n");

}

else

{

printf("no student data\n");

}

printf("\n");

//第一次调用时用字节结束后还有64-24 =40字节

stu_info = get_student_info(976686458);

oklen = ring_buffer_put(ring_buf, (void *)stu_info, student_len);

if(oklen==student_len)

{

printf("1 put student data success\n");

}

else

{

printf("1 put student data failure\n");

}

print_ring_buffer_len(ring_buf);

printf("\n");

//第二次调用时用字节结束后还有64-48 =16字节

stu_info = get_student_info(976686464);

oklen= ring_buffer_put(ring_buf, (void *)stu_info, student_len);

if(oklen==student_len)

{

printf("2 put student data success\n");

}

else

{

printf("2 put student data failure\n");

}

print_ring_buffer_len(ring_buf);

printf("\n");

//第三次调用时需要用字节但只有字节失败

//把字节都写满了

//验证了在调用__kfifo_put函数或者__kfifo_get函数时,如果返回参数与传入参数len不相等时,则操作失败

stu_info = get_student_info(976686445);

oklen= ring_buffer_put(ring_buf, (void *)stu_info, student_len);

if(oklen==student_len)

{

printf("3 put student data success\n");

}

else

{

printf("3 put student data failure\n");

}

print_ring_buffer_len(ring_buf);

printf("\n");

//第四次调用时需要用字节但无字节

////验证了在调用__kfifo_put函数或者__kfifo_get函数时,如果返回参数与传入参数len不相等时,则操作失败

stu_info = get_student_info(976686421);

oklen= ring_buffer_put(ring_buf, (void *)stu_info, student_len);

if(oklen==student_len)

{

printf("4 put student data success\n");

}

else

{

printf("4 put student data failure\n");

}

print_ring_buffer_len(ring_buf);

printf("\n");

//现在开始取学生数据里面保存了个学生数据我们取三次看效果

printf("output student\n");

printf("\n");

//第一次取得数据并打印

memset(stu_info,0,student_len);

oklen=ring_buffer_get(ring_buf, (void *)stu_info, student_len);

if(oklen==student_len)

{

print_student_info(stu_info);

printf("1 get student data success\n");

}

else

{

printf("1 get student data failure\n");

}

print_ring_buffer_len(ring_buf);

printf("\n");

////第二次取得数据并打印

memset(stu_info,0,student_len);

oklen=ring_buffer_get(ring_buf, (void *)stu_info, student_len);

if(oklen==student_len)

{

print_student_info(stu_info);

printf("2 get student data success\n");

}

else

{

printf("2 get student data failure\n");

}

print_ring_buffer_len(ring_buf);

printf("\n");

//第三次取得数据失败

memset(stu_info,0,student_len);

oklen=ring_buffer_get(ring_buf, (void *)stu_info, student_len);

if(oklen==student_len)

{

print_student_info(stu_info);

printf("3 get student data success\n");

}

else

{

printf("3 get student data failure\n");

}

print_ring_buffer_len(ring_buf);

}

return 1;

}

结论:在使用ring_buffer_get(kfifo_get)或者ring_buffer_put(kfifo_put)时,如果返回参数与传入参数len不相等时,则操作失败。代码下载:tessc.rarhttp://files.cnblogs.com/dragonsuc/TDDOWNLOAD.zip

需要注意的地方:

1.只有一个线程负责读,另一个线程负责写的时候,数据是线程安全的。上面的实现是基于这个原理实现的,当有多个线程读或者多个线程写的时候,不保证数据的正确性。
所以使用的时候,一个线程写,一个线程读。网络应用中比较常用,就是开一个线程接口数据,然后把数据写入队列。然后开一个调度线程读取网络数据,然后分发到处理线程。

2.数据长度默认宏定义了一个长度,超过这个长度的时候,后续的数据会写入失败。

本文参考文章:

http://blog.csdn.net/mergerly/article/details/39009473

http://www.cnblogs.com/Anker/p/3481373.html

时间: 2024-11-05 18:55:19

使用无锁队列(环形缓冲区)注意事项的相关文章

环形无锁队列

环形无锁队列 Table of Contents 1 环形无锁队列的实现 2 死锁及饥饿 3 一些优化 1 环形无锁队列的实现 数据结构定义: template class LockFreeQueue { private: ElementT *mArray; int mCapacity; int mFront; int mTail; } 由于出队操作是在队首进行,入队操作是在队尾进行,因此,我们可以尝试用mFront和mTail来实现多个线程之间的协调.这其中会用到CAS操作: 入队操作伪码:

无锁队列的环形数组实现

对无锁队列的最初兴趣来自梁斌同志的一个英雄帖:http://coderpk.com/. 第一次看到这个题目的时候还不知道CAS,FAA等所谓的“原子操作”,但直觉上感觉,通过对读写操作的性能优化来达到大幅提高队列性能的方法是行不通的,就算读写操作全用汇编来写,也不会和正常的read及 write有数量级上的区别.后来搜索了一下lock free data structure,才知道了关于原子操作的一些东西,同时也纠正了自己由来已久的一个错误观点:C++中的++操作和--操作都不是原子操作.这篇笔

boost 无锁队列

一哥们翻译的boost的无锁队列的官方文档 原文地址:http://blog.csdn.net/great3779/article/details/8765103 Boost_1_53_0终于迎来了久违的Boost.Lockfree模块,本着学习的心态,将其翻译如下.(原文地址:http://www.boost.org/doc/libs/1_53_0/doc/html/lockfree.html) Chapter 17. Boost.Lockfree 第17章.Boost.Lockfree Ta

并发无锁队列

1.前言 队列在计算机中非常重要的一种数据结构,尤其在操作系统中.队列典型的特征是先进先出(FIFO),符合流水线业务流程.在进程间通信.网络通信之间经常采用队列做缓存,缓解数据处理压力.结合自己在工作中遇到的队列问题,总结一下对不同场景下的队列实现.根据操作队列的场景分为:单生产者--单消费者.多生产者--单消费者.单生产者--多消费者.多生产者--多消费者四大模型.其实后面三种的队列,可以归纳为一种多对多.根据队列中数据分为:队列中的数据是定长的.队列中的数据是变长的. 2.队列操作模型 (

并发无锁队列学习之二【单生产者单消费者】

1.前言 最近工作比较忙,加班较多,每天晚上回到家10点多了.我不知道自己还能坚持多久,既然选择了就要做到最好.写博客的少了.总觉得少了点什么,需要继续学习.今天继续上个开篇写,介绍单生产者单消费者模型的队列.根据写入队列的内容是定长还是变长,分为单生产者单消费者定长队列和单生产者单消费者变长队列两种.单生产者单消费者模型的队列操作过程是不需要进行加锁的.生产者通过写索引控制入队操作,消费者通过读索引控制出队列操作.二者相互之间对索引是独享,不存在竞争关系.如下图所示: 2.单生产者单消费者定长

转载:无锁队列的实现(CAS同步)

转自:http://coolshell.cn/articles/8239.html 关于无锁队列的实现,网上有很多文章,虽然本文可能和那些文章有所重复,但是我还是想以我自己的方式把这些文章中的重要的知识点串起来和大家讲一讲这个技术.下面开始正文. 关于CAS等原子操作 在开始说无锁队列之前,我们需要知道一个很重要的技术就是CAS操作——Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令.

原子操作实现无锁队列

关于CAS等原子操作 在开始说无锁队列之前,我们需要知道一个很重要的技术就是CAS操作——Compare & Set或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令.有了这个原子操作,我们就可以用其来实现各种无锁(lock free)的数据结构. 这个操作用C语言来描述就是下面这个样子:(代码来自Wikipedia的Compare And Swap词条)意思就是说,看一看内存*reg里的值是不是oldval,如果

基于循环数组的无锁队列

在之前的两篇博客(线程安全的无锁RingBuffer的实现,多个写线程一个读线程的无锁队列实现)中,分别写了在只有一个读线程.一个写线程的情况下,以及只有一个写线程.两个读线程的情况下,不采用加锁技术,甚至原子运算的循环队列的实现.但是,在其他的情况下,我们也需要尽可能高效的线程安全的队列的实现.本文实现了一种基于循环数组和原子运算的无锁队列.采用原子运算(compare and swap)而不是加锁同步,可以很大的提高运行效率.之所以用循环数组,是因为这样在使用过程中不需要反复开辟内存空间,可

多个写线程一个读线程的无锁队列实现

在之前的一篇博客中,写了一个在特殊情况下,也就是只有一个读线程和一个写线程的情况下,的无锁队列的实现.其中甚至都没有利用特殊的原子加减操作,只是普通的运算.这样做的原因是,即使是特殊的原子加减操作,也比普通的加减运算复杂度高很多.因此文中的实现方法可以达到很高的运行效率. 但是,有的情况下并不是只有一个读线程和一个写线程.越是一般化的实现,支持的情况越多,但是往往损失的性能也越多.作者看到过一个实现(http://www.oschina.net/code/snippet_732357_13465