Boost lockfree deque 生产者与消费者多对多线程应用

  boost库中有一个boost::lockfree::queue类型的 队列,对于一般的需要队列的程序,其效率都算不错的了,下面使用一个用例来说明。

  程序是一个典型的生产者与消费者的关系,都可以使用多线程,其效率要比使用上层的互斥锁要快很多,因为它直接使用底层的原子操作来进行同步数据的。

  freedeque.h

  1 #pragma once#ifndef INCLUDED_UTILS_LFRINGQUEUE
  2 #define INCLUDED_UTILS_LFRINGQUEUE
  3
  4 #define _ENABLE_ATOMIC_ALIGNMENT_FIX
  5 #define ATOMIC_FLAG_INIT 0
  6
  7
  8 #pragma once
  9
 10
 11 #include <vector>
 12 #include <mutex>
 13 #include <thread>
 14 #include <atomic>
 15 #include <chrono>
 16 #include <cstring>
 17 #include <iostream>
 18
 19 // Lock free ring queue
 20
 21 template < typename _TyData, long _uiCount = 100000 >
 22 class lfringqueue
 23 {
 24 public:
 25     lfringqueue(long uiCount = _uiCount) : m_lTailIterator(0), m_lHeadIterator(0), m_uiCount(uiCount)
 26     {
 27         m_queue = new _TyData*[m_uiCount];
 28         memset(m_queue, 0, sizeof(_TyData*) * m_uiCount);
 29     }
 30
 31     ~lfringqueue()
 32     {
 33         if (m_queue)
 34             delete[] m_queue;
 35     }
 36
 37     bool enqueue(_TyData *pdata, unsigned int uiRetries = 1000)
 38     {
 39         if (NULL == pdata)
 40         {
 41             // Null enqueues are not allowed
 42             return false;
 43         }
 44
 45         unsigned int uiCurrRetries = 0;
 46         while (uiCurrRetries < uiRetries)
 47         {
 48             // Release fence in order to prevent memory reordering
 49             // of any read or write with following write
 50             std::atomic_thread_fence(std::memory_order_release);
 51
 52             long lHeadIterator = m_lHeadIterator;
 53
 54             if (NULL == m_queue[lHeadIterator])
 55             {
 56                 long lHeadIteratorOrig = lHeadIterator;
 57
 58                 ++lHeadIterator;
 59                 if (lHeadIterator >= m_uiCount)
 60                     lHeadIterator = 0;
 61
 62                 // Don‘t worry if this CAS fails.  It only means some thread else has
 63                 // already inserted an item and set it.
 64                 if (std::atomic_compare_exchange_strong(&m_lHeadIterator, &lHeadIteratorOrig, lHeadIterator))
 65                 {
 66                     // void* are always atomic (you wont set a partial pointer).
 67                     m_queue[lHeadIteratorOrig] = pdata;
 68
 69                     if (m_lEventSet.test_and_set())
 70                     {
 71                         m_bHasItem.test_and_set();
 72                     }
 73                     return true;
 74                 }
 75             }
 76             else
 77             {
 78                 // The queue is full.  Spin a few times to check to see if an item is popped off.
 79                 ++uiCurrRetries;
 80             }
 81         }
 82         return false;
 83     }
 84
 85     bool dequeue(_TyData **ppdata)
 86     {
 87         if (!ppdata)
 88         {
 89             // Null dequeues are not allowed!
 90             return false;
 91         }
 92
 93         bool bDone = false;
 94         bool bCheckQueue = true;
 95
 96         while (!bDone)
 97         {
 98             // Acquire fence in order to prevent memory reordering
 99             // of any read or write with following read
100             std::atomic_thread_fence(std::memory_order_acquire);
101             //MemoryBarrier();
102             long lTailIterator = m_lTailIterator;
103             _TyData *pdata = m_queue[lTailIterator];
104             //volatile _TyData *pdata = m_queue[lTailIterator];
105             if (NULL != pdata)
106             {
107                 bCheckQueue = true;
108                 long lTailIteratorOrig = lTailIterator;
109
110                 ++lTailIterator;
111                 if (lTailIterator >= m_uiCount)
112                     lTailIterator = 0;
113
114                 //if ( lTailIteratorOrig == atomic_cas( (volatile long*)&m_lTailIterator, lTailIterator, lTailIteratorOrig ))
115                 if (std::atomic_compare_exchange_strong(&m_lTailIterator, &lTailIteratorOrig, lTailIterator))
116                 {
117                     // Sets of sizeof(void*) are always atomic (you wont set a partial pointer).
118                     m_queue[lTailIteratorOrig] = NULL;
119
120                     // Gets of sizeof(void*) are always atomic (you wont get a partial pointer).
121                     *ppdata = (_TyData*)pdata;
122
123                     return true;
124                 }
125             }
126             else
127             {
128                 bDone = true;
129                 m_lEventSet.clear();
130             }
131         }
132         *ppdata = NULL;
133         return false;
134     }
135
136
137     long countguess() const
138     {
139         long lCount = trycount();
140
141         if (0 != lCount)
142             return lCount;
143
144         // If the queue is full then the item right before the tail item will be valid.  If it
145         // is empty then the item should be set to NULL.
146         long lLastInsert = m_lTailIterator - 1;
147         if (lLastInsert < 0)
148             lLastInsert = m_uiCount - 1;
149
150         _TyData *pdata = m_queue[lLastInsert];
151         if (pdata != NULL)
152             return m_uiCount;
153
154         return 0;
155     }
156
157     long getmaxsize() const
158     {
159         return m_uiCount;
160     }
161
162     bool HasItem()
163     {
164         return m_bHasItem.test_and_set();
165     }
166
167     void SetItemFlagBack()
168     {
169         m_bHasItem.clear();
170     }
171
172 private:
173     long trycount() const
174     {
175         long lHeadIterator = m_lHeadIterator;
176         long lTailIterator = m_lTailIterator;
177
178         if (lTailIterator > lHeadIterator)
179             return m_uiCount - lTailIterator + lHeadIterator;
180
181         // This has a bug where it returns 0 if the queue is full.
182         return lHeadIterator - lTailIterator;
183     }
184
185 private:
186     std::atomic<long> m_lHeadIterator;  // enqueue index
187     std::atomic<long> m_lTailIterator;  // dequeue index
188     _TyData **m_queue;                  // array of pointers to the data
189     long m_uiCount;                     // size of the array
190     std::atomic_flag m_lEventSet = ATOMIC_FLAG_INIT;       // a flag to use whether we should change the item flag
191     std::atomic_flag m_bHasItem = ATOMIC_FLAG_INIT;        // a flag to indicate whether there is an item enqueued
192 };
193
194 #endif //INCLUDED_UTILS_LFRINGQUEUE  

  

