Michael-Scott非阻塞队列(lock-free)算法的C实现

Michael-Scott非阻塞队列算法,即MS-queue算法,是1 9 9 6 年由Maged . M .Michael and M. L. Scott提出的,是最为经典的并发FIFO队列上的算法,目前很多对并发FIFO队列的研究都是基于这个算法来加以改进的。在共享内存的多核处理器上,这种基于Compare-and-swap(CAS)的算法在性能上要远远优于以前基于锁的算法,并且已经被Java并发包所采用。它的主要特点在于允许多线程并发的、无干扰的访问队列的头和尾。

MS-queue算法依赖于CAS原子操作,CAS操作是与处理器体系结构有关的,GCC中已经提供了内建的CAS相关的API,具体参见这里

bool __sync_bool_compare_and_swap (type *ptr, type oldval, type newval, ...);
type __sync_val_compare_and_swap (type *ptr, type oldval, type newval, ...);
/* 对应的伪代码 */
{ if (*ptr == oldval) { *ptr = newval; return true; } else { return false; } }
{ if (*ptr == oldval) { *ptr = newval; } return oldval; }

与CAS API一起的,还包括另一组自增、自减、与、或、非、异或原子操作的API。

type __sync_fetch_and_add(type *ptr, type value, ...); // m+n
type __sync_fetch_and_sub(type *ptr, type value, ...); // m-n
type __sync_fetch_and_or(type *ptr, type value, ...);  // m|n
type __sync_fetch_and_and(type *ptr, type value, ...); // m&n
type __sync_fetch_and_xor(type *ptr, type value, ...); // m^n
type __sync_fetch_and_nand(type *ptr, type value, ...); // (~m)&n
/* 对应的伪代码 */
{ tmp = *ptr; *ptr op= value; return tmp; }
{ tmp = *ptr; *ptr = (~tmp) & value; return tmp; }   // nand

使用这组API有很多好处,比如C/C++中自增自减及赋值操作都不是原子操作,如果是多线程程序需要使用全局计数器,程序就需要使用锁或者互斥量,对于较高并发的程序,会造成一定的性能瓶颈。而通过使用这组API,GCC通过在汇编级别的代码来保证赋值类操作的原子性,相对于涉及到操作系统系统调用和应用层同步的锁和互斥量,这组api的效率要高很多。

言归正传,回到MS-queue无锁(lock-free)队列上来。虽说MS-queue已经是大名鼎鼎了,不过找一个现成的C实现貌似还真不容易,C++的实现这里已经有了,是基于Boost的。另一个是复旦大学一个研究组的实现(这里),不过主要是针对64位机,CAS原语直接用汇编指令搞定的,觉得直接在32位下用或arm的GCC编译下会有问题。由于平时的项目开发用的基本是GCC编译器或arm的GCC,因此,自己实现了一个适用于32位机的、采用GCC内置CAS API的MS-queue。
ms_queue.h:

/*
 ** This file defines necessary data structures to implement a lock-free FIFO
 ** queue.
 **
 ** Which is described in Michael and Scott‘s excellent paper appeared in PODC
 ** ‘96: "Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue
 ** Algorithms"
 **
 ** Author: Jingcheng Li <[email protected]>
 **
 **/

#define __GNU_SOURCE
#include <stdlib.h>
#include <stdint.h>

#define CAS __sync_bool_compare_and_swap

typedef int data_type;
typedef struct queue_t queue_t;
typedef struct pointer_t pointer_t;
typedef struct node_t node_t; 

struct node_t;

struct pointer_t {
    node_t* ptr;
    uint32_t count;
};

struct node_t {
    data_type value;
    pointer_t next;
};

struct queue_t {
    pointer_t Head;
    pointer_t Tail;
};

void initialize(queue_t *q)
{
    node_t *node = NULL;

    node = malloc(sizeof(node_t));
    node->next.ptr = NULL;
    q->Head.ptr = q->Tail.ptr = node;
}

