转载:无锁队列的实现(CAS同步)

转自:http://coolshell.cn/articles/8239.html

关于无锁队列的实现,网上有很多文章,虽然本文可能和那些文章有所重复,但是我还是想以我自己的方式把这些文章中的重要的知识点串起来和大家讲一讲这个技术。下面开始正文。

关于CAS等原子操作

在开始说无锁队列之前,我们需要知道一个很重要的技术就是CAS操作——Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令。有了这个原子操作,我们就可以用其来实现各种无锁(lock free)的数据结构。

这个操作用C语言来描述就是下面这个样子:(代码来自Wikipedia的Compare And Swap词条)意思就是说,看一看内存*reg里的值是不是oldval,如果是的话,则对其赋值newval。


1

2

3

4

5

6

7

int compare_and_swap (int* reg, int oldval, int newval)

{

  int old_reg_val = *reg;

  if (old_reg_val == oldval)

     *reg = newval;

  return old_reg_val;

}

这个操作可以变种为返回bool值的形式(返回 bool值的好处在于,可以调用者知道有没有更新成功):


1

2

3

4

5

6

7

8

bool compare_and_swap (int *accum, int *dest, int newval)

{

  if ( *accum == *dest ) {

      *dest = newval;

      return true;

  }

  return false;

}

与CAS相似的还有下面的原子操作:(这些东西大家自己看Wikipedia吧)

注:在实际的C/C++程序中,CAS的各种实现版本如下:

1)GCC的CAS

GCC4.1+版本中支持CAS的原子操作(完整的原子操作可参看GCC Atomic Builtins


1

2

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)

type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)

2)Windows的CAS

在Windows下,你可以使用下面的Windows API来完成CAS:(完整的Windows原子操作可参看MSDN的InterLocked Functions


1

2

3

InterlockedCompareExchange ( __inout LONG volatile *Target,

                                __in LONG Exchange,

                                __in LONG Comperand);

3) C++11中的CAS

C++11中的STL中的atomic类的函数可以让你跨平台。(完整的C++11的原子操作可参看 Atomic Operation Library


1

2

3

4

5

6

template< class T >

bool atomic_compare_exchange_weak( std::atomic<T>* obj,

                                   T* expected, T desired );

template< class T >

bool atomic_compare_exchange_weak( volatile std::atomic<T>* obj,

                                   T* expected, T desired );

无锁队列的链表实现

下面的东西主要来自John D. Valois 1994年10月在拉斯维加斯的并行和分布系统系统国际大会上的一篇论文——《Implementing Lock-Free Queues》。

我们先来看一下进队列用CAS实现的方式:


1

2

3

4

5

6

7

8

9

10

11

12

13

EnQueue(x) //进队列

{

    //准备新加入的结点数据

    q = new record();

    q->value = x;

    q->next = NULL;

    do {

        p = tail; //取链表尾指针的快照

    } while( CAS(p->next, NULL, q) != TRUE); //如果没有把结点链在尾指针上,再试

    CAS(tail, p, q); //置尾结点

}

我们可以看到,程序中的那个 do- while 的 Re-Try-Loop。就是说,很有可能我在准备在队列尾加入结点时,别的线程已经加成功了,于是tail指针就变了,于是我的CAS返回了 false,于是程序再试,直到试成功为止。这个很像我们的抢电话热线的不停重播的情况。

你会看到,为什么我们的“置尾结点”的操作(第12行)不判断是否成功,因为:

  1. 如果有一个线程T1,它的while中的CAS如果成功的话,那么其它所有的 随后线程的CAS都会失败,然后就会再循环,
  2. 此时,如果T1 线程还没有更新tail指针,其它的线程继续失败,因为tail->next不是NULL了。
  3. 直到T1线程更新完tail指针,于是其它的线程中的某个线程就可以得到新的tail指针,继续往下走了。

这里有一个潜在的问题——如果T1线程在用CAS更新tail指针的之前,线程停掉或是挂掉了,那么其它线程就进入死循环了。下面是改良版的EnQueue()


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

EnQueue(x) //进队列改良版

{

    q = new record();

    q->value = x;

    q->next = NULL;

    p = tail;

    oldp = p

    do {

        while (p->next != NULL)

            p = p->next;

    } while( CAS(p.next, NULL, q) != TRUE); //如果没有把结点链在尾上,再试

    CAS(tail, oldp, q); //置尾结点

}

我们让每个线程,自己fetch 指针 p 到链表尾。但是这样的fetch会很影响性能。而通实际情况看下来,99.9%的情况不会有线程停转的情况,所以,更好的做法是,你可以接合上述的这两个 版本,如果retry的次数超了一个值的话(比如说3次),那么,就自己fetch指针。

好了,我们解决了EnQueue,我们再来看看DeQueue的代码:(很简单,我就不解释了)


1

2

3

4

5

6

7

8

9

10

DeQueue() //出队列

{

    do{

        p = head;

        if (p->next == NULL){

            return ERR_EMPTY_QUEUE;

        }

    while( CAS(head, p, p->next) != TRUE );

    return p->next->value;

}

我们可以看到,DeQueue的代码操作的是 head->next,而不是head本身。这样考虑是因为一个边界条件,我们需要一个dummy的头指针来解决链表中如果只有一个元素,head 和tail都指向同一个结点的问题,这样EnQueue和DeQueue要互相排斥了

注:上图的tail正处于更新之前的装态。

CAS的ABA问题

所谓ABA(见维基百科的ABA词条),问题基本是这个样子:

  1. 进程P1在共享变量中读到值为A
  2. P1被抢占了,进程P2执行
  3. P2把共享变量里的值从A改成了B,再改回到A,此时被P1抢占。
  4. P1回来看到共享变量里的值没有被改变,于是继续执行。

虽然P1以为变量值没有改变,继续执行了,但是这个会引发一些潜在的问题。ABA问题最容易发生在lock free 的算法中的,CAS首当其冲,因为CAS判断的是指针的地址。如果这个地址被重用了呢,问题就很大了。(地址被重用是很经常发生的,一个内存分配后释放了,再分配,很有可能还是原来的地址)

比如上述的DeQueue()函数,因为我们要让head和tail分开,所以我们引入了一个dummy指针给head,当我们做CAS的之前,如果head的那块内存被回收并被重用了,而重用的内存又被EnQueue()进来了,这会有很大的问题。(内存管理中重用内存基本上是一种很常见的行为

这个例子你可能没有看懂,维基百科上给了一个活生生的例子——

你拿着一个装满钱的手提箱在飞机场,此时过来了一个火辣性感的美女,然后她很暖昧地挑逗着你,并趁你不注意的时候,把用一个一模一样的手提箱和你那装满钱的箱子调了个包,然后就离开了,你看到你的手提箱还在那,于是就提着手提箱去赶飞机去了。

这就是ABA的问题。

解决ABA的问题

维基百科上给了一个解——使用double-CAS(双保险的CAS),例如,在32位系统上,我们要检查64位的内容

1)一次用CAS检查双倍长度的值,前半部是指针,后半部分是一个计数器。

2)只有这两个都一样,才算通过检查,要吧赋新的值。并把计数器累加1。

这样一来,ABA发生时,虽然值一样,但是计数器就不一样(但是在32位的系统上,这个计数器会溢出回来又从1开始的,这还是会有ABA的问题)

当然,我们这个队列的问题就是不想让那个内存重用,这样明确的业务问题比较好解决,论文《Implementing Lock-Free Queues》给出一这么一个方法——使用结点内存引用计数refcnt


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

SafeRead(q)

{

    loop:

        p = q->next;

        if (p == NULL){

            return p;

        }

        Fetch&Add(p->refcnt, 1);

        if (p == q->next){

            return p;

        }else{

            Release(p);

        }

    goto loop;

}

其中的 Fetch&Add和Release分是是加引用计数和减引用计数,都是原子操作,这样就可以阻止内存被回收了。

用数组实现无锁队列

本实现来自论文《Implementing Lock-Free Queues

使用数组来实现队列是很常见的方法,因为没有内存的分部和释放,一切都会变得简单,实现的思路如下:

1)数组队列应该是一个ring buffer形式的数组(环形数组)

