Queue and Message

#ifndef __QUEUE_H__
#define __QUEUE_H__

#include <stdint.h>
#include <stdlib.h>
#include <string.h>

/*
 * Queues can have more than one producer but only one consumer.
 * This means that more than one task or interrupt handler is allowed to store
 * new data in the queue but only one task is allowed toget data from the queue.
 *
 * Queues accept messages of various size. When putting a message into a queue,
 * the message size is passed as a parameter.
 *
 * Retrieving a message from the queue does not copy the message, but returns
 * a pointer to the message and its size. Thisenhances performance because the
 * data is copied only once, when the message is written into the queue.
 *
 * The retrieving function has to delete every message after processing it.
 * A new message can only be retrieved from the queue when the previous message
 * was deleted from the queue.
 *
 * |---------------------- size -------------------------|
 * |        |------------ msgCnt ---------------|        |
 * [ .... ] [ size : message ] [ size : message ] [ .... ]
 * |        |                                     |
 * |pData   |offsetFirst                          |offsetLast
 *
 */
typedef struct TAG_QUEUE
{
  uint8_t * Memory;
  uint32_t Capacity;
  uint32_t MessageCount;
  uint32_t ReadIndex;
  uint32_t WriteIndex;
  uint32_t IsUsing;
  uint32_t InProgressCount;
} QUEUE;

// Creates and initializes a message queue.
QUEUE * Q_Create( uint32_t Capacity );

// Deletes a specific queue.
void Q_Delete( QUEUE * Queue );

// Initializes a message queue.
void Q_Init( QUEUE * Queue, uint8_t * Memory, uint32_t Capacity );

// Deletes the last retrieved message in a queue.
void Q_Purge( QUEUE * Queue );

// Deletes all message in a queue.
void Q_Clear( QUEUE * Queue );

// Returns the number of messages currently in a queue
uint32_t Q_GetCount( QUEUE * Queue );

// Returns the first message size
uint32_t Q_GetSize( QUEUE * Queue );

// Delivers information whether the queue is actually in use.
// A queue must not be cleared or deleted when it is in use.
uint32_t Q_IsUsing( QUEUE * Queue );

// Stores a new message of given size in a queue.
uint32_t Q_Wirte( QUEUE * Queue, void * Message, uint32_t Size );

// Retrieves a message from a queue
uint32_t Q_Read( QUEUE * Queue, void ** Message );

#endif /* __QUEUE_H__ */
#include "queue.h"

#include "cmsis_os.h"
#include "macro_misc.h"

// Creates and initializes a message Queue.
QUEUE * Q_Create( uint32_t Capacity )
{
  uint32_t Size = ALIGN_UP( sizeof(QUEUE), 4 ) + ALIGN_UP( Capacity, 4 );
  QUEUE *Queue = (QUEUE *) osMalloc( Size, osWaitForever );
  if ( Queue == NULL )
    return NULL;

  uint8_t * Memory = //
    (uint8_t *) ( ( (uint32_t) ( Queue ) ) + ALIGN_UP( sizeof(QUEUE), 4 ) );

  Q_Init( Queue, Memory, ALIGN_UP( Capacity, 4 ) );

  return Queue;
}

// Deletes a specific Queue.
// A Queue must not be cleared or deleted when it is in use.
void Q_Delete( QUEUE * Queue )
{
  if ( Queue->IsUsing == 0 )
    osFree( Queue );
}

// Deletes all messages in a Queue.
// A Queue must not be cleared or deleted when it is in use.
void Q_Clear( QUEUE * Queue )
{
  if ( Queue->IsUsing == 0 )
    Queue->MessageCount = 0;
}

// Initializes a message Queue.
void Q_Init( QUEUE * Queue, uint8_t * Memory, uint32_t Capacity )
{
  int32_t Delta = (uint32_t) Memory & 3;
  if ( Delta )
  {
    Delta -= 4;
    Capacity += Delta;
    Memory -= Delta;
  }
  memset( Queue, 0, sizeof(QUEUE) );
  Queue->Capacity = Capacity;
  Queue->Memory = Memory;
}

// Returns the number of messages currently in a Queue
uint32_t Q_GetCount( QUEUE * Queue )
{
  return Queue->MessageCount - Queue->InProgressCount;
}

// Returns the first message size
uint32_t Q_GetSize( QUEUE * Queue )
{
  uint32_t MessageSize = 0;
  if ( Queue->MessageCount )
    MessageSize = *(uint32_t *) ( &Queue->Memory[ Queue->ReadIndex ] );
  return MessageSize;
}

