C++11 实现生产者消费者双缓冲

基础的生产者消费者模型,生产者向公共缓存区写入数据,消费者从公共缓存区读取数据进行处理,两个线程访问公共资源,加锁实现数据的一致性。

通过加锁来实现

 1 class Produce_1 {
 2 public:
 3     Produce_1(std::queue<int> * que_, std::mutex * mt_) {
 4         m_mt = mt_;
 5         m_que = que_;
 6         m_stop = false;
 7     }
 8     void runProduce() {
 9         while (!m_stop) {
10             std::this_thread::sleep_for(std::chrono::seconds(1));
11             std::lock_guard<std::mutex> lgd(*m_mt);
12             m_que->push(1);
13             std::cout << "Produce_1 produce 1" << std::endl;
14         }
15     }
16     void join() {
17         m_trd->join();
18         m_trd.reset();
19     }
20     void start() {
21         m_trd.reset(new std::thread(std::bind(std::mem_fun(&Produce_1::runProduce), this)));
22     }
23     void stop() {
24         m_stop = true;
25     }
26 private:
27     std::mutex * m_mt;
28     std::queue<int> * m_que;
29     volatile bool m_stop;
30     std::shared_ptr<std::thread> m_trd;
31 };
32
33
34 /*
35 *单缓冲一个同步队列 效率较低
36 */
37 class Consume_1 {
38 public:
39     Consume_1(std::queue<int> * que_, std::mutex * mt_) {
40         m_mt = mt_;
41         m_que = que_;
42         m_stop = false;
43     }
44
45     void runConsume() {
46         while (!m_stop) {
47             std::this_thread::sleep_for(std::chrono::seconds(1));
48             std::lock_guard<std::mutex> lgd(*m_mt);
49             if (!m_que->empty()) {
50                 m_que->pop();
51             }
52             std::cout << "Consume_1 consume" << std::endl;
53         }
54     }
55     void join() {
56         m_trd->join();
57         m_trd.reset();
58     }
59     void start() {
60         m_trd.reset(new std::thread(std::bind(std::mem_fun(&Consume_1::runConsume), this)));
61     }
62     void stop() {
63         m_stop = true;
64     }
65 private:
66     std::mutex * m_mt;
67     std::queue<int> * m_que;
68     volatile bool m_stop;
69     std::shared_ptr<std::thread> m_trd;
70 };

通过条件变量来实现

 1 typedef struct Mutex_Condition{
 2     std::mutex mt;
 3     std::condition_variable cv;
 4 }Mutex_Condition;
 5
 6 class Produce {
 7 public:
 8     Produce(std::queue<int> * que_, Mutex_Condition * mc_) {
 9         m_que = que_;
10         m_mc = mc_;
11         m_stop = false;
12     }
13     void join() {
14         m_trd->join();
15         m_trd.reset();
16     }
17     void produce(int enter) {
18         std::lock_guard<std::mutex> lgd(m_mc->mt);
19         m_que->push(enter);
20         m_mc->cv.notify_one();
21     }
22
23     void runProduce() {
24         while (!m_stop) {
25             std::this_thread::sleep_for(std::chrono::seconds(1));
26             produce(1);
27             std::cout << "Produce Thread produce 1 " << std::endl;
28         }
29     }
30
31     void start() {
32         m_trd.reset(new std::thread(std::bind(std::mem_fun(&Produce::runProduce), this)));
33     }
34     void stop() {
35         m_stop = true;
36     }
37
38 private:
39     std::queue<int> * m_que;
40     Mutex_Condition * m_mc;
41     std::shared_ptr<std::thread> m_trd;
42     volatile bool m_stop;
43 };
44
45
46 class Consume {
47 public:
48     Consume(std::queue<int> * que_, Mutex_Condition * mc_) {
49         m_que = que_;
50         m_mc = mc_;
51         m_stop = false;
52     }
53     void join() {
54         m_trd->join();
55         m_trd.reset();
56     }
57     void consume() {
58         std::unique_lock<std::mutex> lgd(m_mc->mt);
59         while (m_que->empty()) {
60             int i = 0;
61             m_mc->cv.wait(lgd);
62         }
63         m_que->pop();
64         std::cout << "Consume Thread consume " << std::endl;
65     }
66     void runConsume() {
67         while (!m_stop) {
68             std::this_thread::sleep_for(std::chrono::seconds(1));
69             consume();
70         }
71     }
72     void start() {
73         m_trd.reset(new std::thread(std::bind(std::mem_fun(&Consume::runConsume), this)));
74     }
75     void stop() {
76         m_stop = true;
77     }
78
79 private:
80     std::queue<int> * m_que;
81     Mutex_Condition * m_mc;
82     std::shared_ptr<std::thread> m_trd;
83     volatile bool m_stop;
84
85 };

