RAW-OS:消息队列

平台:VS2010 版本:1.04

我们先看看消息队列的数据结构:

typedef struct RAW_QUEUE
{
    RAW_COMMON_BLOCK_OBJECT       common_block_obj;
    RAW_MSG_Q                     msg_q;
    RAW_VOID                      (*queue_send_notify)(struct RAW_QUEUE *queue_ptr);

} RAW_QUEUE;

RAW_MSG_Q:

typedef struct RAW_MSG_Q {   

    RAW_VOID         **queue_start;            /* Pointer to start of queue data                              */
    RAW_VOID         **queue_end;              /* Pointer to end   of queue data                              */
    RAW_VOID         **write;               /* Pointer to where next message will be inserted  in   the Q  */
    RAW_VOID         **read;              /* Pointer to where next message will be extracted from the Q  */
    MSG_SIZE_TYPE    size;             /* Size of queue (maximum number of entries)                   */
    MSG_SIZE_TYPE    current_numbers;          /* Current number of entries in the queue                      */
    MSG_SIZE_TYPE    peak_numbers;          /* Peak number of entries in the queue                      */

} RAW_MSG_Q;

Common_block_obj:

typedef struct RAW_COMMON_BLOCK_OBJECT {   

    LIST                      block_list;
    RAW_U8                    *name;
    RAW_U8                    block_way;
    RAW_U8                    object_type;

} RAW_COMMON_BLOCK_OBJECT;

消息队列是用来在两个任务之间传递数据的,它相当于一个有存储功能的数据通道。例如:Task1把MSG1放入queue,则Task2可从queue中MSG1。需要注意的是消息传递是地址而不是数值。其实消息的通信机制和信号量是类似,只是一个需要传递参数一个不需要。下面我们就来看看源代码:

Raw_queue_creat:

RAW_U16 raw_queue_create(RAW_QUEUE *p_q, RAW_U8 *p_name, RAW_VOID **msg_start, MSG_SIZE_TYPE number)
{

    #if (RAW_QUEUE_FUNCTION_CHECK > 0)
    if (p_q == 0) {

        return RAW_NULL_OBJECT;
    }

    if (msg_start == 0) {

        return RAW_NULL_POINTER;
    }

    if (number == 0u) {

        return RAW_ZERO_NUMBER;
    }

    #endif
    /*init the queue blocked list*/
    list_init(&p_q->common_block_obj.block_list);

    p_q->common_block_obj.name = p_name;
    p_q->common_block_obj.block_way = RAW_BLOCKED_WAY_PRIO;
    p_q->msg_q.queue_start       = msg_start;               /*      Initialize the queue                 */
    p_q->msg_q.queue_end        = &msg_start[number];        //warning:queue not include the queue_end
    p_q->msg_q.write            = msg_start;
    p_q->msg_q.read             = msg_start;
    p_q->msg_q.size              = number;
    p_q->msg_q.current_numbers  = 0u;
    p_q->msg_q.peak_numbers      = 0u;
    p_q->queue_send_notify      = 0;
    p_q->common_block_obj.object_type = RAW_QUEUE_OBJ_TYPE;
    TRACE_QUEUE_CREATE(raw_task_active, p_q);

    return RAW_SUCCESS;
}

create函数最主要是对MSG结构进行赋值,都很好理解的。这里看三点:
       1 block_way:PRIO or FIFO,消息支持的任务阻塞策略:任务优先级或者是先来先出和信号量相同

2 queue_end:注意queue队列不包含queue_end指向的地址,看到&msg_start[number]中number就应该明白了

3 peak_numbers:这个是queue队列中最多消息的数量

Msg_post:

