更为适合并发的极简易队列

上一篇文章里说到了一个极简易队列的实现,然而它对于并发存在一个问题,就是当多个或者说就是两个线程并发地访问队列,分别调用 push() 与 tryPop() 时,可能就会导致数据争用或者死锁。

以下是一种思路,通过分离数据允许并发。其大致思路是预先分配一个不储存任何数据的结点占位,当 push() 进一个新结点时,将 push 的值作为 data 存入当前数据为空的 tail,然后再在 tail 后面构建个新的用于占位的空结点。这样的设计使 push() 操作仅访问 tail,而 tryPop() 虽然还是两个都访问,但是对于 head 的访问极短暂。

template<typename T>
class Queue
{
private:
    struct Node{
        std::shared_ptr<T> data;
        std::unique_ptr<T> next;
    };

    std::unique_ptr<T> head;
    Node*              tail;
public:
    Queue():head(std::make_shared<Node>()), tail(head.get ())
    {}
    Queue(const Queue&)            = delete;
    Queue& operator=(const Queue&) = delete;

    std::shared_ptr<T> tryPop()
    {
        if (tail == head.get ()){
            return std::shared_ptr<T>();
        }

        auto const result  = head->data();
        auto const oldHead = std::move(head);
        head = std::move(oldHead->next);
        return result;
    }

    void push(T newValue)
    {
        auto const tailData = std::make_shared<T>(std::move(newValue));
        tail->data = tailData;
        auto const nextTail = std::make_unique<T>();
        auto const newTail  = nextTail.get();
        tail->next = std::move(nextTail);
        tail = newTail;
    }
};

当然目前还是单线程的,但是这个版本离线程安全的队列又进了一步。

而根据这个就能再进一步地写出相对更高效的并发队列:

template<typename T>
class ThreadsafeQueue
{
private:
    struct Node
    {
        std::shared_ptr<T> data;
        std::unique_ptr<T> next;
    };

    std::unique_ptr<Node> head;
    Node*                 tail;
    mutable std::mutex    headMutex;
    mutable std::mutex    tailMutex;

    Node* getTail() const
    {
        std::lock_guard<std::mutex> lock(tailMutex);
        return tail;
    }

    std::unique_ptr<Node> popHead()
    {
        std::lock_guard<std::mutex> lock(headMutex);
        if (getTail () == head.get ()){
            return nullptr;
        }

        auto const oldHead = std::move(head);
        head = std::move(oldHead->next);
        return oldHead;
    }

public:
    ThreadsafeQueue():
        head(std::make_shared<Node>()), tail(head.get ())
    {}

    ThreadsafeQueue(const ThreadsafeQueue&)            = delete;
    ThreadsafeQueue& operator=(const ThreadsafeQueue&) = delete;

