仿的live555 C++的延时队列
#include <rtthread.h>
extern struct dqueue_ops dqops;
struct dqueue_entry
{
rt_tick_t s_time;
void *pframe;
struct dqueue_entry *pnext;
};
struct dqueue_handle
{
struct dqueue_entry priv;
rt_uint8_t s_init;
rt_uint8_t s_size;
rt_tick_t s_last_sync_tick;
//struct dqueue_entry *pfront;
//struct dqueue_entry *prear;
struct dqueue_ops *s_ops;
};
struct dqueue_ops
{
// void (*enqueue)(struct dqueue_handle*,struct dqueue_entry *);
// void (*enqueue_by_ptr)(struct dqueue_handle*,struct dqueue_entry *,struct dqueue_entry *);
void (*enqueue_by_delay)(struct dqueue_handle* ,void *,rt_tick_t);
void (*dequeue)(struct dqueue_handle* ,void (*task)(void *));
void (*synchronize)(struct dqueue_handle*);
//rt_uint8_t (*isempty)(struct dqueue_handle*);
};
void dq_enqueue_by_delay(struct dqueue_handle* dq,void *p,rt_tick_t delta_time);
void dq_synchronize(struct dqueue_handle* dq);
void dq_dequeue(struct dqueue_handle* dq,void (*task)(void *));
struct dqueue_handle* dq_init(struct dqueue_ops* pops);
struct dqueue_entry* dq_head(struct dqueue_handle* dq);
/*****delta_queue.c*****/
/*file:delta_queue.c
* This file is part of A2dp.
*
*Change Logs:
*Date Author Notes
*2015-12-1 Raid create
*2015-12-5 Raid merge enqueue
*/
#include "delta_queue.h"
#include <rtthread.h>
#define NULL 0
#define INT_MAX 0x7FFFFFFF
struct dqueue_entry;
struct dqueue_handle;
struct dqueue_ops dqops =
{
dq_enqueue_by_delay,
dq_dequeue,
dq_synchronize
};
struct dqueue_handle* dq_init(struct dqueue_ops* pops)
{
struct dqueue_handle* dq
= (struct dqueue_handle* )rt_malloc(sizeof(struct dqueue_handle));
dq->s_init = 1;
dq->s_size = 0;
dq->priv.s_time = INT_MAX;
dq->priv.pnext = (struct dqueue_entry *)dq;
dq->s_last_sync_tick = rt_tick_get();
dq->s_ops = pops;
return dq;
}
rt_uint8_t dq_is_empty(struct dqueue_handle* dq)
{
return dq->s_size==0;
}
void dq_enqueue_by_delay(struct dqueue_handle* dq,void *p,rt_tick_t delta_time)
{
struct dqueue_entry* new_entry = NULL;
struct dqueue_entry* index_entry;
if (!dq->s_init)
return ;
new_entry = (struct dqueue_entry*)rt_malloc(sizeof(struct dqueue_entry));
new_entry->pframe = p;
index_entry = dq_head(dq);
dq->s_size++;
while(delta_time>=index_entry->pnext->s_time)
{
delta_time -= index_entry->pnext->s_time;
index_entry = index_entry->pnext;
}
new_entry->s_time = delta_time;
index_entry->pnext->s_time -= new_entry->s_time;
new_entry->pnext = index_entry->pnext;
index_entry->pnext = new_entry;
return ;
}
struct dqueue_entry* dq_head(struct dqueue_handle* dq)
{
return &dq->priv;
}
/*******************ops func******************************/
void dq_synchronize(struct dqueue_handle* dq)
{
rt_tick_t time_now = rt_tick_get();
rt_tick_t time_since_lastsync = 0;
struct dqueue_entry *current_entry = dq->priv.pnext;
if (time_now<dq->s_last_sync_tick)
{
//the system clock has apparently gone back in time
dq->s_last_sync_tick = time_now;
return ;
}
time_since_lastsync = time_now - dq->s_last_sync_tick ;
dq->s_last_sync_tick = time_now;
while(time_since_lastsync>current_entry->s_time)
{
time_since_lastsync -= current_entry->s_time;
current_entry->s_time = 0; //must do without delay
current_entry = current_entry->pnext;
}
current_entry ->s_time -= time_since_lastsync;
}
void dq_dequeue(struct dqueue_handle* dq,void (*taskfunc)() )
{
struct dqueue_entry *current_entry = dq->priv.pnext;
struct dqueue_entry *temp_entry = NULL;
if(!dq_is_empty(dq))
{
while(current_entry->s_time==0)
{
taskfunc(current_entry->pframe);
temp_entry = current_entry;
current_entry = current_entry->pnext;
rt_free(temp_entry);
dq->priv.pnext = current_entry;
dq->s_size--;
}
}
}