单生产者/单消费者 的 FIFO 无锁队列

??发现 zeromq 的 yqueue_t 模板类,其数据存储理念设计得非常妙。借这一理念,按照 STL 的泛型类 queue 的接口标准,我设计了一个线程安全的 单生产者/单消费者(单线程push/单线程pop) FIFO 队列,以此满足更为广泛的应用。

1. 数据存储理念的结构图

  • 队列的整体结构上,使用链表的方式,将多个固定长度的 chunk 串联起来;
  • 每个 chunk 则可用于存储队列所需要的元素;
  • 增加一个可交换的 chunk 单元,利于内存复用;
  • 队列使用时,支持 单个线程的 push(生产) 和 单个线程 pop(消费)的并发操作(内部并未加锁)。

2. 源码 (xspsc_queue.h)

/**
 * @file    xspsc_queue.h
 * <pre>
 * Copyright (c) 2018, Gaaagaa All rights reserved.
 *
 * 文件名称:xspsc_queue.h
 * 创建日期:2018年12月17日
 * 文件标识:
 * 文件摘要:实现线程安全的 单生产者/单消费者(single producer/single consumer) FIFO 队列。
 *
 * 特别声明:x_spsc_queue_t 的设计,主要参考了 zeromq 的 yqueue_t 模板类的数据存储理念。
 * 特别鸣谢:zeromq 开源项目,Lee哥 。
 *
 * 当前版本:1.0.0.0
 * 作    者:
 * 完成日期:2018年12月17日
 * 版本摘要:
 *
 * 历史版本:
 * 原作者  :
 * 完成日期:
 * 版本摘要:
 * </pre>
 */

#ifndef __XSPSC_QUEUE_H__
#define __XSPSC_QUEUE_H__

#include <memory>
#include <atomic>

////////////////////////////////////////////////////////////////////////////////

#ifndef ENABLE_XASSERT
#if ((defined _DEBUG) || (defined DEBUG))
#define ENABLE_XASSERT 1
#else // !((defined _DEBUG) || (defined DEBUG))
#define ENABLE_XASSERT 0
#endif // ((defined _DEBUG) || (defined DEBUG))
#endif // ENABLE_XASSERT

#ifndef XASSERT
#if ENABLE_XASSERT
#include <cassert>
#define XASSERT(xptr)    assert(xptr)
#else // !ENABLE_XASSERT
#define XASSERT(xptr)
#endif // ENABLE_XASSERT
#endif // XASSERT

////////////////////////////////////////////////////////////////////////////////
// x_spsc_queue_t

/**
 * @class x_spsc_queue_t
 * @brief 实现线程安全的 单生产者/单消费者(single producer/single consumer) FIFO 队列。
 *
 * @param [in ] _Ty    : 队列存储的元素类型。
 * @param [in ] _En    : 队列中的存储块可容纳元素的数量。
 * @param [in ] _Alloc : 元素分配器。
 */
template< typename _Ty, size_t _En, typename _Alloc = std::allocator< _Ty > >
class x_spsc_queue_t : protected _Alloc
{
    static_assert(_En >= 4, "_En size value must be greater than or equal to 4!");

    // common data types
public:
    using x_element_t = _Ty;

private:
    /**
     * @struct x_chunk_t
     * @brief  存储元素节点的连续内存块结构体。
     */
    typedef struct x_chunk_t
    {
        x_element_t * xet_array;   ///< 当前内存块中的元素节点数组
        x_chunk_t   * xprev_ptr;   ///< 指向前一内存块节点
        x_chunk_t   * xnext_ptr;   ///< 指向后一内存块节点
    } x_chunk_t;

#ifdef _MSC_VER
    using ssize_t = std::intptr_t;
#endif // _MSC_VER