void enqueue(queue_t* q, data_type value){
    node_t *node = NULL;
    pointer_t old_tail, tail, next, tmp;

    node = malloc(sizeof(node_t));
    node->value = value;
    node->next.ptr = NULL;

    while(1)
    {
        tail = q->Tail;
        old_tail = tail;
        next = tail.ptr->next;
        /* tail may be changed in CAS after compare but before assign to q->Tail,
         * so this is incorrect:
            if (CAS((uint64_t*)&q->Tail, *(uint64_t*)&tail, *(uint64_t*)&old_tail))
           this is correct:
            if (CAS((uint64_t*)&q->Tail, *(uint64_t*)&tail, *(uint64_t*)&tail))
         */
        if (CAS((uint64_t*)&q->Tail, *(const uint64_t*)&tail, *(const uint64_t*)&tail))
        {
            if (next.ptr == NULL)
            {
                tmp.ptr = node;
                tmp.count = next.count+1;
                if (CAS((uint64_t*)&tail.ptr->next, *(const uint64_t*)&next, *(const uint64_t*)&tmp))
                {
                    break;
                }
            }
            else
            {
                tmp.ptr = next.ptr;
                tmp.count = tail.count+1;
                CAS((uint64_t*)&q->Tail, *(const uint64_t*)&tail, *(const uint64_t*)&tmp);
            }
        }
    }

    tmp.ptr = node;
    tmp.count = tail.count+1;
    CAS((uint64_t*)&q->Tail, *(const uint64_t*)&tail, *(const uint64_t*)&tmp);
}

int dequeue(queue_t *q, data_type* pvalue)
{
    pointer_t old_head, head, tail, next, tmp;
    while(1)
    {
        head = q->Head;
        old_head = head;
        tail = q->Tail;
        next = head.ptr->next;

        /* head may be changed in CAS after compare but before assign to q->Head,
         * so this is incorrect:
            if (CAS((uint64_t*)&q->Head, *(uint64_t*)&head, *(uint64_t*)&old_head))
           this is correct:
            if (CAS((uint64_t*)&q->Head, *(uint64_t*)&head, *(uint64_t*)&head))
         */
        if (CAS((uint64_t*)&q->Head, *(const uint64_t*)&head, *(const uint64_t*)&head))
        {
            if (head.ptr == tail.ptr)
            {
                if (next.ptr == NULL)
                {
                    return 0;
                }
                tmp.ptr = next.ptr;
                tmp.count = tail.count+1;
                CAS((uint64_t*)&q->Tail, *(const uint64_t*)&tail, *(const uint64_t*)&tmp);
            }
            else
            {
                if (pvalue)
                {
                    *pvalue = next.ptr->value;
                }
                tmp.ptr = next.ptr;
                tmp.count = head.count+1;
                if (CAS((uint64_t*)&q->Head, *(const uint64_t*)&head, *(const uint64_t*)&tmp))
                {
                    break;
                }
            }
        }
    }

    free(head.ptr);
    return 1;
}

test_queue.c:

#include <stdio.h>
#include <assert.h>
#include "ms_queue.h"

pthread_t a_id[10];
pthread_t b_id[10];

queue_t queue;
void* put(void* a)
{
    int i = 0, j;
    int n = (int)a;

    for(j = n*10000000; j<(n+1)*10000000; j++)
    {
        enqueue(&queue, j);
    }
    printf("put thread: %d exit\n", n);
}

void* get(void* a)
{
    int v;
    int n = (int)a;
    int cnt = 10000000;
    while(cnt--)
    {
        while(0 == dequeue(&queue, &v))
        {
            usleep(100);
        }
    }
    printf("get thread: %d exit\n", n);
}

int main()
{
    int i, j;
    initialize(&queue);
    assert(NULL != queue.Head.ptr);
    assert(NULL != queue.Tail.ptr);
    for ( i = 0; i < 10; i++ )
    {
        pthread_create(&a_id[i], NULL, put, i);
        pthread_create(&b_id[i], NULL, get, i);
    }

    for ( i = 0; i < 10; i++ )
    {
        pthread_join(a_id[i], NULL);
        pthread_join(b_id[i], NULL);
    }

    assert(0 == dequeue(&queue, &j));
}
时间: 2024-10-12 19:12:54