RAW_U16 msg_post(RAW_QUEUE *p_q, RAW_VOID *p_void, RAW_U8 opt_send_method, RAW_U8 opt_wake_all)
{
    LIST *block_list_head;
    RAW_SR_ALLOC();
    RAW_CRITICAL_ENTER();
    if (p_q->common_block_obj.object_type != RAW_QUEUE_OBJ_TYPE) {

        RAW_CRITICAL_EXIT();
        return RAW_ERROR_OBJECT_TYPE;
    }
    block_list_head = &p_q->common_block_obj.block_list;
    //消息队列满了
    if (p_q->msg_q.current_numbers >= p_q->msg_q.size) {
        RAW_CRITICAL_EXIT();

        TRACE_QUEUE_MSG_MAX(raw_task_active, p_q, p_void, opt_send_method);

        return RAW_MSG_MAX;
    }
    /*Queue is not full here, if there is no blocked receive task*/
    if (is_list_empty(block_list_head)) {
        p_q->msg_q.current_numbers++;
        /*update peak_numbers for debug*/
        if (p_q->msg_q.current_numbers > p_q->msg_q.peak_numbers) {
            p_q->msg_q.peak_numbers = p_q->msg_q.current_numbers;
        }

        if (opt_send_method == SEND_TO_END)  {
            *p_q->msg_q.write++ = p_void;
            if (p_q->msg_q.write == p_q->msg_q.queue_end) {  

                p_q->msg_q.write = p_q->msg_q.queue_start;

            }
        }
        else {
             /* Wrap read pointer to end if we are at the 1st queue entry */
            if (p_q->msg_q.read == p_q->msg_q.queue_start) {
                p_q->msg_q.read = p_q->msg_q.queue_end;
            }

            p_q->msg_q.read--;
            *p_q->msg_q.read = p_void;                               /* Insert message into queue                     */

        }

        RAW_CRITICAL_EXIT();
        /*if queue is registered with notify function just call it*/
        if (p_q->queue_send_notify) {
            p_q->queue_send_notify(p_q);
        }
        TRACE_QUEUE_MSG_POST(raw_task_active, p_q, p_void, opt_send_method);

        return RAW_SUCCESS;
    }
    /*wake all the task blocked on this queue*/
    if (opt_wake_all) {
        while (!is_list_empty(block_list_head)) {

            wake_send_msg(list_entry(block_list_head->next, RAW_TASK_OBJ, task_list),  p_void); 

            TRACE_QUEUE_WAKE_TASK(raw_task_active, list_entry(block_list_head->next, RAW_TASK_OBJ, task_list), p_void, opt_wake_all);

        }

    }

    /*wake hignhest priority task blocked on this queue and send msg to it*/
    else {

        wake_send_msg(list_entry(block_list_head->next, RAW_TASK_OBJ, task_list),  p_void);

        TRACE_QUEUE_WAKE_TASK(raw_task_active, list_entry(block_list_head->next, RAW_TASK_OBJ, task_list), p_void, opt_wake_all);
    }

    RAW_CRITICAL_EXIT();
    do_possible_sche();
    return RAW_SUCCESS;
}

消息传递函数,我们来看下函数的执行过程:
       1 判断消息队列的类型

2 判断消息队列是否满了

3 若消息队列中没有被阻塞的任务:
           3.1 更新peak_number的值

3.2 根据opt_send_mathod参数将msg放置在消息队列中

3.2.1 消息添加到消息的最后,体会write指针的改变

3.2.2 消息添加到消息的最前,体会read指针的改变----为什么是改变read指针呢?看下满的获取msg函数

4 根据opt_wake_all参数选择唤醒最前面的task还是所有task

wake_send_msg函数和信号量的wake函数一样,只是在这里多一步给task的msg变量赋值的操作,这里就不在分析了。

在这看一点,当消息队列满时,怎么操作呢?raw宏定义了一个TRACE_QUEUE_MSG_POST(NOTE1)函数让你自己去实现。

Raw_queue_receive:

RAW_U16 raw_queue_receive(RAW_QUEUE *p_q, RAW_TICK_TYPE wait_option, RAW_VOID **msg)
{
    RAW_VOID *pmsg;
    RAW_U16 result;

    RAW_SR_ALLOC();
    #if (RAW_QUEUE_FUNCTION_CHECK > 0)
    if (raw_int_nesting) {

        return RAW_NOT_CALLED_BY_ISR;

    }
    if (p_q == 0) {

        return RAW_NULL_OBJECT;
    }

    if (msg == 0) {

        return RAW_NULL_POINTER;
    }

    #endif

    RAW_CRITICAL_ENTER();
    if (p_q->common_block_obj.object_type != RAW_QUEUE_OBJ_TYPE) {

        RAW_CRITICAL_EXIT();
        return RAW_ERROR_OBJECT_TYPE;
    }
    /*if queue has msgs, just receive it*/
    if (p_q->msg_q.current_numbers) {

        pmsg = *p_q->msg_q.read++;                   

        if (p_q->msg_q.read == p_q->msg_q.queue_end) {
            /*wrap around to start*/
            p_q->msg_q.read = p_q->msg_q.queue_start;
        }
        *msg = pmsg;
        p_q->msg_q.current_numbers--; 

        RAW_CRITICAL_EXIT();
        TRACE_QUEUE_GET_MSG(raw_task_active, p_q, wait_option, *msg);

        return RAW_SUCCESS;
    }
    if (wait_option == RAW_NO_WAIT) {
        *msg = (RAW_VOID *)0;
        RAW_CRITICAL_EXIT();
        return RAW_NO_PEND_WAIT;
    }
    /*if system is locked, block operation is not allowed*/
    SYSTEM_LOCK_PROCESS_QUEUE();
    raw_pend_object((RAW_COMMON_BLOCK_OBJECT  *)p_q, raw_task_active, wait_option);

    RAW_CRITICAL_EXIT();
    TRACE_QUEUE_GET_BLOCK(raw_task_active, p_q, wait_option);

    raw_sched();
    *msg      = (RAW_VOID      *)0;
    result = block_state_post_process(raw_task_active, msg);

    return result;

}

获取msg函数分析如下:
       1 判断消息队列类型

2 消息队列中有消息则立即获取后返回。看这里获取是操作read指针来进行着就是为什么消息插在最前面时修改read指针。在消息的传递和获取时都需要操作current_number成员,它用来判断消息队列的使用情况,空、满载和负载。

3 若消息队列中没有消息且wait_option参数为RAW_NO_WAIT则立即返回不等待消息

4 若消息队列中没有消息且wait_option参数不为RAW_NO_WAIT,则会raw_pend_object函数即task进入阻塞状态,具体不分析了因为信号量的阻塞是一样的

5 函数返回的值是msg,在block_state_post_process函数中直接将task的msg赋值给函数返回值

NOTE1:在函数前面加TRACE都是raw的宏定义函数让你自己去实现,看TRACE这个单词就明白了。

RAW-OS:消息队列

时间: 2024-10-31 04:58:44

RAW-OS:消息队列的相关文章

消息队列 - mac上安装RabbitMq (转)

什么是RabbitMQ? RabbitMQ是由Erlang语言编写的实现了高级消息队列协议(AMQP)的开源消息代理软件(也称为面向消息的中间件).支持WIndows.Linux.MAC OS 操作系统和包括java..net在内的多种编程语言. AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,分面向消息的中间件设计.基于此协议的客户端与消息中间件可传递消息,并不受 客户端/中间件 不同

消息队列实现订单异步提交

what MSMQ(Microsoft Message Queue),微软消息队列,用于应用程序之间相互通信的一种异步传输模式.应用程序可以分布在同台机器上,也可以分布于互联的网络中的任意位置.基本原理:消息发送者把要发送的消息放入容器,也就是Message(消息),然后保存到系统公用空间的消息队列中(Message Queue)中,本地或互联位置上的消息接收程序再从队列中取出发给它的消息进行处理.消息类型可以是文本,图像,自定义对象等.消息队列分为公共队列和私有队列. why 一.用于进程间的

消息队列(二)