    using x_chunk_ptr_t   = x_chunk_t *;
    using x_atomic_ptr_t  = std::atomic< x_chunk_ptr_t >;
    using x_atomic_size_t = std::atomic< size_t >;
    using x_allocator_t   = _Alloc;
    using x_chunk_alloc_t = typename std::allocator_traits< x_allocator_t >::template rebind_alloc< x_chunk_t >;

    // constructor/destructor
public:
    explicit x_spsc_queue_t(void)
        : m_chk_begin_ptr(nullptr)
        , m_xst_begin_pos(0)
        , m_chk_end_ptr(nullptr)
        , m_xst_end_pos(0)
        , m_chk_back_ptr(nullptr)
        , m_xst_back_pos(0)
        , m_xst_queue_size(0)
        , m_chk_swap_ptr(nullptr)
    {
        m_chk_begin_ptr = m_chk_end_ptr = alloc_chunk();
    }

    ~x_spsc_queue_t(void)
    {
        x_chunk_t * xchunk_ptr = nullptr;

        while (size() > 0)
            pop();

        while (true)
        {
            if (m_chk_begin_ptr == m_chk_end_ptr)
            {
                free_chunk(m_chk_begin_ptr);
                break;
            }

            xchunk_ptr = m_chk_begin_ptr;
            m_chk_begin_ptr = m_chk_begin_ptr->xnext_ptr;
            if (nullptr != xchunk_ptr)
                free_chunk(xchunk_ptr);
        }

        xchunk_ptr = m_chk_swap_ptr.exchange(nullptr);
        if (nullptr != xchunk_ptr)
            free_chunk(xchunk_ptr);
    }

    x_spsc_queue_t(x_spsc_queue_t && xobject) = delete;
    x_spsc_queue_t(const x_spsc_queue_t & xobject) = delete;
    x_spsc_queue_t & operator=(const x_spsc_queue_t & xobject) = delete;

    // public interfaces
public:
    /**********************************************************/
    /**
     * @brief 当前队列中的元素数量。
     */
    inline size_t size(void) const
    {
        return m_xst_queue_size;
    }

    /**********************************************************/
    /**
     * @brief 判断队列是否为空。
     */
    inline bool empty(void) const
    {
        return (0 == size());
    }

    /**********************************************************/
    /**
     * @brief 向队列尾端压入一个元素。
     */
    void push(const x_element_t & xemt_value)
    {
        x_allocator_t::construct(&m_chk_end_ptr->xet_array[m_xst_end_pos], xemt_value);

        m_chk_back_ptr = m_chk_end_ptr;
        m_xst_back_pos = m_xst_end_pos;

        m_xst_queue_size.fetch_add(1);
        move_end_pos();
    }

    /**********************************************************/
    /**
     * @brief 向队列尾端压入一个元素。
     */
    void push(x_element_t && xemt_value)
    {
        x_allocator_t::construct(&m_chk_end_ptr->xet_array[m_xst_end_pos],
                                 std::forward< x_element_t >(xemt_value));

        m_chk_back_ptr = m_chk_end_ptr;
        m_xst_back_pos = m_xst_end_pos;

        m_xst_queue_size.fetch_add(1);
        move_end_pos();
    }

    /**********************************************************/
    /**
     * @brief 从队列前端弹出一个元素。
     */
    void pop(void)
    {
        if (empty())
            return;
        m_xst_queue_size.fetch_sub(1);
        x_allocator_t::destroy(&m_chk_begin_ptr->xet_array[m_xst_begin_pos]);
        move_begin_pos();
    }

    /**********************************************************/
    /**
     * @brief 返回队列首个元素。
     */
    inline x_element_t & front(void)
    {
        XASSERT(!empty());
        return m_chk_begin_ptr->xet_array[m_xst_begin_pos];
    }

    /**********************************************************/
    /**
     * @brief 返回队列首个元素。
     */
    inline const x_element_t & front(void) const
    {
        XASSERT(!empty());
        return m_chk_begin_ptr->xet_array[m_xst_begin_pos];
    }

