上一篇文章里说到了一个极简易队列的实现,然而它对于并发存在一个问题,就是当多个或者说就是两个线程并发地访问队列,分别调用 push() 与 tryPop() 时,可能就会导致数据争用或者死锁。
以下是一种思路,通过分离数据允许并发。其大致思路是预先分配一个不储存任何数据的结点占位,当 push() 进一个新结点时,将 push 的值作为 data 存入当前数据为空的 tail,然后再在 tail 后面构建个新的用于占位的空结点。这样的设计使 push() 操作仅访问 tail,而 tryPop() 虽然还是两个都访问,但是对于 head 的访问极短暂。
template<typename T> class Queue { private: struct Node{ std::shared_ptr<T> data; std::unique_ptr<T> next; }; std::unique_ptr<T> head; Node* tail; public: Queue():head(std::make_shared<Node>()), tail(head.get ()) {} Queue(const Queue&) = delete; Queue& operator=(const Queue&) = delete; std::shared_ptr<T> tryPop() { if (tail == head.get ()){ return std::shared_ptr<T>(); } auto const result = head->data(); auto const oldHead = std::move(head); head = std::move(oldHead->next); return result; } void push(T newValue) { auto const tailData = std::make_shared<T>(std::move(newValue)); tail->data = tailData; auto const nextTail = std::make_unique<T>(); auto const newTail = nextTail.get(); tail->next = std::move(nextTail); tail = newTail; } };
当然目前还是单线程的,但是这个版本离线程安全的队列又进了一步。
而根据这个就能再进一步地写出相对更高效的并发队列:
template<typename T> class ThreadsafeQueue { private: struct Node { std::shared_ptr<T> data; std::unique_ptr<T> next; }; std::unique_ptr<Node> head; Node* tail; mutable std::mutex headMutex; mutable std::mutex tailMutex; Node* getTail() const { std::lock_guard<std::mutex> lock(tailMutex); return tail; } std::unique_ptr<Node> popHead() { std::lock_guard<std::mutex> lock(headMutex); if (getTail () == head.get ()){ return nullptr; } auto const oldHead = std::move(head); head = std::move(oldHead->next); return oldHead; } public: ThreadsafeQueue(): head(std::make_shared<Node>()), tail(head.get ()) {} ThreadsafeQueue(const ThreadsafeQueue&) = delete; ThreadsafeQueue& operator=(const ThreadsafeQueue&) = delete; std::shared_ptr<T> tryPop() { auto const popedHead = popHead (); return (popedHead == nullptr? std::shared_ptr<T>() : popedHead->data; } void push(T newValue) { auto newData = std::make_shared<T>(std::move(newValue)); auto nextTail = std::make_unique<Node>(); auto newTail = nextTail.get(); std::lock_guard<std::mutex> lock(tailMutex); tail->data = newData; tail->next = std::move(nextTail); tail = newTail; } };
这里面最重要的是 tailMutex 因为若没有这个互斥元,则 push() 和 tryPop() 可能会在不同的线程中造成数据竞争或未定义行为。而因为在 push() 和 tryPop() 中都存在 tailMutex 所以即便是在不同的线程里调用这两个函数也能保证 getTail() 要不得到的是真正 tail 的本身或者它的前一个结点。
另外一个重点就是 headMutex,这个的重要性还是用反证法说明,假如我这么写:
std::unique_ptr<Node> popHead() { auto const oldTail = getTail(); // 1 std::lock_grand<std::mutex> lock(headMutex); // 2 if (head.get() == oldTail) { return nullptr; } auto oldHead = std::move(head); head = std::move(oldHead->next); return oldHead; }
这里就可能出现一个问题,就是如果线程 A 在 1 这个地方取得 tail 之后暂停,接着其他的线程 BCDEFG 中把这个 ThreadsafeQueue pop 得连 A 中取得的 tail 都出去了,接下来 A 再执行的时候直接就傻逼了,或者说就算 A 中取得的 tail 没出去,head 也被改变过了,后面的操作也成 UB 了。
时间: 2024-10-20 10:27:58