双缓冲队列-减少生产者消费者锁的调用

在生产者-消费者模式中,我们常常会使用到队列,这个队列在多个线程共享访问时存在互斥和竞争操作, 意味着每次访问都要加锁。如何更好的如何减少锁竞争次数呢 ?今天要介绍的双缓冲队列就是个不错的选择。

双缓冲队列就是冲着同步/互斥的开销来的。我们知道,在多个线程并发访问同一个资源的时候,需要特别注意线程的同步问题。稍稍不注意,噢货,程序结果不正确了。

原理

直接上图:

这样为什么会减少锁的调用呢?

举个例子,

两个List,一个用来存,一个用来取。有点迷糊?就是有一个listP从工厂那里得到玩具对象,另外一个listT就专门把它得到的玩具对象送去给Kid类处理。当listT变成空的了以后,再将listP中在这段时间内取到的所有玩具对象放到listT中,好,这完了之后,他们两个就又各自干各自的去了:listP再去取,listT再去送。这样是不是就减少了很多次的线程同步呢?至少,在它们交换之前,listP是完全被工厂类线程占有,listT是完全被Kid类线程占有的,不用处理同步。只有在listT放完了,没得给了,再去跟ListP换过来,这个时候就要处理同步了。(对比一下,原先是工厂生产一个加把锁,往队列里塞一个。消费者加一把锁,从队列里取一个。双队列可以一下取很多,是不是节约了资源呢)

阻塞队列相较于原始的生产者消费者也可以提高效率,下一篇再进行分析

转一个别人的实现

二、生产者消费者-双缓冲

一个公共缓存区,由于多线程访问的锁冲突较大,可以采取双缓冲手段来解决锁的冲突

双缓冲的关键:双缓冲队列的数据交换

1)生产者线程不断的向生产者队列A写入数据,当队列中有数据时,进行数据的交换,交换开始启动时通过条件变量通知交换线程来处理最先的数据交换。

2)数据交换完成后,通过条件变量通知消费者处理数据,此时交换线程阻塞到消费者数据处理完成时通知的条件变量上。

3)消费者收到数据交换后的通知后,进行数据的处理,数据处理完成后,通知交换线程进行下一轮的双缓冲区的数据交换。

要点:

生产者除了在数据交换时,其余时刻都在不停的生产数据。

数据交换队列需要等待消费者处理数据完成的通知,以进行下一轮交换。

