无锁栈的实现

template<typename T>
class LockFreeStack
{
private:
    struct Node;

    struct CountedNode
    {
        int   externalCount = 0;
        Node* ptr           = nullptr;
    };

    struct Node
    {
        std::shared_ptr<T> data;
        std::atomic<int>   internalCount;
        CountedNode        next;

        Node(T const& data_):
            data(std::make_shared<T>(data_)),
            internalCount(0)
        {}
    };

    std::atomic<CountedNode> head;

    void increaseHeadCount(CountedNode& oldCounter)
    {
        CountedNode newCounter;
        do {
            newCounter = oldCounter;
            ++newCounter.externalCount;
        } while (!head.compare_exchange_strong(oldCounter, newCounter,
                                               std::memory_order_acquire,
                                               std::memory_order_relaxed));
        oldCounter.externalCount = newCounter.externalCount;
    }
public:
    ~LockFreeStack()
    {
        while(pop() != nullptr);
    }

    void push(T const& data)
    {
        CountedNode newNode;
        newNode.ptr = new Node(data);
        newNode.externalCount = 1;
        newNode.ptr->next = head.load(std::memory_order_relaxed);
        while(!head.compare_exchange_weak(newNode.ptr->next, newNode,
                                          std::memory_order_release,
                                          std::memory_order_relaxed));
    }

    std::shared_ptr<T> pop()
    {
        auto oldHead = head.load(std::memory_order_relaxed);
        for(;;){
            increaseHeadCount(oldHead);
            auto const nodePtr = oldHead.ptr;

            if (nodePtr == nullptr){
                return shared_ptr<T>();
            }

            if (head.compare_exchange_strong(oldHead, nodePtr->next,
                                             std::memory_order_relaxed)){
                std::shared_ptr<T> result;
                result.swap(nodePtr->data);
                int const increaseCount = oldHead.externalCount - 2;
                if (nodePtr->internalCount.fetch_add(increaseCount,
                                                     std::memory_order_release)
                    == -increaseCount){
                    delete nodePtr;
                }
                return result;
            }
            else if (nodePtr->internalCount.fetch_add(-1,
                                            std::memory_order_acquire) == 1){
                nodePtr->internalCount.load(std::memory_order_acquire);
                delete nodePtr;
            }
        }
    }
};

为了测试其正确性,我用了以下代码作为实验:

    LockFreeStack<int> stack;
    std::thread t1([&]
                   {
                       for (int i = 0; i < 100; ++i){
                           if(i % 2 == 0){
                               stack.push(i);
                           }
                           else{
                               auto const result = stack.pop();
                               if (result != nullptr){
                                   cout << *result << " ";
                               }
                           }
                       }
                   });
    std::thread t2([&]
                   {
                       for (int i = 100; i < 200; ++i){
                           stack.push(i);
                       }
                   });
    std::thread t3([&]
                   {
                       for (int i = 0; i < 199; ++i){
                           auto const result = stack.pop();
                           if (result != nullptr){
                               cout << *result << " ";
                           }
                       }
                   });
    t1.join();
    t2.join();
    t3.join();

结果输出很奇怪,很多好多上千的数,我一度认为是栈实现的问题,直到我用 printf 替换了 cout 之后……

时间: 2024-10-11 01:41:05

无锁栈的实现的相关文章

使用风险指针(hazard pointer) 处理无锁栈的 push 与 pop

