c++ 阻塞队列

阻塞队列是后台开发中多线程异步架构的基本数据结构,像python, java 都提供线程安全的阻塞队列,c++ 可能需要自己实现一个模板。

从性能考虑,自己没有使用STL的queue作为基本数据结构,而是使用循环数组作为基本数据结构,性能应该比queue高,省去了动态内存分配和回收。

确定就是,队列大小不可动态扩展,当时实际开发中,可以通过压力测试,配置合适的队列大小。

/********************************************
function: thread safe blocking queue.
author: liuyi
date: 2014.11.13
version: 2.0
********************************************/

#ifndef BLOCK_QUEUE_H
#define BLOCK_QUEUE_H

#include <iostream>
#include <pthread.h>
#include <sys/time.h>
using namespace std;

template<class T>
class block_queue
{
	public:
		block_queue(int max_size = 1000)
		{
			if(max_size <= 0)
			{
				exit(-1);
			}

			m_max_size = max_size;
			m_array = new T[max_size];
			m_size = 0;
			m_front = -1;
			m_back = -1;

			m_mutex = new pthread_mutex_t;
			m_cond = new pthread_cond_t;
			pthread_mutex_init(m_mutex, NULL);
			pthread_cond_init(m_cond, NULL);
		}

		~block_queue()
		{
			pthread_mutex_lock(m_mutex);
			if(m_array != NULL)
				delete  m_array;
			pthread_mutex_unlock(m_mutex);

			pthread_mutex_destroy(m_mutex);
			pthread_cond_destroy(m_cond);

			delete m_mutex;
			delete m_cond;
		}

		bool full()const
		{
			pthread_mutex_lock(m_mutex);
			if(m_size >= m_max_size)
			{
				pthread_mutex_unlock(m_mutex);
				return true;
			}
			pthread_mutex_unlock(m_mutex);
			return false;
		}

		bool empty()const
		{
			pthread_mutex_lock(m_mutex);
			if(0 == m_size)
			{
				pthread_mutex_unlock(m_mutex);
				return true;
			}
			pthread_mutex_unlock(m_mutex);
			return false;
		}

		T front()const
		{
			T tmp;
			pthread_mutex_lock(m_mutex);
			tmp = m_array[m_front];
			pthread_mutex_unlock(m_mutex);
			return tmp;
		}

		T back()const
		{
			T tmp;
			pthread_mutex_lock(m_mutex);
			tmp = m_array[m_back];
			pthread_mutex_unlock(m_mutex);
			return tmp;
		}

		int size()const
		{
			int tmp = 0;
			pthread_mutex_lock(m_mutex);
			tmp = m_size;
			pthread_mutex_unlock(m_mutex);
			return tmp;
		}

		int max_size()const
		{
			int tmp = 0;
			pthread_mutex_lock(m_mutex);
			tmp = m_max_size;
			pthread_mutex_unlock(m_mutex);
			return tmp;
		}

		bool push(const T& item)
		{
			pthread_mutex_lock(m_mutex);
			if(m_size >= m_max_size)
			{
				pthread_cond_broadcast(m_cond);
				pthread_mutex_unlock(m_mutex);
				return false;
			}

			m_back = (m_back + 1) % m_max_size;
			m_array[m_back] = item;

			m_size++;
			pthread_cond_broadcast(m_cond);
			pthread_mutex_unlock(m_mutex);

			return true;
		}

		bool pop(T& item)
		{
			pthread_mutex_lock(m_mutex);
			while(m_size <= 0)
			{
				if(0 != pthread_cond_wait(m_cond, m_mutex))
				{
					pthread_mutex_unlock(m_mutex);
					return false;
				}
			}

			m_front = (m_front + 1) % m_max_size;
			item = m_array[m_front];
			m_size--;
			pthread_mutex_unlock(m_mutex);
			return true;
		}

		bool pop(T& item, int ms_timeout)
		{
			struct timespec t = {0,0};
			struct timeval now = {0,0};
			gettimeofday(&now, NULL);
			pthread_mutex_lock(m_mutex);
			if(m_size <= 0)
			{
				t.tv_sec = now.tv_sec + ms_timeout/1000;
				t.tv_nsec = (ms_timeout % 1000)*1000;
				if(0 != pthread_cond_timedwait(m_cond, m_mutex, &t))
				{
					pthread_mutex_unlock(m_mutex);
					return false;
				}
			}

			m_front = (m_front + 1) % m_max_size;
			item = m_array[m_front];
			m_size--;
			pthread_mutex_unlock(m_mutex);
			return true;
		}

	private:
		pthread_mutex_t *m_mutex;
		pthread_cond_t *m_cond;
		T *m_array;
		int m_size;
		int m_max_size;
		int m_front;
		int m_back;
};

#endif

//测试程序

#include<iostream>
#include"block_queue.h"
using namespace std;

block_queue<int> g_queue(100);
void *p(void *args)
{
	sleep(1);
	int data = 0;
	for(int i = 0; i < 100; i++)
	{
		g_queue.push(data++);
	}
	return NULL;
}

void *c(void* args)
{
	while(true)
	{
		int t = 0;
		if(!g_queue.pop(t,1000))
		{
			cout<<"timeout"<<endl;
			continue;
		}
		else
		{
			cout<<t<<endl;
		}
		g_queue.pop(t);
		cout<<"block="<<t<<endl;

	}

	return NULL;
}

