无锁队列的环形数组实现

对无锁队列的最初兴趣来自梁斌同志的一个英雄帖:http://coderpk.com/。 第一次看到这个题目的时候还不知道CAS,FAA等所谓的“原子操作”,但直觉上感觉,通过对读写操作的性能优化来达到大幅提高队列性能的方法是行不通的,就算读写操作全用汇编来写,也不会和正常的read及 write有数量级上的区别。后来搜索了一下lock free data structure,才知道了关于原子操作的一些东西,同时也纠正了自己由来已久的一个错误观点:C++中的++操作和--操作都不是原子操作。这篇笔记将记录在我在探索这个问题的过程中产生的一些想法,同时对多线程编程——或者说是并发编程的一些问题进行备忘。

多线程及并发

线程是操作系统进行作业调度的最小单位,也是进程内部的一条执行路径。与进程不同,线程并没有对操作系统的资源所有权,也就是说多个线程对资源的访问权是共享的。一个进程中的所有线程共享一个地址空间或者诸如打开的文件之类的其他资源,一个进程对这些资源的任何修改,都会影响到本进程中其他线程的运行。因此,需要对多个线程的执行进行细致地设计,使它们能够互不干涉,并且不破坏共享的数据及资源。

在单处理器中的并发系统里,不同进程和线程之间的指令流是交替执行的,但由于调度系统及CPU时钟的配合,使得程序对外表现出一种同时执行的外部特征;而在并行多处理器系统中,指令流之间的执行则是重叠的。无论是交替执行还是重叠执行,实际上并发程序都面临这同样的问题,即指令流的执行速度不可预测,这取决于其他指令流的活动状态、操作系统处理中断的方式及操作系统的调度策略。这给并发程序设计带来了如下的一些问题:

1)多个进程(或线程)对同一全局资源的访问可能造成未定义的后果。例如,如果两个并发线程都使用同一个全局变量,并且都对该变量执行读写操作,那么程序执行的结果就取决于不同的读写执行顺序——而这些读写执行顺序是不可预知的。

2)操作系统难以对资源进行最优化分配。这涉及到死锁及饥饿的问题。

3)很难定位程序的错误。在多数情况下,并发程序设计的失误都是很难复现的,在一次执行中出现一种结果,而在下一次执行中,往往会出现迥然不同的其他结果。

因此,在进行多线程程序或者并发程序的设计时,尤其需要小心。可以看到的是,绝大多数并发程序的错误都出现在对共享资源的访问上,因此,如何保证对共享资源的访问以一种确定的、我们可以预知的方式运行,成为并发程序设计的首要问题。在操作系统领域,对共享资源的访问有个专用的数据,称为临界区。

临界区是一段代码,在这段代码中,进程将访问共享资源。当另外一个进程已经在这段代码中执行时,这个进程就不能在这段代码中执行。

也就是说,临界区是一个代码段,这个代码段不允许两条并行的指令流同时进入。提供这种保证的机制称为互斥:当一个进程在临界区访问共享资源时,其他进程不能进入该临界区。

锁及互斥

实现互斥的机制,最重要的是互斥锁(Mutex)。互斥锁实际上是一种二元信号量(只有0和1),专用于多任务之间临界区的互斥操作。(关于信号量及互斥锁的区别,可以参看操作系统相关知识)

Mutex本质上是一个信号量对象,只有0和1两个值。同时,mutex还对信号量加1和减1的操作进行了限制,即某个线程对其进行了+1操作,则-1操作也必须由这个线程来完成。mutex的两个值也分别代表了Mutex的两种状态。值为0, 表示锁定状态,当前对象被锁定,用户进程/线程如果试图Lock临界资源,则进入排队等待;值为1,表示空闲状态,当前对象为空闲,用户进程/线程可以Lock临界资源,之后Mutex值减1变为0。

Mutex可以抽象为创建(Create),加锁(Lock),解锁(Unlock),及销毁(Destroy)等四个操作。在创建Mutex时,可以指定锁的状态是空闲或者是锁定,在linux中,这个属性的设置主要通过pthread_mutex_init来实现。

在使用mutex的时候,务必需要了解其本质:Mutex实际上是一个在多个线程之间共享的信号量,当其进入锁定状态时,再试图对其加锁,则会阻塞线程。例如,对于两个线程A和B,其指令序列如下:


线程A


线程B


1.lock(&mutex);

2.do something;

3.unlock(&mutex);


1.lock(&mutex);

2.do something;
3.unlock(&mutex);