本文是大型网站架构系列:消息队列(二),主要分享JMS消息服务,常用消息中间件(Active MQ,Rabbit MQ,Zero MQ,Kafka). 四.JMS消息服务 讲消息队列就不得不提JMS .JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准/规范,允许应用程序组件基于JavaEE平台创建.发送.接收和读取消息.它使分布式通信耦合度更低,消息服务更加可靠以及异步性. 在EJB架构中,有消息bean可以无缝的与JM消息服务集成.在J2EE架构模

大型网站架构系列:消息队列

出处:ITFLY8 网址:http://www.cnblogs.com/itfly8/p/5156155.html 一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题.实现高性能,高可用,可伸缩和最终一致性架构.是大型分布式系统不可缺少的中间件. 目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等. 二.消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景.异

大型网站架构系列:消息队列(二)

本文是大型网站架构系列:消息队列(二),主要分享JMS消息服务,常用消息中间件(Active MQ,Rabbit MQ,Zero MQ,Kafka).[第二篇的内容大部分为网络资源的整理和汇总,供大家学习总结使用,最后有文章来源] 本次分享大纲 消息队列概述(见第一篇:大型网站架构系列:分布式消息队列(一)) 消息队列应用场景(见第一篇:大型网站架构系列:分布式消息队列(一)) 消息中间件示例(见第一篇:大型网站架构系列:分布式消息队列(一)) JMS消息服务 常用消息队列 参考(推荐)资料 本

python之消息队列

引言 你是否遇到过两个(多个)系统间需要通过定时任务来同步某些数据?你是否在为异构系统的不同进程间相互调用.通讯的问题而苦恼.挣扎?如果是,那么恭喜你,消息服务让你可以很轻松地解决这些问题.消息服务擅长于解决多系统.异构系统间的数据交换(消息通知/通讯)问题,你也可以把它用于系统间服务的相互调用(RPC).本文将要介绍的RabbitMQ就是当前最主流的消息中间件之一. RabbitMQ简介 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源

大型网站架构系列:消息队列(二) (转)

本文是大型网站架构系列:消息队列(二),主要分享JMS消息服务,常用消息中间件(Active MQ,Rabbit MQ,Zero MQ,Kafka).[第二篇的内容大部分为网络资源的整理和汇总,供大家学习总结使用,最后有文章来源] 本次分享大纲 消息队列概述(见第一篇:大型网站架构系列:分布式消息队列(一)) 消息队列应用场景(见第一篇:大型网站架构系列:分布式消息队列(一)) 消息中间件示例(见第一篇:大型网站架构系列:分布式消息队列(一)) JMS消息服务 常用消息队列 参考(推荐)资料 本

Python并发编程-RabbitMq消息队列

消息中间件 --->就是消息队列 异步方式:不需要立马得到结果,需要排队 同步方式:需要实时获得数据,坚决不能排队 subprocess 的Q也提供不同进程之间的沟通 应用场景: 买票,抢购 堡垒机批量发送文件 Centos6.x系统编译安装RabbitMQ 一.系统环境 [[email protected] ~]# cat /etc/redhat-release CentOS release 6.6 (Final) [[email protected] ~]# uname -r 2.6.32-

大型网站架构之分布式消息队列

以下是消息队列以下的大纲,本文主要介绍消息队列概述,消息队列应用场景和消息中间件示例(电商,日志系统). 本次分享大纲 消息队列概述 消息队列应用场景 消息中间件示例 JMS消息服务 常用消息队列 参考(推荐)资料 本次分享总结 一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题.实现高性能,高可用,可伸缩和最终一致性架构.是大型分布式系统不可缺少的中间件. 目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,K

初识中间件之消息队列

初识中间件之消息队列 测试那点事儿 测试那点事儿 初识中间件之消息队列 1 消息队列概述 消息队列是分布式系统中的重要组件,主要解决应用耦合,异步消息,流量削锋等问题,以实现高性能,高可用,可伸缩和最终一致性架构,是大型分布式系统中不可缺少的中间件. 目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等,比如我之前用过的RabbitMQ以及kafka. 2 消息队列应用场景 在实际应用中,消息队列常用于异步处理.应用解耦.流量削锋