(gps)gps消息队列解析

gps从loc_api层到loc eng层用到了消息队列,大致格式是:sendMsg(new xxx),这个msg最终会发送到message queue中,在loopMain中读取出来然后分别调用msg的log()和proc()来处理相应的message,因此有必要了解一下这个message queue的大致flow

gps message queue进程间通信分为发送端和接收端,我们分开来看一下。



发送端的api:

hardware/qcom/gps/utils/msg_q.c

msq_q_err_type msg_q_snd(void* msg_q_data, void* msg_obj, void (*dealloc)(void*))
{
   msq_q_err_type rv;
   if( msg_q_data == NULL )
   {
      LOC_LOGE("%s: Invalid msg_q_data parameter!\n", __FUNCTION__);
      return eMSG_Q_INVALID_HANDLE;
   }
   if( msg_obj == NULL )
   {
      LOC_LOGE("%s: Invalid msg_obj parameter!\n", __FUNCTION__);
      return eMSG_Q_INVALID_PARAMETER;
   }
   //前面两个是判断param的合法性

   //把msg_q_data转换为msg queue相同的类型msg_q*
   msg_q* p_msg_q = (msg_q*)msg_q_data;

   //锁定p_msg_q的mutex,pthread_mutex_lock跟pthread_cond_xxx系列函数一般是成对出现的,防止多线程的pthread_cond_xxx系列函数之间竞争
   pthread_mutex_lock(&p_msg_q->list_mutex);

   LOC_LOGD("%s: Sending message with handle = 0x%08X\n", __FUNCTION__, msg_obj);

   if( p_msg_q->unblocked )
   {
      //如果不是锁定的,说明该msg是不可用的
      LOC_LOGE("%s: Message queue has been unblocked.\n", __FUNCTION__);
      pthread_mutex_unlock(&p_msg_q->list_mutex);
      return eMSG_Q_UNAVAILABLE_RESOURCE;
   }

   //linked_list_add,从名字来看应该是链表的插入动作
   rv = convert_linked_list_err_type(linked_list_add(p_msg_q->msg_list, msg_obj, dealloc));

   /* Show data is in the message queue. */
   //唤醒message queue的条件变量,rcv端被这个条件变量block住了哦,具体用法可请参见manpage
   pthread_cond_signal(&p_msg_q->list_cond);

   //解锁message queue的mutex,rcv端才可以从message queue中取出数据
   pthread_mutex_unlock(&p_msg_q->list_mutex);

   LOC_LOGD("%s: Finished Sending message with handle = 0x%08X\n", __FUNCTION__, msg_obj);

   return rv;
}

message queue,顾名思义message会插入到queue中,下面就是插入的动作:

hardware/qcom/gps/utils/linked_list.c

linked_list_err_type linked_list_add(void* list_data, void *data_obj, void (*dealloc)(void*))
{
   LOC_LOGD("%s: Adding to list data_obj = 0x%08X\n", __FUNCTION__, data_obj);
   if( list_data == NULL )
   {
      LOC_LOGE("%s: Invalid list parameter!\n", __FUNCTION__);
      return eLINKED_LIST_INVALID_HANDLE;
   }

   if( data_obj == NULL )
   {
      LOC_LOGE("%s: Invalid input parameter!\n", __FUNCTION__);
      return eLINKED_LIST_INVALID_PARAMETER;
   }
   //前面两个是判断param的合法性

   //list_data是能确认msg queue的指针
   list_state* p_list = (list_state*)list_data;
   list_element* elem = (list_element*)malloc(sizeof(list_element));
   if( elem == NULL )
   {
      LOC_LOGE("%s: Memory allocation failed\n", __FUNCTION__);
      return eLINKED_LIST_FAILURE_GENERAL;
   }

   /* Copy data to newly created element */
   //填充elem,传入的data_obj参数封装到elem结点,然后插入到msg queue
   elem->data_ptr = data_obj;
   elem->next = NULL;
   elem->prev = NULL;
   elem->dealloc_func = dealloc;

   /* Replace head element */
   //保存list的head结点到tmp
   list_element* tmp = p_list->p_head;
   //list的head结点指向elem
   p_list->p_head = elem;
   /* Point next to the previous head element */
   //list的head结点的next结点指向tmp,也就是以前的head,这样子elem就成了新的head结点
   p_list->p_head->next = tmp;

   if( tmp != NULL )
   {
      //如果tmp结点,就是以前的head结点不是null,那么以前head结点的prev指针指向elem(新的head结点),这也说明了以前的head结点成了现在head结点的下一个结点
      tmp->prev = p_list->p_head;
   }
   else
   {
      //如果tmp结点是null,也就是以前就是一个空的双向链表,现在新插入的elem结点(p_list->p_head = elem)既是head结点,也是tail结点
      p_list->p_tail = p_list->p_head;
   }

   return eLINKED_LIST_SUCCESS;
}

