无锁有序链表的实现

无锁有序链表可以保证元素的唯一性,使其可用于哈希表的桶,甚至直接作为一个效率不那么高的map。普通链表的无锁实现相对简单点,因为插入元素可以在表头插,而有序链表的插入则是任意位置。

本文主要基于论文High Performance Dynamic Lock-Free Hash Tables实现。

主要问题

链表的主要操作包含insertremove,先简单实现一个版本,就会看到问题所在,以下代码只用作示例:

struct node_t {
        key_t key;
        value_t val;
        node_t *next;
    };

    int l_find(node_t **pred_ptr, node_t **item_ptr, node_t *head, key_t key) {
        node_t *pred = head;
        node_t *item = head->next;
        while (item) {
            int d = KEY_CMP(item->key, key);
            if (d >= 0) {
                *pred_ptr = pred;
                *item_ptr = item;
                return d == 0 ? TRUE : FALSE;
            }
            pred = item;
            item = item->next;
        }
        *pred_ptr = pred;
        *item_ptr = NULL;
        return FALSE;
    }

    int l_insert(node_t *head, key_t key, value_t val) {
        node_t *pred, *item, *new_item;
        while (TRUE) {
            if (l_find(&pred, &item, head, key)) {
                return FALSE;
            }
            new_item = (node_t*) malloc(sizeof(node_t));
            new_item->key = key;
            new_item->val = val;
            new_item->next = item;
            // A. 如果pred本身被移除了
            if (CAS(&pred->next, item, new_item)) {
                return TRUE;
            }
            free(new_item);
        }
    }

    int l_remove(node_t *head, key_t key) {
        node_t *pred, *item;
        while (TRUE) {
            if (!l_find(&pred, &item, head, key)) {
                return TRUE;
            }
            // B. 如果pred被移除;如果item也被移除
            if (CAS(&pred->next, item, item->next)) {
                haz_free(item);
                return TRUE;
            }
        }
    }

l_find函数返回查找到的前序元素和元素本身,代码A和B虽然拿到了preditem,但在CAS的时候,其可能被其他线程移除。甚至,在l_find过程中,其每一个元素都可能被移除。问题在于,任何时候拿到一个元素时,都不确定其是否还有效。元素的有效性包括其是否还在链表中,其指向的内存是否还有效。

解决方案

通过为元素指针增加一个有效性标志位,配合CAS操作的互斥性,就可以解决元素有效性判定问题。

因为node_t放在内存中是会对齐的,所以指向node_t的指针值低几位是不会用到的,从而可以在低几位里设置标志,这样在做CAS的时候,就实现了DCAS的效果,相当于将两个逻辑上的操作变成了一个原子操作。想象下引用计数对象的线程安全性,其内包装的指针是线程安全的,但对象本身不是。

CAS的互斥性,在若干个线程CAS相同的对象时,只有一个线程会成功,失败的线程就可以以此判定目标对象发生了变更。改进后的代码(代码仅做示例用,不保证正确):

typedef size_t markable_t;
    // 最低位置1,表示元素被删除
    #define HAS_MARK(p) ((markable_t)p & 0x01)
    #define MARK(p) ((markable_t)p | 0x01)
    #define STRIP_MARK(p) ((markable_t)p & ~0x01)

    int l_insert(node_t *head, key_t key, value_t val) {
        node_t *pred, *item, *new_item;
        while (TRUE) {
            if (l_find(&pred, &item, head, key)) {
                return FALSE;
            }
            new_item = (node_t*) malloc(sizeof(node_t));
            new_item->key = key;
            new_item->val = val;
            new_item->next = item;
            // A. 虽然find拿到了合法的pred,但是在以下代码之前pred可能被删除,此时pred->next被标记
            //    pred->next != item,该CAS会失败,失败后重试
            if (CAS(&pred->next, item, new_item)) {
                return TRUE;
            }
            free(new_item);
        }
        return FALSE;
    }

    int l_remove(node_t *head, key_t key) {
        node_t *pred, *item;
        while (TRUE) {
            if (!l_find(&pred, &item, head, key)) {
                return FALSE;
            }
            node_t *inext = item->next;
            // B. 删除item前先标记item->next,如果CAS失败,那么情况同insert一样,有其他线程在find之后
            //    删除了item,失败后重试
            if (!CAS(&item->next, inext, MARK(inext))) {
                continue;
            }
            // C. 对同一个元素item删除时,只会有一个线程成功走到这里
            if (CAS(&pred->next, item, STRIP_MARK(item->next))) {
                haz_defer_free(item);
                return TRUE;
            }
        }
        return FALSE;
    }

    int l_find(node_t **pred_ptr, node_t **item_ptr, node_t *head, key_t key) {
        node_t *pred = head;
        node_t *item = head->next;
        hazard_t *hp1 = haz_get(0);
        hazard_t *hp2 = haz_get(1);
        while (item) {
            haz_set_ptr(hp1, pred);
            haz_set_ptr(hp2, item);
            /*
             如果已被标记,那么紧接着item可能被移除链表甚至释放,所以需要重头查找
            */
            if (HAS_MARK(item->next)) {
                return l_find(pred_ptr, item_ptr, head, key);
            }
            int d = KEY_CMP(item->key, key);
            if (d >= 0) {
                *pred_ptr = pred;
                *item_ptr = item;
                return d == 0 ? TRUE : FALSE;
            }
            pred = item;
            item = item->next;
        }
        *pred_ptr = pred;
        *item_ptr = NULL;
        return FALSE;
    }

