使用Condition Variables 实现一个线程安全队列

使用Condition Variables实现一个线程安全队列

多线程代码需要面对的一个问题和是如何把数据从一个县城传到另一个县城。 举个栗子,一个常见的是把串行算法并行化方法是,把他们分成块并且做成一个管道。管道中任意一块都可以单独在一个线程里运行。每个阶段完成后添加数据到输入队列给下个阶段。

Basic Thread Safety with a Mutex 使用mutex实现简单的线程安全

最简单的办法是封装一个非线程安全的队列,使用mutex保护它(实例使用boost中的方法和类型,需要1.35以上版)

template<typename Data>
class concurrent_queue
{
private:
    std::queue<Data> the_queue;
    mutable boost::mutex the_mutex;
public:
    void push(const Data& data)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        the_queue.push(data);
    }

    bool empty() const
    {
        boost::mutex::scoped_lock lock(the_mutex);
        return the_queue.empty();
    }

    Data& front()
    {
        boost::mutex::scoped_lock lock(the_mutex);
        return the_queue.front();
    }

    Data const& front() const
    {
        boost::mutex::scoped_lock lock(the_mutex);
        return the_queue.front();
    }

    void pop()
    {
        boost::mutex::scoped_lock lock(the_mutex);
        the_queue.pop();
    }
};

如果一个以上县城从队列中取数据,当前的设计会受制于竞态条件,empty, front 和pop会互相竞争。 但是对于一个消费者的系统就不是问题了。  假如队列是空的话多个线程有可能无事可做进入一个等待循环:

    while(some_queue.empty())
    {
        boost::this_thread::sleep(boost::posix_time::milliseconds(50));
    }

尽管sleep相较于忙等待避免了大量cpu资源的浪费,这个设计还是有些不足。首先线程必须每隔50ms(或者其他间隔)唤醒一次用来锁定mutex、检查队列、解锁mutex、强制上下文切换。    其次,睡眠的间隔时间相当于强加了一个限制给响应时间:数据被加到队列后到线程响应的响应时间。— 0ms到50ms都有可能,平均是25ms。

使用Condition Variable等待

不停轮询方案的一个替代方案是使用Condition Variable等待。  当数据被加到空队列之后Condition Variable会被通知,  然后等待的线程被唤醒。这需要mutex来保护队列。

我们在 concurrent_queue里实现了个成员方法:

template<typename Data>
class concurrent_queue
{
private:
    boost::condition_variable the_condition_variable;
public:
    void wait_for_data()
    {
        boost::mutex::scoped_lock lock(the_mutex);
        while(the_queue.empty())
        {
            the_condition_variable.wait(lock);
        }
    }
    void push(Data const& data)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        bool const was_empty=the_queue.empty();
        the_queue.push(data);
        if(was_empty)
        {
            the_condition_variable.notify_one();
        }
    }
    // rest as before
};

这有三件事需要注意

首先,lock变量被当作参数传到了信号灯的wait方法。 这使得信号灯的实现是可以自动的解锁mutex并把消费者线程加到等待队列。当第一个县城等待时另一个线程可以更新被保护的数据。

其次, condition variable 等待在一个循环里面,可能遭遇假冒唤醒。所以在wait返回时检查实际状态非常重要。

当你执行唤醒操作的时候要小心

第三,调用notify_one发生在数据被加入队列后。假如push操作抛出异常的话,这能避免等待的线程被唤醒却发现没数据。从代码来看,notify_one还是在被保护的区域内。

这还不是最佳方案:等待的线程可能在接到通知后立刻唤醒,并且是在mutex被解锁前,在这种条件下当退出wait重新获取mutex时,它将不得不阻塞。

通过修改这个方法,新的通知将在mutex解锁后发出,等待的线程可以立刻获得mutex不需等待:

template<typename Data>
class concurrent_queue
{
public:
    void push(Data const& data)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        bool const was_empty=the_queue.empty();
        the_queue.push(data);

        lock.unlock(); // unlock the mutex