从函数名来看,应该是把链表的返回值转换成message queue的返回值:

hardware/qcom/gps/utils/linked_list.c

static msq_q_err_type convert_linked_list_err_type(linked_list_err_type linked_list_val)
{
   switch( linked_list_val )
   {
   case eLINKED_LIST_SUCCESS:
      return eMSG_Q_SUCCESS;
   case eLINKED_LIST_INVALID_PARAMETER:
      return eMSG_Q_INVALID_PARAMETER;
   case eLINKED_LIST_INVALID_HANDLE:
      return eMSG_Q_INVALID_HANDLE;
   case eLINKED_LIST_UNAVAILABLE_RESOURCE:
      return eMSG_Q_UNAVAILABLE_RESOURCE;
   case eLINKED_LIST_INSUFFICIENT_BUFFER:
      return eMSG_Q_INSUFFICIENT_BUFFER;

   case eLINKED_LIST_FAILURE_GENERAL:
   default:
      return eMSG_Q_FAILURE_GENERAL;
   }
}

message queue相关的返回值定义如下:

hardware/qcom/gps/utils/msg_q.h

typedef enum
{
  eMSG_Q_SUCCESS                             = 0,
     /**< Request was successful. */
  eMSG_Q_FAILURE_GENERAL                     = -1,
     /**< Failed because of a general failure. */
  eMSG_Q_INVALID_PARAMETER                   = -2,
     /**< Failed because the request contained invalid parameters. */
  eMSG_Q_INVALID_HANDLE                      = -3,
     /**< Failed because an invalid handle was specified. */
  eMSG_Q_UNAVAILABLE_RESOURCE                = -4,
     /**< Failed because an there were not enough resources. */
  eMSG_Q_INSUFFICIENT_BUFFER                 = -5,
     /**< Failed because an the supplied buffer was too small. */
}msq_q_err_type;

接下来看一下msg_q的定义:

hardware/qcom/gps/utils/msg_q.c

typedef struct msg_q {
   void* msg_list;                  /* Linked list to store information */
   pthread_cond_t  list_cond;       /* Condition variable for waiting on msg queue */
   pthread_mutex_t list_mutex;      /* Mutex for exclusive access to message queue */
   int unblocked;                   /* Has this message queue been unblocked? */
} msg_q;

关于实现message queue的双向链表,数据结构如下:

hardware/qcom/gps/utils/linked_list.c

typedef struct list_element {
   struct list_element* next;
   struct list_element* prev;
   void* data_ptr;
   void (*dealloc_func)(void*);
}list_element;

list_state的定义如下:

hardware/qcom/gps/utils/linked_list.c

typedef struct list_state {
   list_element* p_head;
   list_element* p_tail;
} list_state;

接收端的api:

hardware/qcom/gps/utils/msg_q.c

