第四讲:消息队列处理类

#ifndef __CommonMsgHandler___
#define __CommonMsgHandler___
#include "cocos2d.h"
#include <thread>
#include <queue>
#include <iostream>
#include <google/protobuf/message.h>
#include <sigslot.h>
#include "receive_data/BaseWebSocketObject.h"
#include "common/CommonGlobal.h"
#include "receive_data/BaseHttpObject.h"

struct EventObject{
    int32_t cmd_id;
    google::protobuf::Message* protobuf_msg;
};

typedef std::queue<EventObject> MessageQueue;
typedef std::queue<BaseWebSocketObject> WebSocketMessageQueue;
typedef std::queue<BaseHttpObject*> HttpMessageQueue;

class CommonMsgHandler :public cocos2d::Node{
private:
    MessageQueue *_eventQueue;
    WebSocketMessageQueue *_websocketEventQueue;
    HttpMessageQueue* _httpEventQueue;
private:
    std::mutex mutexQuene;
    void WaitMutex() { mutexQuene.lock() ;/*pidMutExOwner = getpid();*/}
    void ClearMutex() { mutexQuene.unlock(); }
    void keepAlive();
    CommonMsgHandler();
public:
    static CommonMsgHandler* sharedMsgHandler();

    ~CommonMsgHandler();
    virtual void update(float fDelta);
    void addEvent(EventObject  eventObject);
    void addEventFirst(EventObject eventObject);

    void addWebSocketEvent(const BaseWebSocketObject& obj);

    void addHttpEvent( BaseHttpObject* obj);

    //分发通知
    sigslot::signal1<EventObject> postProtoBufEvent;
    sigslot::signal1<const BaseWebSocketObject&> postWebSocketEvent;

    sigslot::signal1<BaseHttpObject*> postHttpEvent;
};

#endif /* defined(__CommonMsgHandler___) */

.cpp

#include "CommonMsgHandler.h"
USING_NS_CC;

static CommonMsgHandler *s_CommonMsgHandler = NULL;
CommonMsgHandler* CommonMsgHandler::sharedMsgHandler()
{
    if (s_CommonMsgHandler == NULL) {
        s_CommonMsgHandler = new CommonMsgHandler();
    }
    return s_CommonMsgHandler;
}

CommonMsgHandler::CommonMsgHandler()
{
    _eventQueue = new MessageQueue;
    _websocketEventQueue = new WebSocketMessageQueue;
    _httpEventQueue = new HttpMessageQueue;
    this->onEnter();
    this->scheduleUpdate();
}

CommonMsgHandler::~CommonMsgHandler()
{
    this->unscheduleUpdate();
    delete _eventQueue;
    delete _websocketEventQueue;
    delete _httpEventQueue;
}

void CommonMsgHandler::update(float fDelta)
{
    WaitMutex();
    while(_eventQueue->size() >0 ){
        EventObject  message = _eventQueue->front();
        postProtoBufEvent.emit(message);
        if(message.protobuf_msg != nullptr){
            delete message.protobuf_msg;
        }
        _eventQueue->pop();
    }

    while(_websocketEventQueue->size() >0 ){
        BaseWebSocketObject  message = _websocketEventQueue->front();
        postWebSocketEvent.emit(message);
        _websocketEventQueue->pop();
    }

    while (_httpEventQueue->size() > 0) {
        BaseHttpObject* message = _httpEventQueue->front();
        postHttpEvent.emit(message);
        _httpEventQueue->pop();
    }
    ClearMutex();
}

void CommonMsgHandler::addEvent(EventObject eventObject)
{
    WaitMutex();
    _eventQueue->push(eventObject);
    ClearMutex();
}

void CommonMsgHandler::addEventFirst(EventObject eventObject)
{
    WaitMutex();
    MessageQueue tempQueue ;
    tempQueue.push(eventObject);
    for (int i = 0 ; i < _eventQueue->size() ; i++)
    {
        EventObject tempObject =  _eventQueue->front();
        tempQueue.push(tempObject);
        _eventQueue->pop();
    }
    *_eventQueue =tempQueue;
    ClearMutex();
}

void CommonMsgHandler::addWebSocketEvent(const BaseWebSocketObject& obj){
    WaitMutex();
    _websocketEventQueue->push(obj);
    ClearMutex();
}

void CommonMsgHandler::addHttpEvent( BaseHttpObject* obj){
    WaitMutex();
    _httpEventQueue->push(obj);
    ClearMutex();
}
时间: 2024-10-13 10:27:28