// Delivers information whether the Queue is actually in use.
// A Queue must not be cleared or deleted when it is in use.
uint32_t Q_IsUsing( QUEUE * Queue )
{
  return Queue->IsUsing;
}

// Stores a new message of given size in a Queue.
// 0 : Queue could not be stored (Queue is full).
// 1 : Success; message stored.
uint32_t Q_Write( QUEUE * Queue, void * Message, uint32_t Size )
{
  uint32_t ReadIndexVal;
  uint32_t WriteIndexPending;
  uint32_t WriteIndexVal;
  uint32_t MessageSize = 4 + ALIGN_UP( Size, 4 );
  int32_t * Memory = (int32_t *) Queue->Memory;

  uint32_t Value = osDisableInterrupt( );

  if ( Queue->MessageCount == 0 )
  {
    // read next message from head of memory
    Queue->ReadIndex = 0;

    // Queue could not be stored (memory is full).
    WriteIndexVal = -1;

    if ( Queue->Capacity >= MessageSize )
      WriteIndexVal = 0;
  }
  else
  {
    Memory = (int32_t *) Queue->Memory;
    WriteIndexPending = Queue->WriteIndex;
    int32_t SizePending = Memory[ WriteIndexPending ];

    if ( SizePending < 0 )
    {
      // other task is writting ... but it is preemptived by our task
      // WriteIndexPending has been updated
      // [ Last Queue ] [ --- Other Queue --- ] [ Our Mesage ]
      //                  | WriteIndexPending
      SizePending = -SizePending;
    }
    else
    {
      // [ Last Queue ] [ Our Mesage ]
      //                  | WriteIndexPending
    }

    // where our task will write ...
    WriteIndexVal = WriteIndexPending + 4 + ALIGN_UP( SizePending, 4 );
    ReadIndexVal = Queue->ReadIndex;
    if ( ReadIndexVal >= WriteIndexVal )
    {
      // [ Our Mesage ] [ Last Queue ]
      // |<------------>|ReadIndexVal
      // |WriteIndexVal
      if ( ReadIndexVal - WriteIndexVal < MessageSize )
        WriteIndexVal = -1;
    }
    else
    {
      // [ Our Mesage ] [ Available Space ]
      // |WriteIndexVal                   |Capacity
      // |<------------------------------>|
      uint32_t sizeAvailableTail = Queue->Capacity - WriteIndexVal;
      if ( sizeAvailableTail < MessageSize )
      {
        // try to write to head of memory
        // [ Our Mesage ] [ Last Queue ]
        // |<------------>|ReadIndexVal
        // |0
        if ( ReadIndexVal < MessageSize )
          WriteIndexVal = -1;
        else if ( sizeAvailableTail > 4 )
        {
          // can not read message from tail of memory
          // Marker for Q_Purge()
          Memory[ WriteIndexVal ] = 0;
          // write to head of memory
          WriteIndexVal = 0;
        }
      }
    }
  }

  // store message to memory
  if ( WriteIndexVal != -1 )
  {
    // WriteIndexPending for other task if our task be preemptived
    Queue->WriteIndex = WriteIndexVal;
    Queue->MessageCount++;
    Memory[ WriteIndexVal ] = -Size; // SizePending for other task
    Queue->InProgressCount++;

    osRestoreInterrupt( Value );
    //
    memcpy( &Memory[ WriteIndexVal + 4 ], Message, Size );
    //
    Value = osDisableInterrupt( );
    Memory[ WriteIndexVal ] = Size; // Size for this message
    Queue->InProgressCount--;
  }

  osRestoreInterrupt( Value );
  return ( WriteIndexVal != -1 );
}

// Retrieves a message from a Queue
// not allowed while the queue is in use.
uint32_t Q_Read( QUEUE * Queue, void ** Message )
{
  uint32_t MessageSize = 0;
  uint32_t * Memory = (uint32_t *) Queue->Memory;

  uint32_t Value = osDisableInterrupt( );
  if ( ( Queue->IsUsing == 0 ) && ( Queue->MessageCount ) )
  {
    MessageSize = Memory[ Queue->ReadIndex ];
    *Message = (void *) ( (uint32_t) ( &Memory[ Queue->ReadIndex ] ) + 4 );
    Queue->IsUsing = 1;
  }
  osRestoreInterrupt( Value );
  return MessageSize;
}

