Michael-Scott非阻塞队列算法,即MS-queue算法,是1 9 9 6 年由Maged . M .Michael and M. L. Scott提出的,是最为经典的并发FIFO队列上的算法,目前很多对并发FIFO队列的研究都是基于这个算法来加以改进的。在共享内存的多核处理器上,这种基于Compare-and-swap(CAS)的算法在性能上要远远优于以前基于锁的算法,并且已经被Java并发包所采用。它的主要特点在于允许多线程并发的、无干扰的访问队列的头和尾。
MS-queue算法依赖于CAS原子操作,CAS操作是与处理器体系结构有关的,GCC中已经提供了内建的CAS相关的API,具体参见这里。
bool __sync_bool_compare_and_swap (type *ptr, type oldval, type newval, ...); type __sync_val_compare_and_swap (type *ptr, type oldval, type newval, ...); /* 对应的伪代码 */ { if (*ptr == oldval) { *ptr = newval; return true; } else { return false; } } { if (*ptr == oldval) { *ptr = newval; } return oldval; }
与CAS API一起的,还包括另一组自增、自减、与、或、非、异或原子操作的API。
type __sync_fetch_and_add(type *ptr, type value, ...); // m+n type __sync_fetch_and_sub(type *ptr, type value, ...); // m-n type __sync_fetch_and_or(type *ptr, type value, ...); // m|n type __sync_fetch_and_and(type *ptr, type value, ...); // m&n type __sync_fetch_and_xor(type *ptr, type value, ...); // m^n type __sync_fetch_and_nand(type *ptr, type value, ...); // (~m)&n /* 对应的伪代码 */ { tmp = *ptr; *ptr op= value; return tmp; } { tmp = *ptr; *ptr = (~tmp) & value; return tmp; } // nand
使用这组API有很多好处,比如C/C++中自增自减及赋值操作都不是原子操作,如果是多线程程序需要使用全局计数器,程序就需要使用锁或者互斥量,对于较高并发的程序,会造成一定的性能瓶颈。而通过使用这组API,GCC通过在汇编级别的代码来保证赋值类操作的原子性,相对于涉及到操作系统系统调用和应用层同步的锁和互斥量,这组api的效率要高很多。
言归正传,回到MS-queue无锁(lock-free)队列上来。虽说MS-queue已经是大名鼎鼎了,不过找一个现成的C实现貌似还真不容易,C++的实现这里已经有了,是基于Boost的。另一个是复旦大学一个研究组的实现(这里),不过主要是针对64位机,CAS原语直接用汇编指令搞定的,觉得直接在32位下用或arm的GCC编译下会有问题。由于平时的项目开发用的基本是GCC编译器或arm的GCC,因此,自己实现了一个适用于32位机的、采用GCC内置CAS API的MS-queue。
ms_queue.h:
/* ** This file defines necessary data structures to implement a lock-free FIFO ** queue. ** ** Which is described in Michael and Scott‘s excellent paper appeared in PODC ** ‘96: "Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue ** Algorithms" ** ** Author: Jingcheng Li <[email protected]> ** **/ #define __GNU_SOURCE #include <stdlib.h> #include <stdint.h> #define CAS __sync_bool_compare_and_swap typedef int data_type; typedef struct queue_t queue_t; typedef struct pointer_t pointer_t; typedef struct node_t node_t; struct node_t; struct pointer_t { node_t* ptr; uint32_t count; }; struct node_t { data_type value; pointer_t next; }; struct queue_t { pointer_t Head; pointer_t Tail; }; void initialize(queue_t *q) { node_t *node = NULL; node = malloc(sizeof(node_t)); node->next.ptr = NULL; q->Head.ptr = q->Tail.ptr = node; } void enqueue(queue_t* q, data_type value){ node_t *node = NULL; pointer_t old_tail, tail, next, tmp; node = malloc(sizeof(node_t)); node->value = value; node->next.ptr = NULL; while(1) { tail = q->Tail; old_tail = tail; next = tail.ptr->next; /* tail may be changed in CAS after compare but before assign to q->Tail, * so this is incorrect: if (CAS((uint64_t*)&q->Tail, *(uint64_t*)&tail, *(uint64_t*)&old_tail)) this is correct: if (CAS((uint64_t*)&q->Tail, *(uint64_t*)&tail, *(uint64_t*)&tail)) */ if (CAS((uint64_t*)&q->Tail, *(const uint64_t*)&tail, *(const uint64_t*)&tail)) { if (next.ptr == NULL) { tmp.ptr = node; tmp.count = next.count+1; if (CAS((uint64_t*)&tail.ptr->next, *(const uint64_t*)&next, *(const uint64_t*)&tmp)) { break; } } else { tmp.ptr = next.ptr; tmp.count = tail.count+1; CAS((uint64_t*)&q->Tail, *(const uint64_t*)&tail, *(const uint64_t*)&tmp); } } } tmp.ptr = node; tmp.count = tail.count+1; CAS((uint64_t*)&q->Tail, *(const uint64_t*)&tail, *(const uint64_t*)&tmp); } int dequeue(queue_t *q, data_type* pvalue) { pointer_t old_head, head, tail, next, tmp; while(1) { head = q->Head; old_head = head; tail = q->Tail; next = head.ptr->next; /* head may be changed in CAS after compare but before assign to q->Head, * so this is incorrect: if (CAS((uint64_t*)&q->Head, *(uint64_t*)&head, *(uint64_t*)&old_head)) this is correct: if (CAS((uint64_t*)&q->Head, *(uint64_t*)&head, *(uint64_t*)&head)) */ if (CAS((uint64_t*)&q->Head, *(const uint64_t*)&head, *(const uint64_t*)&head)) { if (head.ptr == tail.ptr) { if (next.ptr == NULL) { return 0; } tmp.ptr = next.ptr; tmp.count = tail.count+1; CAS((uint64_t*)&q->Tail, *(const uint64_t*)&tail, *(const uint64_t*)&tmp); } else { if (pvalue) { *pvalue = next.ptr->value; } tmp.ptr = next.ptr; tmp.count = head.count+1; if (CAS((uint64_t*)&q->Head, *(const uint64_t*)&head, *(const uint64_t*)&tmp)) { break; } } } } free(head.ptr); return 1; }
test_queue.c:
#include <stdio.h> #include <assert.h> #include "ms_queue.h" pthread_t a_id[10]; pthread_t b_id[10]; queue_t queue; void* put(void* a) { int i = 0, j; int n = (int)a; for(j = n*10000000; j<(n+1)*10000000; j++) { enqueue(&queue, j); } printf("put thread: %d exit\n", n); } void* get(void* a) { int v; int n = (int)a; int cnt = 10000000; while(cnt--) { while(0 == dequeue(&queue, &v)) { usleep(100); } } printf("get thread: %d exit\n", n); } int main() { int i, j; initialize(&queue); assert(NULL != queue.Head.ptr); assert(NULL != queue.Tail.ptr); for ( i = 0; i < 10; i++ ) { pthread_create(&a_id[i], NULL, put, i); pthread_create(&b_id[i], NULL, get, i); } for ( i = 0; i < 10; i++ ) { pthread_join(a_id[i], NULL); pthread_join(b_id[i], NULL); } assert(0 == dequeue(&queue, &j)); }