【SmartOS】轻量级多任务调度系统

SmartOS是一个完全由新生命团队设计的嵌入式操作系统,主要应用于智能家居、物联网、工业自动化控制等领域。

ARM Cortex-M系列微处理器几乎全都做成单核心,对于业务逻辑较复杂的物联网就显得难以使用,因此SmartOS设计了两个多任务调度系统:
1,多线程调度,重量级,逼近PC操作系统多线程用法。使用上需要特别小心,要合理分配每一个线程的栈空间大小,任务越多越容易出问题
2,大循环,轻量级。每个任务注册一个函数指针,然后由主线程轮询各个任务函数,轮流执行

本文主要讲解第二种,轻量级多任务调度系统。

TaskScheduler是任务调度中心,Task表示单个任务。
SmartOS启动后会进入C/C++标准的main函数,在这里需要初始化各个模块,各个模块在初始化的时候,通过Sys.AddTask向系统注册任务函数。
一切就绪以后,在main最后一行,使用Sys.Start()进入大循环,开始调度。
Sys.Start实际上调用TaskScheduler.Start,从后面代码可以看出,这个Start内部有一个死循环。

每一个任务都需要指定4大参数:函数指针、回调参数、开始时间、调度周期。
调度中心将会维护并计算每一个任务的“下一次调度”时间。
显然,每一个任务函数获得CPU时间开始执行的时候,其它所有任务都没有机会执行。
原则上,当然是每个任务都尽量不要占用太长的时间。但是随着智能设备越来越复杂,应用系统也日渐复杂,为了满足需求,开发人员很希望在一个任务里面完成一系列连贯动作,获得跟PC上一样的体验,让任务假设自己独占CPU。
常规的大循环调度根本无法满足以上要求。

我们在这个基础上做了一点点改进,允许某个任务在休眠等待的时候,分出时间去调度其它函数。
例如,A、B、C多个任务正在工作。
其中A是主要业务逻辑,B是以太网驱动,定时询问网卡要数据。
A里面有一个功能,需要向服务器发送一个指令,然后等待响应。
如果这个时候A阻塞CPU,它永远也拿不到响应数据,即使响应数据已经到来!
因为CPU被A独占了,B没有机会去问网卡要数据,也就不能把数据交给A。
我们把A的等待做一点点调整,A在调用Sys.Sleep等待一定时间的时候,调度中心不要浪费了这点时间,安排去调度其它任务,那么B就有机会执行,网络响应数据上冒到A业务附近的函数,最终被A获取,达到业务需求。

头文件

#ifndef __Task_H__
#define __Task_H__

#include "Sys.h"
#include "List.h"

class TaskScheduler;

// 任务
class Task
{
private:
        TaskScheduler* _Scheduler;

        friend class TaskScheduler;

        Task(TaskScheduler* scheduler);

public:
        uint        ID;                        // 编号
        Action        Callback;        // 回调
        void*        Param;                // 参数
        long        Period;                // 周期us
        ulong        NextTime;        // 下一次执行时间
        uint        Times;                // 执行次数
        uint        CpuTime;        // 总耗费时间
        uint        SleepTime;        // 当前睡眠时间
        uint        Cost;                // 平均执行时间
        bool        Enable;                // 是否启用
        byte        Reversed[3];// 保留,避免对齐问题

        //~Task();

        void ShowStatus();        // 显示状态
};

// 任务调度器
class TaskScheduler
{
private:
        FixedArray<Task, 32> _Tasks;
        uint _gid;        // 总编号

        friend class Task;

public:
        string        Name;                // 系统名称
        int                Count;                // 任务个数
        Task*        Current;        // 正在执行的任务
        bool        Running;        // 是否正在运行
        byte        Reversed[3];// 保留,避免对齐问题

        TaskScheduler(string name = NULL);
        ~TaskScheduler();

        // 创建任务,返回任务编号。dueTime首次调度时间us,period调度间隔us,-1表示仅处理一次
        uint Add(Action func, void* param, ulong dueTime = 0, long period = 0);
        void Remove(uint taskid);

        void Start();
        void Stop();
        // 执行一次循环。指定最大可用时间
        void Execute(uint usMax);

        static void ShowStatus(void* param);        // 显示状态

    Task* operator[](int taskid);
};

#endif

源代码

#include "Task.h"

/*

*/

Task::Task(TaskScheduler* scheduler)
{
        _Scheduler = scheduler;

        Times                = 0;
        CpuTime                = 0;
        SleepTime        = 0;
        Cost                = 0;
        Enable                = true;
}

/*Task::~Task()
{
        if(ID) _Scheduler->Remove(ID);
}*/

// 显示状态
void Task::ShowStatus()
{
        debug_printf("Task::Status 任务 %d [%d] 执行 %dus 平均 %dus\r\n", ID, Times, CpuTime, Cost);
}

