细粒度锁的极简单线程安全队列

template<typename T>
class ThreadsafeQueue
{
private:
    struct Node
    {
        std::shared_ptr<T>       data;
        std::unique_ptr<T>       next;
    };

    std::unique_ptr<Node>        head;
    Node*                        tail;
    mutable std::mutex           headMutex;
    mutable std::mutex           tailMutex;
    std::condition_variable      dataCond;

    Node* getTail () const
    {
        std::lock_guard<std::mutex> lock(tailMutex);
        return tail;
    }

    std::unique_lock<std::mutex> waitForData();

    std::unique_ptr<Node> popHead ()
    {
        std::lock_guard<std::mutex> lock(headMutex);
        auto oldHead = std::move(head);
        head = std::move(oldHead->next);
        return oldHead;
    }

    std::unique_ptr<Node> waitPopHead ()
    {
        std::unique_lock<std::mutex> lock(waitForData ());
        return popHead ();
    }

    std::unique_ptr<Node> waitPopHead (T& popedValue)
    {
        std::unique_lock<std::mutex> lock(waitForData ());
        popedValue = std::move(*head->data);
        return popHead ();
    }

    std::unique_ptr<Node> tryPopHead ()
    {
        std::lock_guard<std::mutex> lock(headMutex);
        if (head.get () == getTail ()){
            return std::unique_ptr<Node>();
        }
        return popHead ();
    }

    std::unique_ptr<Node> tryPopHead (T& popedValue)
    {
        std::lock_guard<std::mutex> lock(headMutex);
        if (head.get () == getTail ()){
            return std::unique_ptr<Node>();
        }
        popedValue = std::move(*head->data);
        return popHead ();
    }

public:
    ThreadsafeQueue():
        head(std::make_shared<T>()), tail(head.get ())
    {}

    ThreadsafeQueue(const ThreadsafeQueue&)            = delete;
    ThreadsafeQueue& operator=(const ThreadsafeQueue&) = delete;

    std::shared_ptr<T>           tryPop();
    bool                         tryPop(T&);
    std::shared_ptr<T>           waitAndPop();
    void                         waitAndPop(T& value);
    void                         push(T newValue);
    bool                         empty();
};

template<typename T>
void ThreadsafeQueue<T>::push (T newValue)
{
    auto const newData = std::make_shared<T>(std::move(newValue));
    auto       newNext = std::make_unique<Node>();
    auto       newTail = newNext.get();
    std::lock_guard<std::mutex> lock(tailMutex);
    tail->next = std::move(newNext);
    tail->data = newData;
    tail = newTail;
    dataCond.notify_one ();
}

template<typename T>
inline std::unique_lock<std::mutex> ThreadsafeQueue<T>::waitForData ()
{
    std::unique_lock<std::mutex> lock(headMutex);
    dataCond.wait(lock, [&]{return head != getTail ();});
    return std::move(lock);
}

template<typename T>
inline bool ThreadsafeQueue<T>::empty ()
{
    std::lock_guard<std::mutex> lock(headMutex);
    return (head.get () == getTail ());
}

template<typename T>
inline std::shared_ptr<T> ThreadsafeQueue<T>::waitAndPop ()
{
    auto const oldHead = waitPopHead ();
    return oldHead->data;
}

 template<typename T>
inline void ThreadsafeQueue<T>::waitAndPop (T& popedValue)
{
    waitPopHead (popedValue);
}

template<typename T>
inline std::shared_ptr<T> ThreadsafeQueue<T>::tryPop ()
{
    auto const oldHead = tryPopHead();
    return oldHead? oldHead->data : std::shared_ptr<T>();
}

template<typename T>
inline bool ThreadsafeQueue<T>::tryPop (T& popedValue)
{
    auto const oldHead = tryPopHead (popedValue);
    return oldHead;
}
时间: 2024-10-15 06:02:46

细粒度锁的极简单线程安全队列的相关文章

极简单的单线程队列

