[C++11 并发编程] 12 使用条件变量创建线程间安全的队列

之前有一节中,我们使用mutex实现了一个线程间安全的堆栈。这一节,我们使用条件变量来实现一个线程间安全的队列。

标准库中的std::queue<>的接口定义如下:

template <class T, class Container = std::deque<T> >
class queue {
public:
    explicit queue(const Container&);
    explicit queue(Container&& = Container());
    queue(queue&& q);

    template <class Alloc> explicit queue(const Alloc&);
    template <class Alloc> queue(const Container&, const Alloc&);
    template <class Alloc> queue(Container&&, const Alloc&);
    template <class Alloc> queue(queue&&, const Alloc&);

    queue& operator=(queue&& q);
    void swap(queue&& q);

    bool empty() const;
    size_type size() const;

    T& front();
    const T& front() const;
    T& back();
    const T& back() const;

    void push(const T& x);
    void push(T&& x);
    void pop();
};

忽略构造,赋值和交换运算,还剩下以下三类操作:

  • 查询队列状态:empty()和size()
  • 查询队列中的元素:front()和back()
  • 修改队列中的元素:push()、pop()和emplace()

和stack类似,这些接口对存在竞争条件。我们需要合并front()和pop()操作。这里我们需要实现pop操作的两个变种:

  • try_pop():尝试从队列中pop数据并立即返回。
  • wait_and_pop():等待挂起直到队列中有数据被获取。
#include <memory>	// 为了使用std::shared_ptr
template<typename T>
class threadsafe_queue
{
public:
    threadsafe_queue();
    threadsafe_queue(const threadsafe_queue&);
	// 为了简化实现,禁用赋值运算操作
    threadsafe_queue& operator=(const threadsafe_queue&) = delete;

    void push(T new_value);

	// 返回被获取数据的引用,返回值作为pop成功和失败的标志
    bool try_pop(T& value);
	// 返回值为NULL则表示获取失败,否则返回指向被pop数据的指针
    std::shared_ptr<T> try_pop();

    void wait_and_pop(T& value);
    std::shared_ptr<T> wait_and_pop();

    bool empty() const;
};

修改前一节的程序,使用这个队列:

#include <iostream>

#include <mutex>
#include <condition_variable>
#include <thread>
#include <queue>

template<typename T>
class threadsafe_queue
{
private:
    std::mutex mut;
    std::queue<T> data_queue;
    std::condition_variable data_cond;
public:
    void push(T new_value)
    {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(new_value);
        data_cond.notify_one();
    }

    void wait_and_pop(T& value)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk,[this]{return !data_queue.empty();});
        value=data_queue.front();
        data_queue.pop();
    }
};

static bool more = true;

bool more_data_to_prepare()
{
	return more;
}

struct data_chunk
{
	char m_data = 'q';
	data_chunk(char c) : m_data(c) {
	}

	data_chunk() : m_data('q') {
	}
};

data_chunk prepare_data()
{
	std::cout << "data_preparation_thread prepare_data"<< std::endl;
	char x = 'q';
	std::cin >> x;
	if (x == 'q')
	{
		more = false;
	}
    return data_chunk(x);
}

void process(data_chunk& data)
{
	std::cout << "process data: " << data.m_data << std::endl;
}

bool is_last_chunk(data_chunk& data)
{
	if (data.m_data == 'q')
    {
    	return true;
	}

	return false;
}

threadsafe_queue<data_chunk> data_queue;	// 用于线程间通信的队列
// mutex和条件变量都已放入到threadsafe_queue中,去掉相应的全局变量 

void data_preparation_thread()
{
    while(more_data_to_prepare())
    {
    	std::cout << "data_preparation_thread while" << std::endl;
        data_chunk const data=prepare_data();
        // 数据准备好后,将数据插入队列之中,不在需要额外的同步操作
        data_queue.push(data);
    }
}

void data_processing_thread()
{
    while(true)
    {
    	std::cout << "data_processing_thread while" << std::endl;
        data_chunk data;
        // wait_and_pop实现了相应的“等待”操作
        data_queue.wait_and_pop(data);
        std::cout << "data_processing_thread process data" << std::endl;
        process(data);
        if(is_last_chunk(data))
            break;
    }
}