int main()
{
	pthread_t id;
	pthread_create(&id, NULL, p, NULL);
	//pthread_create(&id, NULL, p, NULL);
	//pthread_create(&id, NULL, c, NULL);
	pthread_create(&id, NULL, c, NULL);
	for(;;)sleep(1);
	return 0;
}
时间: 2024-11-05 10:38:42

c++ 阻塞队列的相关文章

caffe数据读取的双阻塞队列说明

caffe的datareader类中 class QueuePair { public: explicit QueuePair(int size); ~QueuePair(); BlockingQueue<T*> free_; BlockingQueue<T*> full_; DISABLE_COPY_AND_ASSIGN(QueuePair); }; 这个就是双阻塞队列,先将free队列填充到最大长度,然后按照如下规则: 1,每当生产者push时,先将full队列pop,如果fu

阻塞队列和生产者-消费者模式

阻塞队列提供了可阻塞的put和take方法.如果队列满了put将阻塞到有空间可用,如果队列为空,take将阻塞到有元素可用.队列可以是有界和无界的,无界的队列put将不会阻塞. 阻塞队列支持生产者消费者模式,该模式将找出需要完成的工作,和执行工作分开.生产者-消费者模式能简化开发过程,因为消除了生产者和消费者之间的代码依赖性,此外,该模式还将生产数据的过程和使用数据的过程解耦开来. 在基于阻塞队列构建的生产者-消费者设计中个,当数据生成时,生产者把数据放入队列,当消费者处理数据时,将从队列中获取

BlockingQueue(阻塞队列)详解

一. 前言 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全"传输"数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景. 二. 认识BlockingQueue 阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示: 从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由

Java多线程-新特征-阻塞队列ArrayBlockingQueue

阻塞队列是Java5线程新特征中的内容,Java定义了阻塞队列的接口java.util.concurrent.BlockingQueue,阻塞队列的概念是,一个指定长度的队列,如果队列满了,添加新元素的操作会被阻塞等待,直到有空位为止.同样,当队列为空时候,请求队列元素的操作同样会阻塞等待,直到有可用元素为止. 有了这样的功能,就为多线程的排队等候的模型实现开辟了便捷通道,非常有用. java.util.concurrent.BlockingQueue继承了java.util.Queue接口,可

spring线程池ThreadPoolTaskExecutor与阻塞队列BlockingQueue

一: ThreadPoolTaskExecutor是一个spring的线程池技术,查看代码可以看到这样一个字段: private ThreadPoolExecutor threadPoolExecutor; 可以发现,spring的  ThreadPoolTaskExecutor是使用的jdk中的java.util.concurrent.ThreadPoolExecutor进行实现, 直接看代码: @Override protected ExecutorService initializeExe

深入剖析java并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue

关联文章: 深入理解Java类型信息(Class对象)与反射机制 深入理解Java枚举类型(enum) 深入理解Java注解类型(@Annotation) 深入理解Java类加载器(ClassLoader) 深入理解Java并发之synchronized实现原理 Java并发编程-无锁CAS与Unsafe类及其并发包Atomic 深入理解Java内存模型(JMM)及volatile关键字 剖析基于并发AQS的重入锁(ReetrantLock)及其Condition实现原理 剖析基于并发AQS的共

Java里的阻塞队列

JDK7提供了7个阻塞队列,如下: ArrayBlockingQueue  : 一个数组结构组成的有界阻塞队列. LinkedBlockingQueue : 一个由链表结构组成的有界阻塞队列 . PriorityBlockingQueue : 一个支持优先级排序的无界阻塞队列 . DelayQueue : 一个使用优先级队列实现的无界阻塞队列 . SynchronousQueue : 一个不存储元素的阻塞队列 . LinkedTransferQueue : 一个由链表结构组成的无界阻塞队列 .

9.并发包非阻塞队列ConcurrentLinkedQueue

jdk1.7.0_79  队列是一种非常常用的数据结构,一进一出,先进先出. 在Java并发包中提供了两种类型的队列,非阻塞队列与阻塞队列,当然它们都是线程安全的,无需担心在多线程并发环境所带来的不可预知的问题.为什么会有非阻塞和阻塞之分呢?这里的非阻塞与阻塞在于有界与否,也就是在初始化时有没有给它一个默认的容量大小,对于阻塞有界队列来讲,如果队列满了的话,则任何线程都会阻塞不能进行入队操作,反之队列为空的话,则任何线程都不能进行出队操作.而对于非阻塞无界队列来讲则不会出现队列满或者队列空的情况

LinkedBlockingQueue(lbq)阻塞队列

最近开发中,经常使用这个类LinkedBlockingQueue,它是BlockingQueue这个子类. 并发库中的BlockingQueue是一个比较好玩的类,顾名思义,就是阻塞队列.该类主要提供了两个方法put(),offer()和take(),poll(),前者将一个对象放 到队列尾部,如果队列已经满了,就等待直到有空闲节点:后者从head取一个对象,如果没有对象,就等待直到有可取的对象. 反正都是开发中常用的.记哈

Michael-Scott非阻塞队列(lock-free)算法的C实现

Michael-Scott非阻塞队列算法,即MS-queue算法,是1 9 9 6 年由Maged . M .Michael and M. L. Scott提出的,是最为经典的并发FIFO队列上的算法,目前很多对并发FIFO队列的研究都是基于这个算法来加以改进的.在共享内存的多核处理器上,这种基于Compare-and-swap(CAS)的算法在性能上要远远优于以前基于锁的算法,并且已经被Java并发包所采用.它的主要特点在于允许多线程并发的.无干扰的访问队列的头和尾. MS-queue算法依赖