二、生产者消费者-双缓冲

一个公共缓存区,由于多线程访问的锁冲突较大,可以采取双缓冲手段来解决锁的冲突

双缓冲的关键:双缓冲队列的数据交换

1)生产者线程不断的向生产者队列A写入数据,当队列中有数据时,进行数据的交换,交换开始启动时通过条件变量通知交换线程来处理最先的数据交换。

2)数据交换完成后,通过条件变量通知消费者处理数据,此时交换线程阻塞到消费者数据处理完成时通知的条件变量上。

3)消费者收到数据交换后的通知后,进行数据的处理,数据处理完成后,通知交换线程进行下一轮的双缓冲区的数据交换。

要点:

生产者除了在数据交换时,其余时刻都在不停的生产数据。

数据交换队列需要等待消费者处理数据完成的通知,以进行下一轮交换。

消费者处理数据时,不进行数据交换,生产者同时会不断的生产数据,消费者需要等待数据交换完成的通知,并且发送消费完成的通知给交换线程

  1 typedef struct Mutex_Condition{
  2     std::mutex mt;
  3     std::condition_variable cv;
  4 }Mutex_Condition;
  5
  6 class Produce_1 {
  7 public:
  8     Produce_1(std::queue<int> * que_1, std::queue<int> * que_2, Mutex_Condition * mc_1 , Mutex_Condition * mc_2) {
  9         m_read_que   = que_1;
 10         m_writer_que = que_2;
 11         m_read_mc    = mc_1;
 12         m_writer_mc  = mc_2;
 13         m_stop       = false;
 14
 15     }
 16     void runProduce() {
 17         while (!m_stop) {
 18             std::this_thread::sleep_for(std::chrono::microseconds(20 * 1000));
 19             std::lock_guard<std::mutex> lgd(m_writer_mc->mt);
 20             m_writer_que->push(1);
 21             m_writer_mc->cv.notify_one();
 22             std::cout << "m_writer push" << std::endl;
 23         }
 24
 25     }
 26     void join() {
 27         m_trd->join();
 28         m_trd.reset();
 29     }
 30     void start() {
 31         m_trd.reset(new std::thread(std::bind(std::mem_fun(&Produce_1::runProduce), this)));
 32     }
 33     void stop() {
 34         m_stop = true;
 35     }
 36 private:
 37     Mutex_Condition * m_read_mc;
 38     Mutex_Condition * m_writer_mc;
 39     std::queue<int> * m_read_que;
 40     std::queue<int> * m_writer_que;
 41     volatile bool m_stop;
 42     std::shared_ptr<std::thread> m_trd;
 43 };
 44
 45
 46 class Consume_1 {
 47 public:
 48     Consume_1(std::queue<int> * que_1, std::queue<int> * que_2, Mutex_Condition * mc_1,Mutex_Condition * mc_2,Mutex_Condition * switch_mc) {
 49         m_read_que    = que_1;
 50         m_writer_que  = que_2;
 51         m_read_mc     = mc_1;
 52         m_writer_mc   = mc_2;
 53         m_stop        = false;
 54         m_switch_mc = switch_mc;
 55     }
 56
 57     void runConsume() {
 58         while (!m_stop) {
 59             while (true) {
 60                 std::this_thread::sleep_for(std::chrono::microseconds(20 * 1000));
 61                 std::unique_lock<std::mutex> ulg(m_read_mc->mt);
 62                 while (m_read_que->empty()) {
 63                     m_read_mc->cv.wait(ulg);
 64                 }
 65                 //deal data
 66                 //std::lock_guard<std::mutex> ulg(m_read_mc->mt);
 67                 while (!m_read_que->empty()) {
 68                     m_read_que->pop();
 69                     std::cout << "m_read_queue pop" << std::endl;
 70                 }
 71                 m_switch_mc->cv.notify_one();
 72             }
 73         }
 74     }
 75     void join() {
 76         m_trd->join();
 77         m_trd.reset();
 78     }
 79     void start() {
 80         m_trd.reset(new std::thread(std::bind(std::mem_fun(&Consume_1::runConsume), this)));
 81     }
 82     void stop() {
 83         m_stop = true;
 84     }
 85 private:
 86     Mutex_Condition * m_read_mc;
 87     Mutex_Condition * m_writer_mc;
 88     Mutex_Condition * m_switch_mc;
 89     std::queue<int> * m_read_que;
 90     std::queue<int> * m_writer_que;
 91     volatile bool m_stop;
 92     std::shared_ptr<std::thread> m_trd;
 93 };
 94 void que_switch_trd(std::queue<int> * read_que, std::queue<int> * writer_que, Mutex_Condition * read_mc, Mutex_Condition * writer_mc,Mutex_Condition * switch_mc) {
 95     while (true) {
 96         std::this_thread::sleep_for(std::chrono::microseconds(20*1000));
 97         {
 98             std::unique_lock<std::mutex> ulg(writer_mc->mt);
 99             while (writer_que->empty()) {
100                 writer_mc->cv.wait(ulg);
101             }
102             std::lock_guard<std::mutex> ulg_2(read_mc->mt);
103             std::swap(*read_que, *writer_que);
104             std::cout << "switch queue" << std::endl;
105             if (!read_que->empty()) {
106                 read_mc->cv.notify_one();
107             }
108         }
109         std::unique_lock<std::mutex> ulg_2(switch_mc->mt);
110         while (!read_que->empty()) {
111             switch_mc->cv.wait(ulg_2);
112         }
113     }
114 }
115 int main(){
116
117     Mutex_Condition mc_1;
118     Mutex_Condition mc_2;
119     Mutex_Condition mc_3;
120     std::queue<int> que_1;
121     std::queue<int> que_2;
122
123     Produce_1 produce_1(&que_1, &que_2, &mc_1, &mc_2);
124     Consume_1 consume_1(&que_1, &que_2, &mc_1, &mc_2,&mc_3);
125
126     std::thread trd(std::bind(&que_switch_trd, &que_1, &que_2, &mc_1, &mc_2,&mc_3));
127     produce_1.start();
128     consume_1.start();
129
130     produce_1.join();
131     consume_1.join();
132     trd.join();
133
134     return 0;
135 }
时间: 2024-11-29 09:16:55

