基于消息机制的异步架构之对消息队列的处理

/*

*   handle.h

*/

#ifndef HANDLE_H_

#define HANDLE_H_

#include "msgqueue.h"

typedef struct HANDLER{

int send_sock;

char send_ip[128];

uint16 send_port;

int ind;

pthread_t  thread_id;

//WORKER* father_thread;

MSG_QUEUE* qmsg;

}HANDLER;

HANDLER* init_handle();

int start_handle(HANDLER* sender);

#endif

/*

*handle.c

*

*/

#include "handle.h"

#include "network.h"

HANDLER* init_handle()

{

int len = sizeof(HANDLER);

HANDLER* handler= (HANDLER*) malloc(len);

if (handler == NULL) {

fprintf(stderr, "init_handlers malloc error, errno: %d %m\n", errno);

return NULL;

}

memset(handler, 0, len);

//为线程创建对应的消息队列

MSG_QUEUE* msg_queue =create_msg_queue();

if(msg_queue==NULL){

free(handler);

fprintf(stderr, "create msg queue error, errno: %d %m\n", errno);

return NULL;

}

handler->qmsg=msg_queue;

//worker->qmsg=msg_queue;

handler->send_port=conf->center_port;

strcpy(handler->send_ip,conf->center_ip);

return handler;

}

static int create_sock(const char *ip, short port){

int    sockfd=0;

if((sockfd=socket(AF_INET,SOCK_STREAM,0))<0){

herror("Init socket error!");

return -1;

}

struct sockaddr_in addr;

socklen_t addrlen = sizeof(addr);

memset(&addr, 0, addrlen);

addr.sin_family = AF_INET;

addr.sin_addr.s_addr = ip == NULL ? INADDR_ANY : inet_addr(ip);

addr.sin_port = htons(port);

if(connect(sockfd,(struct sockaddr*)&addr,sizeof(addr))<0){

//fprintf(stderr, "connect the center server error, errno: %d %m\n", errno);

return -1;//0表示成功,-1表示失败

}else{

printf("handler socket create success!\n");

return sockfd;

}

}

static void *handle_thread_run(void *arg)

{

HANDLER *handler= (HANDLER*) arg;

int sock_fd=create_sock(NULL,handler->send_port);//ip 暂时设为本地

if(sock_fd<0){

fprintf(stderr, "connect the center server error, errno: %d %m\n", errno);

}else{

handler->send_sock=sock_fd;

printf("connect the center server success!handler _id:%d\n",sock_fd);

}

//char msg_buf[2048];

while(1){

//send(sock_fd,"test",5,0);

//bzero(msg_buf,2048);

if(handler->qmsg->size>0){

MSG *tmp_msg;

tmp_msg=malloc(sizeof(MSG)); /*申请新结点*/

//得到消息数组指针

if(get_msg(handler->qmsg,tmp_msg)<0){

fprintf(stderr, "get msg from msg_queue error, errno: %d %m\n", errno);

free(tmp_msg);

continue;

}else{

printf("get a msg success!\n");

//handle the protocol CMD

uint16 msg_id=0;

memcpy(&msg_id,tmp_msg->buf,2);

msg_id=ntohs(msg_id);

printf("recv id %d.\n",msg_id);

//处理指令应答

if(tmp_msg->c->rsp_status==1){

//判断消息是否是平台等待终端应答的消息

if(msg_id==tmp_msg->c->waiting_cmd_rsp){

tmp_msg->c->rsp_status=2;

printf("command zd response success.rsp_id:%x\n",msg_id);

}

}

//执行协议指令

if (msg_id < g_akg_connected_id &&AKG_FUNC[msg_id]) {

if (AKG_FUNC[msg_id](tmp_msg)==0) {

//执行处理函数成功,释放tmp_msg

free(tmp_msg);

}

} else {

fprintf(stderr, "invalid msg_id:%x.\n", msg_id);

continue;

}

}

}

}

return NULL;

}

int start_handle(HANDLER* handler){

pthread_t thread_id;

if(pthread_create(&thread_id, NULL, handle_thread_run,handler) != 0) {

fprintf(stderr, "start_handle create thread error, errno: %d %m\n", errno);

return -1;

}

handler->thread_id=thread_id;

return 0;

}

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-07-29 18:41:49

基于消息机制的异步架构之对消息队列的处理的相关文章

基于消息机制的异步架构之回调函数注册