int main()
{
	std::cout << "main" << std::endl;
    std::thread t1(data_preparation_thread);
    std::thread t2(data_processing_thread);

    t1.join();
    t2.join();
}

程序执行效果如下:

main
data_preparation_thread while
data_processing_thread while
data_preparation_thread prepare_data
a
data_preparation_thread while
data_preparation_thread prepare_data
data_processing_thread process data
process data: a
data_processing_thread while
q
data_processing_thread process data
process data: q

--------------------------------
Process exited after 2.937 seconds with return value 0
请按任意键继续. . .

线程间安全队列的完整实现如下:

#include <mutex>
#include <condition_variable>
#include <queue>
#include <memory>

template<typename T>
class threadsafe_queue
{
private:
    mutable std::mutex mut; // 为了是mut可以在const函数中被修改,声明为mutable
    std::queue<T> data_queue;
    std::condition_variable data_cond;
public:
    threadsafe_queue()
    {}
    threadsafe_queue(threadsafe_queue const& other)
    {
        std::lock_guard<std::mutex> lk(other.mut);
        data_queue=other.data_queue;
    }

    void push(T new_value)
    {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(new_value);
        data_cond.notify_one();
    }

    void wait_and_pop(T& value)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk,[this]{return !data_queue.empty();});
        value=data_queue.front();
        data_queue.pop();
    }

    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk,[this]{return !data_queue.empty();});
        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
        data_queue.pop();
        return res;
    }

    bool try_pop(T& value)
    {
        std::lock_guard<std::mutex> lk(mut);
        if(data_queue.empty)
            return false;
        value=data_queue.front();
        data_queue.pop();
    }

    std::shared_ptr<T> try_pop()
    {
        std::lock_guard<std::mutex> lk(mut);
        if(data_queue.empty())
            return std::shared_ptr<T>();
        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
        data_queue.pop();
        return res;
    }

    bool empty() const
    {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }
};

条件变量在多个线程等待同一个事件时,也是很有用的。当线程用来分解工作负载,并且只有一个线程可以对通知做出反应,与上述实例中使用的结构完全相同;运行多个数据实例——处理线程(processing thread)。当新的数据准备完成,调用notify_one()将会触发一个正在执行wait()的线程,去检查条件和wait()函数的返回状态(因为你仅是向data_queue添加了一个数据项)。 这里不保证哪一个线程会被通知到,即使只有一个等待线程被通知时,所有处线程也有可能都在处理数据。