    /**********************************************************/
    /**
     * @brief 返回队列末端元素。
     */
    inline x_element_t & back(void)
    {
        XASSERT(!empty());
        return m_chk_back_ptr->xet_array[m_xst_back_pos];
    }

    /**********************************************************/
    /**
     * @brief 返回队列末端元素。
     */
    inline const x_element_t & back(void) const
    {
        XASSERT(!empty());
        return m_chk_back_ptr->xet_array[m_xst_back_pos];
    }

    // internal invoking
private:
    /**********************************************************/
    /**
     * @brief 申请一个存储元素节点的内存块。
     */
    x_chunk_ptr_t alloc_chunk(void)
    {
        x_chunk_alloc_t xchunk_allocator(*(x_allocator_t *)this);

        x_chunk_ptr_t xchunk_ptr = xchunk_allocator.allocate(1);
        XASSERT(nullptr != xchunk_ptr);

        if (nullptr != xchunk_ptr)
        {
            xchunk_ptr->xet_array = x_allocator_t::allocate(_En);
            XASSERT(nullptr != xchunk_ptr->xet_array);

            if (nullptr != xchunk_ptr->xet_array)
            {
                xchunk_ptr->xprev_ptr = nullptr;
                xchunk_ptr->xnext_ptr = nullptr;
            }
            else
            {
                xchunk_allocator.deallocate(xchunk_ptr, 1);
                xchunk_ptr = nullptr;
            }
        }

        return xchunk_ptr;
    }

    /**********************************************************/
    /**
     * @brief 释放一个存储元素节点的内存块。
     */
    void free_chunk(x_chunk_ptr_t xchunk_ptr)
    {
        if (nullptr != xchunk_ptr)
        {
            if (nullptr != xchunk_ptr->xet_array)
                x_allocator_t::deallocate(xchunk_ptr->xet_array, _En);

            x_chunk_alloc_t xchunk_allocator(*(x_allocator_t *)this);
            xchunk_allocator.deallocate(xchunk_ptr, 1);
        }
    }

    /**********************************************************/
    /**
     * @brief 将起始端位置向后移(该接口仅由 pop() 接口调用)。
     */
    void move_begin_pos(void)
    {
        if (++m_xst_begin_pos == _En)
        {
            x_chunk_ptr_t xchunk_ptr = m_chk_begin_ptr;
            m_chk_begin_ptr = m_chk_begin_ptr->xnext_ptr;
            XASSERT(nullptr != m_chk_begin_ptr);
            m_chk_begin_ptr->xprev_ptr = nullptr;
            m_xst_begin_pos = 0;

            xchunk_ptr = m_chk_swap_ptr.exchange(xchunk_ptr);
            if (nullptr != xchunk_ptr)
                free_chunk(xchunk_ptr);
        }
    }

    /**********************************************************/
    /**
     * @brief 将结束端位置向后移(该接口仅由 push() 接口调用)。
     */
    void move_end_pos(void)
    {
        if (++m_xst_end_pos == _En)
        {
            x_chunk_ptr_t xchunk_ptr = m_chk_swap_ptr.exchange(nullptr);
            if (nullptr != xchunk_ptr)
            {
                m_chk_end_ptr->xnext_ptr = xchunk_ptr;
                xchunk_ptr->xprev_ptr = m_chk_end_ptr;
            }
            else
            {
                m_chk_end_ptr->xnext_ptr = alloc_chunk();
                m_chk_end_ptr->xnext_ptr->xprev_ptr = m_chk_end_ptr;
            }

            m_chk_end_ptr = m_chk_end_ptr->xnext_ptr;
            m_xst_end_pos = 0;
        }
    }

