【并发编程】多线程程序同步策略

目录

  • C++11线程使用初探
  • 采用条件变量等待某个事件或条件发生
  • 线程安全的队列适配器

C++11线程使用初探

std::thread

#include <thread>

只读的共享数据在多个线程间不存在Race condition的危险,而可读可写共享数据在线程间共享时则需做好线程同步,即数据保护,主要包括lock-based和lock-free策略。

常见的以互斥锁保护多线程间的共享数据,保证某一时刻仅有一个线程访问共享数据,导致线程间数据保护是串行,因此在多线程环境中,锁保护的区域越小,并发程度越高。

采用条件变量等待某个事件或条件发生

C++11提供了两种条件变量:std::condition_variable和std::condition_variable_any,均需要和互斥量一起使用来保证操作的同步性。前者仅能与std::mutex一起使用,后者可与所有mutex-like的锁一起使用,更加通用,但以牺牲空间、性能或操作系统资源为代价,因此std::condition_variable是首选;它们均在头文件中定义。

生产者-消费者模式在并发编程中应用广泛,有助于系统的解耦。队列是一种常见的在生产者和消费者线程间传递数据的容器,队列先入先出的特性满足应用对顺序性的要求。

人脸分析组件采用队列传递数据,通过在生产者线程中调用InputData函数将待分析数据包送入队列,并调用notify_one通知消费者线程,从而消费者线程进行人脸数据分析。伪代码如下:

std::mutex mut;
std::condition_variable cond;
std::queue<data_chunk> data_queue;

// 将待分析数据送入队列
int InputData(const data_chunk& data)
{
  if (invalid_data)
  {
    LOG_ERROR("invalid param!");
    return ERROR_CODE;
  }
  // 将数据送入队列
  // 并发出通知
  std::lock_guard<std::mutex> lk(mut);
  data_queue.push(data);
  cond.notify_one();
  return SUCCESS_CODE;
}

// InputData在生产者线程中被调用,将待分析数据送入队列
// 消费线程函数Process从队列取数据以进行分析
// 线程在条件未发生时因wait函数阻塞进入睡眠状态
void Process()
{
  while (not_exit_expression)
  {
    // 使用unique_lock而非lock_guard
    std::unique_lock<std::mutex> lk(mut);

    // wait函数在条件满足时返回,否则线程进入阻塞状态
    //
    // 若lambda表达式返回false(即不条件满足),则wait释放lk中锁资源,
    // 且线程进入阻塞状态,以便生线程可以继续获取锁并送数据到队里;否则,
    //
    // 当notify_one通知条件变量时,消费消除从睡眠状态苏醒,重新获取锁,且对条件再次检查,lambda表达式返回true
    // 此时,wait返回并继续持有锁资源,然后继续往下执行
    cond.wait(lk, [](){ return !data_queue.empty(); });

    data_chunk data = data_queue.front();
    data_queue.pop();

    // wait返回后持有锁,因此需要在此处解锁
    lk.unlock();

    // 处理数据
    process_the_data;
  }
}

?? 若将Process循环中阻塞逻辑(wait函数)换为非阻塞模式逻辑——即当队列为空时,循环continue,将如何影响Process线程?

以上代码,消费者线程使用std::unique_lock而非std::lock_guard是因为前者较后者灵活,std::lock_guard未实现lock/unlock成员函数。另外,队列在线程间传递数据应用广泛,将非线程安全的队列和条件变量封装到具体场景类,不仅编码重复,而且低效易出错。因此,实现线程安全的队列是十分必要的。

线程安全的队列适配器

上小节已介绍了“队列,条件变量和互斥量”的应用场景,现将它们封装为线程安全的队列适配器——CTSQueue。

线程安全的队列 - 需求分析:

  • 仍具备队列的先入先出特性;
  • 支持数据队尾入队列、队首出队列操作;
  • 提供阻塞和非阻塞版本出队列操作;
  • 泛型队列;
  • 使用C++11容器及线程同步原语,当然也可使用其他如posix提供的同步原语;

完整代码如下:

#include <queue>              // for queue
#include <memory>             // for std::shared_ptr
#include <mutex>              // for std::mutex
#include <condition_variable> // for std::condition_variable

// 线程安全的队列
template<typename T>
class CTSQueue
{
public:
  CTSQueue() = default;
  ~CTSQueue()
  { }

  // 不提供复制操作 - 禁用
  CTSQueue(const CTSQueue&) = delete;
  CTSQueue& operator=(const CTSQueue&) = delete;

  // 入队列操作
  void Push(T data)
  {
    std::lock_guard<std::mutex> lk(mut_);
    data_queue_.push(data);
    cond_.notify_one();
  }

  // 出队列操作 - 非阻塞版本
  bool TryPop(T& data)
  {
    std::lock_guard<std::mutex> lk(mut_);
    // 空队列立马返回出队列失败,否则
    // 数据出队列(拷贝版本)
    if (data_queue_.empty())
    {
      return false;
    }
    data = data_queue_.front();
    data_queue_.pop();
    return true;
  }
  std::shared_ptr<T> TryPop()
  {
    std::lock_guard<std::mutex> lk(mut_);
    if (data_queue_.empty())
    {
      return std::shared_ptr<T>();
    }
    // 出队列(非拷贝版本)
    std::shared_ptr<T> res(std::make_shared<T>(data_queue_.front()));
    data_queue_.pop();
    return res;
  }