haz_gethaz_set_ptr之类的函数是一个hazard pointer实现,用于支持多线程下内存的GC。上面的代码中,要删除一个元素item时,会标记item->next,从而使得insert时中那个CAS不需要做任何调整。总结下这里的线程竞争情况:

  • insertfind到正常的preditempred->next == item,然后在CAS前有线程删除了pred,此时pred->next == MARK(item)CAS失败,重试;删除分为2种情况:a) 从链表移除,得到标记,pred可继续访问;b)
    pred可能被释放内存,此时再使用pred会错误。为了处理情况b,所以引入了类似hazard pointer的机制,可以有效保障任意一个指针p只要还有线程在使用它,它的内存就不会被真正释放
  • insert中有多个线程在pred后插入元素,此时同样由insert中的CAS保证,这个不多说
  • remove中情况同insertfind拿到了有效的prednext,但在CAS的时候pred被其他线程删除,此时情况同insertCAS失败,重试
  • 任何时候改变链表结构时,无论是remove还是insert,都需要重试该操作
  • find中遍历时,可能会遇到被标记删除的item,此时item根据remove的实现很可能被删除,所以需要重头开始遍历

ABA问题

ABA问题还是存在的,insert中:

if (CAS(&pred->next, item, new_item)) {
        return TRUE;
    }

如果CAS之前,pred后的item被移除,又以相同的地址值加进来,但其value变了,此时CAS会成功,但链表可能就不是有序的了。pred->val < new_item->val > item->val

为了解决这个问题,可以利用指针值地址对齐的其他位来存储一个计数,用于表示pred->next的改变次数。当insert拿到pred时,pred->next中存储的计数假设是0,CAS之前其他线程移除了pred->next又新增回了item,此时pred->next中的计数增加,从而导致insertCAS失败。

// 最低位留作删除标志
    #define MASK ((sizeof(node_t) - 1) & ~0x01)

    #define GET_TAG(p) ((markable_t)p & MASK)
    #define TAG(p, tag) ((markable_t)p | (tag))
    #define MARK(p) ((markable_t)p | 0x01)
    #define HAS_MARK(p) ((markable_t)p & 0x01)
    #define STRIP_MARK(p) ((node_t*)((markable_t)p & ~(MASK | 0x01)))

remove的实现:

/* 先标记再删除 */
    if (!CAS(&sitem->next, inext, MARK(inext))) {
        continue;
    }
    int tag = GET_TAG(pred->next) + 1;
    if (CAS(&pred->next, item, TAG(STRIP_MARK(sitem->next), tag))) {
        haz_defer_free(sitem);
        return TRUE;
    }

insert中也可以更新pred->next的计数。

总结

无锁的实现,本质上都会依赖于CAS的互斥性。从头实现一个lock free的数据结构,可以深刻感受到lock free实现的tricky。最终代码可以从这里github获取。代码中为了简单,实现了一个不是很强大的hazard pointer,可以参考之前的博文

原文地址: http://codemacro.com/2015/05/05/lock_free_list/

written by Kevin Lynx  posted at
http://codemacro.com

时间: 2024-10-06 00:16:08

无锁有序链表的实现的相关文章

使用CAS实现无锁列队-链表

#include <stdlib.h> #include <stdio.h> #include <pthread.h> #include <iostream> #include <sys/time.h> #include <pthread.h> using namespace std; #define MAXLEN 200000 #define NUM_THREADS 8 #define CAS __sync_bool_compare

聊聊高并发(三十二)实现一个基于链表的无锁Set集合

