线程安全的无锁RingBuffer的实现

在程序设计中,我们有时会遇到这样的情况,一个线程将数据写到一个buffer中,另外一个线程从中读数据。所以这里就有多线程竞争的问题。通常的解决办法是对竞争资源加锁。但是,一般加锁的损耗较高。其实,对于这样的一个线程写,一个线程读的特殊情况,可以以一种简单的无锁RingBuffer来实现。这样代码的运行效率很高。

本文借鉴了Disruptor项目代码。

代码我在github上放了一份,需要的同学可以去下载(RingBuffer.java)。本文最后也会附上一份。

代码的基本原理如下。

如图所示,假定buffer的长度是bufferSize. 我们设置两个指针。head指向的是下一次读的位置,而tail指向的是下一次写的位置。由于这里是环形buffer (ring buffer),这里就有一个问题,怎样判断buffer是满或者空。这里采用的规则是,buffer的最后一个单元不存储数据。所以,如果head == tail,那么说明buffer为空。如果 head == tail + 1 (mod bufferSize),那么说明buffer满了。

接下来就是最重要的内容了:怎样以无锁的方式进行线程安全的buffer的读写操作。基本原理是这样的。在进行读操作的时候,我们只修改head的值,而在写操作的时候我们只修改tail的值。在写操作时,我们在写入内容到buffer之后才修改tail的值;而在进行读操作的时候,我们会读取tail的值并将其赋值给copyTail。赋值操作是原子操作。所以在读到copyTail之后,从head到copyTail之间一定是有数据可以读的,不会出现数据没有写入就进行读操作的情况。同样的,读操作完成之后,才会修改head的数值;而在写操作之前会读取head的值判断是否有空间可以用来写数据。所以,这时候tail到head - 1之间一定是有空间可以写数据的,而不会出现一个位置的数据还没有读出就被写操作覆盖的情况。这样就保证了RingBuffer的线程安全性。

最后附上代码供参考。欢迎批评指正,也欢迎各种讨论!

 1 public class RingBuffer {
 2
 3     private final static int bufferSize = 1024;
 4     private String[] buffer = new String[bufferSize];
 5     private int head = 0;
 6     private int tail = 0;
 7
 8     private Boolean empty() {
 9         return head == tail;
10     }
11     private Boolean full() {
12         return (tail + 1) % bufferSize == head;
13     }
14     public Boolean put(String v) {
15         if (full()) {
16             return false;
17         }
18         buffer[tail] = v;
19         tail = (tail + 1) % bufferSize;
20         return true;
21     }
22     public String get() {
23         if (empty()) {
24             return null;
25         }
26         String result = buffer[head];
27         head = (head + 1) % bufferSize;
28         return result;
29     }
30     public String[] getAll() {
31         if (empty()) {
32             return new String[0];
33         }
34         int copyTail = tail;
35         int cnt = head < copyTail ? copyTail - head : bufferSize - head + copyTail;
36         String[] result = new String[cnt];
37         if (head < copyTail) {
38             for (int i = head; i < copyTail; i++) {
39                 result[i - head] = buffer[i];
40             }
41         } else {
42             for (int i = head; i < bufferSize; i++) {
43                 result[i - head] = buffer[i];
44             }
45             for (int i = 0; i < copyTail; i++) {
46                 result[bufferSize - head + i] = buffer[i];
47             }
48         }
49         head = copyTail;
50         return result;
51     }
52 }

时间: 2024-12-06 19:11:20

线程安全的无锁RingBuffer的实现的相关文章

高效线程池之无锁化实现(Linux C)