template<typename T> class Queue { private: struct Node { T data; std::unique_ptr<T> next = nullptr; Node(T _data): data(std::move(_data)) {} }; std::unique_ptr<Node> head = nullptr; Node* tail = nullptr; public: Queue() = default; Queue

利用JAVA线程安全队列简单实现读者写者问题。

常见的操作系统教科书中,会使用互斥锁来实现读者线程和写者线程的同步问题,但是在JDK5推出线程安全队列之后,将该问题变得异常简单. java.util.concurrent.ConcurrentLinkedQueue 是线程安全的非阻塞队列,其实很容易想到,非阻塞队列当线程需要等待的时候,则不会阻塞等待,而是直接根据情况返回. java.util.concurrent.LinkedBlockingQueue 是线程安全的阻塞队列,该队列能够在很多情况下对线程进行阻塞,比如队列为空时调用take(

简单的线程消息队列实现

1. 线程使用场景(1)流水线方式.根据业务特点,将一个流程的处理分割成多个线程,形成流水线的处理方式.产生的结果:延长单一流程的处理时间,提高系统整体的吞吐能力.(2)线程池方式.针对处理时间比较长且没有内蕴状态的线程,使用线程池方式分流消息,加快对线程消息的处理,避免其成为系统瓶颈.线程使用的关键是线程消息队列.线程锁.智能指针的使用.其中以线程消息队列最为重要. 2. 线程消息队列描述所谓线程消息队列,就是一个普通的循环队列加上“多生产者-单(多)消费者的存/取操作”.流水线方式中的线程是

使用Condition Variables 实现一个线程安全队列

使用Condition Variables实现一个线程安全队列 多线程代码需要面对的一个问题和是如何把数据从一个县城传到另一个县城. 举个栗子,一个常见的是把串行算法并行化方法是,把他们分成块并且做成一个管道.管道中任意一块都可以单独在一个线程里运行.每个阶段完成后添加数据到输入队列给下个阶段. Basic Thread Safety with a Mutex 使用mutex实现简单的线程安全 最简单的办法是封装一个非线程安全的队列,使用mutex保护它(实例使用boost中的方法和类型,需要1

Java细粒度锁实现的3种方式

最近在工作上碰见了一些高并发的场景需要加锁来保证业务逻辑的正确性,并且要求加锁后性能不能受到太大的影响.初步的想法是通过数据的时间戳,id等关键字来加锁,从而保证不同类型数据处理的并发性.而java自身api提供的锁粒度太大,很难同时满足这些需求,于是自己动手写了几个简单的扩展… 1. 分段锁 借鉴concurrentHashMap的分段思想,先生成一定数量的锁,具体使用的时候再根据key来返回对应的lock.这是几个实现里最简单,性能最高,也是最终被采用的锁策略,代码如下: /** * 分段锁

简单线程池的实现

1. 什么是线程池 线程池是线程的集合,拥有若干个线程,线程池中的线程一般用于执行大量的且相对短暂的任务.如果一个任务执行的时间很长,那么就不适合放在线程池中处理,比如说一个任务的执行时间跟进程的生命周期是一致的,那么这个线程的处理就没有必要放到线程池中调度,用一个普通线程即可. 线程池中线程的个数太少的话会降低系统的并发量,太多的话又会增加系统的开销.一般而言,线程池中线程的个数与线程的类型有关系,线程的类型分为 1.     计算密集型任务: 2.     I/O密集型任务. 计算密集型任务

Linux下简单线程池的实现

线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务(不止一个不同的任务)就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池正是遵循这一思想而产生的,本文将介绍的线程池技术同

C++版简单线程池

需求 之前写过一个C#版本的简单线程池http://blog.csdn.net/ylbs110/article/details/51224979 由于刚刚学习了C++11新特性中的future,于是想到用它来实现一个线程池. 实现 思路基本和C#版本的一样,主要区别是委托的实现,线程句柄的不同和线程锁: 本来C++有function模板,但是实现起来比较麻烦,这里主要是实现线程池,所以动态参数的委托就不实现了,直接使用typedef void(*Func)();来实现一个无参数无返回值的函数指针

Linux多线程实践(9) --简单线程池的设计与实现

线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务(不止一个不同的任务)就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池正是遵循这一思想而产生的,本文将介绍的线程池技术同