在线程A的语句1处,线程A对mutex进行了加锁操作,mutex变为锁定状态。在线程A的语句2及线程B的语句1处,A尚未对mutex进行解锁,而B则试图是mutex进行加锁操作,因此线程B被阻塞,直到A的语句3处,线程A对mutex进行了解锁,B的语句1才得以继续执行,将mutex进行加锁并继续执行语句2和语句3。因此,如果在do something中有对共享资源的访问操作,那么do something就是一个临界区,每次都只有一个线程能够进入这段代码。

原子操作

无论是信号量,还是互斥,其中最重要的一个概念就是原子操作。所谓原子操作,就是不会被线程调度机制所打断的操作——从该操作的第一条指令开始到最后一条指令结束,中间不会有任何的上下文切换(context switch)。

在单处理器系统上,原子操作的实现较为简单:第一种方式是一些单指令即可完成的操作,如compare and swap、test and set等;由于上下文切换只可能出现在指令之间,因此单处理器系统上的单指令操作都是原子操作;另一种方式则是禁用中断,通过汇编语言支持,在指令执行期间,禁用处理器的所有中断操作,由于上下文切换都是通过中断来触发的,因此禁用中断后,可以保证指令流的执行不会被外部指令所打断。

而在多处理器系统上,情况要复杂一些。由于系统中有多个处理器在独立地运行,即使能在单条指令中完成的操作也有可能受到干扰。如,在不同的CPU运行的两个进行都在执行一条递减指令,即对内存中某个内存单元的值-1,则指令流水线可能是这样:(省略了取指)

A处理器: |--读内存--|--计数减1--|--写内存--|

B处理器:            |--读内存--|--计数减1--|--写内存--|

假设原来内存单元中存储的值为5,那么,A、B处理器所读到的内存值都为5,其往内存单元中写入的值都为4。因此,虽然进行了两次-1操作,但实际上运行的结果和执行了1次是一样的。

注:这是一个数据相关问题(关于数据相关问题,可以参考计算机体系结构中指令流水线的设计及数据相关的避免等资料),在单处理机中,这个问题可以通过检查处理机中的指令寄存器,来检查在流水线中的指令之间的相关性,如果出现数据相关的情况,可以通过延迟相关指令执行的方法来规避;而在对称多处理机中,由于CPU之间相互不知道对方的指令寄存器状态,那么这种流水线作业引起的数据竞跑就无法避免。

为了对原子操作提供支持,在x86 平台上,CPU提供了在指令执行期间对总线加锁的手段。CPU芯片上有一条引线#HLOCK pin,如果汇编语言的程序中在一条指令前面加上前缀"LOCK",经过汇编以后的机器代码就使CPU在执行这条指令的时候把#HLOCK pin的电位拉低,持续到这条指令结束时放开,从而把总线锁住,这样同一总线上别的CPU就暂时不能通过总线访问内存了,保证了这条指令在多处理器环境中的原子性。

可以看出,其实pthread_mutex_lock及pthread_mutex_unlock就是一个原子操作。它保证了两个线程不会同时对某个mutex变量加锁或者解锁,否则的话,互斥也就无从实现了。

i++和++i是原子操作吗?

有一个很多人也许都不是很清楚的问题:i++或++i是一个原子操作吗?在上一节,其实已经提到了,在SMP(对称多处理器)上,即使是单条递减汇编指令,其原子性也是不能保证的。那么在单处理机系统中呢?

在编译器对C/C++源代码进行编译时,往往会进行一些代码优化。例如,对i++这条指令,实际上编译器编译出的汇编代码是类似下面的汇编语句:

1.mov eax,[i]

2.add eax,1

3.mov [i],eax

语句1是将i所在的内存读取到寄存器中,而语句2是将寄存器的值加1,语句3是将寄存器值写回到内存中。之所以进行这样的操作,是为了CPU访问数据效率的高效。可以看出,i++是由一条语句被编译成了3条指令,因此,即使在单处理机系统上,i++这种操作也不是原子的。这是由于指令之间的乱序执行而造成的,注意和上节中,指令流水线之间的数据竞跑造成的数据不一致的区别。

GCC的内建原子操作

在GCC中,从版本4.1.2起,提供了__sync_*系列的built-in函数,用于提供加减和逻辑运算的原子操作。这些操作通过锁定总线,无论在单处理机和多处理机上都保证了其原子性。GCC提供的原子操作主要包括:

type __sync_fetch_and_add (type *ptr, type value, ...)

type __sync_fetch_and_sub (type *ptr, type value, ...)

type __sync_fetch_and_or (type *ptr, type value, ...)

type __sync_fetch_and_and (type *ptr, type value, ...)

type __sync_fetch_and_xor (type *ptr, type value, ...)