/* * akg.h * 业务逻辑注册 * */ #ifndef AKG_H_ #define AKG_H_ #include "conn.h" #include "msgqueue.h" #define MAX_PKG_TYPE (0xffff) extern const uint16 g_akg_connected_id; extern const uint16 g_akg_timeout_id; extern const uint16 g_akg_closed

Win32窗口消息机制 x Android消息机制 x 异步执行

如果你开发过Win32窗口程序,那么当你看到android代码到处都有的mHandler.sendEmptyMessage和 private final Handler mHandler = new Handler() { @Override public void handleMessage(Message msg) { switch (msg.what) { case MSG_REPORT_PRIMARY_CLIP_CHANGED: reportPrimaryClipChanged(); }

【原创】源码角度分析Android的消息机制系列(一)——Android消息机制概述

ι 版权声明:本文为博主原创文章,未经博主允许不得转载. 1.为什么需要Android的消息机制 因为Android系统不允许在子线程中去访问UI,即Android系统不允许在子线程中更新UI. 为什么不允许在子线程中更新UI呢?因为Android的控件不是线程安全的.既然是非线程安全的,那么若在多个子线程中并发访问,UI控制可能会处于一种不可预期的状态.有的读者可能会说,为什么不对UI控件加锁呢?加锁会降低UI访问的效率,因为加锁之后,若想要运行这段synchronized的代码,线程要先拿到

Android消息机制1-Handler(Java层)(转)

转自:http://gityuan.com/2015/12/26/handler-message-framework/ 相关源码 framework/base/core/java/andorid/os/Handler.java framework/base/core/java/andorid/os/Looper.java framework/base/core/java/andorid/os/Message.java framework/base/core/java/andorid/os/Mes

Android Handler消息机制

在上一篇文章<Android AsyncTask异步任务>中我们介绍了如何使用AsyncTask异步处理网络通信和UI更新.在本文中将使用Handler消息机制来异步处理网络通信和UI更新. Google参考了Windows的消息机制,在Android系统中实现了一套类似的消息机制.学习Android的消息机制,有几个概念(类)必须了解: 1.Message 消息,理解为线程间通讯的数据单元.例如后台线程在处理数据完毕后需要更新UI,则可发送一条包含更新信息的Message给UI线程. 2.M

JavaScript消息机制入门篇

JavaScript这个语言本身就是建立在一种消息机制上的,所以它很容易处理异步回调和各种事件.这个概念与普通的编程语言基础是不同的,所以让很多刚接触JavaScript的人摸不着头脑.JavaScript就是通过消息来实现多个事务同时处理,不要把自己吊死在一个消息中. 经常会看到这样的问题“JavaScript有sleep函数吗?”.“如何让上面的函数执行完后再执行下面的函数?”,就是因为没有理解消息机制才会发出这样的疑问.JavaScript是单线程的,要是有sleep函数,那在sleep的

MFC框架剖析和消息机制

即便是基于MFC的应用程序,建立窗口类也是会遵循如下的过程: 设计窗口类->注册窗口类->生成窗口->显示窗口->更新窗口->消息循环->消息路由到窗口过程函数处理.下面就剖析一下在MFC中是如何完成上述过程的. (1)每个应用程序都有且仅有一个应用类的全局变量theApp,全局变量先于WinMain函数进行处理. (2)WinMain函数体在APPMODUL.CPP文件中,定义如下: extern "C" int WINAPI _tWinMain(

编程思想之消息机制

编程思想之消息机制 消息机制 从一个剧情开始 路遥的<平凡的世界>因为翻拍成电视剧,又再次火起来了!我们就从这里开始吧,其小说是以这样一个场景开头的: 在一个半山腰县立高中的大院坝里,在一个校园内的南墙根下,按班级排起了十几个纵队的年轻男女,各班的值日生正忙碌地给众人分发饭菜-- 菜分为甲.乙.丙三等,甲菜以土豆.白菜.粉条为主,还有可人大肉片,乙菜没有肉,丙菜只有清水煮白萝卜.主食也分为三等:白面馍,玉米面馍,高粱面馍,白.黄.黑分别代表了三种差别,学生们戏称欧洲.亚洲.非洲.每个人的饭菜都

iOS开发runtime学习:一:runtime简介与runtime的消息机制

一:runtime简介:也是面试必须会回答的部分 二:runtime的消息机制 #import "ViewController.h" #import <objc/message.h> #import "Person.h" /* 总结: 1: runtime:必须要导入头文件 <objc/message.h>,此头文件中已经引入了<objc/runtime.h> 任何方法调用本质:发送一个消息,用runtime发送消息.OC底层实现