目录
- C++11线程使用初探
- 采用条件变量等待某个事件或条件发生
- 线程安全的队列适配器
C++11线程使用初探
std::thread
#include <thread>
只读的共享数据在多个线程间不存在Race condition的危险,而可读可写共享数据在线程间共享时则需做好线程同步,即数据保护,主要包括lock-based和lock-free策略。
常见的以互斥锁保护多线程间的共享数据,保证某一时刻仅有一个线程访问共享数据,导致线程间数据保护是串行,因此在多线程环境中,锁保护的区域越小,并发程度越高。
采用条件变量等待某个事件或条件发生
C++11提供了两种条件变量:std::condition_variable和std::condition_variable_any,均需要和互斥量一起使用来保证操作的同步性。前者仅能与std::mutex一起使用,后者可与所有mutex-like的锁一起使用,更加通用,但以牺牲空间、性能或操作系统资源为代价,因此std::condition_variable是首选;它们均在头文件中定义。
生产者-消费者模式在并发编程中应用广泛,有助于系统的解耦。队列是一种常见的在生产者和消费者线程间传递数据的容器,队列先入先出的特性满足应用对顺序性的要求。
人脸分析组件采用队列传递数据,通过在生产者线程中调用InputData函数将待分析数据包送入队列,并调用notify_one通知消费者线程,从而消费者线程进行人脸数据分析。伪代码如下:
std::mutex mut;
std::condition_variable cond;
std::queue<data_chunk> data_queue;
// 将待分析数据送入队列
int InputData(const data_chunk& data)
{
if (invalid_data)
{
LOG_ERROR("invalid param!");
return ERROR_CODE;
}
// 将数据送入队列
// 并发出通知
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
cond.notify_one();
return SUCCESS_CODE;
}
// InputData在生产者线程中被调用,将待分析数据送入队列
// 消费线程函数Process从队列取数据以进行分析
// 线程在条件未发生时因wait函数阻塞进入睡眠状态
void Process()
{
while (not_exit_expression)
{
// 使用unique_lock而非lock_guard
std::unique_lock<std::mutex> lk(mut);
// wait函数在条件满足时返回,否则线程进入阻塞状态
//
// 若lambda表达式返回false(即不条件满足),则wait释放lk中锁资源,
// 且线程进入阻塞状态,以便生线程可以继续获取锁并送数据到队里;否则,
//
// 当notify_one通知条件变量时,消费消除从睡眠状态苏醒,重新获取锁,且对条件再次检查,lambda表达式返回true
// 此时,wait返回并继续持有锁资源,然后继续往下执行
cond.wait(lk, [](){ return !data_queue.empty(); });
data_chunk data = data_queue.front();
data_queue.pop();
// wait返回后持有锁,因此需要在此处解锁
lk.unlock();
// 处理数据
process_the_data;
}
}
?? 若将Process循环中阻塞逻辑(wait函数)换为非阻塞模式逻辑——即当队列为空时,循环continue,将如何影响Process线程?
以上代码,消费者线程使用std::unique_lock而非std::lock_guard是因为前者较后者灵活,std::lock_guard未实现lock/unlock成员函数。另外,队列在线程间传递数据应用广泛,将非线程安全的队列和条件变量封装到具体场景类,不仅编码重复,而且低效易出错。因此,实现线程安全的队列是十分必要的。
线程安全的队列适配器
上小节已介绍了“队列,条件变量和互斥量”的应用场景,现将它们封装为线程安全的队列适配器——CTSQueue。
线程安全的队列 - 需求分析:
- 仍具备队列的先入先出特性;
- 支持数据队尾入队列、队首出队列操作;
- 提供阻塞和非阻塞版本出队列操作;
- 泛型队列;
- 使用C++11容器及线程同步原语,当然也可使用其他如posix提供的同步原语;
完整代码如下:
#include <queue> // for queue
#include <memory> // for std::shared_ptr
#include <mutex> // for std::mutex
#include <condition_variable> // for std::condition_variable
// 线程安全的队列
template<typename T>
class CTSQueue
{
public:
CTSQueue() = default;
~CTSQueue()
{ }
// 不提供复制操作 - 禁用
CTSQueue(const CTSQueue&) = delete;
CTSQueue& operator=(const CTSQueue&) = delete;
// 入队列操作
void Push(T data)
{
std::lock_guard<std::mutex> lk(mut_);
data_queue_.push(data);
cond_.notify_one();
}
// 出队列操作 - 非阻塞版本
bool TryPop(T& data)
{
std::lock_guard<std::mutex> lk(mut_);
// 空队列立马返回出队列失败,否则
// 数据出队列(拷贝版本)
if (data_queue_.empty())
{
return false;
}
data = data_queue_.front();
data_queue_.pop();
return true;
}
std::shared_ptr<T> TryPop()
{
std::lock_guard<std::mutex> lk(mut_);
if (data_queue_.empty())
{
return std::shared_ptr<T>();
}
// 出队列(非拷贝版本)
std::shared_ptr<T> res(std::make_shared<T>(data_queue_.front()));
data_queue_.pop();
return res;
}
// 出队列操作 - 阻塞版本
void WaitPop(T& data)
{
std::lock_guard<std::mutex> lk(mut_);
// 条件未发生时,调用线程阻塞
// 否则,数据出队列(拷贝版本)
cond_.wait(lk, [this](){ return !data_queue_.empty() });
data = data_queue_.front();
data_queue_.pop();
}
std::shared_ptr<T> WaitPop()
{
std::lock_guard<std::mutex> lk(mut_);
cond_.wait(lk, [this](){ return !data_queue_.empty() });
// 出队列(非拷贝版本)
std::shared_ptr<T> res(std::make_shared<T>(data_queue_.front()));
data_queue_.pop();
return res;
}
// 查询队列状态操作
bool Empty() const
{
std::lock_guard<std::mutex> lk(mut_);
return data_queue_.empty();
}
private:
mutable std::mutex mut_;
std::queue<T> data_queue_;
std::condition_variable cond_;
};
参考文献:
[1] 《C++ Concurrency In Action》, https://www.bogotobogo.com/cplusplus/files/CplusplusConcurrencyInAction_PracticalMultithreading.pdf
原文地址:https://www.cnblogs.com/bigosprite/p/11332389.html