C++11 实现生产者消费者双缓冲的相关文章

C++11实现生产者消费者问题

生产者消费者问题是多线程并发中一个非常经典的问题.我在这里实现了一个基于C++11的,单生产者单消费者的版本,供大家参考. #include <windows.h> #include <iostream> #include <cstdlib> #include <mutex> #include <thread> #include <condition_variable> const int bufferSize=10; struct

双缓冲队列-减少生产者消费者锁的调用

在生产者-消费者模式中,我们常常会使用到队列,这个队列在多个线程共享访问时存在互斥和竞争操作, 意味着每次访问都要加锁.如何更好的如何减少锁竞争次数呢 ?今天要介绍的双缓冲队列就是个不错的选择. 双缓冲队列就是冲着同步/互斥的开销来的.我们知道,在多个线程并发访问同一个资源的时候,需要特别注意线程的同步问题.稍稍不注意,噢货,程序结果不正确了. 原理 直接上图: 这样为什么会减少锁的调用呢? 举个例子, 两个List,一个用来存,一个用来取.有点迷糊?就是有一个listP从工厂那里得到玩具对象,

c++11 条件变量 生产者-消费者 并发线程

http://baptiste-wicht.com/posts/2012/04/c11-concurrency-tutorial-advanced-locking-and-condition-variables.html ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 struc