2)数组的元素应该有三个可能的值:HEAD,TAIL,EMPTY(当然,还有实际的数据)

3)数组一开始全部初始化成EMPTY,有两个相邻的元素要初始化成HEAD和TAIL,这代表空队列。

4)EnQueue操作。假设数据x要入队列,定位TAIL的位置,使用double-CAS方法把(TAIL, EMPTY) 更新成 (x, TAIL)。需要注意,如果找不到(TAIL, EMPTY),则说明队列满了。

5)DeQueue操作。定位HEAD的位置,把(HEAD, x)更新成(EMPTY, HEAD),并把x返回。同样需要注意,如果x是TAIL,则说明队列为空。

算法的一个关键是——如何定位HEAD或TAIL?

1)我们可以声明两个计数器,一个用来计数EnQueue的次数,一个用来计数DeQueue的次数。

2)这两个计算器使用使用Fetch&ADD来进行原子累加,在EnQueue或DeQueue完成的时候累加就好了。

3)累加后求个模什么的就可以知道TAIL和HEAD的位置了。

如下图所示:

小结

以上基本上就是所有的无锁队列的技术细节,这些技术都可以用在其它的无锁数据结构上。

1)无锁队列主要是通过CAS、FAA这些原子操作,和Retry-Loop实现。

2)对于Retry-Loop,我个人感觉其实和锁什么什么两样。只是这种“锁”的粒度变小了,主要是“锁”HEAD和TAIL这两个关键资源。而不是整个数据结构。

还有一些和Lock Free的文章你可以去看看:

注:我配了一张look-free的自行车,寓意为——如果不用专门的车锁,那么自行得自己锁自己!

(全文完)

时间: 2024-10-03 14:46:29

转载:无锁队列的实现(CAS同步)的相关文章

锁、CAS操作和无锁队列的实现

