#ifndef MUTEXLOCKER_H_INCLUDED #define MUTEXLOCKER_H_INCLUDED #include <pthread.h> #include <stdexcept> class MutexLocker{ public: explicit MutexLocker(pthread_mutex_t *mutex):m_mutex(mutex){ int ret = pthread_mutex_lock(m_mutex); if(ret != 0){ printf("Lock mutex failed"); throw std::logic_error("Could not lock mutex"); } } virtual ~MutexLocker(){ pthread_mutex_unlock(m_mutex); } private: pthread_mutex_t *m_mutex; }; #endif // MUTEXLOCKER_H_INCLUDED
#ifndef SAFEQUEUE_H_INCLUDED #define SAFEQUEUE_H_INCLUDED #include "MutexLocker.h" #include <pthread.h> #include <list> template <class T> class SafeQueue{ public: SafeQueue(int size = 0){ m_capacity = capacity; m_total_enqueue_count = 0; pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); pthread_mutex_init(&m_lock, &attr); pthread_mutexattr_destroy(&attr); } ~SafeQueue() { int frame_count = m_list.size(); for(int i = 0; i < frame_count; i++) { m_list.pop_front(); } pthread_mutex_destroy(&m_lock); } void SetCapacity(int capacity) { m_capacity = capacity; } // return 0 if succeed; -1 if the queue is full int Enqueue(T node) { CTMutexLocker locker(&m_lock); //pthread_mutex_lock(&m_lock); if (m_capacity > 0 && Size() >= m_capacity){ //pthread_mutex_unlock(&m_lock); return -1; // overflow } m_list.push_back(node); m_total_enqueue_count++; //pthread_mutex_unlock(&m_lock); return 0; } // dequeue a item, and save the result to the @item pointer. // return 0 if succeed, -1 if the queue is empty; int Dequeue(T *item, int reserve_len = 0) { CTMutexLocker locker(&m_lock); //*item = NULL; memset(item, 0, sizeof(T)); int total_count = m_list.size(); if(total_count == 0 || total_count < reserve_len){ return -1; } *item = m_list.front(); m_list.pop_front(); //pthread_mutex_unlock(&m_lock); return 0; } int DequeueAll(std::list<T> *out_queue, int reserve_len = 0) { CTMutexLocker locker(&m_lock); if(m_list.size() <= reserve_len){ return 0; } T item; int remove_size = m_list.size() - reserve_len; while(remove_size > 0){ item = m_list.front(); m_list.pop_front(); if (out_queue){ out_queue->push_back(item); } remove_size--; } return 0; } int Front(T *item) { CTMutexLocker locker(&m_lock); if(m_list.size() == 0){ return -1; } *item = m_list.front(); return 0; } int Back(T *item) { CTMutexLocker locker(&m_lock); if(Size() == 0){ return -1; } *item = m_list.back(); return 0; } int Size() { CTMutexLocker locker(&m_lock); return m_list.size(); } int Lock() { return pthread_mutex_lock(&m_lock); } int Unlock() { return pthread_mutex_unlock(&m_lock); } int RePushFront(T node) { CTMutexLocker locker(&m_lock); if (m_capacity > 0 && Size() >= m_capacity) return -1; // overflow m_list.push_front(node); return 0; } int PopBack() { CTMutexLocker locker(&m_lock); m_list.pop_back(); return 0; } private: std::list<T> m_list; int m_capacity; pthread_mutex_t m_lock; uint64_t m_total_enqueue_count; }; #endif // SAFEQUEUE_H_INCLUDED
时间: 2024-12-18 00:25:46