type __sync_fetch_and_nand (type *ptr, type value, ...)

这六个函数的作用是:取得ptr所指向的内存中的数据,同时对ptr中的数据进行修改操作(加,减,或,与,异或,与后取非)等;

type __sync_add_and_fetch (type *ptr, type value, ...)

type __sync_sub_and_fetch (type *ptr, type value, ...)

type __sync_or_and_fetch (type *ptr, type value, ...)

type __sync_and_and_fetch (type *ptr, type value, ...)

type __sync_xor_and_fetch (type *ptr, type value, ...)

type __sync_nand_and_fetch (type *ptr, type value, ...)

这六个函数与上六个函数基本相同,不同之处在于,上六个函数返回值为修改之前的数据,而这六个函数返回的值为修改之后的数据.

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)

type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)

比较并交换指令.如果ptr所指向的内存中的数据等于oldval,则设置其为newval,同时返回true;否则返回false.

type __sync_lock_test_and_set (type *ptr, type value, ...)

测试并置位指令.

void __sync_lock_release (type *ptr, ...)

将ptr设置为0;

其中,这些操作的操作数(type)

可以是1,2,4或8字节长度的int类型,即:

int8_t / uint8_t

int16_t / uint16_t

int32_t / uint32_t

int64_t / uint64_t

无锁队列的实现