另一种可能是,很多线程等待同一事件,对于通知他们都需要做出回应。这会发生在共享数据正在初始化的时候,当处理线程可以使用同一数据时,就要等待数据被初始化(有不错的机制可用来应对;或等待共享数据的更新,比如,定期重新初始化(periodic reinitialization)。在这些情况下,准备线程准备数据数据时,就会通过条件变量调用notify_all()成员函数,而非直接调用notify_one()函数。顾名思义,这就是全部线程在都去执行wait()(检查他们等待的条件是否满足)的原因。

当等待线程只等待一次,当条件为true时,它就不会再等待条件变量了,这种情况下,使用一个条件变量并非同步机制的最好选择。尤其是,要等待的是一小块数据被准备好块时。如果是这样,期望(future)可能是一个更适合的方法。

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-08-04 00:56:15

[C++11 并发编程] 12 使用条件变量创建线程间安全的队列的相关文章

conditon_variable(条件变量)用于线程间同步

conditon_variable(条件变量)用于线程间同步 condition_variable有5个函数,函数名及对应的功能如下: wait阻塞自己,等待唤醒 wait_for阻塞自己,等待唤醒,最多等待一段时间 wait_until阻塞自己,等待唤醒,最多等待到某个时间点 notify_one 唤醒一个等待在这个条件变量上的线程 notify_all 唤醒所有等待在这个条件变量上的线程 3个wait函数都要求输入一个已经上锁的unique_lock<mutex>变量,并且都有两个版本,一

并发编程(壹):创建线程的三种方式及其对比

创建线程的三种方式及其对比 1. 继承 Thread类 (1). 继承Thread类.并重写run()方法,该方法无参数,无返回值: (2). 创建子类实例,并实例化对象: (3). 通过start()方法启动,注意:不是通过run()方法启动. public class ThreadDemo extends Thread{ public void run(){ System.out.println("继承Thread创建线程的."); } } public class ThreadA

【Java并发编程】之十二:线程间通信中notifyAll造成的早期通知问题(含代码)

如果线程在等待时接到通知,但线程等待的条件还不满足,此时,线程接到的就是早期通知,如果条件满足的时间很短,但很快又改变了,而变得不再满足,这时也将发生早期通知.这种现象听起来很奇怪,下面通过一个示例程序来说明问题. 很简单,两个线程等待删除List中的元素,同时另外一个线程正要向其中添加项目.代码如下: [java] view plaincopy import java.util.*; public class EarlyNotify extends Object { private List 

转:【Java并发编程】之十二:线程间通信中notifyAll造成的早期通知问题(含代码)

转载请注明出处:http://blog.csdn.net/ns_code/article/details/17229601 如果线程在等待时接到通知,但线程等待的条件还不满足,此时,线程接到的就是早期通知,如果条件满足的时间很短,但很快又改变了,而变得不再满足,这时也将发生早期通知.这种现象听起来很奇怪,下面通过一个示例程序来说明问题. 很简单,两个线程等待删除List中的元素,同时另外一个线程正要向其中添加项目.代码如下: [java] view plain copy import java.

使用互斥量和条件变量实现线程同步控制

管程(monitor)说明 在并发编程中,管程(monitor)是一个同步构件,管程实现了同一时间点,最多只有一个线程可以执行管程的某个子程序.与那些通过修改数据结构实现互斥访问的并发程序设计相比,管程的实现很大程度上简化了程序设计. 管程可以确保一次只有一个进程执行管程中的程序,因此程序员不需要显式地编写同步代码,但是如果需要就某些特定条件上的同步,则需要定义一些条件结构(condition variable)来实现,并且对条件变量的操作仅有wait()和signal(),如下: condit

并发编程—— 任务取消 之 停止基于线程的服务

Java并发编程实践 目录 并发编程—— ConcurrentHashMap 并发编程—— 阻塞队列和生产者-消费者模式 并发编程—— 闭锁CountDownLatch 与 栅栏CyclicBarrier 并发编程—— Callable和Future 并发编程—— CompletionService : Executor 和 BlockingQueue 并发编程—— 任务取消 并发编程—— 任务取消 之 中断 并发编程—— 任务取消 之 停止基于线程的服务 概述 第1 部分 问题描述 第2 部分

Java并发编程学习笔记(一)线程安全性 1

什么是线程安全性: 要编写线程安全的代码,其核心在于要对状态访问操作进行管理,特别是对共享的和可变的状态的访问."共享"意味着变量可以由多个线程同时访问,而"可变"则意味着变量的值在其生命周期内可以发生变化. 一个对象是否需要线程安全的,取决于他是否被多个线程访问.这指的是在程序中访问对象的方式,而不是对象要实现的功能.要使得对象时线程安全的,需要采用同步机制来协同对对象可变状态的访问.如果无法实现协同,那么可能导致数据破坏以及其他不该出现的结果. 如果当多个线程访

static变量的线程间共享,进程间不共享

JAVA中通常我们会使用static域变量来在内存中缓存数据或长驻内存数据,众所周知,static是类的所有实例所共享. 考虑一个问题,假如在多线程情况下,共享数据肯定会有危险的, 例如使用SimpleDateFormat工具的一个变量时,为方便作为util写为了static,后来 在几W条数据中会出现一个奇怪的日期,这就是在多线程下会出现问题导致的数据冲突, 一般这种能私有不共享的数据最好为一个实例一份拷贝,不要做为static若非要做,就对象同步锁,使之单线程. 现在考虑的问题是,在进程间,

C++11并发编程-条件变量(condition_variable)详解

<condition_variable >头文件主要包含了与条件变量相关的类和函数.相关的类包括 std::condition_variable和 std::condition_variable_any,还有枚举类型std::cv_status.另外还包括函数 std::notify_all_at_thread_exit(),下面分别介绍一下以上几种类型. std::condition_variable 类介绍 std::condition_variable是条件变量,更多有关条件变量的定义参