// Deletes the last retrieved message in a Queue.
void Q_Purge( QUEUE * Queue )
{
  uint32_t Value = osDisableInterrupt( );
  if ( Queue->IsUsing )
  {
    uint32_t * Memory = (uint32_t *) Queue->Memory;
    uint32_t MessageSize = 4 + ALIGN_UP( Memory[ Queue->ReadIndex ], 4 );
    Queue->MessageCount--;
    uint32_t NextReadIndexVal = Queue->ReadIndex + MessageSize;
    Queue->ReadIndex = NextReadIndexVal;
    if ( Queue->Capacity - NextReadIndexVal < 5 )
      Queue->ReadIndex = 0;
    else if ( Queue->MessageCount )
    {
      // Marked by Q_Write(), Next readable message at head of memory
      if ( Memory[ NextReadIndexVal ] == 0 )
        Queue->ReadIndex = 0;
    }
    Queue->IsUsing = 0;
  }
  osRestoreInterrupt( Value );
}

Queue and Message

时间: 2024-11-26 06:34:43

Queue and Message的相关文章

延迟发送550 4.4.7 QUEUE.Expired; message expired

最近有用户反映发送外网邮件偶尔退信,内容为延迟发送550 4.4.7 QUEUE.Expired; message expired. 环境为Exchange 2007 , 邮件路由为分公司HUB服务器---总部HUB服务器---SMG/EDGE. 故障现象为发送外网邮件部分不能发送成功,经过测试发现目标为同一邮箱,进行多次发送也会出现个别不成功现象,并对多个外网邮箱进(qq,126,163)行了测试,结果相同 查询对方邮箱的MX记录可正常解析. 检查队列发现分公司有2台HUB队列里有重试邮件,下

ZOJ2724_Windows Message Queue(STL/优先队列)

解题报告 题意: 看输入输出就很明白. 思路: 优先队列. #include <algorithm> #include <iostream> #include <cstring> #include <cmath> #include <queue> #include <vector> #include <cstdio> #include <map> using namespace std; struct node

Top 10 Uses For A Message Queue

We've been working with, building, and evangelising message queues for the last year, and it's no secret that we think they're awesome. We believe message queues are a vital component to any architecture or application, and here are ten reasons why:

MSMQ(Microsoft Message Queue)

http://www.cnblogs.com/sk-net/archive/2011/11/25/2232341.html 利用 MSMQ(Microsoft Message Queue),应用程序开发人员可以通过发送和接收消息方便地与应用程序进行快速可靠的通信.消息处理为您提供了有保障的消息传递和执行许多业务处理的可靠的防故障方法. MSMQ与XML Web Services和.Net Remoting一样,是一种分布式开发技术.但是在使用XML Web Services或.Net Remot

Android开发:Handler异步通信机制全面解析(包含Looper、Message Queue

前言 最近刚好在做关于异步通信的需求,那么,今天我们来讲解下Android开发中的Handler异步通信传递机制(包括Looper.Message Queue) 目录 定义 Android提供的一套消息传递机制 作用 用于实现子线程对UI线程的更新,实现异步消息的处理: - 在新启动的线程中发送消息 - 在主线程中获取并处理信息 为什么要用Handler 在安卓开发中: - 为了保证Android的UI操作是线程安全的,Android规定了只允许UI线程修改Activity里的UI组件: - 但

Android Message Handling Mechanism

转自:http://solarex.github.io/blog/2015/09/22/android-message-handling-mechanism/ Android is a message driven, message driven several elements: The message says: Message Message queue: MessageQueue The news cycle, remove the message processing for circ

[Android源代码分析]Android消息机制,Handler,Message,Looper,MessageQueue

最近准备把Android源码大致过一遍,不敢私藏,写出来分享给大家,顺便记录一下自己的学习感悟.里面一定有一些错误的地方,希望广大看客理解理解. 网上也有不少分析文章,这里我尽量分析的更加细致详尽.不留死角. 一.核心循环体:Looper.loop(); 我们知道,在线程run()中Looper.prepare();Looper.looper().之后这个线程就是一个HandlerThread了.我们可以通过Handler在另外一个线程中(自己也可以)向这个线程发送消息,在这个线程中处理消息.

System and method for assigning a message

A processor of a plurality of processors includes a processor core and a message?manager. The message?manager?is in communication with the processor core. The message?manager?to receive a message from a second processor of the plurality of processors

Handler和Message以及Looper之间的三角关系

说到Handler想必大家都经常用到,在非UI线程更新UI那可是利器,用起来也非常容易上手 从使用上来说,我们只需要关注sendMessage和handleMessage即可 所以我们先从Handler和Message来说起,先看一小段代码 public static final int UPDATE_TEXT_VIEW = 0; public TextView mResultTextView = null; // new 一个 Handler 对象, 以内部类的方式重写 handleMessa