#ifndef __QUEUE_H__ #define __QUEUE_H__ #include <stdint.h> #include <stdlib.h> #include <string.h> /* * Queues can have more than one producer but only one consumer. * This means that more than one task or interrupt handler is allowed to store * new data in the queue but only one task is allowed toget data from the queue. * * Queues accept messages of various size. When putting a message into a queue, * the message size is passed as a parameter. * * Retrieving a message from the queue does not copy the message, but returns * a pointer to the message and its size. Thisenhances performance because the * data is copied only once, when the message is written into the queue. * * The retrieving function has to delete every message after processing it. * A new message can only be retrieved from the queue when the previous message * was deleted from the queue. * * |---------------------- size -------------------------| * | |------------ msgCnt ---------------| | * [ .... ] [ size : message ] [ size : message ] [ .... ] * | | | * |pData |offsetFirst |offsetLast * */ typedef struct TAG_QUEUE { uint8_t * Memory; uint32_t Capacity; uint32_t MessageCount; uint32_t ReadIndex; uint32_t WriteIndex; uint32_t IsUsing; uint32_t InProgressCount; } QUEUE; // Creates and initializes a message queue. QUEUE * Q_Create( uint32_t Capacity ); // Deletes a specific queue. void Q_Delete( QUEUE * Queue ); // Initializes a message queue. void Q_Init( QUEUE * Queue, uint8_t * Memory, uint32_t Capacity ); // Deletes the last retrieved message in a queue. void Q_Purge( QUEUE * Queue ); // Deletes all message in a queue. void Q_Clear( QUEUE * Queue ); // Returns the number of messages currently in a queue uint32_t Q_GetCount( QUEUE * Queue ); // Returns the first message size uint32_t Q_GetSize( QUEUE * Queue ); // Delivers information whether the queue is actually in use. // A queue must not be cleared or deleted when it is in use. uint32_t Q_IsUsing( QUEUE * Queue ); // Stores a new message of given size in a queue. uint32_t Q_Wirte( QUEUE * Queue, void * Message, uint32_t Size ); // Retrieves a message from a queue uint32_t Q_Read( QUEUE * Queue, void ** Message ); #endif /* __QUEUE_H__ */
#include "queue.h" #include "cmsis_os.h" #include "macro_misc.h" // Creates and initializes a message Queue. QUEUE * Q_Create( uint32_t Capacity ) { uint32_t Size = ALIGN_UP( sizeof(QUEUE), 4 ) + ALIGN_UP( Capacity, 4 ); QUEUE *Queue = (QUEUE *) osMalloc( Size, osWaitForever ); if ( Queue == NULL ) return NULL; uint8_t * Memory = // (uint8_t *) ( ( (uint32_t) ( Queue ) ) + ALIGN_UP( sizeof(QUEUE), 4 ) ); Q_Init( Queue, Memory, ALIGN_UP( Capacity, 4 ) ); return Queue; } // Deletes a specific Queue. // A Queue must not be cleared or deleted when it is in use. void Q_Delete( QUEUE * Queue ) { if ( Queue->IsUsing == 0 ) osFree( Queue ); } // Deletes all messages in a Queue. // A Queue must not be cleared or deleted when it is in use. void Q_Clear( QUEUE * Queue ) { if ( Queue->IsUsing == 0 ) Queue->MessageCount = 0; } // Initializes a message Queue. void Q_Init( QUEUE * Queue, uint8_t * Memory, uint32_t Capacity ) { int32_t Delta = (uint32_t) Memory & 3; if ( Delta ) { Delta -= 4; Capacity += Delta; Memory -= Delta; } memset( Queue, 0, sizeof(QUEUE) ); Queue->Capacity = Capacity; Queue->Memory = Memory; } // Returns the number of messages currently in a Queue uint32_t Q_GetCount( QUEUE * Queue ) { return Queue->MessageCount - Queue->InProgressCount; } // Returns the first message size uint32_t Q_GetSize( QUEUE * Queue ) { uint32_t MessageSize = 0; if ( Queue->MessageCount ) MessageSize = *(uint32_t *) ( &Queue->Memory[ Queue->ReadIndex ] ); return MessageSize; } // Delivers information whether the Queue is actually in use. // A Queue must not be cleared or deleted when it is in use. uint32_t Q_IsUsing( QUEUE * Queue ) { return Queue->IsUsing; } // Stores a new message of given size in a Queue. // 0 : Queue could not be stored (Queue is full). // 1 : Success; message stored. uint32_t Q_Write( QUEUE * Queue, void * Message, uint32_t Size ) { uint32_t ReadIndexVal; uint32_t WriteIndexPending; uint32_t WriteIndexVal; uint32_t MessageSize = 4 + ALIGN_UP( Size, 4 ); int32_t * Memory = (int32_t *) Queue->Memory; uint32_t Value = osDisableInterrupt( ); if ( Queue->MessageCount == 0 ) { // read next message from head of memory Queue->ReadIndex = 0; // Queue could not be stored (memory is full). WriteIndexVal = -1; if ( Queue->Capacity >= MessageSize ) WriteIndexVal = 0; } else { Memory = (int32_t *) Queue->Memory; WriteIndexPending = Queue->WriteIndex; int32_t SizePending = Memory[ WriteIndexPending ]; if ( SizePending < 0 ) { // other task is writting ... but it is preemptived by our task // WriteIndexPending has been updated // [ Last Queue ] [ --- Other Queue --- ] [ Our Mesage ] // | WriteIndexPending SizePending = -SizePending; } else { // [ Last Queue ] [ Our Mesage ] // | WriteIndexPending } // where our task will write ... WriteIndexVal = WriteIndexPending + 4 + ALIGN_UP( SizePending, 4 ); ReadIndexVal = Queue->ReadIndex; if ( ReadIndexVal >= WriteIndexVal ) { // [ Our Mesage ] [ Last Queue ] // |<------------>|ReadIndexVal // |WriteIndexVal if ( ReadIndexVal - WriteIndexVal < MessageSize ) WriteIndexVal = -1; } else { // [ Our Mesage ] [ Available Space ] // |WriteIndexVal |Capacity // |<------------------------------>| uint32_t sizeAvailableTail = Queue->Capacity - WriteIndexVal; if ( sizeAvailableTail < MessageSize ) { // try to write to head of memory // [ Our Mesage ] [ Last Queue ] // |<------------>|ReadIndexVal // |0 if ( ReadIndexVal < MessageSize ) WriteIndexVal = -1; else if ( sizeAvailableTail > 4 ) { // can not read message from tail of memory // Marker for Q_Purge() Memory[ WriteIndexVal ] = 0; // write to head of memory WriteIndexVal = 0; } } } } // store message to memory if ( WriteIndexVal != -1 ) { // WriteIndexPending for other task if our task be preemptived Queue->WriteIndex = WriteIndexVal; Queue->MessageCount++; Memory[ WriteIndexVal ] = -Size; // SizePending for other task Queue->InProgressCount++; osRestoreInterrupt( Value ); // memcpy( &Memory[ WriteIndexVal + 4 ], Message, Size ); // Value = osDisableInterrupt( ); Memory[ WriteIndexVal ] = Size; // Size for this message Queue->InProgressCount--; } osRestoreInterrupt( Value ); return ( WriteIndexVal != -1 ); } // Retrieves a message from a Queue // not allowed while the queue is in use. uint32_t Q_Read( QUEUE * Queue, void ** Message ) { uint32_t MessageSize = 0; uint32_t * Memory = (uint32_t *) Queue->Memory; uint32_t Value = osDisableInterrupt( ); if ( ( Queue->IsUsing == 0 ) && ( Queue->MessageCount ) ) { MessageSize = Memory[ Queue->ReadIndex ]; *Message = (void *) ( (uint32_t) ( &Memory[ Queue->ReadIndex ] ) + 4 ); Queue->IsUsing = 1; } osRestoreInterrupt( Value ); return MessageSize; } // Deletes the last retrieved message in a Queue. void Q_Purge( QUEUE * Queue ) { uint32_t Value = osDisableInterrupt( ); if ( Queue->IsUsing ) { uint32_t * Memory = (uint32_t *) Queue->Memory; uint32_t MessageSize = 4 + ALIGN_UP( Memory[ Queue->ReadIndex ], 4 ); Queue->MessageCount--; uint32_t NextReadIndexVal = Queue->ReadIndex + MessageSize; Queue->ReadIndex = NextReadIndexVal; if ( Queue->Capacity - NextReadIndexVal < 5 ) Queue->ReadIndex = 0; else if ( Queue->MessageCount ) { // Marked by Q_Write(), Next readable message at head of memory if ( Memory[ NextReadIndexVal ] == 0 ) Queue->ReadIndex = 0; } Queue->IsUsing = 0; } osRestoreInterrupt( Value ); }
Queue and Message
时间: 2024-11-26 06:34:43