嵌入式软件架构
软硬件模型
不管是通信互联系统,图形图像,音频视频,一个满足某种需求的业务应用,都常常需要协同使用硬件和软件来配合完成。硬件快,天然的具备处理数字的或模拟的信号的能力;软件灵活,可配置可定制可更新。那些固定的算法或已经成为业界标准的成熟规格,不像面向用户的使用场景一样会频繁修改,但对性能指标有高要求,比如MPEG编解码,颜色空间转换,还有那些离不开硬件实现的协议物理层(比如有对模拟信号的快速处理),你很难想象现在手机里是软件在做视频流解码,OPENGL渲染或者wifi的物理层信道编码调制等,这些功能单一的工作就该交给硬件(前面和后面所指的硬件基本都是指ASIC)来实现。而软件可以用来处理业务复杂但对性能没有苛刻要求的工作,相对于前面的协议物理层,不再需要同模拟信号和高速数字信号编解码打交道的协议链路层其实已经可以交给软件来做了,当然交给硬件做更好,更不用说更往上的应用层。站在把硬件作为被不同应用独占的资源的角度上来看待,软件扮演的是资源分配的仲裁者,根据业务需求来把硬件分配到应用上。站在系统中数据的流动的角度来看,有些硬件单元是外界数据的消费者,有些硬件单元是数据的生产者,有些既是生产者也是消费者,数据本身只在硬件和硬件单元之间流动,它们就像铁轨或航线,承载着作为数据的火车或飞机,而软件就像一个铁路调度器或者空中管制者,根据应用需要,把不同的数据消费者和生产者连接起来。这其实和Unix系统的基于管道架构的过滤器的思想很像,比如当你在shell上执行:
ls *.c | xargs cat | grep "stdio"
操作系统管道就把ls, xargs, cat, grep几个程序连接起来,ls, cat和grep们就像是具备单一功能干具体活儿的硬件模块,它们通过定义好的接口即Unix上的标准输入和标准输出消费和生产数据(硬件通过定义良好的接口即寄存器或配置自己的参数行为,如上面作为程序参数的字符串”*.c”和‘“stdio”,或设置存储单元地址接收输入数据或存放输出数据,如标准输入输出缓冲),软件同样扮演操作系统的角色,它创建/销毁相应的进程(enable/disable相应的硬件模块),重定向进程输入输出(决定硬件工作的时间点)。
最简单的模式:阻塞式super loop
在分析一个嵌入式系统需求的时候,就要搞清楚系统的数据流是什么,哪些是生产者,它们或许是来自外界的输入数据(比如UART的FIFO IN DATA),或者是反映状态的变化(比如DMA buffer满了?BlutTooth PHY Transimitter空闲了,包传输结束了?),哪些是消费者(比如为一个buffer请求一个新的DMA write操作,为一个包请求一次新的BT PHY Transimitter 事务)。它们之间的”连接点”,即需要软件插手的地方是什么。一个这样的系统,如果够简单的话,总是可以直接实现成最简单的super loop的方式:
void main(void)
{
while(1) {
hw_poll_input();
sw_process_data();
hw_output();
}
}
上面程序三行代码分别代表查询input I/O有没有新的数据产生–>处理数据(如果需要的话)–>通知output I/O去消费数据。
如果系统的行为能用一个状态机来建模或流程图来表达的话,都可以实现成这种super loop。这里的变化无非就是作为模型的状态机可能会很复杂,比如software同input I/O和output I/O有复杂的协议和握手过程,增加的复杂性也是在这个最简单的super loop的骨架上完善其血肉。
非阻塞式状态机模式
这种代码是顺序且阻塞式的,比如在hw_poll_input()中,可能会有这样的代码:
void hw_poll_input()
{
while (!(read_reg(STATUS_REG) & BIT_DATA_ARRIVALLING));
...
}
因为阻塞,所以当系统无法简单地建模成一个状态机,而是存在多个并行的状态机时,就不能这样做了,(这里的状态机,其实也就是任务/task),因为程序会阻塞在其中一个任务里,无法响应另一个任务的随机事件。想象一下,如果一个web服务器只能一个一个的接受请求会怎么样?
非阻塞模式就是把每个任务的程序都拆分为非顺序执行的一段一段的有限状态机(FSM)形式,比如上面那个程序就写成
void statemachine()
{
switch(state)
{
case WAIT_INPUT_READY:
if(!(read_reg(STATUS_REG) & BIT_DATA_ARRIVALLING)) {
return;
} else {
write_reg(STATUS_REG,BIT_DATA_ARRIVALLING);
sw_process_data();
state = WAIT_OUTPUT_READY;
}
break;
case WAIT_OUTPUT_READY:
if (!(read_reg(STATUS_REG2)&BIT_OUT_FIFO_EMPTY)) {
return;
}
else {
write_reg(STATUS_REG2, BIT_OUT_FIFO_EMPTY);
write_reg(FIFO_DAT, value);
state = WAIT_INPUT_READY;
break;
}
}
}
两个状态WAIT_INPUT_READY和WAIT_OUTPUT_READY分别对应上面那个等价的顺序执行的程序的两个阻塞点。于是当“阻塞”发生时,就从函数返回。
注意上面的代码在每次查询到状态寄存器有事件(从0变到1)后,都有一句
write_reg(STATUS_REG,BIT_XXX);
去清除它。这是一种最常见的软件/硬件交互的协议,硬件在状态寄存器记录事件发生,软件查询状态寄存器发现事件发生,最后软件向硬件表明”我已经知道了”,硬件看到软件已经知道事件发生了,就可以接下来继续接收新的事件了。
这样,两个并行的状态机就可以”并行”运行了,任意一个被阻塞都不会影响到另一个,例如:
void main(void)
{
while(1) {
statemachine_a(); //process A
statemachine_b(); //process B
}
}
异步I/O模式
上面的代码都是同步I/O模式,即程序主动去询问I/O”事件发生了吗?”,这样的好处是程序的执行符合人的思维模式,同步的执行不会发生竞态。但坏处是很多cpu cycle被浪费在无谓的查询上。比如并行的状态机a,b,c,d….越来越多的时候,每次main loop都要查询一遍。任务在每个状态时需要查询的事件变得越来越多时,例如:
case WAIT_INPUT_READY:
if(!(read_reg(STATUS_REG) & BIT_EVENT1_ARRIVALLING) && !(read_reg(STATUS_REG) & BIT_EVENT2_ARRIVING) && !(read_reg(STATUS_REG) & BIT_EVENT3_ARRIVING) && timeout(TIMEOUT_500MS)) {
return;
} else {
}
这里只要event1, event2, event3和time out发生都可以触发到下一个状态,对cpu cycle的浪费也越严重(大部分时候都是在执行无意义的代码)。
异步I/O处理模式可以把主动的”轮询”变为被动的”按需调度”。异步I/O需要利用cpu的异步执行代码的能力,如中断,定时器。但同时异步执行的代码也是危险的,就像多线程引入了程序的不确定性一样,被中断的代码和中断中执行的代码如果访问到同样的资源(memory地址,硬件寄存器),都有可能造成这些资源被非原子性地读写而出现很难发现的错误。其次,在中断处理程序里最好不要做太多的事,最好只是简单处理一下现场,记录一下状态就返回,真正的干活儿还是交给工作任务来做,这里使用消息队列的模式来解耦中断处理程序和工作任务程序是一个很好的方法。
每个状态机执行代码仍然在main loop中执行,不过,它们不再主动的查询I/O事件,而是被动的被一个消息队列调度者来调用它们。状态机函数从主动查询的“拉”模式变为了被动调用的“推”模式。
void statemachine(msg_t Msg)
{
switch(state)
{
case WAIT_INPUT_READY:
if (msg_is(Msg, EVENT1_ARRIVING) || msg_is(Msg, EVENT2_ARRIVING) || msg_is(Msg, EVENT3_ARRIVING) || !msg_is(Msg, TIMEOUT_500MS)) {
sw_process_data();
state = WAIT_OUTPUT_READY;
}
else {
return;
}
}
}
根级的main函数可以就是那个消息队列调度者:
void main(void)
{
while(1) {
if (!msg_queue_empty()) {
msg_t Next_Msg = msg_queue_pop();
statemachine_a(Next_Msg);
statemachine_b(Next_Msg);
}
}
}
硬件设计时,最好让每当STATUS_REG的每个bit发生从0到1的状态变化时都有中断产生,那么,查询STATUS_REG状态的工作就放在中断函数中执行:
void IRQ(void)
{
if (read_reg(STATUS_REG) & BIT_EVENT1_ARRIVING) {
write_reg(STATUS_REG, BIT_EVENT1_ARRIVING);
msg_queue_push(new_msg(EVENT1_ARRIVING));
}
if (read_reg(STATUS_REG) & BIT_EVENT2_ARRIVING) {
write_reg(STATUS_REG, BIT_EVENT2_ARRIVING);
msg_queue_push(new_msg(EVENT2_ARRIVING));
}
if (read_reg(STATUS_REG) & BIT_EVENT3_ARRIVING) {
write_reg(STATUS_REG, BIT_EVENT3_ARRIVING);
msg_queue_push(new_msg(EVENT3_ARRIVING));
}
}
这里也是有竞态的,这里IRQ函数和main函数共享了对消息队列的操作,所以,应该在消息队列操作msg_queue_push和msg_queue_pop函数中使用disable IRQ等方法来保护队列指针不被破坏。
异步程序容易出错,特别是程序本身逻辑具备同步性的时候。
比如下面一个例子:
Created with Rapha?l 2.1.2任务任务硬件硬件状态寄存器状态寄存器dispatcherdispatcher阻塞在"等待新字节"状态接收到新字节,设状态为1中断产生查询到状态为1发送消息从"等待新字节"状态返回阻塞在其他状态清除状态为0FIFO空闲,准备接收新字节一段时间后接收到新字节,设状态为1中断产生查询到状态为1发送消息并不在"等待新字节"状态,消息被忽略清除状态为0FIFO空闲,准备接收新字节一段时间后从其他状态返回阻塞在"等待新字节"状态
上面这个例子,因为任务同中断(dispatcher)之间是异步的,所以导致了事件丢失。
看看当代码是同步时,即不用中断时是怎样的?
Created with Rapha?l 2.1.2任务任务硬件硬件状态寄存器状态寄存器阻塞在"等待新字节"状态保持轮询状态寄存器接收到新字节,设状态为1查询到状态为1清除状态为0从"等待新字节"状态返回,阻塞在其他状态FIFO空闲,准备接收新字节一段时间后接收到新字节,设状态为1一段时间后从其他状态返回,阻塞在"等待新字节"状态查询到状态为1事件得到处理
这里的区别就是,任务只应该在“等待新字节”状态时,才应该去“查询-清除”状态寄存器(继而通知硬件开始接收新的字节)。这里本质上应该是一个同步的过程。所以当程序是异步时,需要一个同步点来保证正确的sequence。这里因为和任务代码之间异步执行的是源头是IRQ处理函数,而一般硬件设计时,对每个能产生IRQ的状态应该都有一个Mask寄存器开关,只有当寄存器状态变化且相应的mask是打开时,才会有中断产生。这样,可以把前面那个任务的流程改为:
Created with Rapha?l 2.1.2任务任务硬件硬件状态寄存器状态寄存器dispatcherdispatcher阻塞在"等待新字节"状态接收到新字节,设状态为1中断产生查询到状态为1发送消息关闭Mask从"等待新字节"状态返回阻塞在其他状态清除状态为0FIFO空闲,准备接收新字节一段时间后接收到新字节,设状态为1Mask关,没有中断一段时间后从其他状态返回阻塞在"等待新字节"状态打开MaskMask开,产生中断中断产生查询到状态为1发送消息事件得到处理清除状态为0FIFO空闲,准备接收新字节
这样,从任务执行的结果来看,达到了和同步I/O时相同的结果。
如果一个事件在硬件设计时没有设计相应的中断呢?同样,可以使用另一个能为代码提供异步能力的工具–timer:把主动查询事件的工作程序放到timer处理函数里,定时调用,同样,需要用软件的Mask来为异步代码提供可能需要的”同步点”。
面向消息来建模每个任务,每个任务在自己关心的消息里实现响应的dispatch函数,任务的工作程序就只有在消息到来时才被执行。消息不光可以来自于硬件,也可以来自于其他任务。任务和任务之间有通信的需求,一些任务可能提供服务,一些调用它们的服务。这个调用关系可以简单的实现为函数调用,也可以稍微复杂点用消息来实现–服务使用者向服务提供者发送一个消息,就像代表硬件任务的中断程序向任务工作程序发送一个消息一样,都是告诉目的任务:这里有个你关心的事件发生了,处理一下吧。具体采用直接函数调用方式呢还是消息传递方式,得结合具体问题具体分析。如果把任务之间的一次调用也看做是被调用的任务的一个事件,同时这个事件正好同硬件事件之间又有依赖关系的话,可能会出现一个任务先处理后发生的事件,再处理先发生的事件的情况。比如下面的情况:
Created with Rapha?l 2.1.2硬件中断硬件中断消息队列消息队列任务A任务A任务B任务B硬件事件Dispatch(Msg)准备函数调用"请求TaskB服务"EBCallEBCall()EBCall()返回Dispatch(Msg)返回Dispatch(Msg)
这个例子里,硬件事件是首先被消息调度器分发到任务A再分发到任务B的,而任务A里的硬件事件分发函数里,正好产生了对任务B的请求服务的事件,但因为是使用直接函数调用来实现的,所以对于任务B来说,它其实是先看到”来自A的请求事件”,再看到”来自硬件的事件”的,而在时间点来说,”来自硬件的事件”是比”来自A的请求事件”要更早的。
假如同样用一个消息来抽象”任务A向任务B请求服务”这一事件,放到系统的任务队列里,因为队列的先入先出,可以保证硬件事件必然早于”A请求B”事件得到B的处理,像下面一样:
Created with Rapha?l 2.1.2硬件中断硬件中断消息队列消息队列任务A任务A任务B任务B硬件事件Msg_HWDispatch(Msg_HW)创造事件"请求TaskB服务"Msg_BMsg_BMsg_B在Msg_HW后面Dispatch(Msg_HW)Dispatch(Msg_B)Dispatch(Msg_B)
这样,任务B看到的事件顺序同事件发生的真实时间顺序一样。至于这个顺序对于任务B或者系统来说重不重要,就看具体问题了。
状态机和协程
前面讲了用状态机来构造工作任务,好处是不会阻塞,但缺点之一就是程序逻辑被打散。有一种东西叫”协程”,既可以具备状态机一样的”非阻塞”的优点,又可以像写阻塞式的线性程序一样顺序的思维。当然,缺点就是比状态机要消耗更多的资源–主要是栈,每个协程都要有自己独立的栈,来保留现场。当然,也有protothread这样的本质上仍然是全局单栈状态机但写起来像写协程一样的很富有想象力的库。这个留着以后来写了。