C/C++ 线程同步安全队列简单实现例子

#ifndef MUTEXLOCKER_H_INCLUDED
#define MUTEXLOCKER_H_INCLUDED

#include <pthread.h>
#include <stdexcept>

class MutexLocker{
public:
    explicit MutexLocker(pthread_mutex_t *mutex):m_mutex(mutex){
        int ret = pthread_mutex_lock(m_mutex);
        if(ret != 0){
            printf("Lock mutex failed");
            throw std::logic_error("Could not lock mutex");
        }
    }
    virtual ~MutexLocker(){
        pthread_mutex_unlock(m_mutex);
    }
private:
    pthread_mutex_t *m_mutex;
};

#endif // MUTEXLOCKER_H_INCLUDED
#ifndef SAFEQUEUE_H_INCLUDED
#define SAFEQUEUE_H_INCLUDED

#include "MutexLocker.h"
#include <pthread.h>
#include <list>

template <class T>

class SafeQueue{
public:
    SafeQueue(int size = 0){
        m_capacity = capacity;
        m_total_enqueue_count = 0;

        pthread_mutexattr_t attr;
        pthread_mutexattr_init(&attr);
        pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
        pthread_mutex_init(&m_lock, &attr);
        pthread_mutexattr_destroy(&attr);
    }
    ~SafeQueue()
    {
        int frame_count = m_list.size();
        for(int i = 0; i < frame_count; i++) {
            m_list.pop_front();
        }
        pthread_mutex_destroy(&m_lock);
    }
    void SetCapacity(int capacity)
    {
        m_capacity = capacity;
    }

    // return 0 if succeed; -1 if the queue is full
    int Enqueue(T node)
    {
        CTMutexLocker locker(&m_lock);
        //pthread_mutex_lock(&m_lock);
        if (m_capacity > 0 && Size() >= m_capacity){
            //pthread_mutex_unlock(&m_lock);
            return -1; // overflow
        }
        m_list.push_back(node);
        m_total_enqueue_count++;
        //pthread_mutex_unlock(&m_lock);
        return 0;
    }

    // dequeue a item, and save the result to the @item pointer.
    // return 0 if succeed, -1 if the queue is empty;
    int Dequeue(T *item, int reserve_len = 0)
    {
        CTMutexLocker locker(&m_lock);
        //*item = NULL;
        memset(item, 0, sizeof(T));
        int total_count = m_list.size();
        if(total_count == 0 || total_count < reserve_len){
            return -1;
        }

        *item = m_list.front();
        m_list.pop_front();
        //pthread_mutex_unlock(&m_lock);
        return 0;
    }

    int DequeueAll(std::list<T> *out_queue, int reserve_len = 0)
    {
        CTMutexLocker locker(&m_lock);
        if(m_list.size() <= reserve_len){
            return 0;
        }
        T item;
        int remove_size = m_list.size() - reserve_len;
        while(remove_size > 0){
            item  = m_list.front();
            m_list.pop_front();
            if (out_queue){
               out_queue->push_back(item);
            }
            remove_size--;
        }

        return 0;
    }

    int Front(T *item)
    {
        CTMutexLocker locker(&m_lock);
        if(m_list.size() == 0){
            return -1;
        }

        *item = m_list.front();
        return 0;
    }

    int Back(T *item)
    {
        CTMutexLocker locker(&m_lock);
        if(Size() == 0){
            return -1;
        }

        *item = m_list.back();
        return 0;
    }

    int Size()
    {
        CTMutexLocker locker(&m_lock);
        return m_list.size();
    }
    int Lock()
    {
        return pthread_mutex_lock(&m_lock);
    }

    int Unlock()
    {
        return pthread_mutex_unlock(&m_lock);
    }

    int RePushFront(T node)
    {
        CTMutexLocker locker(&m_lock);

        if (m_capacity > 0 && Size() >= m_capacity)
            return -1; // overflow

        m_list.push_front(node);
        return 0;
    }

    int PopBack()
    {
        CTMutexLocker locker(&m_lock);
        m_list.pop_back();
        return 0;
    }

private:
    std::list<T>    m_list;
    int             m_capacity;
    pthread_mutex_t m_lock;
    uint64_t        m_total_enqueue_count;
};

#endif // SAFEQUEUE_H_INCLUDED
时间: 2024-12-18 00:25:46

C/C++ 线程同步安全队列简单实现例子的相关文章

linux线程同步(队列方式)