    std::shared_ptr<T> tryPop()
    {
        auto const popedHead = popHead ();
        return (popedHead == nullptr? std::shared_ptr<T>() : popedHead->data;
    }

    void push(T newValue)
    {
        auto newData = std::make_shared<T>(std::move(newValue));
        auto nextTail = std::make_unique<Node>();
        auto newTail  = nextTail.get();
        std::lock_guard<std::mutex> lock(tailMutex);
        tail->data = newData;
        tail->next = std::move(nextTail);
        tail = newTail;
    }
};

这里面最重要的是 tailMutex 因为若没有这个互斥元,则 push() 和 tryPop() 可能会在不同的线程中造成数据竞争或未定义行为。而因为在 push() 和 tryPop() 中都存在 tailMutex 所以即便是在不同的线程里调用这两个函数也能保证 getTail() 要不得到的是真正 tail 的本身或者它的前一个结点。

另外一个重点就是 headMutex,这个的重要性还是用反证法说明,假如我这么写:

std::unique_ptr<Node> popHead()
{
    auto const oldTail = getTail();              // 1
    std::lock_grand<std::mutex> lock(headMutex); // 2

    if (head.get() == oldTail)
    {
        return nullptr;
    }
    auto oldHead = std::move(head);
    head = std::move(oldHead->next);
    return oldHead;
}

这里就可能出现一个问题,就是如果线程 A 在 1 这个地方取得 tail 之后暂停,接着其他的线程 BCDEFG 中把这个 ThreadsafeQueue pop 得连 A 中取得的 tail 都出去了,接下来 A 再执行的时候直接就傻逼了,或者说就算 A 中取得的 tail 没出去,head 也被改变过了,后面的操作也成 UB 了。

时间: 2024-10-20 10:27:58

更为适合并发的极简易队列的相关文章

并发理解(阻塞队列)

一.java中阻塞队列如何设计 关于java中的阻塞队列 队列非常适合于生产者/消费者这种业务场景,像rabbitMq,activeMq都是对queue的一个封装的中间件. java中提供了阻塞队列的实现. Queue是接口,定义了存取这些基本接口: public interface Queue<E> extends Collection<E> { //放入到队列,如果放不进则抛错,成功返回true boolean add(E e); //放入queue,返回成功或失败 boole

Python 3 并发编程多进程之队列(推荐使用)

Python 3 并发编程多进程之队列(推荐使用) 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的. 可以往队列里放任意类型的数据 创建队列的类(底层就是以管道和锁定的方式实现): 1 Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递. 参数介绍: 1 maxsize是队列中允许最大项数,省略则无大小限制. 方法介绍: 1.主要

12、Java并发编程:阻塞队列

Java并发编程:阻塞队列 在前面几篇文章中,我们讨论了同步容器(Hashtable.Vector),也讨论了并发容器(ConcurrentHashMap.CopyOnWriteArrayList),这些工具都为我们编写多线程程序提供了很大的方便.今天我们来讨论另外一类容器:阻塞队列. 在前面我们接触的队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产

环形缓冲区的设计及其在生产者消费者模式下的使用(并发有锁环形队列)

1.环形缓冲区 缓冲区的好处,就是空间换时间和协调快慢线程.缓冲区可以用很多设计法,这里说一下环形缓冲区的几种设计方案,可以看成是几种环形缓冲区的模式.设计环形缓冲区涉及到几个点,一是超出缓冲区大小的的索引如何处理,二是如何表示缓冲区满和缓冲区空,三是如何入队.出队,四是缓冲区中数据长度如何计算. ps.规定以下所有方案,在缓冲区满时不可再写入数据,缓冲区空时不能读数据 1.1.常规数组环形缓冲区 设缓冲区大小为N,队头out,队尾in,out.in均是下标表示: 初始时,in=out=0 队头

Java并发编程:阻塞队列(转载)

Java并发编程:阻塞队列 在前面几篇文章中,我们讨论了同步容器(Hashtable.Vector),也讨论了并发容器(ConcurrentHashMap.CopyOnWriteArrayList),这些工具都为我们编写多线程程序提供了很大的方便.今天我们来讨论另外一类容器:阻塞队列. 在前面我们接触的队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产

眉目传情之并发无锁环形队列的实现

眉目传情之并发无锁环形队列的实现 Author:Echo Chen(陈斌) Email:[email protected] Blog:Blog.csdn.net/chen19870707 Date:October 10th, 2014 前面在<眉目传情之匠心独运的kfifo>一文中详细解析了 linux  内核并发无锁环形队列kfifo的原理和实现,kfifo鬼斧神工,博大精深,让人叹为观止,但遗憾的是kfifo为内核提供服务,并未开放出来.剑不试则利钝暗,弓不试则劲挠诬,鹰不试则巧拙惑,马不

(转)Java并发编程:阻塞队列

Java并发编程:阻塞队列 在前面几篇文章中,我们讨论了同步容器(Hashtable.Vector),也讨论了并发容器(ConcurrentHashMap.CopyOnWriteArrayList),这些工具都为我们编写多线程程序提供了很大的方便.今天我们来讨论另外一类容器:阻塞队列. 在前面我们接触的队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产

【转】Java并发编程:阻塞队列

在前面几篇文章中,我们讨论了同步容器(Hashtable.Vector),也讨论了并发容器(ConcurrentHashMap.CopyOnWriteArrayList),这些工具都为我们编写多线程程序提供了很大的方便.今天我们来讨论另外一类容器:阻塞队列. 在前面我们接触的队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 使用了非阻塞队列的时候有一个很大的问题就是:它不会对当前线程产生阻塞,那么在面对类似消

[Java并发编程实战] 阻塞队列 BlockingQueue(含代码,生产者-消费者模型)

见贤思齐焉,见不贤而内自省也.-<论语> PS: 如果觉得本文有用的话,请帮忙点赞,留言评论支持一下哦,您的支持是我最大的动力!谢谢啦~ Java5.0 增加了两种新的容器类型,它们是指:Queue 和 BlockingQueue.Queue 用来临时保存一组等待处理的元素.BlockingQueue 扩张了 Queue 接口,增加了可阻塞的插入和获取等操作. BlockingQueue 通常运用于一个线程生产对象放入队列,另一个线程从队列获取对象并消费,这是典型的生产者消费者模型. 这里写图