笔者之前练手写过一个小的线程池版本(已上传至https://github.com/xhjcehust/thread-pool),最近几天复习了一下,发现大多数线程池实现都离不开锁的使用,如互斥量pthread_mutex*结合条件变量pthread_cond*.众所周知,锁的使用对于程序性能影响较大,虽然现有的pthread_mutex*在锁的申请与释放方面做了较大的优化,但仔细想想,线程池的实现是可以做到无锁化的,于是有了本文. 1.常见线程池实现原理 如上图所示,工作队列由主线程和工作者线程

(转)高效线程池之无锁化实现(Linux C)

本文链接:https://blog.csdn.net/xhjcehust/article/details/45844901 笔者之前照着通用写法练手写过一个小的线程池版本,最近几天复习了一下,发现大多数线程池实现都离不开锁的使用,如互斥量pthread_mutex*结合条件变量pthread_cond*.众所周知,锁的使用对于程序性能影响较大,虽然现有的pthread_mutex*在锁的申请与释放方面做了较大的优化,但仔细想想,线程池的实现是可以做到无锁化的,于是有了本文. 1.常见线程池实现原

基于循环数组的无锁队列

在之前的两篇博客(线程安全的无锁RingBuffer的实现,多个写线程一个读线程的无锁队列实现)中,分别写了在只有一个读线程.一个写线程的情况下,以及只有一个写线程.两个读线程的情况下,不采用加锁技术,甚至原子运算的循环队列的实现.但是,在其他的情况下,我们也需要尽可能高效的线程安全的队列的实现.本文实现了一种基于循环数组和原子运算的无锁队列.采用原子运算(compare and swap)而不是加锁同步,可以很大的提高运行效率.之所以用循环数组,是因为这样在使用过程中不需要反复开辟内存空间,可

多个写线程一个读线程的无锁队列实现

在之前的一篇博客中,写了一个在特殊情况下,也就是只有一个读线程和一个写线程的情况下,的无锁队列的实现.其中甚至都没有利用特殊的原子加减操作,只是普通的运算.这样做的原因是,即使是特殊的原子加减操作,也比普通的加减运算复杂度高很多.因此文中的实现方法可以达到很高的运行效率. 但是,有的情况下并不是只有一个读线程和一个写线程.越是一般化的实现,支持的情况越多,但是往往损失的性能也越多.作者看到过一个实现(http://www.oschina.net/code/snippet_732357_13465

使用RCU技术实现读写线程无锁

在一个系统中有一个写线程和若干个读线程,读写线程通过一个指针共用了一个数据结构,写线程改写这个结构,读线程读取该结构.在写线程改写这个数据结构的过程中,加锁情况下读线程由于等待锁耗时会增加. 可以利用RCU (Read Copy Update What is rcu)的思想来去除这个锁.本文提到的主要实现代码:gist RCU RCU可以说是一种替代读写锁的方法.其基于一个事实:当写线程在改变一个指针时,读线程获取这个指针,要么获取到老的值,要么获取到新的值.RCU的基本思想其实很简单,参考Wh

基于无锁队列和c++11的高性能线程池

基于无锁队列和c++11的高性能线程池线程使用c++11库和线程池之间的消息通讯使用一个简单的无锁消息队列适用于linux平台,gcc 4.6以上 标签: <无> 代码片段(6)[全屏查看所有代码] 1. [代码]lckfree.h ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 // lckfree.h //

支持内部晋升的无锁并发优先级线程池

支持内部晋升的无锁并发优先级线程池 [TOC] 引言 在技术群讨论到一个有意思的业务需求,可以描述为: 有一个内部按照优先级进行任务排序的线程池.线程池会优先执行高优先级的任务.随着时间的流逝,线程池内部低优先级的任务的优先级会逐渐晋升变为高优先级,以避免被不断新增的高优先级任务阻塞导致饿死. 考虑到 JDK 已经为开发者提供了自定义线程池ThreadPoolExecutor以及优先级队列PriorityBlockingQueue,两者相结合并且定期调整队列中低优先级任务的优先级再进行resor

一个无锁消息队列引发的血案(六)——RingQueue(中) 休眠的艺术 [续]

目录 (一)起因 (二)混合自旋锁 (三)q3.h 与 RingBuffer (四)RingQueue(上) 自旋锁 (五)RingQueue(中) 休眠的艺术 (六)RingQueue(中) 休眠的艺术 [续] 开篇 这是第五篇的后续,这部分的内容同时会更新和添加在 第五篇:RingQueue(中) 休眠的艺术 一文的末尾. 归纳 紧接上一篇的末尾,我们把 Windows 和 Linux 下的休眠策略归纳总结一下,如下图: 我们可以看到,Linux 下的 sched_yield() 虽然包括了

boost 无锁队列

一哥们翻译的boost的无锁队列的官方文档 原文地址:http://blog.csdn.net/great3779/article/details/8765103 Boost_1_53_0终于迎来了久违的Boost.Lockfree模块,本着学习的心态,将其翻译如下.(原文地址:http://www.boost.org/doc/libs/1_53_0/doc/html/lockfree.html) Chapter 17. Boost.Lockfree 第17章.Boost.Lockfree Ta