https://blog.csdn.net/yishizuofei/article/details/78353722 锁的机制 锁和人很像,有的人乐观,总会想到好的一方面,所以只要越努力,就会越幸运:有的人悲观,总会想到不好的一方面,患得患失,所以经常会做不好事.我一直把前一个当作为我前进的动力和方向,快乐充实的过好每一天. 常用的锁机制也有两种: 1.乐观锁:假设不会发生并发冲突,每次不加锁而去完成某项操作,只在提交操作时,检查是否违反数据完整性.如果因为冲突失败就继续重试,直到成功为止.而乐

基于循环数组的无锁队列

在之前的两篇博客(线程安全的无锁RingBuffer的实现,多个写线程一个读线程的无锁队列实现)中,分别写了在只有一个读线程.一个写线程的情况下,以及只有一个写线程.两个读线程的情况下,不采用加锁技术,甚至原子运算的循环队列的实现.但是,在其他的情况下,我们也需要尽可能高效的线程安全的队列的实现.本文实现了一种基于循环数组和原子运算的无锁队列.采用原子运算(compare and swap)而不是加锁同步,可以很大的提高运行效率.之所以用循环数组,是因为这样在使用过程中不需要反复开辟内存空间,可

环形无锁队列

环形无锁队列 Table of Contents 1 环形无锁队列的实现 2 死锁及饥饿 3 一些优化 1 环形无锁队列的实现 数据结构定义: template class LockFreeQueue { private: ElementT *mArray; int mCapacity; int mFront; int mTail; } 由于出队操作是在队首进行,入队操作是在队尾进行,因此,我们可以尝试用mFront和mTail来实现多个线程之间的协调.这其中会用到CAS操作: 入队操作伪码:

并发无锁队列学习(概念介绍)

1.前言 队列在计算机中非常重要的一种数据结构,尤其在操作系统中.队列典型的特征是先进先出(FIFO),符合流水线业务流程.在进程间通信.网络通信之间经常采用队列做缓存,缓解数据处理压力.结合自己在工作中遇到的队列问题,总结一下对不同场景下的队列实现.根据操作队列的场景分为:单生产者--单消费者.多生产者--单消费者.单生产者--多消费者.多生产者--多消费者四大模型.其实后面三种的队列,可以归纳为一种多对多.根据队列中数据分为:队列中的数据是定长的.队列中的数据是变长的. 2.队列操作模型 (

无锁队列的环形数组实现

对无锁队列的最初兴趣来自梁斌同志的一个英雄帖:http://coderpk.com/. 第一次看到这个题目的时候还不知道CAS,FAA等所谓的“原子操作”,但直觉上感觉,通过对读写操作的性能优化来达到大幅提高队列性能的方法是行不通的,就算读写操作全用汇编来写,也不会和正常的read及 write有数量级上的区别.后来搜索了一下lock free data structure,才知道了关于原子操作的一些东西,同时也纠正了自己由来已久的一个错误观点:C++中的++操作和--操作都不是原子操作.这篇笔

boost 无锁队列

一哥们翻译的boost的无锁队列的官方文档 原文地址:http://blog.csdn.net/great3779/article/details/8765103 Boost_1_53_0终于迎来了久违的Boost.Lockfree模块,本着学习的心态,将其翻译如下.(原文地址:http://www.boost.org/doc/libs/1_53_0/doc/html/lockfree.html) Chapter 17. Boost.Lockfree 第17章.Boost.Lockfree Ta

lockFreeQueue 无锁队列实现与总结

无锁队列 介绍 在工程上,为了解决两个处理器交互速度不一致的问题,我们使用队列作为缓存,生产者将数据放入队列,消费者从队列中取出数据.这个时候就会出现四种情况,单生产者单消费者,多生产者单消费者,单生成者多消费者,多生产者多消费者.我们知道,多线程往往会带来数据不一致的情况,一般需要靠加锁解决问题.但是,加锁往往带来阻塞,阻塞会带来线程切换开销,在数据量大的情况下锁带来的开销是很大的,因此无锁队列实现势在必行.下面就详细讲一下每种情况的不同实现方法. 单生产者单消费者 从最简单的单生产者单消费者

多线程编程之无锁队列

关于无锁队列的概念与实现,可以参考博文<无锁队列的实现>,主要涉及到的知识点包括CAS原子操作.无锁队列的链表实现.无锁队列的数组实现以及ABA问题. 下面借鉴了<多线程的那点儿事(之无锁队列)>的代码,说明两个线程(一个添加一个读取数据)之间的无锁队列,可以不借助线程互斥方法就能够达到并行效果.代码如下: #define MAX_NUMBER 1000L #define STATUS int #define OK 0 #define FALSE -1 typedef struct

并发无锁队列

1.前言 队列在计算机中非常重要的一种数据结构,尤其在操作系统中.队列典型的特征是先进先出(FIFO),符合流水线业务流程.在进程间通信.网络通信之间经常采用队列做缓存,缓解数据处理压力.结合自己在工作中遇到的队列问题,总结一下对不同场景下的队列实现.根据操作队列的场景分为:单生产者--单消费者.多生产者--单消费者.单生产者--多消费者.多生产者--多消费者四大模型.其实后面三种的队列,可以归纳为一种多对多.根据队列中数据分为:队列中的数据是定长的.队列中的数据是变长的. 2.队列操作模型 (