constexpr size_t maxHazardPointers = 100; struct HazardPointer { std::atomic<std::thread::id> id; std::atomic<void*> pointer; }; array<HazardPointer, maxHazardPointers> hazardPointers; class HazardPointerOwner { HazardPointer* hazardPoin

使用引用计数方法管理内存的无锁栈 I

template<typename T> class LockFreeStack { private: struct Node { std::shared_ptr<T> data; Node* next; Node(T const& value): data(std::make_shared<T>(value)) {} }; std::atomic<Node*> head; std::atomic<size_t> threadsInPop

无锁同步-C++11之Atomic和CAS

1.概要 本文是无锁同步系列文章的第一篇,主要探讨C++11中的Atomic. 我们知道在C++11中引入了mutex和方便优雅的lock_guard.但是有时候我们想要的是性能更高的无锁实现,下面我们来讨论C++11中新增的原子操作类Atomic,我们可以利用它巧妙地实现无锁同步. 2.传统的线程同步 1 #include <thread> 2 #include <mutex> 3 4 #include <iostream> 5 6 using namespace s

Nah Lock: 一个无锁的内存分配器

概述 我实现了两个完全无锁的内存分配器:_nalloc 和 nalloc.  我用benchmark工具对它们进行了一组综合性测试,并比较了它们的指标值. 与libc(glibc malloc)相比,第一个分配器测试结果很差,但是我从中学到了很多东西,然后我实现了第二个无锁分配器,随着核数增加至30,测试结果线性提高.核数增加至60,测试结果次线性提高,但是仅比tcmalloc好一点. 想要安装,输入命令: git clone ~apodolsk/repo/nalloc,阅读 README文档.

boost 无锁队列

一哥们翻译的boost的无锁队列的官方文档 原文地址:http://blog.csdn.net/great3779/article/details/8765103 Boost_1_53_0终于迎来了久违的Boost.Lockfree模块,本着学习的心态,将其翻译如下.(原文地址:http://www.boost.org/doc/libs/1_53_0/doc/html/lockfree.html) Chapter 17. Boost.Lockfree 第17章.Boost.Lockfree Ta

无锁编程与有锁编程的性能对比与分析

最近维护的一个网络服务器遇到性能问题,于是就对原有的程序进行了较大的框架改动.改动最多的是线程工作模式与数据传递方式,最终的结果是改变锁的使用模式.经过一番改进,基本上可以做到 GMb 网卡全速工作处理.在 性能达标之后,一度在想有没有什么办法使用更加轻量级锁,或者去掉锁的使用,为此搜索一些相关的研究成果,并做了一些实验来验证这些成果,因而就有这篇文章.希望有做类似工作的同行可以有所借鉴.如果有人也有相关的经验,欢迎和我交流. 1 无锁编程概述 本节主要对文献 [1] 进行概括,做一些基础知识的

无锁机制实现并发访问

对于并发控制而言, 锁是一种悲观的策略.它总是假设每一次的临界区操作会产生冲突,因此,必须对每次操作都小心翼翼.如果有多个线程同时需要访问临界区资源,就宁可牺牲性能让线程进行等待,所以说锁会阻塞线程执行. 而无锁是一种乐观的策略,它会假设对资源的访问是没有冲突的.既然没有冲突,自然不需要等待,所以所有的线程都可以在不停顿的状态下持续执行.那遇到冲突怎么办呢?无锁的策略使用一种叫做比较交换的技术(CAS Compare And Swap)来鉴别线程冲突,一旦检测到冲突产生,就重试当前操作直到没有冲

无锁编程

无锁编程,即不使用锁的情况下实现多线程之间的变量同步,也就是在没有线程被阻塞的情况下实现变量的同步,所以也叫非阻塞同步(Non-blocking Synchronization). 实现非阻塞同步的方案称为"无锁编程算法"( Non-blocking algorithm). 多线程编程条件下,多个线程需要对同一共享变量写操作时,一般使用互斥锁来解决竞争问题,如下: 1 extern int g_var; 2 3 mutex_lock; 4 g_var ++; 5 mutex_unloc

Compare And Swap(CAS)实现无锁多生产者

1.CAS 原理 compare and swap,解决多线程并行情况下使用锁造成性能损耗的一种机制,CAS操作包含三个操作数--内存位置(V).预期原值(A)和新值(B).如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值.否则,处理器不做任何操作.无论哪种情况,它都会在CAS指令之前返回该位置的值.CAS有效地说明了"我认为位置V应该包含值A:如果包含该值,则将B放到这个位置:否则,不要更改该位置,只告诉我这个位置现在的值即可. 当同时存在读写线程时,默认情况下是不保证线