/*
* File:   main.cpp
* Author: Peng
*
* Created on February 22, 2014, 9:55 PM
*/
#include <iostream>
#include <string>
#include "freedeque.h"
#include <sstream>
#include <boost/thread/thread.hpp>
#include <boost/lockfree/queue.hpp>
#include <boost/atomic.hpp>
#include<boost/thread/lock_guard.hpp>
#include<boost/thread/mutex.hpp>
#include<boost/date_time/posix_time/posix_time.hpp>

const int NUM_ENQUEUE_THREAD = 5;
const int NUM_DEQUEUE_THREAD = 10;
const long NUM_ITEM = 50000;
const long NUM_DATA = NUM_ENQUEUE_THREAD * NUM_ITEM;

class Data {
public:
	Data(int i = 0) : m_iData(i)
	{
		std::stringstream ss;
		ss << i;
		m_szDataString = ss.str();
	}

	bool operator< (const Data & aData) const
	{
		if (m_iData < aData.m_iData)
			return true;
		else
			return false;
	}

	int& GetData()
	{
		return m_iData;
	}
private:
	int m_iData;
	std::string m_szDataString;
};

Data* g_arrData = new Data[NUM_DATA];
boost::mutex mtx;

constexpr long size = 0.5 * NUM_DATA;
lfringqueue < Data, 10000> LockFreeQueue;
boost::lockfree::queue<Data*> BoostQueue(10000);