    // data members
protected:
    x_chunk_ptr_t    m_chk_begin_ptr;  ///< 内存块链表的起始块
    ssize_t          m_xst_begin_pos;  ///< 队列中的首个元素位置
    x_chunk_ptr_t    m_chk_end_ptr;    ///< 内存块链表的结束块
    ssize_t          m_xst_end_pos;    ///< 队列中的元素结束位置
    x_chunk_ptr_t    m_chk_back_ptr;   ///< 内存块链表的结尾块
    ssize_t          m_xst_back_pos;   ///< 队列中的结尾元素位置
    x_atomic_size_t  m_xst_queue_size; ///< 队列中的有效元素数量
    x_atomic_ptr_t   m_chk_swap_ptr;   ///< 用于保存临时内存块(备用缓存块)
};

////////////////////////////////////////////////////////////////////////////////

#endif // __XSPSC_QUEUE_H__

3. 使用示例

/**
 * @file    main.cpp
 * <pre>
 * Copyright (c) 2019, Gaaagaa All rights reserved.
 *
 * 文件名称:main.cpp
 * 创建日期:2019年02月07日
 * 文件标识:
 * 文件摘要:单生产者/单消费者(single producer/single consumer)FIFO 队列 的测试程序。
 *
 * 当前版本:1.0.0.0
 * 作    者:
 * 完成日期:2019年02月07日
 * 版本摘要:
 *
 * 取代版本:
 * 原作者  :
 * 完成日期:
 * 版本摘要:
 * </pre>
 */

#include "xspsc_queue.h"
#include <iostream>
#include <thread>
#include <chrono>

#include <list>

////////////////////////////////////////////////////////////////////////////////

int main(int argc, char * argv[])
{
    using x_int_queue_t = x_spsc_queue_t< int, 8 >;

    x_int_queue_t spsc;

    std::cout << "sizeof(x_int_queue_t) : " << sizeof(x_int_queue_t) << std::endl;

    bool b_push_finished = false;
    std::thread xthread_in([&spsc, &b_push_finished](void) -> void
    {
        for (int i = 1; i < 10000; ++i)
        {
            spsc.push(i);
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
        }

        b_push_finished = true;
    });

    std::thread xthread_out([&spsc, &b_push_finished](void) -> void
    {
        while (true)
        {
            if (!spsc.empty())
            {
                std::cout << spsc.size() << " : " << spsc.front() << std::endl;
                spsc.pop();
                std::this_thread::sleep_for(std::chrono::milliseconds(1));
            }
            else if (b_push_finished)
            {
                break;
            }
        }
    });

    if (xthread_in.joinable())
    {
        xthread_in.join();
    }

    if (xthread_out.joinable())
    {
        xthread_out.join();
    }

    return 0;
}

原文地址:https://www.cnblogs.com/VxGaaagaa/p/11110492.html

时间: 2024-10-08 10:14:03

单生产者/单消费者 的 FIFO 无锁队列的相关文章

C++11 —— 单生产者/单消费者 的 FIFO 无锁队列

??发现 zeromq 的 yqueue_t 模板类,其数据存储理念设计得非常妙.借这一理念,按照 STL 的泛型类 queue 的接口标准,我设计了一个线程安全的 单生产者/单消费者(单线程push/单线程pop) FIFO 队列,以此满足更为广泛的应用. 1. 数据存储理念的结构图 队列的整体结构上,使用链表的方式,将多个固定长度的 chunk 串联起来: 每个 chunk 则可用于存储队列所需要的元素: 增加一个可交换的 chunk 单元,利于内存复用: 队列使用时,支持 单个线程的 pu

并发无锁队列学习之二【单生产者单消费者】

1.前言 最近工作比较忙,加班较多,每天晚上回到家10点多了.我不知道自己还能坚持多久,既然选择了就要做到最好.写博客的少了.总觉得少了点什么,需要继续学习.今天继续上个开篇写,介绍单生产者单消费者模型的队列.根据写入队列的内容是定长还是变长,分为单生产者单消费者定长队列和单生产者单消费者变长队列两种.单生产者单消费者模型的队列操作过程是不需要进行加锁的.生产者通过写索引控制入队操作,消费者通过读索引控制出队列操作.二者相互之间对索引是独享,不存在竞争关系.如下图所示: 2.单生产者单消费者定长

