条件变量
如果线程之间执行顺序上有依赖关系,可使用条件变量(Condition variables)。
可以到boost官网中参考条件变量(Condition variables)的使用。
条件变量必须和互斥量配合使用,等待另一个线程重某个事件的发生(满足某个条件),然后线程才能继续执行。
共有两种条件变量对象condition_variable, condition_variable_any,一般情况下使用condition_variable_any。
条件变量的使用方式:
拥有条件变量的线程先锁定互斥量,然后循环检查某个条件,如果条件不满足,那么就调用条件变量的成员函数wait()等待直到条件满足。其他线程处理条件变量要求的条件,当条件满足时调用它的成员函数notify_one()或者notify_all(),以通知一个或者所有正在等待条件的变量的线程停止等待继续执行。
例子:生产--消费模型。
缓冲区buffer使用了两个条件变量cond_put和cond_get,分别用于处理put动作和get动作,如果缓冲区满则cond_put持续等待,当cond_put得到通知 (缓冲区不满)时线程写入数据,然后通知cond_get条件变量可以获取数据。cond_get的处理流程与cond_put类似。
C++代码
- #include <boost/thread.hpp>
- #include <boost/thread/mutex.hpp>
- #include <iostream>
- #include <stack>
- using namespace std;
- boost::mutex io_mu;
- class buffer
- {
- private:
- boost::mutex mu; // 互斥量,配合条件变量使用
- boost::condition_variable_any cond_put; // 写入条件变量
- boost::condition_variable_any cond_get; // 读取条件变量
- stack<int> stk; // 缓冲区对象
- int un_read, capacity;
- bool is_full() // 缓冲区满判断
- {
- return un_read == capacity;
- }
- bool is_empty() // 缓冲区空判断
- {
- return un_read == 0;
- }
- public:
- buffer(size_t n) : un_read(0), capacity(n){} // 构造函数
- void put(int x) // 写入数据
- {
- { // 开始一个局部域
- boost::mutex::scoped_lock lock(mu); //锁定互斥量
- while ( is_full() ) // 检查缓冲区是否满
- {
- { // 局部域,锁定cout输出一条信息
- boost::mutex::scoped_lock lock(io_mu);
- cout << "full waiting..." << endl;
- }
- cond_put.wait(mu); // 条件变量等待
- } // 条件变脸满足,停止等待
- stk.push(x); // 压栈,写入数据
- ++un_read;
- } // 解锁互斥量,条件变量的通知不需要互斥量锁定
- cond_get.notify_one(); // 通知可以读取数据
- }
- void get(int *x) // 读取数据
- {
- { // 局部域开始
- boost::mutex::scoped_lock lock(mu); // 锁定互斥量
- while (is_empty()) // 检查缓冲区是否空
- {
- {
- boost::mutex::scoped_lock lock(io_mu);
- cout << "empty waiting..." << endl;
- }
- cond_get.wait(mu); // 条件变量等待
- }
- --un_read;
- *x = stk.top(); // 读取数据
- stk.pop(); // 弹栈
- }
- cond_put.notify_one(); // 通知可以写入数据
- }
- };
- buffer buf(5); // 一个缓冲区对象
- void producter(int n) // 生产者
- {
- for (int i = 0; i < n; ++i)
- {
- {
- boost::mutex::scoped_lock lock(io_mu);
- cout << "put " << i << endl;
- }
- buf.put(i); // 写入数据
- }
- }
- void consumer(int n) // 消费者
- {
- int x;
- for (int i = 0; i < n; ++i)
- {
- buf.get(&x); // 读取数据
- boost::mutex::scoped_lock lock(io_mu);
- cout << "get " << x << endl;
- }
- }
- int main()
- {
- boost::thread t1(producter, 20); // 一个生产者线程
- boost::thread t2(consumer, 10); // 两个消费者线程
- boost::thread t3(consumer, 10);
- t1.join();
- t2.join();
- t3.join();
- return 0;
- }
运行结果:
empty waiting...
put 0
empty waiting...
put 1
put 2
get 1
get 2
get 0
empty waiting...
empty waiting...
put 3
put 4
put 5
put 6
put 7
get 6
get 7
get 5
get 4
get 3
empty waiting...
put 8
empty waiting...
put 9
put 10
put 11
get 9
get 11
get 8
empty waiting...
put 12
put 13
put 14
put 15
put 16
put 17
full waiting...
get 10
get 16
put 18
full waiting...
get 17
get 15
get 14
get 13
get 12
get 18
empty waiting...
put 19
get 19