bool GenerateRandomNumber_FindPointerToTheNumber_EnQueue(int n)
{
	for (long i = 0; i < NUM_ITEM; i++)
	{
		int x = i + NUM_ITEM * n;
		Data* pData = g_arrData + x;
		LockFreeQueue.enqueue(pData);
	}
	return true;
}

void print(Data* pData) {
	if (!pData)
		return;

	boost::lock_guard<boost::mutex> lock(mtx);

	std::cout << pData->GetData() << std::endl;

}

bool Dequeue()
{
	Data *pData = NULL;

	while (true)
	{
		if (LockFreeQueue.dequeue(&pData) && pData)
		{
			print(pData);
		}
		else {
			boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(5));
		}
	}

	return true;
}

int main(int argc, char** argv)
{
	for (int i = 0; i < NUM_DATA; ++i)
	{
		Data data(i);
		//DataArray[i] = data;
		*(g_arrData + i) = data;
	}

	std::thread PublishThread[NUM_ENQUEUE_THREAD];
	std::thread ConsumerThread[NUM_DEQUEUE_THREAD];
	std::chrono::duration<double> elapsed_seconds;

	for (int i = 0; i < NUM_ENQUEUE_THREAD; i++)
	{
		PublishThread[i] = std::thread(GenerateRandomNumber_FindPointerToTheNumber_EnQueue, i);
	}

	for (int i = 0; i < NUM_DEQUEUE_THREAD; i++)
	{
		ConsumerThread[i] = std::thread{ Dequeue };
	}

	for (int i = 0; i < NUM_DEQUEUE_THREAD; i++)
	{
		ConsumerThread[i].join();
	}

	for (int i = 0; i < NUM_ENQUEUE_THREAD; i++)
	{
		PublishThread[i].join();
	}

	delete[] g_arrData;
	return 0;
}

  说明:模板文件是原作者写的,为了验证其正确性,后面的测试程序我改写了一下,最后测试程序是无法退出来的,这里只是测试,没有进一步完善了。

  在测试中发现deque应该是大小限制的,再增大data的数据程序会阻塞在某个地方没有进一步再查找原因了,以后有时候再做修改,对于一般的工程都够用了。

原文地址:https://www.cnblogs.com/hbright/p/9508032.html

时间: 2024-11-02 11:58:42

Boost lockfree deque 生产者与消费者多对多线程应用的相关文章

java-Runnable加锁实现生产者和消费者的多线程问题

案例: 有一家商品售卖机构,只有一名生产者,两名消费者,请采用多线程的方式对这个案例进行实现. //库存函数,保存着库存的信息Storage.java public class Storage { //模拟库存 public Integer num=1; } //生产者函数 product.java /************************************************************    Copyright (C), 1988-1999, Huawei T

练习生产者与消费者-PYTHON多线程中的条件变量同步

以前练习过,但好久不用,手生,概念也生了, 重温一下.. URL: http://www.cnblogs.com/holbrook/tag/%E5%A4%9A%E7%BA%BF%E7%A8%8B/ ~~~~~~~ 互斥锁是最简单的线程同步机制,Python提供的Condition对象提供了对复杂线程同步问题的支持.Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法.线程首先acquire一个条件变量,然后判断一些条件

boost::lockfree::queue多线程读写实例

最近的任务是写一个多线程的东西,就得接触多线程队列了,我反正是没学过分布式的,代码全凭感觉写出来的,不过运气好,代码能够work= = 话不多说,直接给代码吧,一个多消费者,多生产者的模式.假设我的任务是求队列的中位数是啥,每消费10000次的时候,我要知道中位数是什么. 至于加不加锁,这个看你了,我反正是加了,代码里面没写--我反正是把写的代码单独封装了一个函数,然后加了个锁 欢迎交流,这个代码已经在实际任务上面上线了,希望不会有bug. 用的是boost::lockfree::queue,官