并发无锁队列学习(单生产者单消费者模型)

1.引言 本文介绍单生产者单消费者模型的队列.根据写入队列的内容是定长还是变长,分为单生产者单消费者定长队列和单生产者单消费者变长队列两种.单生产者单消费者模型的队列操作过程是不需要进行加锁的.生产者通过写索引控制入队操作,消费者通过读索引控制出队列操作.二者相互之间对索引是独享,不存在竞争关系.如下图所示: 2.单生产者单消费者定长队列 这种队列要求每次入队和出队的内容是定长的,即生产者写入队列和消费者读取队列的内容大小事相同的.linux内核中的kfifo就是这种队列,提供了读和写两个索引.

单生产者-多消费者模型中遇到的问题

(1)      原始代码 最近使用单生产者-多消费者模型是遇到一个问题,以前既然都没有想到过.生产者线程的代码如下,基本功能就是接收到一个连接之后创建一个Socket对象并放到list中等待处理. void DataManager::InternalStart() { server_socket_ = new ServerSocket(); if (!server_socket_->SetAddress(NetworkUtil::GetIpAddress().c_str(), 9091)) {

disruptor 单生产者多消费者

demo1 单生产者多消费者创建. maven 依赖 <!-- https://mvnrepository.com/artifact/com.lmax/disruptor --> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.2</version> </dependency&

并发无锁队列学习(概念介绍)

1.前言 队列在计算机中非常重要的一种数据结构,尤其在操作系统中.队列典型的特征是先进先出(FIFO),符合流水线业务流程.在进程间通信.网络通信之间经常采用队列做缓存,缓解数据处理压力.结合自己在工作中遇到的队列问题,总结一下对不同场景下的队列实现.根据操作队列的场景分为:单生产者--单消费者.多生产者--单消费者.单生产者--多消费者.多生产者--多消费者四大模型.其实后面三种的队列,可以归纳为一种多对多.根据队列中数据分为:队列中的数据是定长的.队列中的数据是变长的. 2.队列操作模型 (

并发无锁队列

1.前言 队列在计算机中非常重要的一种数据结构,尤其在操作系统中.队列典型的特征是先进先出(FIFO),符合流水线业务流程.在进程间通信.网络通信之间经常采用队列做缓存,缓解数据处理压力.结合自己在工作中遇到的队列问题,总结一下对不同场景下的队列实现.根据操作队列的场景分为:单生产者--单消费者.多生产者--单消费者.单生产者--多消费者.多生产者--多消费者四大模型.其实后面三种的队列,可以归纳为一种多对多.根据队列中数据分为:队列中的数据是定长的.队列中的数据是变长的. 2.队列操作模型 (

boost 无锁队列

一哥们翻译的boost的无锁队列的官方文档 原文地址:http://blog.csdn.net/great3779/article/details/8765103 Boost_1_53_0终于迎来了久违的Boost.Lockfree模块,本着学习的心态,将其翻译如下.(原文地址:http://www.boost.org/doc/libs/1_53_0/doc/html/lockfree.html) Chapter 17. Boost.Lockfree 第17章.Boost.Lockfree Ta

lockFreeQueue 无锁队列实现与总结

无锁队列 介绍 在工程上,为了解决两个处理器交互速度不一致的问题,我们使用队列作为缓存,生产者将数据放入队列,消费者从队列中取出数据.这个时候就会出现四种情况,单生产者单消费者,多生产者单消费者,单生成者多消费者,多生产者多消费者.我们知道,多线程往往会带来数据不一致的情况,一般需要靠加锁解决问题.但是,加锁往往带来阻塞,阻塞会带来线程切换开销,在数据量大的情况下锁带来的开销是很大的,因此无锁队列实现势在必行.下面就详细讲一下每种情况的不同实现方法. 单生产者单消费者 从最简单的单生产者单消费者