  // 出队列操作 - 阻塞版本
  void WaitPop(T& data)
  {
    std::lock_guard<std::mutex> lk(mut_);
    // 条件未发生时,调用线程阻塞
    // 否则,数据出队列(拷贝版本)
    cond_.wait(lk, [this](){ return !data_queue_.empty() });
    data = data_queue_.front();
    data_queue_.pop();
  }
  std::shared_ptr<T> WaitPop()
  {
    std::lock_guard<std::mutex> lk(mut_);
    cond_.wait(lk, [this](){ return !data_queue_.empty() });
    // 出队列(非拷贝版本)
    std::shared_ptr<T> res(std::make_shared<T>(data_queue_.front()));
    data_queue_.pop();
    return res;
  }

  // 查询队列状态操作
  bool Empty() const
  {
    std::lock_guard<std::mutex> lk(mut_);
    return data_queue_.empty();
  }

private:
  mutable std::mutex      mut_;
  std::queue<T>           data_queue_;
  std::condition_variable cond_;
};

参考文献:
[1] 《C++ Concurrency In Action》, https://www.bogotobogo.com/cplusplus/files/CplusplusConcurrencyInAction_PracticalMultithreading.pdf

原文地址:https://www.cnblogs.com/bigosprite/p/11332389.html

时间: 2024-10-13 18:34:21

【并发编程】多线程程序同步策略的相关文章

python并发编程&amp;多线程(二)

前导理论知识见:python并发编程&多线程(一) 一 threading模块介绍 multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性 官网链接:https://docs.python.org/3/library/threading.html?highlight=threading#(装B模式加载中…………) 二 开启线程的两种方式  方式一  方式二 三 在一个进程下开启多个线程与在一个进程下开启多个子进程的区别  1 谁的开启速度快  2 瞅

python并发编程--多线程2

并发编程--多线程2 实战部分: threading模块介绍 开启线程的两种方式 在一个进程下开启多个线程与在一个进程下开启多个子进程的区别 练习 线程相关的其他方法 守护线程 python GIL(Global Interpreter Lock) 同步锁 死锁现象与递归锁 信号量Semaphore Evect 条件Condition 定时器 线程queue python标准模块-concurrent.futures 一.threading模块介绍 说明:threading用于提供线程相关的操作

JAVA并发编程4_线程同步之volatile关键字

上一篇博客JAVA并发编程3_线程同步之synchronized关键字中讲解了JAVA中保证线程同步的关键字synchronized,其实JAVA里面还有个较弱的同步机制volatile.volatile关键字是JAVA中的轻量级的同步机制,用来将变量的更新操作同步到其他线程.从内存可见性的角度来说,写入volatile变量相当于退出同步代码块,读取volatile变量相当于进入同步代码块. 旧的内存模型:保证读写volatile都直接发生在main memory中. 在新的内存模型下(1.5)

python并发编程&amp;多线程(一)

本篇理论居多,实际操作见:  python并发编程&多线程(二) 一 什么是线程 在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程 线程顾名思义,就是一条流水线工作的过程,一条流水线必须属于一个车间,一个车间的工作过程是一个进程 车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一个流水线 流水线的工作需要电源,电源就相当于cpu 所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位. 多线程(即多个控制线程)的概念

Java并发编程-非阻塞同步方式原子类(Atomic)的使用

非阻塞同步 在大多数情况下,我们为了实现线程安全都会使用Synchronized或lock来加锁进行线程的互斥同步,但互斥同步的最主要的问题就是进行线程的阻塞和唤醒所带来的性能问题,因此这种阻塞也称作阻塞同步.从处理问题的方式上说,互斥同步属于一种悲观的并发策略,总是认为只要不去做正确的同步措施,那就肯定会出现问题,无论共享数据是否真的会出现竞争,它都会进行加锁.用户态核心态转换.维护锁的计数器和检查是否有被阻塞的线程需要被唤醒等操作. 随着硬件指令集的发展,我们有了另一个选择:基于冲突检测的乐

【Java并发编程二】同步容器和并发容器

一.同步容器 在Java中,同步容器包括两个部分,一个是vector和HashTable,查看vector.HashTable的实现代码,可以看到这些容器实现线程安全的方式就是将它们的状态封装起来,并在需要同步的方法上加上关键字synchornized. 另一个是Collections类中提供的静态工厂方法创建的同步包装类. 同步容器都是线程安全的.但是对于复合操作(迭代.缺少即加入.导航:根据一定的顺序寻找下一个元素),有时可能需要使用额外的客户端加锁进行保护.在一个同步容器中,复合操作是安全

并发编程的技巧和策略

可变状态是至关重要的. 所有的并发问题都可以归结为如何协调对并发状态的访问.可变状态越少,就越容易确保线程安全性. 尽量将域声明为final 类型,除非需要他们是可变的 不可变对象一定是线程安全的 不可变对象能极大地降低并发编程的复杂性.他们更为简单而且安全,可以任意共享而无需使用加锁或保护性复制等机制. 用锁来保护每个可变变量 当保护同一个不变性条件中的所有变量时,要使用同一个锁 在执行复合操作期间,要持有锁 如果从多个线程中访问同一个可变变量没有同步机制,那么程序就会出现问题 在设计过程中考

JAVA并发编程3_线程同步之synchronized关键字

在上一篇博客里讲解了JAVA的线程的内存模型,见:JAVA并发编程2_线程安全&内存模型,接着上一篇提到的问题解决多线程共享资源的情况下的线程安全问题. 不安全线程分析 public class Test implements Runnable { private int i = 0; private int getNext() { return i++; } @Override public void run() { // synchronized while (true) { synchro

4.并发编程多线程

并发编程之多线程(理论) 一 threading模块介绍 multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性,因而不再详细介绍 官网链接:https://docs.python.org/3/library/threading.html?highlight=threading# 二 开启线程的两种方式 #方式一 from threading import Thread import time def sayhi(name): time.sleep(