多线程操作实例——生产者与消费者

面对多线程学习生产者与消费者是最基本的实例 对于java后端开发的人员必须要掌握,还有考研考试计算机操作系统的同鞋. 下面是三个实例对于生产者与消费者的的例子,层层递进,逐步解决问题. 问题:生产者——设置信息名字name,和内容content 消费者——负责取出设置的信息. 一.基本实现 由于线程的不确定性可能出现以下问题: (1)消费者取出的信息不匹配,即不是由同一个生产者设置的信息 (2)生产者生产了多个信息,消费者才开始取出信息,或消费者取出的重复的信息. 上面的问题下面会逐一解决,下面

多线程生产者、消费者模式中,如何停止消费者?多生产者情况下对“毒丸”策略的应用。

生产者.消费者模式是多线程中的经典问题.通过中间的缓冲队列,使得生产者和消费者的速度可以相互调节. 对于比较常见的单生产者.多消费者的情况,主要有以下两种策略: 通过volatile boolean producerDone =false 来标示是否完成.生产者结束后标示为true, 消费者轮询这个变量来决定自己是否退出. 这种方式对producerDone产生比较大的争用,实现起来也有诸多问题需要考虑. 比较经典的"毒丸"策略,生产者结束后,把一个特别的对象:"毒丸&quo

多线程-线程间通信-多生产者多消费者示例

1.多线程-线程间通信-多生产者多消费者问题 多生产者和多消费者.等待唤醒机制. 产生了两个问题: 1.出现了多次连续生产,未消费,或者一个商品被消费多次. 解决:必须要--------每一个被唤醒的线程判断一次标记,所以将if判断改为while判断. 2.出现了死锁. 本方唤醒了本方,导致了所有的线程都等待了. 解决方式就是:唤醒所有等待的线程.这样既唤醒了本方也唤醒对方. 虽然解决了多生产消费的问题,但是有些低效. 解决方法一: 唤醒所有等待的线程 class Resource{     p

java多线程中的生产者与消费者之等待唤醒机制@Version1.0

一.生产者消费者模式的学生类成员变量生产与消费demo,第一版1.等待唤醒:    Object类中提供了三个方法:    wait():等待    notify():唤醒单个线程    notifyAll():唤醒所有线程2.为什么这些方法不定义在Thread类中呢?  这些方法的调用必须通过锁对象调用,而我们刚才使用的锁对象是任意锁对象.  所以,这些方法必须定义在Object类中.3.当我们在使用多线程的时候有的时候需要,一条线程产生一个数据,另一条线程接着消费一个数据,一边生产一边消费,

Linux多线程机制(生产者和消费者实例 )

    使用多线程的理由之一是和进程相比,它是一种非常"节俭"的多任务操作方式.我们知道,在Linux系统下,启动一个新的进程必须分配给它独立 的地址空间,建立众多的数据表来维护它的代码段.堆栈段和数据段,这是一种"昂贵"的多任务工作方式.而运行于一个进程中的多个线程,它们彼此之间使用相 同的地址空间,共享大部分数据,启动一个线程所花费的空间远远小于启动一个进程所花费的空间,而且,线程间彼此切换所需的时间也远远小于进程间切换所需要 的时间. 使用多线程的理由之二是线

JAVA基础再回首(二十五)——Lock锁的使用、死锁问题、多线程生产者和消费者、线程池、匿名内部类使用多线程、定时器、面试题

JAVA基础再回首(二十五)--Lock锁的使用.死锁问题.多线程生产者和消费者.线程池.匿名内部类使用多线程.定时器.面试题 版权声明:转载必须注明本文转自程序员杜鹏程的博客:http://blog.csdn.net/m366917 我们来继续学习多线程 Lock锁的使用 虽然我们可以理解同步代码块和同步方法的锁对象问题,但是我们并没有直接看到在哪里加上了锁,在哪里释放了锁,为了更清晰的表达如何加锁和释放锁,JDK5以后提供了一个新的锁对象Lock Lock void lock():获取锁 v