#ifndef __CommonMsgHandler___ #define __CommonMsgHandler___ #include "cocos2d.h" #include <thread> #include <queue> #include <iostream> #include <google/protobuf/message.h> #include <sigslot.h> #include "receive_data/BaseWebSocketObject.h" #include "common/CommonGlobal.h" #include "receive_data/BaseHttpObject.h" struct EventObject{ int32_t cmd_id; google::protobuf::Message* protobuf_msg; }; typedef std::queue<EventObject> MessageQueue; typedef std::queue<BaseWebSocketObject> WebSocketMessageQueue; typedef std::queue<BaseHttpObject*> HttpMessageQueue; class CommonMsgHandler :public cocos2d::Node{ private: MessageQueue *_eventQueue; WebSocketMessageQueue *_websocketEventQueue; HttpMessageQueue* _httpEventQueue; private: std::mutex mutexQuene; void WaitMutex() { mutexQuene.lock() ;/*pidMutExOwner = getpid();*/} void ClearMutex() { mutexQuene.unlock(); } void keepAlive(); CommonMsgHandler(); public: static CommonMsgHandler* sharedMsgHandler(); ~CommonMsgHandler(); virtual void update(float fDelta); void addEvent(EventObject eventObject); void addEventFirst(EventObject eventObject); void addWebSocketEvent(const BaseWebSocketObject& obj); void addHttpEvent( BaseHttpObject* obj); //分发通知 sigslot::signal1<EventObject> postProtoBufEvent; sigslot::signal1<const BaseWebSocketObject&> postWebSocketEvent; sigslot::signal1<BaseHttpObject*> postHttpEvent; }; #endif /* defined(__CommonMsgHandler___) */
.cpp
#include "CommonMsgHandler.h" USING_NS_CC; static CommonMsgHandler *s_CommonMsgHandler = NULL; CommonMsgHandler* CommonMsgHandler::sharedMsgHandler() { if (s_CommonMsgHandler == NULL) { s_CommonMsgHandler = new CommonMsgHandler(); } return s_CommonMsgHandler; } CommonMsgHandler::CommonMsgHandler() { _eventQueue = new MessageQueue; _websocketEventQueue = new WebSocketMessageQueue; _httpEventQueue = new HttpMessageQueue; this->onEnter(); this->scheduleUpdate(); } CommonMsgHandler::~CommonMsgHandler() { this->unscheduleUpdate(); delete _eventQueue; delete _websocketEventQueue; delete _httpEventQueue; } void CommonMsgHandler::update(float fDelta) { WaitMutex(); while(_eventQueue->size() >0 ){ EventObject message = _eventQueue->front(); postProtoBufEvent.emit(message); if(message.protobuf_msg != nullptr){ delete message.protobuf_msg; } _eventQueue->pop(); } while(_websocketEventQueue->size() >0 ){ BaseWebSocketObject message = _websocketEventQueue->front(); postWebSocketEvent.emit(message); _websocketEventQueue->pop(); } while (_httpEventQueue->size() > 0) { BaseHttpObject* message = _httpEventQueue->front(); postHttpEvent.emit(message); _httpEventQueue->pop(); } ClearMutex(); } void CommonMsgHandler::addEvent(EventObject eventObject) { WaitMutex(); _eventQueue->push(eventObject); ClearMutex(); } void CommonMsgHandler::addEventFirst(EventObject eventObject) { WaitMutex(); MessageQueue tempQueue ; tempQueue.push(eventObject); for (int i = 0 ; i < _eventQueue->size() ; i++) { EventObject tempObject = _eventQueue->front(); tempQueue.push(tempObject); _eventQueue->pop(); } *_eventQueue =tempQueue; ClearMutex(); } void CommonMsgHandler::addWebSocketEvent(const BaseWebSocketObject& obj){ WaitMutex(); _websocketEventQueue->push(obj); ClearMutex(); } void CommonMsgHandler::addHttpEvent( BaseHttpObject* obj){ WaitMutex(); _httpEventQueue->push(obj); ClearMutex(); }
时间: 2024-10-13 10:27:28