Michael-Scott非阻塞队列(lock-free)算法的C实现的相关文章

9.并发包非阻塞队列ConcurrentLinkedQueue

jdk1.7.0_79  队列是一种非常常用的数据结构,一进一出,先进先出. 在Java并发包中提供了两种类型的队列,非阻塞队列与阻塞队列,当然它们都是线程安全的,无需担心在多线程并发环境所带来的不可预知的问题.为什么会有非阻塞和阻塞之分呢?这里的非阻塞与阻塞在于有界与否,也就是在初始化时有没有给它一个默认的容量大小,对于阻塞有界队列来讲,如果队列满了的话,则任何线程都会阻塞不能进行入队操作,反之队列为空的话,则任何线程都不能进行出队操作.而对于非阻塞无界队列来讲则不会出现队列满或者队列空的情况

并发容器(三)非阻塞队列的并发容器

??本文将介绍除了阻塞队列外的并发容器: ConcurrentHashMap.CopyOnWriteArrayList.CopyOnWriteArraySet.ConcurrentSkipListMap.ConcurrentSkipListSet.ConcurrentLinkedQueue: 1. CopyOnWriteArrayList 是 ArrayList 的线程安全的实现,同时也可用于代替 Vector .底层实现是一个数组,其中所有可变操作(add.set 等等)都是通过对底层数组进行

阻塞队列与非阻塞队列

阻塞队列 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出 插入方法 add(e) offer(e) put(e) offer(e,time,unit)

非阻塞队列 普通队列 阻塞队列 学习笔记

参考 http://www.cnblogs.com/dolphin0520/p/3932906.html package blockthread; import java.util.ArrayList; import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.LinkedBlockingQue

Java 理论与实践: 非阻塞算法简介--转载

在不只一个线程访问一个互斥的变量时,所有线程都必须使用同步,否则就可能会发生一些非常糟糕的事情.Java 语言中主要的同步手段就是synchronized 关键字(也称为内在锁),它强制实行互斥,确保执行 synchronized 块的线程的动作,能够被后来执行受相同锁保护的synchronized 块的其他线程看到.在使用得当的时候,内在锁可以让程序做到线程安全,但是在使用锁定保护短的代码路径,而且线程频繁地争用锁的时候,锁定可能成为相当繁重的操作. 在 “流行的原子” 一文中,我们研究了原子

java 多线程阻塞队列 与 阻塞方法与和非阻塞方法

Queue是什么 队列,是一种数据结构.除了优先级队列和LIFO队列外,队列都是以FIFO(先进先出)的方式对各个元素进行排序的.无论使用哪种排序方式,队列的头都是调用remove()或poll()移除元素的.在FIFO队列中,所有新元素都插入队列的末尾.队列都是线程安全的,内部已经实现安全措施,不用我们担心 Queue中的方法 Queue中的方法不难理解,6个,每2对是一个也就是总共3对.看一下JDK API就知道了: 注意一点就好,Queue通常不允许插入Null,尽管某些实现(比如Link

阻塞队列和生产者-消费者模式、DelayQueue

1.ArrayDeque, (数组双端队列) 2.PriorityQueue, (优先级队列) 3.ConcurrentLinkedQueue, (基于链表的并发队列) 4.DelayQueue,                                         (延期阻塞队列)(阻塞队列实现了BlockingQueue接口) 5.ArrayBlockingQueue,           (基于数组的并发阻塞队列) 6.LinkedBlockingQueue,        (基

阻塞队列

在前面我们接触的队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦.但是有了阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素.当队列中有元素后,被阻塞的线程会自动被唤醒

Java并发编程:阻塞队列

在前面几篇文章中,我们讨论了同步容器(Hashtable.Vector),也讨论了并发容器(ConcurrentHashMap.CopyOnWriteArrayList),这些工具都为我们编写多线程程序提供了很大的方便.今天我们来讨论另外一类容器:阻塞队列. 在前面我们接触的队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者