第四讲:消息队列处理类的相关文章

RabbitMqHelper 消息队列帮助类

using Newtonsoft.Json;using RabbitMQ.Client;using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks; namespace RabbitMQ_Send{ class ConfigModel { } public enum ExchangeTypeEnum { /// <summary> /

进程通信(四)—— 消息队列

消息队列使用的API与信号量.共享内存类似. 消息队列.信号量.共享内存均可用ipcs命令查看以及ipcrm删除. msgget首先向内核获取一个消息队列ID. 获取成功后,可用msgctl获取和设置队列相关信息. msgsnd用于写消息队列. msgrcv用于读消息队列. 消息队列遵循First In ,First Out规则. 下面是消息队列相关实现代码. 1 //queuewrite.cpp 2 #include <sys/types.h> 3 #include <sys/msg.

第四讲:CCSprite精灵类 -- 创建精灵

创建精灵类的6种方法: 1.最常用的方法 //获取屏幕大小 CCSize size = CCDirector::sharedDirector()->getWinSize(); CCSprite *sp1 = CCSprite::create("icon.png"); sp1->setPosition(size.width*0.2, size.height*0.7); this->addChild(sp1); 2. CCSprite *sp2 = CCSprite::c

消息队列帮助类

准备工作: 1:安装windows组件(MSMQ) 编写代码: using System; using System.Messaging; using System.Collections.Generic; using System.Text; namespace LCL.Bus { public interface IBusMessageQueue { void Clear(); List<BusMessage> GetAll(); BusMessage Receive(); BusMess

消息队列状态:struct msqid_ds

Linux的消息队列(queue)实质上是一个链表, 它有消息队列标识符(queue ID). msgget创建一个新队列或打开一个存在的队列; msgsnd向队列末端添加一条新消息; msgrcv从队列中取消息, 取消息是不一定遵循先进先出的, 也可以按消息的类型字段取消息. 1. 标识符(des)和键(key): 消息队列, 信号量和共享存储段, 都属于内核中的IPC结构, 它们都用标识符来描述. 这个标识符是一个非负整数, 与文件描述符不同的是, 创建时并不会重复利用通过删除回收的整数,

消息队列技术

消息队列技术 上周,我们举办了第二届技术沙龙,我这边主要演讲了消息队列技术的议题,现分享给大家: 在我们团队内部,随着消息应用中心(任务中心)的广泛应用,有时候我们感觉不到消息队列的存在,但这不影响消息队列在高可用.分布式.高并发架构下的核心地位. 消息队列都应用到了哪些实际的应用场景中? 一.再谈消息队列的应用场景 异步处理:例如短信通知.终端状态推送.App推送.用户注册等 数据同步:业务数据推送同步 重试补偿:记账失败重试 系统解耦:通讯上下行.终端异常监控.分布式事件中心 流量消峰:秒杀

再谈消息队列技术

上周,我们举办了第二届技术沙龙,我这边主要演讲了消息队列技术的议题,现分享给大家: 在我们团队内部,随着消息应用中心(任务中心)的广泛应用,有时候我们感觉不到消息队列的存在,但这不影响消息队列在高可用.分布式.高并发架构下的核心地位. 消息队列都应用到了哪些实际的应用场景中? 一.再谈消息队列的应用场景 异步处理:例如短信通知.终端状态推送.App推送.用户注册等 数据同步:业务数据推送同步 重试补偿:记账失败重试 系统解耦:通讯上下行.终端异常监控.分布式事件中心 流量消峰:秒杀场景下的下单处

消息队列(二)

本文是大型网站架构系列:消息队列(二),主要分享JMS消息服务,常用消息中间件(Active MQ,Rabbit MQ,Zero MQ,Kafka). 四.JMS消息服务 讲消息队列就不得不提JMS .JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准/规范,允许应用程序组件基于JavaEE平台创建.发送.接收和读取消息.它使分布式通信耦合度更低,消息服务更加可靠以及异步性. 在EJB架构中,有消息bean可以无缝的与JM消息服务集成.在J2EE架构模

大型网站架构系列:消息队列

出处:ITFLY8 网址:http://www.cnblogs.com/itfly8/p/5156155.html 一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题.实现高性能,高可用,可伸缩和最终一致性架构.是大型分布式系统不可缺少的中间件. 目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等. 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异