TaskScheduler::TaskScheduler(string name)
{
        Name = name;

        _gid = 1;

        Running = false;
        Current        = NULL;
        Count = 0;
}

TaskScheduler::~TaskScheduler()
{
        Current = NULL;
        _Tasks.DeleteAll().Clear();
}

// 创建任务,返回任务编号。dueTime首次调度时间us,period调度间隔us,-1表示仅处理一次
uint TaskScheduler::Add(Action func, void* param, ulong dueTime, long period)
{
        Task* task = new Task(this);
        task->ID = _gid++;
        task->Callback = func;
        task->Param = param;
        task->Period = period;
        task->NextTime = Time.Current() + dueTime;

        Count++;
        _Tasks.Add(task);

#if DEBUG
        // 输出长整型%ld,无符号长整型%llu
        //debug_printf("%s添加任务%d 0x%08x FirstTime=%lluus Period=%ldus\r\n", Name, task->ID, func, dueTime, period);
        if(period >= 1000)
        {
                uint dt = dueTime / 1000;
                int  pd = period > 0 ? period / 1000 : period;
                debug_printf("%s::添加任务%d 0x%08x FirstTime=%ums Period=%dms\r\n", Name, task->ID, func, dt, pd);
        }
        else
                debug_printf("%s::添加任务%d 0x%08x FirstTime=%uus Period=%dus\r\n", Name, task->ID, func, (uint)dueTime, (int)period);
#endif

        return task->ID;
}

void TaskScheduler::Remove(uint taskid)
{
        int i = -1;
        while(_Tasks.MoveNext(i))
        {
                Task* task = _Tasks[i];
                if(task->ID == taskid)
                {
                        _Tasks.RemoveAt(i);
                        debug_printf("%s::删除任务%d 0x%08x\r\n", Name, task->ID, task->Callback);
                        // 首先清零ID,避免delete的时候再次删除
                        task->ID = 0;
                        delete task;
                        break;
                }
        }
}

void TaskScheduler::Start()
{
        if(Running) return;

#if DEBUG
        //Add(ShowTime, NULL, 2000000, 2000000);
        Add(ShowStatus, this, 10000000, 30000000);
#endif
        debug_printf("%s::准备就绪 开始循环处理%d个任务!\r\n\r\n", Name, Count);

        Running = true;
        while(Running)
        {
                Execute(0xFFFFFFFF);
        }
        debug_printf("%s停止调度,共有%d个任务!\r\n", Name, Count);
}

void TaskScheduler::Stop()
{
        debug_printf("%s停止!\r\n", Name);
        Running = false;
}

// 执行一次循环。指定最大可用时间
void TaskScheduler::Execute(uint usMax)
{
        ulong now = Time.Current() - Sys.StartTime;        // 当前时间。减去系统启动时间,避免修改系统时间后导致调度停摆
        ulong min = UInt64_Max;                // 最小时间,这个时间就会有任务到来
        ulong end = Time.Current() + usMax;

        // 需要跳过当前正在执行任务的调度
        //Task* _cur = Current;

        int i = -1;
        while(_Tasks.MoveNext(i))
        {
                Task* task = _Tasks[i];
                //if(task && task != _cur && task->Enable && task->NextTime <= now)
                if(task && task->Enable && task->NextTime <= now)
                {
                        // 不能通过累加的方式计算下一次时间,因为可能系统时间被调整
                        task->NextTime = now + task->Period;
                        if(task->NextTime < min) min = task->NextTime;

                        ulong now2 = Time.Current();
                        task->SleepTime = 0;

                        Current = task;
                        task->Callback(task->Param);
                        Current = NULL;

                        // 累加任务执行次数和时间
                        task->Times++;
                        int cost = (int)(Time.Current() - now2);
                        if(cost < 0) cost = -cost;
                        //if(cost > 0)
                        {
                                task->CpuTime += cost - task->SleepTime;
                                task->Cost = task->CpuTime / task->Times;
                        }

#if DEBUG
                        if(cost > 500000) debug_printf("Task::Execute 任务 %d [%d] 执行时间过长 %dus 睡眠 %dus\r\n", task->ID, task->Times, cost, task->SleepTime);
#endif

                        // 如果只是一次性任务,在这里清理
                        if(task->Period < 0) Remove(task->ID);
                }

                // 如果已经超出最大可用时间,则退出
                if(!usMax || Time.Current() > end) return;
        }

        // 如果有最小时间,睡一会吧
        now = Time.Current();        // 当前时间
        if(min != UInt64_Max && min > now)
        {
                min -= now;
#if DEBUG
                //debug_printf("TaskScheduler::Execute 等待下一次任务调度 %uus\r\n", (uint)min);
#endif
                //// 最大只允许睡眠1秒,避免Sys.Delay出现设计错误,同时也更人性化
                //if(min > 1000000) min = 1000000;
                //Sys.Delay(min);
                Time.Sleep(min);
        }
}

