基于条件变量的消息队列

条件变量是线程之前同步的另一种机制。条件变量给多线程提供了一种会和的场所。当条件变量和互斥锁一起使用时,允许线程以无竞争的方式等待特定的条件发生。这样大大减少了锁竞争引起的线程调度和线程等待。

消息队列是服务器端开发过程中绕不开的一道坎,前面,我已经实现了一个基于互斥锁和三队列的消息队列,性能很不错。博客园中的其他园主也实现了很多基于环形队列和lock-free的消息队列,很不错,今天我们将要实现一个基于双缓冲、互斥锁和条件变量的消息队列;这个大概也参考了一下java的blockingqueue,在前面一个博客中有简单介绍!!基于三缓冲的队列,虽然最大限度上解除了线程竞争,但是在玩家很少,消息很小的时候,需要添加一些buff去填充数据,这大概也是其一个缺陷吧!

消息队列在服务器开发过程中主要用于什么对象呢?

1: 我想大概就是通信层和逻辑层之间的交互,通信层接受到的网络数据,验证封包之后,通过消息队列传递给逻辑层,逻辑层将处理结果封包再传递给通信层!

2:逻辑线程和数据库IO线程的分离;数据库IO线程负责对数据库的读写更新,逻辑层对数据库的操作,封装成消息去请求数据库IO线程,数据库IO线程处理完之后,再交回给逻辑层。

3:日志;处理模式与方式2 类似。不过日志大概是不需要返回的!

给出源代码:

BlockingQueue.h文件

/*

 * BlockingQueue.h

 *

 *  Created on: Apr 19, 2013

 *      Author: archy_yu

 */

#ifndef BLOCKINGQUEUE_H_

#define BLOCKINGQUEUE_H_

#include <queue>

#include <pthread.h>

typedef void* CommonItem;

class BlockingQueue

{

public:

    BlockingQueue();

    virtual ~BlockingQueue();

    int peek(CommonItem &item);

    int append(CommonItem item);

private:

    pthread_mutex_t _mutex;

    pthread_cond_t _cond;

    std::queue<CommonItem> _read_queue;

    std::queue<CommonItem> _write_queue;

};

#endif /* BLOCKINGQUEUE_H_ */

BlockingQueue.cpp 文件代码

/*

 * BlockingQueue.cpp

 *

 *  Created on: Apr 19, 2013

 *      Author: archy_yu

 */

#include "BlockingQueue.h"

BlockingQueue::BlockingQueue()

{

    pthread_mutex_init(&this->_mutex,NULL);

    pthread_cond_init(&this->_cond,NULL);

}

BlockingQueue::~BlockingQueue()

{

    pthread_mutex_destroy(&this->_mutex);

    pthread_cond_destroy(&this->_cond);

}

int BlockingQueue::peek(CommonItem &item)

{

    if( !this->_read_queue.empty() )

    {

        item = this->_read_queue.front();

        this->_read_queue.pop();

    }

    else

    {

        pthread_mutex_lock(&this->_mutex);

        while(this->_write_queue.empty())

        {

            pthread_cond_wait(&this->_cond,&this->_mutex);

        }

        while(!this->_write_queue.empty())

        {

            this->_read_queue.push(this->_write_queue.front());

            this->_write_queue.pop();

        }

        pthread_mutex_unlock(&this->_mutex);

    }

    return 0;

}

int BlockingQueue::append(CommonItem item)

{

    pthread_mutex_lock(&this->_mutex);

    this->_write_queue.push(item);

    pthread_cond_signal(&this->_cond);

    pthread_mutex_unlock(&this->_mutex);

    return 0;

}

测试代码:

BlockingQueue _queue;

void* process(void* arg)

{

    int i=0;

    while(true)

    {

        int *j = new int();

        *j = i;

        _queue.append((void *)j);

        i ++;

    }

    return NULL;

}

int main(int argc,char** argv)

{

    pthread_t pid;

    pthread_create(&pid,0,process,0);

    long long int start = get_os_system_time();

    int i = 0;

    while(true)

    {

        int* j = NULL;

        _queue.peek((void* &)j);

        i ++;

        if(j != NULL && (*j) == 100000)

        {

            long long int end = get_os_system_time();

            printf("consume %d\n",end - start);

            break;

        }

    }

    return 0;

}

欢迎拍砖!!!

时间: 2024-08-08 05:16:52

基于条件变量的消息队列的相关文章

基于线程池、消息队列和epoll模型实现Client-Server并发架构

引言 并发是什么?企业在进行产品开发过程中为什么需要考虑这个问题?想象一下天猫的双11和京东的618活动,一秒的点击量就有几十万甚至上百万,这么多请求一下子涌入到服务器,服务器需要对这么多的请求逐个进行消化掉,假如服务器一秒的处理能力就几万,那么剩下的不能及时得到处理的这些请求作何处理?总不能让用户界面一直等着,因此消息队列应运而生,所有的请求都统一放入消息队列,工作线程从消息队列不断的消费,消息队列相当于一个缓冲区,可达到解藕.异步和削峰的目的. Kafka.ActiveMQ.RabbitMQ