        if(was_empty)
        {
            the_condition_variable.notify_one();
        }
    }
    // rest as before
};

减少锁的开销

尽管信号灯改善了生产者消费者的性能,但是对于消费者来说执行锁的操作还是过多。wait_for_datafront 以及pop 全都要锁mutex,消费者还是会快速交替调用锁操作。 吧wait和pop整合为一个操作可以减少加锁解锁操作:

template<typename Data>
class concurrent_queue
{
public:
    void wait_and_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        while(the_queue.empty())
        {
            the_condition_variable.wait(lock);
        }

        popped_value=the_queue.front();
        the_queue.pop();
    }

    // rest as before
};

使用引用参数而不是函数返回值来传递结果是为了避免抛出异常引发的安全问题,使用返回值的话如果拷贝构造函数有异常抛出,那么数据被移除了但是也丢失了。然而使用这种方式就不会(参见 Herb Sutter‘s Guru Of The Week #8 的讨论)有时需要 boost::optional来避免神马问题,NND没太看懂是啥。This does, of course, require that an instance Data can be created by the calling code in order to receive the result, which is not always the case. In those cases, it might be worth using something like boost::optional to avoid this requirement.

Handling multiple consumers处理多个消费者

wait_and_pop 不仅移掉了锁的间接开销还带来了额外的好处。  — 现在自动允许多个消费者了。

然而没有外部锁,把方法分成多个天生有细粒度的本质,这使他们会遭受竞态条件,现在吧方法结合起来就能安全的处理并发请求了。  (one reason why the authors of the SGI STL advocate against making things like std::vector thread-safe — 你需要外部锁去做许多共同的工作,让内部锁变得浪费资源), the combined function safely handles concurrent calls.

If multiple threads are popping entries from a full queue, then they just get serialized inside wait_and_pop, and everything works fine. If the queue is empty, then each thread in turn will block waiting on the condition variable. When a new entry is added to the queue, one of the threads will wake and take the value, whilst the others keep blocking. If more than one thread wakes (e.g. with a spurious wake-up), or a new thread calls wait_and_pop concurrently, thewhile loop ensures that only one thread will do the pop, and the others will wait.

Update: As commenter David notes below, using multiple consumers does have one problem: if there are several threads waiting when data is added, only one is woken. Though this is exactly what you want if only one item is pushed onto the queue, if multiple items are pushed then it would be desirable if more than one thread could wake. There are two solutions to this: use notify_all() instead of notify_one() when waking threads, or to callnotify_one() whenever any data is added to the queue, even if the queue is not currently empty. If all threads are notified then the extra threads will see it as a spurious wake and resume waiting if there isn‘t enough data for them. If we notify with every push() then only the right number of threads are woken. This is my preferred option: condition variable notify calls are pretty light-weight when there are no threads waiting. The revised code looks like this:

template<typename Data>
class concurrent_queue
{
public:
    void push(Data const& data)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        the_queue.push(data);
        lock.unlock();
        the_condition_variable.notify_one();
    }
    // rest as before
};

There is one benefit that the separate functions give over the combined one — the ability to check for an empty queue, and do something else if the queue is empty. empty itself still works in the presence of multiple consumers, but the value that it returns is transitory — there is no guarantee that it will still apply by the time a thread calls wait_and_pop, whether it was true or false. For this reason it is worth adding an additional function: try_pop, which returnstrue if there was a value to retrieve (in which case it retrieves it), or false to indicate that the queue was empty.

template<typename Data>
class concurrent_queue
{
public:
    bool try_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        if(the_queue.empty())
        {
            return false;
        }

        popped_value=the_queue.front();
        the_queue.pop();
        return true;
    }

    // rest as before
};

通过移除front and pop 方法,我们这个简单而又单纯的实现,现在已经变成了一个可用的多生产者多消费者队列。

最终方案

多生产者多消费者队列的最终方案:

template<typename Data>
class concurrent_queue
{
private:
    std::queue<Data> the_queue;
    mutable boost::mutex the_mutex;
    boost::condition_variable the_condition_variable;
public:
    void push(Data const& data)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        the_queue.push(data);
        lock.unlock();
        the_condition_variable.notify_one();
    }

    bool empty() const
    {
        boost::mutex::scoped_lock lock(the_mutex);
        return the_queue.empty();
    }

    bool try_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        if(the_queue.empty())
        {
            return false;
        }

        popped_value=the_queue.front();
        the_queue.pop();
        return true;
    }

    void wait_and_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        while(the_queue.empty())
        {
            the_condition_variable.wait(lock);
        }

        popped_value=the_queue.front();
        the_queue.pop();
    }

};

Posted by Anthony Williams

时间: 2024-10-21 19:59:11

使用Condition Variables 实现一个线程安全队列的相关文章

多线程十大经典案例之一 双线程读写队列数据

本文配套程序下载地址为:http://download.csdn.net/detail/morewindows/5136035 转载请标明出处,原文地址:http://blog.csdn.net/morewindows/article/details/8646902 欢迎关注微博:http://weibo.com/MoreWindows 在<秒杀多线程系列>的前十五篇中介绍多线程的相关概念,多线程同步互斥问题<秒杀多线程第四篇一个经典的多线程同步问题>及解决多线程同步互斥的常用方法

秒杀多线程第十六篇 多线程十大经典案例之一 双线程读写队列数据

版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 本文配套程序下载地址为:http://download.csdn.net/detail/morewindows/5136035 转载请标明出处,原文地址:http://blog.csdn.net/morewindows/article/details/8646902 欢迎关注微博:http://weibo.com/MoreWindows 在<秒杀多线程系列>的前十五篇中介绍多线程的相关概念,多线程同步互斥问题<秒杀多

多线程面试题系列(16):多线程十大经典案例之一 双线程读写队列数据

前十五篇中介绍多线程的相关概念,多线程同步互斥问题(第四篇)及解决多线程同步互斥的常用方法--关键段.事件.互斥量.信号量.读写锁.为了让大家更加熟练运用多线程,将会有十篇文章来讲解十个多线程使用案例,相信看完这十篇后会让你能更加游刃有余的使用多线程. 首先来看第一篇--第十六篇 多线程十大经典案例之一 双线程读写队列数据 <多线程十大经典案例之一双线程读写队列数据>案例描述: MFC对话框中一个按钮的响应函数实现两个功能:显示数据同时处理数据,因此开两个线程,一个线程显示数据(开了一个定时器

13 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件  queue队列 生产者消费者模型 Queue队列 开发一个线程池

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把

Condition Variables

Condition variables are synchronization primitives that enable threads to wait until a particular condition occurs. Condition variables are user-mode objects that cannot be shared across processes. Condition variables enable threads to atomically rel

并行编程之条件变量(posix condition variables)

在整理Java LockSupport.park()的东东,看到了个"Spurious wakeup",重新梳理下. 首先来个<UNIX环境高级编程>里的例子: [cpp] view plaincopy #include <pthread.h> struct msg { struct msg *m_next; /* ... more stuff here ... */ }; struct msg *workq; pthread_cond_t qready = P

深入解析条件变量(condition variables)

深入解析条件变量 什么是条件变量(condition variables) 引用APUE中的一句话: Condition variables are another synchronization mechanism available to threads. These synchronization objects provide a place for threads to rendezvous. When used with mutexes, condition variables al

Java多线程总结之线程安全队列Queue

在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列.Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列. 注:什么叫线程安全?这个首先要明确.线程安全的类 ,指的是类内共享的全局变量的访问必须保证是不受多线程形式影响的.如果由于多线程的访问(比如修改.遍历.查看)而使这些变量结构被破坏

Java多线程之线程安全队列Queue

在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列.Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列. 注:什么叫线程安全?这个首先要明确.线程安全的类 ,指的是类内共享的全局变量的访问必须保证是不受多线程形式影响的.如果由于多线程的访问(比如修改.遍历.查看)而使这些变量结构被破坏