消费者处理数据时,不进行数据交换,生产者同时会不断的生产数据,消费者需要等待数据交换完成的通知,并且发送消费完成的通知给交换线程

 使用条件变量的版本实现

  1 typedef struct Mutex_Condition{
  2     std::mutex mt;
  3     std::condition_variable cv;
  4 }Mutex_Condition;
  5
  6 class Produce_1 {
  7 public:
  8     Produce_1(std::queue<int> * que_1, std::queue<int> * que_2, Mutex_Condition * mc_1 , Mutex_Condition * mc_2) {
  9         m_read_que   = que_1;
 10         m_writer_que = que_2;
 11         m_read_mc    = mc_1;
 12         m_writer_mc  = mc_2;
 13         m_stop       = false;
 14
 15     }
 16     void runProduce() {
 17         while (!m_stop) {
 18             std::this_thread::sleep_for(std::chrono::microseconds(20 * 1000));
 19             std::lock_guard<std::mutex> lgd(m_writer_mc->mt);
 20             m_writer_que->push(1);
 21             m_writer_mc->cv.notify_one();
 22             std::cout << "m_writer push" << std::endl;
 23         }
 24
 25     }
 26     void join() {
 27         m_trd->join();
 28         m_trd.reset();
 29     }
 30     void start() {
 31         m_trd.reset(new std::thread(std::bind(std::mem_fun(&Produce_1::runProduce), this)));
 32     }
 33     void stop() {
 34         m_stop = true;
 35     }
 36 private:
 37     Mutex_Condition * m_read_mc;
 38     Mutex_Condition * m_writer_mc;
 39     std::queue<int> * m_read_que;
 40     std::queue<int> * m_writer_que;
 41     volatile bool m_stop;
 42     std::shared_ptr<std::thread> m_trd;
 43 };
 44
 45
 46 class Consume_1 {
 47 public:
 48     Consume_1(std::queue<int> * que_1, std::queue<int> * que_2, Mutex_Condition * mc_1,Mutex_Condition * mc_2,Mutex_Condition * switch_mc) {
 49         m_read_que    = que_1;
 50         m_writer_que  = que_2;
 51         m_read_mc     = mc_1;
 52         m_writer_mc   = mc_2;
 53         m_stop        = false;
 54         m_switch_mc = switch_mc;
 55     }
 56
 57     void runConsume() {
 58         while (!m_stop) {
 59             while (true) {
 60                 std::unique_lock<std::mutex> ulg(m_read_mc->mt);
 61                 while (m_read_que->empty()) {
 62                     m_read_mc->cv.wait(ulg);
 63                 }
 64                 //deal data
 65                 //std::lock_guard<std::mutex> ulg(m_read_mc->mt);
 66                 while (!m_read_que->empty()) {
 67                     m_read_que->pop();
 68                     std::cout << "m_read_queue pop" << std::endl;
 69                 }
 70                 m_switch_mc->cv.notify_one();
 71             }
 72         }
 73     }
 74     void join() {
 75         m_trd->join();
 76         m_trd.reset();
 77     }
 78     void start() {
 79         m_trd.reset(new std::thread(std::bind(std::mem_fun(&Consume_1::runConsume), this)));
 80     }
 81     void stop() {
 82         m_stop = true;
 83     }
 84 private:
 85     Mutex_Condition * m_read_mc;
 86     Mutex_Condition * m_writer_mc;
 87     Mutex_Condition * m_switch_mc;
 88     std::queue<int> * m_read_que;
 89     std::queue<int> * m_writer_que;
 90     volatile bool m_stop;
 91     std::shared_ptr<std::thread> m_trd;
 92 };
 93 void que_switch_trd(std::queue<int> * read_que, std::queue<int> * writer_que, Mutex_Condition * read_mc, Mutex_Condition * writer_mc,Mutex_Condition * switch_mc) {
 94     while (true) {
 95         {
 96             std::unique_lock<std::mutex> ulg(writer_mc->mt);
 97             while (writer_que->empty()) {
 98                 writer_mc->cv.wait(ulg);
 99             }
100             std::lock_guard<std::mutex> ulg_2(read_mc->mt);
101             std::swap(*read_que, *writer_que);
102             std::cout << "switch queue" << std::endl;
103             if (!read_que->empty()) {
104                 read_mc->cv.notify_one();
105             }
106         }
107         std::unique_lock<std::mutex> ulg_2(switch_mc->mt);
108         while (!read_que->empty()) {
109             switch_mc->cv.wait(ulg_2);
110         }
111     }
112 }
113 int main(){
114
115     Mutex_Condition mc_1;
116     Mutex_Condition mc_2;
117     Mutex_Condition mc_3;
118     std::queue<int> que_1;
119     std::queue<int> que_2;
120
121     Produce_1 produce_1(&que_1, &que_2, &mc_1, &mc_2);
122     Consume_1 consume_1(&que_1, &que_2, &mc_1, &mc_2,&mc_3);
123
124     std::thread trd(std::bind(&que_switch_trd, &que_1, &que_2, &mc_1, &mc_2,&mc_3));
125     produce_1.start();
126     consume_1.start();
127
128     produce_1.join();
129     consume_1.join();
130     trd.join();
131
132     return 0;
133 }

原文地址:https://www.cnblogs.com/wangshaowei/p/10744894.html

时间: 2024-07-29 14:25:40

双缓冲队列-减少生产者消费者锁的调用的相关文章

双缓冲队列来减少锁的竞争

双缓冲队列来减少锁的竞争 在日常的开发中,日志的记录是必不可少的.但是我们也清楚对同一个文本进行写日志只能单线程的去写,那么我们也经常会使用简单lock锁来保证只有一个线程来写入日志信息.但是在多线程的去写日志信息的时候,由于记录日志信息是需要进行I/O交互的,导致我们占用锁的时间会加长,从而导致大量线程的阻塞与等待. 这种场景下我们就会去思考,我们该怎么做才能保证当有多个线程来写日志的时候我们能够在不利用锁的情况下让他们依次排队去写呢?这个时候我们就可以考虑下使用双缓冲队列来完成. 所谓双缓冲

利用双缓冲队列来减少锁的竞争