看了一些关于信号量的线程同步方式,今天用了一下. 我对于线程同步一直有疑问,在主线程和子线程处理时间不相同的时候,用这种信号量,如何保证同步. 假如主线程比较快,信号量连加了n个,但是子线程就不断减这个n,减到0.但是如果主线程太快太快,需要停一停,比如缓冲区快溢出了,主线程需要挂起. 由什么来唤醒主线程呢?子线程?不过这样的话,容易造成主线程死锁,或者主和子都卡死. 下面的程序,没有用到信号量同步,信号量只是负责开启子线程而已.主要是队列的实现而已.等我把上面的问题解决完会写上更新的程序. 队

3、传统线程同步与通信--生产消费例子

核心点: 1.锁对象必须是同一个. 2.wait()和notify()方法必须是调用锁对象的方法,而非this(线程)的. 3.在多生产多消费的时候注意使用notifyAll而不是notifyAll,否则会造成死锁 测试代码: 1 import java.util.LinkedList; 2 import java.util.Queue; 3 import java.util.Random; 4 5 /** 6 * 多个生产 - 消费 线程同步通信 7 * 核心点: 8 * 1.锁对象必须是同一

【编写高质量代码C#】建议72:在线程同步中使用信号量

1.使用信号机制提供线程同步的一个简单例子 AutoResetEvent autoResetEvent = new AutoResetEvent(false); private void button1_Click(object sender, EventArgs e) { Control.CheckForIllegalCrossThreadCalls = false; Thread tWork = new Thread(() => { label1.Text = "线程启动...&quo

C#中的几个线程同步对象方法

在编写多线程程序时无可避免会遇到线程的同步问题.什么是线程的同步呢? 举个例子:如果在一个公司里面有一个变量记录某人T的工资count=100,有两个主管A和B(即工作线程)在早一些时候拿了这个变量的值回去 ,过了一段时间A主管将T的工资加了5块,并存回count变量,而B主管将T的工资减去3块,并存回count变量.好了,本来T君可以得到102块的工资的,现在就变成98块了.这就是线程同步要解决的问题. 在.Net的某些对象里面,在读取里面的数据的同时还可以修改数据,这类的对象就是“线程安全”

C#基础:线程同步

一.前言 我们先来看下面一个例子: using System; using System.Threading; namespace ThreadSynchDemo { class Program { private static int Counter = 0; static void Main(string[] args) { Thread t1 = new Thread(() => { for (int i = 0; i < 1000; i++) { Counter++; Thread.S

一起talk C栗子吧(第一百一十六回:C语言实例--线程同步之互斥量二)

各位看官们,大家好,上一回中咱们说的是线程同步之信号量的例子,这一回咱们继续说该例子.闲话休提,言归正转.让我们一起talk C栗子吧! 我们在上一回中详细介绍了互斥量相关函数的用法,这一回中,我们介绍如何使用这些函数来操作互斥量. 下面是详细的操作步骤: 1.定义一个互斥量A,用来同步线程: 2.在创建线程的进程中使用pthread_mutex_init函数初始化互斥量,互斥量的属性使用默认值: 3.在读取数据的线程中读取数据,首先使用pthread_mutex_lock函数对互斥量A进行加锁

利用JAVA线程安全队列简单实现读者写者问题。

常见的操作系统教科书中,会使用互斥锁来实现读者线程和写者线程的同步问题,但是在JDK5推出线程安全队列之后,将该问题变得异常简单. java.util.concurrent.ConcurrentLinkedQueue 是线程安全的非阻塞队列,其实很容易想到,非阻塞队列当线程需要等待的时候,则不会阻塞等待,而是直接根据情况返回. java.util.concurrent.LinkedBlockingQueue 是线程安全的阻塞队列,该队列能够在很多情况下对线程进行阻塞,比如队列为空时调用take(

Java多线程之简单的线程同步实例

数据类: package Thread.MyCommon; public class Data { public int num = 0; public synchronized int getEven() { ++num; Thread.yield();//让另外线程先执行,加大测试效果几率 ++num; return num; } } 线程类: package Thread.MyCommon; public class myThread implements Runnable { priva

一个简单例子了解使用互斥量线程同步

在刚开始学习学习线程同步时总是认为两个线程或是多个线程共同运行,但是那样是做的. 同步就是协同步调,按预定的先后次序进行运行.如:你说完,我再说. "同"字从字面上容易理解为一起动作. 其实不是,"同"字应是指协同.协助.互相配合. 如进程.线程同步,可理解为进程或线程A和B一块配合,A执行到一定程度时要依靠B的某个结果,于是停下来,示意B运行:B依言执行,再将结果给A:A再继续操作. 所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回,同时其它