基于Redis实现分布式消息队列(汇总目录)

基于Redis实现分布式消息队列(1)– 缘起 http://blog.csdn.net/stationxp/article/details/45595733 基于Redis实现分布式消息队列(2)– 分布式消息队列功能设计 http://blog.csdn.net/stationxp/article/details/45596619 基于Redis实现分布式消息队列(3)– Redis功能分析 http://blog.csdn.net/stationxp/article/details/457

滴滴出行基于RocketMQ构建企业级消息队列服务的实践

小结: 1. https://mp.weixin.qq.com/s/v6NM3UgX-qTI7yO1QPCJrw 滴滴出行基于RocketMQ构建企业级消息队列服务的实践 原创: 江海挺 阿里巴巴中间件 2018-11-01 原文地址:https://www.cnblogs.com/yuanjiangw/p/10780829.html

信号,信号量,锁,条件变量,消息通信,共享内存,RPC (一)

在实际项目当中,经常需要把一个功能分成多个子模块实现.那么,这些子模块之间该如何关联起来呢?静态地看,模块可以看作一组完成相同功能的函数:而动态地看,模块可以是一个独立的进程.线程或者一个中断服务或者信号服务例程.根据不同的具体业务实现,它们之间可能是静态调用.动态互斥.同步.唤醒等关系.静态的调用很好实现,上层的函数调用底层的函数即可.那么,动态互斥.同步.唤醒等关系,又该如何实现呢?这就设计到我们将要讨论的信号.进程间消息通信.共享内存.线程互斥同步条件变量.RPC等手段.下面就按照Linu

基于Redis实现分布式消息队列(1)

1.为什么需要消息队列? 当系统中出现"生产"和"消费"的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异. 举个例子:业务系统触发短信发送申请,但短信发送模块速度跟不上,需要将来不及处理的消息暂存一下,缓冲压力. 再举个例子:调远程系统下订单成本较高,且因为网络等因素,不稳定,攒一批一起发送. 再举个栗子,交互模块5:00到24:00和电商系统联通,和内部ERP断开.1:00到4:00和ERP联通,和电商系统断开. 再举个例子,服务员点菜

基于redis的延迟消息队列设计

需求背景 用户下订单成功之后隔20分钟给用户发送上门服务通知短信 订单完成一个小时之后通知用户对上门服务进行评价 业务执行失败之后隔10分钟重试一次 类似的场景比较多 简单的处理方式就是使用定时任务 假如数据比较多的时候 有的数据可能延迟比较严重,而且越来越多的定时业务导致任务调度很繁琐不好管理. 队列设计 目前可以考虑使用rabbitmq来满足需求 但是不打算使用,因为目前太多的业务使用了另外的MQ中间件. 开发前需要考虑的问题? 及时性 消费端能按时收到 同一时间消息的消费权重 可靠性 消息

基于Redis实现分布式消息队列(3)

1.Redis是什么鬼? Redis是一个简单的,高效的,分布式的,基于内存的缓存工具. 假设好服务器后,通过网络连接(类似数据库),提供Key-Value式缓存服务. 简单,是Redis突出的特色. 简单可以保证核心功能的稳定和优异. 2.性能 性能方面:Redis是足够高效的. 和Memecached对比,在数据量较小大情况下,Redis性能更优秀. 数据量大到一定程度的时候,Memecached性能稍好. 简单结论:但总体上讲Redis性能已经足够好. // Ref: Redis性能测试

[转载] 基于Redis实现分布式消息队列

转载自http://www.linuxidc.com/Linux/2015-05/117661.htm 1.为什么需要消息队列?当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列,作为抽象层,弥合双方的差异. 举个例子:业务系统触发短信发送申请,但短信发送模块速度跟不上,需要将来不及处理的消息暂存一下,缓冲压力. 再举个例子:调远程系统下订单成本较高,且因为网络等因素,不稳定,攒一批一起发送. 再举个栗子,交互模块5:00到24:00和电商系统联通,和内部ERP断开.

基于Docker搭建分布式消息队列Kafka

本文基于Docker搭建一套单节点的Kafka消息队列,Kafka依赖Zookeeper为其管理集群信息,虽然本例不涉及集群,但是该有的组件都还是会有,典型的kafka分布式架构如下图所示.本例搭建的示例包含Zookeeper + Kafka + Kafka-manger #获取镜像 ·         zookeeper镜像:zookeeper:3.4.9 ·         kafka镜像:wurstmeister/kafka:0.10.2.0 ·         kafka-manager