msq_q_err_type msg_q_rcv(void* msg_q_data, void** msg_obj)
{
   msq_q_err_type rv;
   if( msg_q_data == NULL )
   {
      LOC_LOGE("%s: Invalid msg_q_data parameter!\n", __FUNCTION__);
      return eMSG_Q_INVALID_HANDLE;
   }

   if( msg_obj == NULL )
   {
      LOC_LOGE("%s: Invalid msg_obj parameter!\n", __FUNCTION__);
      return eMSG_Q_INVALID_PARAMETER;
   }
   //前面两个是判断param的合法性

   //msg_q_data是用来确定message queue的
   msg_q* p_msg_q = (msg_q*)msg_q_data;

   LOC_LOGD("%s: Waiting on message\n", __FUNCTION__);

   //在操作之前锁定message queue的mutex
   pthread_mutex_lock(&p_msg_q->list_mutex);

   if( p_msg_q->unblocked )
   {
      LOC_LOGE("%s: Message queue has been unblocked.\n", __FUNCTION__);
      pthread_mutex_unlock(&p_msg_q->list_mutex);
      return eMSG_Q_UNAVAILABLE_RESOURCE;
   }

   /* Wait for data in the message queue */
   //判断message queue是否为空,且message queue没有被销毁
   while( linked_list_empty(p_msg_q->msg_list) && !p_msg_q->unblocked )
   {
      //如果message queue是空,且另一个线程还没有使条件变量“成立”,那么此处会阻塞住
      pthread_cond_wait(&p_msg_q->list_cond, &p_msg_q->list_mutex);
   }

   //如果另一个线程使条件变量“成立”,那么从message queue中取出相应的message
   rv = convert_linked_list_err_type(linked_list_remove(p_msg_q->msg_list, msg_obj));

   //用完之后unlock message queue的mutex
   pthread_mutex_unlock(&p_msg_q->list_mutex);

   LOC_LOGD("%s: Received message 0x%08X rv = %d\n", __FUNCTION__, *msg_obj, rv);

   return rv;
}

linked_list_empty的作用是判断链表是否为空,具体实现如下:

hardware/qcom/gps/utils/linked_list.c

int linked_list_empty(void* list_data)
{
   //如果message queue的指针是NULL,那么返回eLINKED_LIST_INVALID_HANDLE
   if( list_data == NULL )
   {
      LOC_LOGE("%s: Invalid list parameter!\n", __FUNCTION__);
      return (int)eLINKED_LIST_INVALID_HANDLE;
   }
   //如果p_list的head结点是NULL,那么message queue是空的,否则非空
   else
   {
      list_state* p_list = (list_state*)list_data;
      return p_list->p_head == NULL ? 1 : 0;
   }
}

lined_list_remove的作用是从链表中取出message保存到第二个参数中,然后删掉取出的链表结点,具体实现如下:

hardware/qcom/gps/utils/linked_list.c

linked_list_err_type linked_list_remove(void* list_data, void **data_obj)
{
   LOC_LOGD("%s: Removing from list\n", __FUNCTION__);
   if( list_data == NULL )
   {
      LOC_LOGE("%s: Invalid list parameter!\n", __FUNCTION__);
      return eLINKED_LIST_INVALID_HANDLE;
   }

   if( data_obj == NULL )
   {
      LOC_LOGE("%s: Invalid input parameter!\n", __FUNCTION__);
      return eLINKED_LIST_INVALID_PARAMETER;
   }
   //前面两个是判断param的合法性

   //既然是从message queue中取出,那么如果message queue是空的,则返回eLINKED_LIST_UNAVAILABLE_RESOURCE
   list_state* p_list = (list_state*)list_data;
   if( p_list->p_tail == NULL )
   {
      return eLINKED_LIST_UNAVAILABLE_RESOURCE;
   }

   //把尾结点的内容拷贝到tmp中
   list_element* tmp = p_list->p_tail;

   //把尾结点指针指向tmp(也就是前一步的尾结点)的前一个结点(新的尾结点)
   /* Replace tail element */
   p_list->p_tail = tmp->prev;

   if( p_list->p_tail != NULL )
   {
      //新的尾结点的next指针指向NULL,这样子tmp从message queue中脱落
      p_list->p_tail->next = NULL;
   }
   else
   {
      //原来message queue中只有一个结点,那么取出之后head和tail都指向NULL指针,也就是message queue中没有任何结点了
      p_list->p_head = p_list->p_tail;
   }

   //把取出的tmp结点的data_ptr赋值给传入的参数data_obj
   /* Copy data to output param */
   *data_obj = tmp->data_ptr;

   /* Free allocated list element */
   //释放掉取出来的结点
   free(tmp);

   return eLINKED_LIST_SUCCESS;
}


看完整个流程之后,得出的结论是:发送端和接收端是通过消息队列进行通信的,具体消息队列是用双向链表来实现的。

时间: 2024-08-24 04:54:10

(gps)gps消息队列解析的相关文章

编程开发消息队列解析