C++11 生产者消费者

下面是一个生产者消费者问题,来介绍condition_variable的用法.当线程间的共享数据发生变化的时候,可以通过condition_variable来通知其他的线程.消费者wait 直到生产者通知其状态发生改变,Condition_variable是使用方法如下: ·当持有锁之后,线程调用wait ·wait解开持有的互斥锁(mutex),阻塞本线程,并将自己加入到唤醒队列中 ·当收到通知(notification),该线程从阻塞中恢复,并加入互斥锁队列(mutex queue) 线程被

综合运用: C++11 多线程下生产者消费者模型详解(转)

生产者消费者问题是多线程并发中一个非常经典的问题,相信学过操作系统课程的同学都清楚这个问题的根源.本文将就四种情况分析并介绍生产者和消费者问题,它们分别是:单生产者-单消费者模型,单生产者-多消费者模型,多生产者-单消费者模型,多生产者-多消费者模型,我会给出四种情况下的 C++11 并发解决方案,如果文中出现了错误或者你对代码有异议,欢迎交流 ;-). 单生产者-单消费者模型 顾名思义,单生产者-单消费者模型中只有一个生产者和一个消费者,生产者不停地往产品库中放入产品,消费者则从产品库中取走产

11.python并发入门(part8 基于线程队列实现生产者消费者模型)

一.什么是生产者消费者模型? 生产者就是生产数据的线程,消费者指的就是消费数据的线程. 在多线程开发过程中,生产者的速度比消费者的速度快,那么生产者就必须等待消费者把数据处理完,生产者才会产生新的数据,相对的,如果消费者处理数据的速度大于生产者,那么消费者就必须等待生产者. 为了解决这种问题,就有了生产者消费者模型. 生产者与消费者模型,是通过一个容器,来解决生产者和消费者之间的耦合性问题,生产者和消费者之间并不会直接通信,这样生产者就无需等待消费者处理完数据,生产者可以直接把数据扔给队列,这个

11.9-全栈Java笔记: 线程并发协作(生产者/消费者模式)

多线程环境下,我们经常需要多个线程的并发和协作.这个时候,就需要了解一个重要的多线程并发协作模型"生产者消费者模式". 什么是生产者? 生产者指的是负责生产数据的模块(这里模块可能是:方法.对象.线程.进程). 什么是消费者? 消费者指的是负责处理数据的模块(这里模块可能是:方法.对象.线程.进程). 什么是缓冲区? 消费者不能直接使用生产者的数据,它们之间有个"缓冲区".生产者将生产好的数据放入"缓冲区",消费者从"缓冲区"

生产者消费者模式(转)

本文转载自博文系列架构设计:生产者/消费者模式.文中对原文格式进行了稍加整理. 概述 今天打算来介绍一下“生产者/消费者模式”,这玩意儿在很多开发领域都能派上用场.由于该模式很重要,打算分几个帖子来介绍.今天这个帖子先来扫盲一把.如果你对这个模式已经比较了解,请跳过本扫盲帖,直接看下一个帖子(关于该模式的具体应用) . 看到这里,可能有同学心中犯嘀咕了:在四人帮(GOF)的23种模式里面似乎没听说过这种嘛!其实GOF那经典的23种模式主要是基于OO的(从书名<Design Patterns: E

python-实现生产者消费者模型

生产者消费者:包子铺不停的做包子,行人不停的买 ---> 这样就达到了目的--->包子的销售 两个不同的角色 包子铺,行人 只负责单一操作 让包子变成连接的介质. 1 #_*_coding:utf-8_*_ 2 from threading import Thread 3 from Queue import Queue 4 import time 5 class Procuder(Thread): 6 def __init__(self,name,queue): 7 self.__Name =