在酷壳的一篇文章(http://coolshell.cn/articles/8239.html )中,给出了一种链表无锁队列的实现。其中对ABA和double CAS等现象都进行了分析。在文章的结尾,给出了一种数组无锁队列的实现,不过这个数组受限于CAS、FAA等操作对操作类型的限制,只能存储一些较小的数据类型,如32位数据等。而对于链表无锁队列,每次进行出队和入队操作都伴随着内存的分配和释放,不可避免地要影响到效率。

而使用环形数组的队列则避免了频繁的内存操作,从实现上来说也更加简单。本节描述如何以环形数组为基础,实现一个无锁队列。

多线程之间的协调

多线程程序或者说并发程序之间协调的关键是,要考虑到多个线程同时访问某个资源的时候,保证它们访问的顺序能够准确地反映到程序执行的结果上。

先定义一下无锁队列的基本结构:

template

class LockFreeQueue {

private:

ElementT * ring_array_;

int size_;

int head_index_;

int tail_index_;

}

由于出队操作都是在队首进行,而入队操作则都是在队尾进行,因此,我们可以尝试用head_index_和tail_index_来实现多个线程之间的协调。这其中会用到CAS操作:

入队进程:

……

do {

获取当前的tail_index_的值cur_tail_index;

计算新的tail_index_的值:new_tail_index = (cur_tail_index + 1) % size;

} while(!CAS(tail_index_, cur_tail_index, new_tail_index));

插入元素到cur_tail_index;

其中的do-while循环实现的是一个忙式等待:线程试图获取当前的队列尾部空间的控制权;一旦获取成功,则向其中插入元素。

但是这样出队的时候就出现了问题:如何判断队首的位置里是否有相应元素呢?仅使用head_index_来判断是不行的,这只能保证出队进程不会对同一个索引位置进行出队操作,而不能保证head_index_的位置中一定有有效的元素。

因此,为了保证出队队列与入队队列之间的协调,需要在LockFreeQueue中添加一个标志数组:

char * flag_array_;

flag_array中的元素标记ring_array_中与之对应的元素位置是否有效。flag_array_中的元素有4个取值:

0表示对应的ring_array_中的槽位为空;1表示对应槽位已被申请,正在写入;2表示对应槽位中为有效的元素,可以对其进行出对操作;3则表示正在弹出操作。

修改后的无锁队列的代码如下:

template

class LockFreeQueue {

private:

ElementT * ring_array_;

char * flags_array_; // 标记位,标记某个位置的元素是否被占用

// flags: 0:空节点;1:已被申请,正在写入

// 2:已经写入,可以弹出;3,正在弹出操作;

int size_;  // 环形数组的大小

int element_num_; //队列中元素的个数

int head_index_;

int tail_index_;

public:

LockFreeQueue(int s = 0) {

size_ = s;

head_index_ = 0;

tail_index_ = 0;

element_num_ = 0;

}

~LockFreeQueue() {}

public:

// 初始化queue。分配内存,设定size

bool Init(void);

const int GetSize(void) const {

return size_;

}

const int GetElementNum(void) const {

return element_num_;

}

// 入队函数

bool EnQueue(const ElementT & ele);

// 出队函数

bool DeQueue(ElementT * ele);

};

// This function is NOT ThreadSafe!

// 应当在单线程环境中使用该函数

// OR should be called in the constructor...

template

bool LockFreeQueue::Init(void) {

flags_array_ = new(std::nothrow) char[size_];

if (flags_array_ == NULL)

return false;

memset(flags_array_, 0, size_);

ring_array_ = reinterpret_cast(

new(std::nothrow) char[size_ * sizeof(ElementT)]);

if (ring_array_ == NULL)

return false;

memset(ring_array_, 0, size_ * sizeof(ElementT));

return true;

}

// ThreadSafe

// 元素入队尾部

template

bool LockFreeQueue::EnQueue(const ElementT & ele) {

if (!(element_num_ < size_))

return false;

int cur_tail_index = tail_index_;

char * cur_tail_flag_index = flags_array_ + cur_tail_index;

// 忙式等待

// while中的原子操作:如果当前tail的标记为“”已占用(1)“,则更新cur_tail_flag_index,

// 继续循环;否则,将tail标记设为已经占用

while (!__sync_bool_compare_and_swap(cur_tail_flag_index, 0, 1)) {

cur_tail_index = tail_index_;

cur_tail_flag_index = flags_array_ +  cur_tail_index;

}

// 两个入队线程之间的同步

// 取模操作可以优化

int update_tail_index = (cur_tail_index + 1) % size_;

// 如果已经被其他的线程更新过,则不需要更新;

// 否则,更新为 (cur_tail_index+1) % size_;

__sync_bool_compare_and_swap(&tail_index_, cur_tail_index, update_tail_index);

// 申请到可用的存储空间

*(ring_array_ + cur_tail_index) = ele;

// 写入完毕

__sync_fetch_and_add(cur_tail_flag_index, 1);

// 更新size;入队线程与出队线程之间的协作

__sync_fetch_and_add(&element_num_, 1);

return true;

}

// ThreadSafe

// 元素出队头部

template

bool LockFreeQueue::DeQueue(ElementT * ele) {

if (!(element_num_ > 0))

return false;

int cur_head_index = head_index_;

char * cur_head_flag_index = flags_array_ + cur_head_index;

while (!__sync_bool_compare_and_swap(cur_head_flag_index, 2, 3)) {

cur_head_index = head_index_;

cur_head_flag_index = flags_array_ + cur_head_index;

}

// 取模操作可以优化

int update_head_index = (cur_head_index + 1) % size_;

__sync_bool_compare_and_swap(&head_index_, cur_head_index, update_head_index);

*ele = *(ring_array_ + cur_head_index);

// 弹出完毕

__sync_fetch_and_sub(cur_head_flag_index, 3);

// 更新size

__sync_fetch_and_sub(&element_num_, 1);

return true;

}

无锁队列的分析——死锁及饥饿

经过上节的分析,LockFreeQueue实现了基本的多线程之间的协调,不会存在多个线程同时对同一个资源进行操作的情况,也就不会产生数据竞跑,这保证了对于这个队列而言,基本的访问操作(出队、入队)的执行都是安全的,其结果是可预期的。

在多线程环境下,LockFreeQueue会不会出现死锁的情况呢?死锁有四个必要条件:1:对资源的访问是互斥的;2,请求和保持请求;3,资源不可剥夺;4,循环等待。在LockFreeQueue中,所有的线程都是对资源进行申请后再使用,一个线程若申请到了资源(这里的资源主要指环形队列中的内存槽位),就会立即使用,并且在使用完后释放掉该资源。不存在一个线程使用A资源的同时去申请B资源的情况,因此并不会出现死锁。

但LockFreeQueue可能出现饥饿状态。例如,对两个出队线程A、B,两者都循环进行出队操作。当队列中有元素时,A总能申请到这个元素并且执行到弹出操作,而B则只能在DeQueue函数的while循环中一直循环下去。

一些优化

对LockFreeQueue可以进行一些优化。比如:

1,对于环形数组大小,可以设定为2的整数倍,如1024。这样取模的操作即可以简化为与size_-1的按位与操作。

2,忙式等待的时候可能会出现某个线程一直占用cpu的情况。此时可以使用sleep(0),其功能类似于java中的yield系统调用,可以让该线程让出CPU时间片,从就绪态转为挂起态。

时间: 2024-10-19 08:12:04

无锁队列的环形数组实现的相关文章

使用无锁队列(环形缓冲区)注意事项

环形缓冲区是生产者和消费者模型中常用的数据结构.生产者将数据放入数组的尾端,而消费者从数组的另一端移走数据,当达到数组的尾部时,生产者绕回到数组的头部.如果只有一个生产者和一个消费者,那么就可以做到免锁访问环形缓冲区(Ring Buffer).写入索引只允许生产者访问并修改,只要写入者在更新索引之前将新的值保存到缓冲区中,则读者将始终看到一致的数据结构.同理,读取索引也只允许消费者访问并修改. 环形缓冲区实现原理图 如图所示,当读者和写者指针相等时,表明缓冲区是空的,而只要写入指针在读取指针后面

环形无锁队列

环形无锁队列 Table of Contents 1 环形无锁队列的实现 2 死锁及饥饿 3 一些优化 1 环形无锁队列的实现 数据结构定义: template class LockFreeQueue { private: ElementT *mArray; int mCapacity; int mFront; int mTail; } 由于出队操作是在队首进行,入队操作是在队尾进行,因此,我们可以尝试用mFront和mTail来实现多个线程之间的协调.这其中会用到CAS操作: 入队操作伪码:

基于循环数组的无锁队列

在之前的两篇博客(线程安全的无锁RingBuffer的实现,多个写线程一个读线程的无锁队列实现)中,分别写了在只有一个读线程.一个写线程的情况下,以及只有一个写线程.两个读线程的情况下,不采用加锁技术,甚至原子运算的循环队列的实现.但是,在其他的情况下,我们也需要尽可能高效的线程安全的队列的实现.本文实现了一种基于循环数组和原子运算的无锁队列.采用原子运算(compare and swap)而不是加锁同步,可以很大的提高运行效率.之所以用循环数组,是因为这样在使用过程中不需要反复开辟内存空间,可

boost 无锁队列

一哥们翻译的boost的无锁队列的官方文档 原文地址:http://blog.csdn.net/great3779/article/details/8765103 Boost_1_53_0终于迎来了久违的Boost.Lockfree模块,本着学习的心态,将其翻译如下.(原文地址:http://www.boost.org/doc/libs/1_53_0/doc/html/lockfree.html) Chapter 17. Boost.Lockfree 第17章.Boost.Lockfree Ta

转载:无锁队列的实现(CAS同步)

转自:http://coolshell.cn/articles/8239.html 关于无锁队列的实现,网上有很多文章,虽然本文可能和那些文章有所重复,但是我还是想以我自己的方式把这些文章中的重要的知识点串起来和大家讲一讲这个技术.下面开始正文. 关于CAS等原子操作 在开始说无锁队列之前,我们需要知道一个很重要的技术就是CAS操作——Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令.

并发无锁队列

1.前言 队列在计算机中非常重要的一种数据结构,尤其在操作系统中.队列典型的特征是先进先出(FIFO),符合流水线业务流程.在进程间通信.网络通信之间经常采用队列做缓存,缓解数据处理压力.结合自己在工作中遇到的队列问题,总结一下对不同场景下的队列实现.根据操作队列的场景分为:单生产者--单消费者.多生产者--单消费者.单生产者--多消费者.多生产者--多消费者四大模型.其实后面三种的队列,可以归纳为一种多对多.根据队列中数据分为:队列中的数据是定长的.队列中的数据是变长的. 2.队列操作模型 (

原子操作实现无锁队列

关于CAS等原子操作 在开始说无锁队列之前,我们需要知道一个很重要的技术就是CAS操作——Compare & Set或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令.有了这个原子操作,我们就可以用其来实现各种无锁(lock free)的数据结构. 这个操作用C语言来描述就是下面这个样子:(代码来自Wikipedia的Compare And Swap词条)意思就是说,看一看内存*reg里的值是不是oldval,如果

多线程编程之无锁队列

关于无锁队列的概念与实现,可以参考博文<无锁队列的实现>,主要涉及到的知识点包括CAS原子操作.无锁队列的链表实现.无锁队列的数组实现以及ABA问题. 下面借鉴了<多线程的那点儿事(之无锁队列)>的代码,说明两个线程(一个添加一个读取数据)之间的无锁队列,可以不借助线程互斥方法就能够达到并行效果.代码如下: #define MAX_NUMBER 1000L #define STATUS int #define OK 0 #define FALSE -1 typedef struct

多个写线程一个读线程的无锁队列实现

在之前的一篇博客中,写了一个在特殊情况下,也就是只有一个读线程和一个写线程的情况下,的无锁队列的实现.其中甚至都没有利用特殊的原子加减操作,只是普通的运算.这样做的原因是,即使是特殊的原子加减操作,也比普通的加减运算复杂度高很多.因此文中的实现方法可以达到很高的运行效率. 但是,有的情况下并不是只有一个读线程和一个写线程.越是一般化的实现,支持的情况越多,但是往往损失的性能也越多.作者看到过一个实现(http://www.oschina.net/code/snippet_732357_13465