// 显示状态
void TaskScheduler::ShowStatus(void* param)
{
        TaskScheduler* ts = (TaskScheduler*)param;

        int i = -1;
        while(ts->_Tasks.MoveNext(i))
        {
                Task* task = ts->_Tasks[i];
                if(task) task->ShowStatus();
        }
}

Task* TaskScheduler::operator[](int taskid)
{
        int i = -1;
        while(_Tasks.MoveNext(i))
        {
                Task* task = _Tasks[i];
                if(task && task->ID == taskid) return task;
        }

        return NULL;
}

外部注册函数

// 任务
#include "Task.h"
// 任务类
TaskScheduler* _Scheduler;

// 创建任务,返回任务编号。priority优先级,dueTime首次调度时间us,period调度间隔us,-1表示仅处理一次
uint TSys::AddTask(Action func, void* param, ulong dueTime, long period)
{
        // 屏蔽中断,否则可能有线程冲突
        SmartIRQ irq;

        if(!_Scheduler) _Scheduler = new TaskScheduler("系统");

        return _Scheduler->Add(func, param, dueTime, period);
}

void TSys::RemoveTask(uint taskid)
{
        assert_ptr(_Scheduler);

        _Scheduler->Remove(taskid);
}

void TSys::SetTask(uint taskid, bool enable)
{
        Task* task = (*_Scheduler)[taskid];
        if(task) task->Enable = enable;
}

void TSys::Start()
{
        if(!_Scheduler) _Scheduler = new TaskScheduler("系统");

#if DEBUG
        //AddTask(ShowTime, NULL, 2000000, 2000000);
#endif
        if(OnStart)
                OnStart();
        else
                _Scheduler->Start();
}

void TSys::StartInternal()
{
        _Scheduler->Start();
}

void TSys::Stop()
{
        _Scheduler->Stop();
}

void TimeSleep(uint us)
{
        // 在这段时间里面,去处理一下别的任务
        if(_Scheduler && (!us || us >= 1000))
        {
                // 记录当前正在执行任务
                Task* task = _Scheduler->Current;

                ulong start = Time.Current();
                // 1ms一般不够调度新任务,留给硬件等待
                ulong end = start + us - 1000;
                // 如果休眠时间足够长,允许多次调度其它任务
                int cost = 0;
                while(true)
                {
                        ulong start2 = Time.Current();

                        _Scheduler->Execute(us);

                        ulong now = Time.Current();
                        cost += (int)(now - start2);

                        // us=0 表示释放一下CPU
                        if(!us) return;

                        if(now >= end) break;
                }

                if(task)
                {
                        _Scheduler->Current = task;
                        task->SleepTime += cost;
                }

                cost = (int)(Time.Current() - start);
                if(cost > 0) return;

                us -= cost;
        }
        if(us) Time.Sleep(us);
}

void TSys::Sleep(uint ms)
{
        // 优先使用线程级睡眠
        if(OnSleep)
                OnSleep(ms);
        else
        {
#if DEBUG
                if(ms > 1000) debug_printf("Sys::Sleep 设计错误,睡眠%dms太长,超过1000ms建议使用多线程Thread!", ms);
#endif

                TimeSleep(ms * 1000);
        }
}

void TSys::Delay(uint us)
{
        // 如果延迟微秒数太大,则使用线程级睡眠
        if(OnSleep && us >= 2000)
                OnSleep((us + 500) / 1000);
        else
        {
#if DEBUG
                if(us > 1000000) debug_printf("Sys::Sleep 设计错误,睡眠%dus太长,超过1000ms建议使用多线程Thread!", us);
#endif

                TimeSleep(us);
        }
}
时间: 2024-10-14 17:41:39

【SmartOS】轻量级多任务调度系统的相关文章

在PHP中使用协程实现多任务调度

PHP5.5一个比较好的新功能是加入了对迭代生成器和协程的支持.对于生成器,PHP的文档和各种其他的博客文章已经有了非常详细的讲解.协程相对受到的关注就少了,因为协程虽然有很强大的功能但相对比较复杂, 也比较难被理解,解释起来也比较困难. 这篇文章将尝试通过介绍如何使用协程来实施任务调度, 来解释在PHP中的协程. 我将在前三节做一个简单的背景介绍.如果你已经有了比较好的基础,可以直接跳到“协同多任务处理”一节. 迭代生成器 生成器也是一个函数,不同的是这个函数的返回值是依次输出,而不是只返回一

hadoop工作流调度系统