在日常的开发中,日志的记录是必不可少的.但是我们也清楚对同一个文本进行写日志只能单线程的去写,那么我们也经常会使用简单lock锁来保证只有一个线程来写入日志信息.但是在多线程的去写日志信息的时候,由于记录日志信息是需要进行I/O交互的,导致我们占用锁的时间会加长,从而导致大量线程的阻塞与等待. 这种场景下我们就会去思考,我们该怎么做才能保证当有多个线程来写日志的时候我们能够在不利用锁的情况下让他们依次排队去写呢?这个时候我们就可以考虑下使用双缓冲队列来完成. 所谓双缓冲队列就是有两个队列,一个是

Python之路(第三十八篇) 并发编程:进程同步锁/互斥锁、信号量、事件、队列、生产者消费者模型

一.进程锁(同步锁/互斥锁) 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理. 例子 #并发运行,效率高,但竞争同一打印终端,带来了打印错乱 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('

多线程操作C++ STL vector出现概率coredump问题及尽量避免锁的双缓冲队列

多线程操作全局变量,必须考虑同步问题,否则可能出现数据不一致, 甚至触发coredump. 前段时间, 遇到一个多线程操作了全局的vector的问题,  程序崩了.场景是这样的:某全局配置参数保存在一个vector中,需要定时更新(更新线程), 另外的工作线程去读取配置. 这种场景是非常普遍的. 在该场景中,程序没有枷锁,概率coredump, 实际情况是,服务跑了一段时间后,必然coredump.   很显然, 更新线程执行clear,然后在push_back操作时, 会导致工作线程的vect

服务器应用--双缓冲队列

在服务器开发中 通常的做法是 把 逻辑处理线程和I/O处理线程分离. 逻辑处理线程:对接收的包进行逻辑处理. I/0处理线程:网络数据的发送和接收,连接的建立和维护. 通常 逻辑处理线程和I/O处理线程是通过数据队列来交换数据,就是生产者--消费者模型. 这个数据队列是多个线程在共享,每次访问都需要加锁,因此如何减少 互斥/同步的开销就显得尤为重要.                                                                         解

阻塞队列和生产者-消费者模式、DelayQueue

1.ArrayDeque, (数组双端队列) 2.PriorityQueue, (优先级队列) 3.ConcurrentLinkedQueue, (基于链表的并发队列) 4.DelayQueue,                                         (延期阻塞队列)(阻塞队列实现了BlockingQueue接口) 5.ArrayBlockingQueue,           (基于数组的并发阻塞队列) 6.LinkedBlockingQueue,        (基

阻塞队列和生产者-消费者模式

阻塞队列提供了可阻塞的put和take方法.如果队列满了put将阻塞到有空间可用,如果队列为空,take将阻塞到有元素可用.队列可以是有界和无界的,无界的队列put将不会阻塞. 阻塞队列支持生产者消费者模式,该模式将找出需要完成的工作,和执行工作分开.生产者-消费者模式能简化开发过程,因为消除了生产者和消费者之间的代码依赖性,此外,该模式还将生产数据的过程和使用数据的过程解耦开来. 在基于阻塞队列构建的生产者-消费者设计中个,当数据生成时,生产者把数据放入队列,当消费者处理数据时,将从队列中获取

队列、生产者消费者模型

目录 队列.生产者消费者模型.初识线程 一.用进程锁来优化抢票小程序 1.1 进程锁 1.2 优化抢票小程序 二.队列 2.1 队列的介绍 2.2 创建队列的类 2.3 使用队列的案例 三.生产者消费者模型 3.1 用队列Queue实现生产者消费者模型 3.2 用队列JoinableQueue实现生产者消费者模型 队列.生产者消费者模型.初识线程 一.用进程锁来优化抢票小程序 1.1 进程锁 进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端是没有问题的.而共享带来

11.python并发入门(part8 基于线程队列实现生产者消费者模型)

一.什么是生产者消费者模型? 生产者就是生产数据的线程,消费者指的就是消费数据的线程. 在多线程开发过程中,生产者的速度比消费者的速度快,那么生产者就必须等待消费者把数据处理完,生产者才会产生新的数据,相对的,如果消费者处理数据的速度大于生产者,那么消费者就必须等待生产者. 为了解决这种问题,就有了生产者消费者模型. 生产者与消费者模型,是通过一个容器,来解决生产者和消费者之间的耦合性问题,生产者和消费者之间并不会直接通信,这样生产者就无需等待消费者处理完数据,生产者可以直接把数据扔给队列,这个