基础的生产者消费者模型,生产者向公共缓存区写入数据,消费者从公共缓存区读取数据进行处理,两个线程访问公共资源,加锁实现数据的一致性。
通过加锁来实现
1 class Produce_1 { 2 public: 3 Produce_1(std::queue<int> * que_, std::mutex * mt_) { 4 m_mt = mt_; 5 m_que = que_; 6 m_stop = false; 7 } 8 void runProduce() { 9 while (!m_stop) { 10 std::this_thread::sleep_for(std::chrono::seconds(1)); 11 std::lock_guard<std::mutex> lgd(*m_mt); 12 m_que->push(1); 13 std::cout << "Produce_1 produce 1" << std::endl; 14 } 15 } 16 void join() { 17 m_trd->join(); 18 m_trd.reset(); 19 } 20 void start() { 21 m_trd.reset(new std::thread(std::bind(std::mem_fun(&Produce_1::runProduce), this))); 22 } 23 void stop() { 24 m_stop = true; 25 } 26 private: 27 std::mutex * m_mt; 28 std::queue<int> * m_que; 29 volatile bool m_stop; 30 std::shared_ptr<std::thread> m_trd; 31 }; 32 33 34 /* 35 *单缓冲一个同步队列 效率较低 36 */ 37 class Consume_1 { 38 public: 39 Consume_1(std::queue<int> * que_, std::mutex * mt_) { 40 m_mt = mt_; 41 m_que = que_; 42 m_stop = false; 43 } 44 45 void runConsume() { 46 while (!m_stop) { 47 std::this_thread::sleep_for(std::chrono::seconds(1)); 48 std::lock_guard<std::mutex> lgd(*m_mt); 49 if (!m_que->empty()) { 50 m_que->pop(); 51 } 52 std::cout << "Consume_1 consume" << std::endl; 53 } 54 } 55 void join() { 56 m_trd->join(); 57 m_trd.reset(); 58 } 59 void start() { 60 m_trd.reset(new std::thread(std::bind(std::mem_fun(&Consume_1::runConsume), this))); 61 } 62 void stop() { 63 m_stop = true; 64 } 65 private: 66 std::mutex * m_mt; 67 std::queue<int> * m_que; 68 volatile bool m_stop; 69 std::shared_ptr<std::thread> m_trd; 70 };
通过条件变量来实现
1 typedef struct Mutex_Condition{ 2 std::mutex mt; 3 std::condition_variable cv; 4 }Mutex_Condition; 5 6 class Produce { 7 public: 8 Produce(std::queue<int> * que_, Mutex_Condition * mc_) { 9 m_que = que_; 10 m_mc = mc_; 11 m_stop = false; 12 } 13 void join() { 14 m_trd->join(); 15 m_trd.reset(); 16 } 17 void produce(int enter) { 18 std::lock_guard<std::mutex> lgd(m_mc->mt); 19 m_que->push(enter); 20 m_mc->cv.notify_one(); 21 } 22 23 void runProduce() { 24 while (!m_stop) { 25 std::this_thread::sleep_for(std::chrono::seconds(1)); 26 produce(1); 27 std::cout << "Produce Thread produce 1 " << std::endl; 28 } 29 } 30 31 void start() { 32 m_trd.reset(new std::thread(std::bind(std::mem_fun(&Produce::runProduce), this))); 33 } 34 void stop() { 35 m_stop = true; 36 } 37 38 private: 39 std::queue<int> * m_que; 40 Mutex_Condition * m_mc; 41 std::shared_ptr<std::thread> m_trd; 42 volatile bool m_stop; 43 }; 44 45 46 class Consume { 47 public: 48 Consume(std::queue<int> * que_, Mutex_Condition * mc_) { 49 m_que = que_; 50 m_mc = mc_; 51 m_stop = false; 52 } 53 void join() { 54 m_trd->join(); 55 m_trd.reset(); 56 } 57 void consume() { 58 std::unique_lock<std::mutex> lgd(m_mc->mt); 59 while (m_que->empty()) { 60 int i = 0; 61 m_mc->cv.wait(lgd); 62 } 63 m_que->pop(); 64 std::cout << "Consume Thread consume " << std::endl; 65 } 66 void runConsume() { 67 while (!m_stop) { 68 std::this_thread::sleep_for(std::chrono::seconds(1)); 69 consume(); 70 } 71 } 72 void start() { 73 m_trd.reset(new std::thread(std::bind(std::mem_fun(&Consume::runConsume), this))); 74 } 75 void stop() { 76 m_stop = true; 77 } 78 79 private: 80 std::queue<int> * m_que; 81 Mutex_Condition * m_mc; 82 std::shared_ptr<std::thread> m_trd; 83 volatile bool m_stop; 84 85 };
二、生产者消费者-双缓冲
一个公共缓存区,由于多线程访问的锁冲突较大,可以采取双缓冲手段来解决锁的冲突
双缓冲的关键:双缓冲队列的数据交换
1)生产者线程不断的向生产者队列A写入数据,当队列中有数据时,进行数据的交换,交换开始启动时通过条件变量通知交换线程来处理最先的数据交换。
2)数据交换完成后,通过条件变量通知消费者处理数据,此时交换线程阻塞到消费者数据处理完成时通知的条件变量上。
3)消费者收到数据交换后的通知后,进行数据的处理,数据处理完成后,通知交换线程进行下一轮的双缓冲区的数据交换。
要点:
生产者除了在数据交换时,其余时刻都在不停的生产数据。
数据交换队列需要等待消费者处理数据完成的通知,以进行下一轮交换。
消费者处理数据时,不进行数据交换,生产者同时会不断的生产数据,消费者需要等待数据交换完成的通知,并且发送消费完成的通知给交换线程
1 typedef struct Mutex_Condition{ 2 std::mutex mt; 3 std::condition_variable cv; 4 }Mutex_Condition; 5 6 class Produce_1 { 7 public: 8 Produce_1(std::queue<int> * que_1, std::queue<int> * que_2, Mutex_Condition * mc_1 , Mutex_Condition * mc_2) { 9 m_read_que = que_1; 10 m_writer_que = que_2; 11 m_read_mc = mc_1; 12 m_writer_mc = mc_2; 13 m_stop = false; 14 15 } 16 void runProduce() { 17 while (!m_stop) { 18 std::this_thread::sleep_for(std::chrono::microseconds(20 * 1000)); 19 std::lock_guard<std::mutex> lgd(m_writer_mc->mt); 20 m_writer_que->push(1); 21 m_writer_mc->cv.notify_one(); 22 std::cout << "m_writer push" << std::endl; 23 } 24 25 } 26 void join() { 27 m_trd->join(); 28 m_trd.reset(); 29 } 30 void start() { 31 m_trd.reset(new std::thread(std::bind(std::mem_fun(&Produce_1::runProduce), this))); 32 } 33 void stop() { 34 m_stop = true; 35 } 36 private: 37 Mutex_Condition * m_read_mc; 38 Mutex_Condition * m_writer_mc; 39 std::queue<int> * m_read_que; 40 std::queue<int> * m_writer_que; 41 volatile bool m_stop; 42 std::shared_ptr<std::thread> m_trd; 43 }; 44 45 46 class Consume_1 { 47 public: 48 Consume_1(std::queue<int> * que_1, std::queue<int> * que_2, Mutex_Condition * mc_1,Mutex_Condition * mc_2,Mutex_Condition * switch_mc) { 49 m_read_que = que_1; 50 m_writer_que = que_2; 51 m_read_mc = mc_1; 52 m_writer_mc = mc_2; 53 m_stop = false; 54 m_switch_mc = switch_mc; 55 } 56 57 void runConsume() { 58 while (!m_stop) { 59 while (true) { 60 std::this_thread::sleep_for(std::chrono::microseconds(20 * 1000)); 61 std::unique_lock<std::mutex> ulg(m_read_mc->mt); 62 while (m_read_que->empty()) { 63 m_read_mc->cv.wait(ulg); 64 } 65 //deal data 66 //std::lock_guard<std::mutex> ulg(m_read_mc->mt); 67 while (!m_read_que->empty()) { 68 m_read_que->pop(); 69 std::cout << "m_read_queue pop" << std::endl; 70 } 71 m_switch_mc->cv.notify_one(); 72 } 73 } 74 } 75 void join() { 76 m_trd->join(); 77 m_trd.reset(); 78 } 79 void start() { 80 m_trd.reset(new std::thread(std::bind(std::mem_fun(&Consume_1::runConsume), this))); 81 } 82 void stop() { 83 m_stop = true; 84 } 85 private: 86 Mutex_Condition * m_read_mc; 87 Mutex_Condition * m_writer_mc; 88 Mutex_Condition * m_switch_mc; 89 std::queue<int> * m_read_que; 90 std::queue<int> * m_writer_que; 91 volatile bool m_stop; 92 std::shared_ptr<std::thread> m_trd; 93 }; 94 void que_switch_trd(std::queue<int> * read_que, std::queue<int> * writer_que, Mutex_Condition * read_mc, Mutex_Condition * writer_mc,Mutex_Condition * switch_mc) { 95 while (true) { 96 std::this_thread::sleep_for(std::chrono::microseconds(20*1000)); 97 { 98 std::unique_lock<std::mutex> ulg(writer_mc->mt); 99 while (writer_que->empty()) { 100 writer_mc->cv.wait(ulg); 101 } 102 std::lock_guard<std::mutex> ulg_2(read_mc->mt); 103 std::swap(*read_que, *writer_que); 104 std::cout << "switch queue" << std::endl; 105 if (!read_que->empty()) { 106 read_mc->cv.notify_one(); 107 } 108 } 109 std::unique_lock<std::mutex> ulg_2(switch_mc->mt); 110 while (!read_que->empty()) { 111 switch_mc->cv.wait(ulg_2); 112 } 113 } 114 } 115 int main(){ 116 117 Mutex_Condition mc_1; 118 Mutex_Condition mc_2; 119 Mutex_Condition mc_3; 120 std::queue<int> que_1; 121 std::queue<int> que_2; 122 123 Produce_1 produce_1(&que_1, &que_2, &mc_1, &mc_2); 124 Consume_1 consume_1(&que_1, &que_2, &mc_1, &mc_2,&mc_3); 125 126 std::thread trd(std::bind(&que_switch_trd, &que_1, &que_2, &mc_1, &mc_2,&mc_3)); 127 produce_1.start(); 128 consume_1.start(); 129 130 produce_1.join(); 131 consume_1.join(); 132 trd.join(); 133 134 return 0; 135 }
时间: 2024-11-29 09:16:55