常见工作流调度系统 Oozie, Azkaban, Cascading, Hamake 各种调度工具特性对比 特性 Hamake Oozie Azkaban Cascading 工作流描述语言 XML XML (xPDL based) text file with key/value pairs Java API 依赖机制 data-driven explicit explicit explicit 是否要web容器 No Yes Yes No 进度跟踪 console/log messages

单片机多任务调度

单片机多任务调度 mcu由于内部资源的限制,软件设计有其特殊性,程序一般没有复杂的算法以及数据结构,代码量也不大, 通常不会使用OS (Operating System),  因为对于一个只有 若干K ROM, 一百多byte RAM 的 mcu 来说,一个简单OS  也会吃掉大部分的资源. 对于无 os 的系统,流行的设计是主程序(主循环 ) + (定时)中断,这种结构虽然符合自然想法,不过却有很多不利之处,首先是中断可以在主程序的任何地方发生,随意打断主程序.其次主程序与中断之间的耦合性(关

工作流调度系统Azkaban的简介和使用

1 概述 1.1 为什么需要工作流调度系统 l 一个完整的数据分析系统通常都是由大量任务单元组成: shell脚本程序,java程序,mapreduce程序.hive脚本等 l 各任务单元之间存在时间先后及前后依赖关系 l 为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行: 例如,我们可能有这样一个需求,某个业务系统每天产生20G原始数据,我们每天都要对其进行处理,处理步骤如下所示: 1.  通过Hadoop先将原始数据同步到HDFS上: 2.  借助MapReduce计算框

EasyScheduler调度系统的架构原理及实现思路

系统架构设计 在对调度系统架构说明之前,我们先来认识一下调度系统常用的名词 1.名词解释 DAG: 全称Directed Acyclic Graph,简称DAG.工作流中的Task任务以有向无环图的形式组装起来,从入度为零的节点进行拓扑遍历,直到无后继节点为止.举例如下图: 流程定义:通过拖拽任务节点并建立任务节点的关联所形成的可视化DAG 流程实例:流程实例是流程定义的实例化,可以通过手动启动或定时调度生成 任务实例:任务实例是流程定义中任务节点的实例化,标识着具体的任务执行状态 任务类型:

黑马程序员-java-银行业务调度系统《十一》

                   --Java培训.Android培训.iOS培训..Net培训.期待与您交流! -- 1.了解银行业务调度系统的工作流程与原理 银行里有三种窗口,一种是普通窗口,一种是VIP窗口,一种是快速窗口,所有客户都要领号排队,当VIP窗口或快速窗口空闲时,可以为普通窗口服务. 2.银行业务调度系统的项目需求 · 银行内有6个业务窗口,1 - 4号窗口为普通窗口,5号窗口为快速窗口,6号窗口为VIP窗口. · 有三种对应类型的客户:VIP客户,普通客户,快速客户(办理

电梯调度系统(界面由C图形库编绘)

电梯调度系统编程 1.编程题目 电梯调度. 2.结对编程组员 黄冠译,刘畅. 3.编程语言 C语言图形库. 4.题目要求: 5.代码运行及结果调试: ① 运行界面为C++图形库支持,开始运行的初始界面如图,且默认所有电梯初始状态都为1楼,此时不分奇偶层: ② 我设置了鼠标响应事件,左边上下箭头为当前楼层有人要上或下的按钮,可以用鼠标直接点击响应,点击后要输入有多少人在此楼层等待,示例点击5楼,输入15人,如图所示: ③ 输入完毕后,电梯会根据单双层或全部楼层4个电梯的忙碌状态调度一个电梯过去,第

调度系统

---恢复内容开始--- 一.目的和要求 1. 实验目的 (1)加深对作业调度算法的理解: (2)进行程序设计的训练. 2.实验要求 用高级语言编写一个或多个作业调度的模拟程序. 单道批处理系统的作业调度程序.作业一投入运行,它就占有计算机的一切资源直到作业完成为止,因此调度作业时不必考虑它所需要的资源是否得到满足,它所运行的时间等因素. 作业调度算法: 1) 采用先来先服务(FCFS)调度算法,即按作业到达的先后次序进行调度.总是首先调度在系统中等待时间最长的作业. 2) 短作业优先 (SJF

Hadoop 开源调度系统zeus(二)

紧跟之前Hadoop 开源调度系统zeus(一) 本节主要介绍一下zeus的架构: 先给一个zeus的架构图 无论Master还是Worker都有一套WEB UI,无论从哪个上面去看,看到的结果都是一样的,实际上一般仅仅看主 Master:调度内核,在启动时启动一个TCP服务,同一时候将全部任务读到内存中,在任务能够运行时,加到运行队列,下发到client Worker:启动后连接Master,并定时向Master发送心跳,当收到Master的任务后,封装任务运行shell,并将任务运行结果通知