一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题.实现高性能,高可用,可伸缩和最终一致性架构.是大型分布式系统不可缺少的中间件. 目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等. 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异步处理,应用解耦,流量削锋和消息通讯四个场景. 2.1异步处理 场景说明:用户注册后,需要发注册邮件和注册短信.传统

nmq消息队列解析

消息中间件NMQ 1.What is nmq? nmq = new message queue; 一个通用消息队列系统 为在线服务设计 什么是消息队列?问什么需要?有哪些功能? 消息队列的本质:1.多个不同的应用之间实现相互通信的一种异步传输模式2.异步3.解耦 业界有哪些比较好的mq? yahoo YMB .twitter Kestrel.amazon SQS.apache kafka 百度的nmq和bigpipe 那么为什么会有这么多的实现呢? 影响设计的关键需求: 1.数据安全性 2.传输

MQ(消息队列)常见的应用场景解析

前言 j提高系统性能首先考虑的是数据库的优化,之前一篇文章<数据库的使用你可能忽略了这些>中有提到过开发中,针对数据库需要注意的事项.但是数据库因为历史原因,横向扩展是一件非常复杂的工程,所有我们一般会尽量把流量都挡在数据库之前. 不管是无限的横向扩展服务器,还是纵向阻隔到达数据库的流量,都是这个思路.阻隔直达数据库的流量,缓存组件和消息组件是两大杀器. MQ简介 MQ,Message queue,消息队列,就是指保存消息的一个容器.具体的定义这里就不类似于数据库.缓存等,用来保存数据的.当然

【系统架构】读《大型网站架构系列:分布式消息队列》整理

文章地址 拓展阅读: RabbitMQ + PHP (一)入门与安装 RabbitMQ + PHP (二)AMQP拓展安装 RabbitMQ + PHP (三)案例演示 1. 一些词汇和技术 1)Zookeeper注册中心 Storm集群 2. 核心思想 1)[概述] 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题.实现高性能,高可用,可伸缩和最终一致性架构.是大型分布式系统不可缺少的中间件. 目前在生产环境,使用较多的消息队列有ActiveMQ,Rabbit

大型网站架构系列:消息队列

出处:ITFLY8 网址:http://www.cnblogs.com/itfly8/p/5156155.html 一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题.实现高性能,高可用,可伸缩和最终一致性架构.是大型分布式系统不可缺少的中间件. 目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等. 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异

Linux IPC实践(5) --System V消息队列(2)

消息发送/接收API msgsnd函数 int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg); 参数 msgid: 由msgget函数返回的消息队列标识码, 也可以是通过ipcs命令查询出来的一个已经存在的消息队列的ID号 msgp:是一个指针,指针指向准备发送的消息, msgsz:是msgp指向的消息长度, 注意:这个长度不含保存消息类型的那个long int长整型 msgflg:控制着当前消息队列满或到达系统上限时

转:为什么会需要消息队列(MQ)?

为什么会需要消息队列(MQ)? ########################################################################################## 主要原因是由于在高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达MySQL,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误.通过使用消息队列,我们可以异步处理请求,从而

【系统架构】分布式消息队列

原文地址 以下是消息队列以下的大纲,本文主要介绍消息队列概述,消息队列应用场景和消息中间件示例(电商,日志系统). 本次分享大纲 消息队列概述 消息队列应用场景 消息中间件示例 JMS消息服务(见第二篇:大型网站架构系列:分布式消息队列(二)) 常用消息队列(见第二篇:大型网站架构系列:分布式消息队列(二)) 参考(推荐)资料(见第二篇:大型网站架构系列:分布式消息队列(二)) 本次分享总结(见第二篇:大型网站架构系列:分布式消息队列(二)) 一.消息队列概述 消息队列中间件是分布式系统中重要的

linux进程间的通信(C): 消息队列

一.消息队列(message queue) 消息队列也是System V IPC机制之一. 消息队列与命名管道类似, 但少了打开和关闭管道方面的复杂性. 但使用消息队列并未解决我们在使用命名管道时遇到的一些问题, 如管道满时的阻塞问题. 消息队列提供了一种在两个不相关进程间传递数据的简单有效的方法. 与命名管道相比, 消息队列的优势在于,它独立于发送和接收进程而存在, 这消除了在同步命名管道的打开和关闭时可能产生的一些困难. 消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法. 而且,