使用Condition Variables实现一个线程安全队列
多线程代码需要面对的一个问题和是如何把数据从一个县城传到另一个县城。 举个栗子,一个常见的是把串行算法并行化方法是,把他们分成块并且做成一个管道。管道中任意一块都可以单独在一个线程里运行。每个阶段完成后添加数据到输入队列给下个阶段。
Basic Thread Safety with a Mutex 使用mutex实现简单的线程安全
最简单的办法是封装一个非线程安全的队列,使用mutex保护它(实例使用boost中的方法和类型,需要1.35以上版)
template<typename Data> class concurrent_queue { private: std::queue<Data> the_queue; mutable boost::mutex the_mutex; public: void push(const Data& data) { boost::mutex::scoped_lock lock(the_mutex); the_queue.push(data); } bool empty() const { boost::mutex::scoped_lock lock(the_mutex); return the_queue.empty(); } Data& front() { boost::mutex::scoped_lock lock(the_mutex); return the_queue.front(); } Data const& front() const { boost::mutex::scoped_lock lock(the_mutex); return the_queue.front(); } void pop() { boost::mutex::scoped_lock lock(the_mutex); the_queue.pop(); } };
如果一个以上县城从队列中取数据,当前的设计会受制于竞态条件,empty, front
和pop会互相竞争。
但是对于一个消费者的系统就不是问题了。 假如队列是空的话多个线程有可能无事可做进入一个等待循环:
while(some_queue.empty()) { boost::this_thread::sleep(boost::posix_time::milliseconds(50)); }
尽管sleep相较于忙等待避免了大量cpu资源的浪费,这个设计还是有些不足。首先线程必须每隔50ms(或者其他间隔)唤醒一次用来锁定mutex、检查队列、解锁mutex、强制上下文切换。 其次,睡眠的间隔时间相当于强加了一个限制给响应时间:数据被加到队列后到线程响应的响应时间。— 0ms到50ms都有可能,平均是25ms。
使用Condition Variable等待
不停轮询方案的一个替代方案是使用Condition Variable等待。 当数据被加到空队列之后Condition Variable会被通知, 然后等待的线程被唤醒。这需要mutex来保护队列。
我们在 concurrent_queue里实现了个成员方法
:
template<typename Data> class concurrent_queue { private: boost::condition_variable the_condition_variable; public: void wait_for_data() { boost::mutex::scoped_lock lock(the_mutex); while(the_queue.empty()) { the_condition_variable.wait(lock); } } void push(Data const& data) { boost::mutex::scoped_lock lock(the_mutex); bool const was_empty=the_queue.empty(); the_queue.push(data); if(was_empty) { the_condition_variable.notify_one(); } } // rest as before };
这有三件事需要注意
首先,lock变量被当作参数传到了信号灯的wait方法。 这使得信号灯的实现是可以自动的解锁mutex并把消费者线程加到等待队列。当第一个县城等待时另一个线程可以更新被保护的数据。
其次, condition variable 等待在一个循环里面,可能遭遇假冒唤醒。所以在wait返回时检查实际状态非常重要。
当你执行唤醒操作的时候要小心
第三,调用notify_one发生在数据被加入队列后。假如push操作抛出异常的话,这能避免等待的线程被唤醒却发现没数据。从代码来看,notify_one还是在被保护的区域内。
这还不是最佳方案:等待的线程可能在接到通知后立刻唤醒,并且是在mutex被解锁前,在这种条件下当退出wait重新获取mutex时,它将不得不阻塞。
通过修改这个方法,新的通知将在mutex解锁后发出,等待的线程可以立刻获得mutex不需等待:
template<typename Data> class concurrent_queue { public: void push(Data const& data) { boost::mutex::scoped_lock lock(the_mutex); bool const was_empty=the_queue.empty(); the_queue.push(data); lock.unlock(); // unlock the mutex if(was_empty) { the_condition_variable.notify_one(); } } // rest as before };
减少锁的开销
尽管信号灯改善了生产者消费者的性能,但是对于消费者来说执行锁的操作还是过多。wait_for_data
, front
以及pop
全都要锁mutex,消费者还是会快速交替调用锁操作。 吧wait和pop整合为一个操作可以减少加锁解锁操作:
template<typename Data> class concurrent_queue { public: void wait_and_pop(Data& popped_value) { boost::mutex::scoped_lock lock(the_mutex); while(the_queue.empty()) { the_condition_variable.wait(lock); } popped_value=the_queue.front(); the_queue.pop(); } // rest as before };
使用引用参数而不是函数返回值来传递结果是为了避免抛出异常引发的安全问题,使用返回值的话如果拷贝构造函数有异常抛出,那么数据被移除了但是也丢失了。然而使用这种方式就不会(参见 Herb Sutter‘s Guru Of The Week #8 的讨论)有时需要 boost::optional来避免神马问题,NND没太看懂是啥。
This does, of course, require that an instance Data
can be created by the calling code in order to receive the result, which is not always the case. In those cases, it might be worth using something like boost::optional
to avoid this requirement.
Handling multiple consumers处理多个消费者
wait_and_pop
不仅移掉了锁的间接开销还带来了额外的好处。 — 现在自动允许多个消费者了。
然而没有外部锁,把方法分成多个天生有细粒度的本质,这使他们会遭受竞态条件,现在吧方法结合起来就能安全的处理并发请求了。 (one reason why the authors of the SGI STL advocate against making things like std::vector
thread-safe — 你需要外部锁去做许多共同的工作,让内部锁变得浪费资源), the combined function safely handles concurrent calls.
If multiple threads are popping entries from a full queue, then they just get serialized inside wait_and_pop
, and everything works fine. If the queue is empty, then each thread in turn will block waiting on the condition variable. When a new entry is added to the queue, one of the threads will wake and take the value, whilst the others keep blocking. If more than one thread wakes (e.g. with a spurious wake-up), or a new thread calls wait_and_pop
concurrently, thewhile
loop ensures that only one thread will do the pop
, and the others will wait.
Update: As commenter David notes below, using multiple consumers does have one problem: if there are several threads waiting when data is added, only one is woken. Though this is exactly what you want if only one item is pushed onto the queue, if multiple items are pushed then it would be desirable if more than one thread could wake. There are two solutions to this: use notify_all()
instead of notify_one()
when waking threads, or to callnotify_one()
whenever any data is added to the queue, even if the queue is not currently empty. If all threads are notified then the extra threads will see it as a spurious wake and resume waiting if there isn‘t enough data for them. If we notify with every push()
then only the right number of threads are woken. This is my preferred option: condition variable notify calls are pretty light-weight when there are no threads waiting. The revised code looks like this:
template<typename Data> class concurrent_queue { public: void push(Data const& data) { boost::mutex::scoped_lock lock(the_mutex); the_queue.push(data); lock.unlock(); the_condition_variable.notify_one(); } // rest as before };
There is one benefit that the separate functions give over the combined one — the ability to check for an empty queue, and do something else if the queue is empty. empty
itself still works in the presence of multiple consumers, but the value that it returns is transitory — there is no guarantee that it will still apply by the time a thread calls wait_and_pop
, whether it was true
or false
. For this reason it is worth adding an additional function: try_pop
, which returnstrue
if there was a value to retrieve (in which case it retrieves it), or false
to indicate that the queue was empty.
template<typename Data> class concurrent_queue { public: bool try_pop(Data& popped_value) { boost::mutex::scoped_lock lock(the_mutex); if(the_queue.empty()) { return false; } popped_value=the_queue.front(); the_queue.pop(); return true; } // rest as before };
通过移除front
and pop
方法,我们这个简单而又单纯的实现,现在已经变成了一个可用的多生产者多消费者队列。
最终方案
多生产者多消费者队列的最终方案:
template<typename Data> class concurrent_queue { private: std::queue<Data> the_queue; mutable boost::mutex the_mutex; boost::condition_variable the_condition_variable; public: void push(Data const& data) { boost::mutex::scoped_lock lock(the_mutex); the_queue.push(data); lock.unlock(); the_condition_variable.notify_one(); } bool empty() const { boost::mutex::scoped_lock lock(the_mutex); return the_queue.empty(); } bool try_pop(Data& popped_value) { boost::mutex::scoped_lock lock(the_mutex); if(the_queue.empty()) { return false; } popped_value=the_queue.front(); the_queue.pop(); return true; } void wait_and_pop(Data& popped_value) { boost::mutex::scoped_lock lock(the_mutex); while(the_queue.empty()) { the_condition_variable.wait(lock); } popped_value=the_queue.front(); the_queue.pop(); } };
Posted by Anthony Williams