Set表示一种没有反复元素的集合类,在JDK里面有HashSet的实现,底层是基于HashMap来实现的.这里实现一个简化版本号的Set,有下面约束: 1. 基于链表实现.链表节点依照对象的hashCode()顺序由小到大从Head到Tail排列. 2. 如果对象的hashCode()是唯一的.这个如果实际上是不成立的,这里为了简化实现做这个如果.实际情况是HashCode是基于对象地址进行的一次Hash操作.目的是把对象依据Hash散开.所以可能有多个对象地址相应到一个HashCode.也就是

链表的无锁操作 (JAVA)

看了下网上关于链表的无锁操作,写的不清楚,遂自己整理一部分,主要使用concurrent并发包的CAS操作. 1. 链表尾部插入 待插入的节点为:cur 尾节点:pred 基本插入方法: do{ pred = find_tail(); //重新找尾节点 }(! pred.next.compareAndSet(NULL, cur)) //pred.next 是否为NULL,是则将其指向cur,不是则有新的节点插入 这种插入方法是不带标记的,如果不涉及链表删除这个方法是可行的. 但是如果有删除操作,

无锁HASHMAP的原理与实现

在疫苗:Java HashMap的死循环疫苗:Java HashMap的死循环中,我们看到,java.util.HashMap并不能直接应用于多线程环境.对于多线程环境中应用HashMap,主要有以下几种选择: 使用线程安全的java.util.Hashtable作为替代. 使用java.util.Collections.synchronizedMap方法,将已有的HashMap对象包装为线程安全的. 使用java.util.concurrent.ConcurrentHashMap类作为替代,它

生产者消费者模式下的并发无锁环形缓冲区

上一篇记录了几种环形缓冲区的设计方法和环形缓冲区在生产者消费者模式下的使用(并发有锁),这一篇主要看看怎么实现并发无锁. 0.简单的说明 首先对环形缓冲区做下说明: 环形缓冲区使用改进的数组版本,缓冲区容量为2的幂 缓冲区满阻塞生产者,消费者进行消费后,缓冲区又有可用资源,由消费者唤醒生产者 缓冲区空阻塞消费者,生产者进程生产后,缓冲区又有可用资源,由生产者唤醒消费者 然后对涉及到的几个技术做下说明: ⑴CAS,Compare & Set,X86下对应的是CMPXCHG 汇编指令,原子操作,基本

一个无锁消息队列引发的血案(六)——RingQueue(中) 休眠的艺术 [续]

目录 (一)起因 (二)混合自旋锁 (三)q3.h 与 RingBuffer (四)RingQueue(上) 自旋锁 (五)RingQueue(中) 休眠的艺术 (六)RingQueue(中) 休眠的艺术 [续] 开篇 这是第五篇的后续,这部分的内容同时会更新和添加在 第五篇:RingQueue(中) 休眠的艺术 一文的末尾. 归纳 紧接上一篇的末尾,我们把 Windows 和 Linux 下的休眠策略归纳总结一下,如下图: 我们可以看到,Linux 下的 sched_yield() 虽然包括了

无锁模式的Vector

这两天学习无锁的并发模式,发现相比于传统的 同步加锁相比,有两点好处1.无锁 模式 相比于 传统的 同步加锁  提高了性能 2.无锁模式 是天然的死锁免疫 下来介绍无锁的Vector--- LockFreeVector 它的结构是: private final AtomicReferenceArray<AtomicReferenceArray<E>> buckets; 从这里我们可以看到,它的内部是采用的是 无锁的引用数组, 数组嵌套数组 相当于一个二维数组,它的大小可以动态的进行

有序链表和顺序表

通过捕捉键盘动作来进行交互,头文件为conio.h,是console input output的缩写,捕捉用户键盘按键的函数为getch(),通过键值码可以找到对应的按键. #include<stdio.h> #include<string.h> #include<stdlib.h> #include<conio.h> #include<windows.h> #define MAX 5000 struct List { int num; List

IOKING真正无锁服务器引擎之消息引擎模块Demo(no-lock)

 关键词: no-lock interlocked lock-free tcp/ip socket server engine epoll iocp server out-of-orderexecution无锁 原子锁 原子操作 原子指令 锁无关 开放锁 通讯服务器 引擎 高并发 大数据 搜索引擎 完成端口服务器 cpu乱序并行执行 内存栅栏 IOKING 真正无锁服务器引擎之消息引擎模块Demo(no-lock) 这是继无锁iocp通讯模块以后,又一个无锁模块.下一步有时间将会把两个整合在