上周发来个运营需求:服务器得接收各类运营消息,并记录下来(二进制文件、入库)。
我们的消息处理是单线程轮询取队列的方式,如在响应函数中直接调IO等耗时操作,整个处理线程都会被阻塞。所以设计了这个异步日志模块。核心代码如下:
//如果写得非常快,瞬间把两片buf都写满了,会阻塞在awakeChan处,等writeLoop写完log即恢复 //两片buf的好处:在当前线程即可交换,不用等到后台writeLoop唤醒 func (self *AsyncLog) Append(pdata []byte) { isAwakenWriteLoop := false self.Lock() { self.curBuf = append(self.curBuf, pdata) if len(self.curBuf) == cap(self.curBuf) { _swapBuf(&self.curBuf, &self.spareBuf) isAwakenWriteLoop = true } } self.Unlock() if isAwakenWriteLoop { self.awakeChan <- true //Notice:不能放在临界区 } } func (self *AsyncLog) _writeLoop(bufSize int) { bufToWrite1 := make([][]byte, 0, bufSize) bufToWrite2 := make([][]byte, 0, bufSize) for { <-self.awakeChan //没人写数据即阻塞:超时/buf写满,唤起【这句不能放在临界区,否则死锁】 self.Lock() { //此时bufToWrite为空,交换 _swapBuf(&bufToWrite1, &self.spareBuf) _swapBuf(&bufToWrite2, &self.curBuf) } self.Unlock() //将bufToWrite中的数据全写进log,并清空 self.writeLogFunc(bufToWrite1, bufToWrite2) _clearBuf(&bufToWrite1) _clearBuf(&bufToWrite2) } } func (self *AsyncLog) _timeOutWrite() { for { time.Sleep(Flush_Interval * time.Second) self.awakeChan <- true } }
Append()给逻辑线程调用的,负责填充buffer,buffer被写满即唤醒后台写线程_writeLoop进行IO。共有四片buffer,在前后台之间交换,提高处理效率。
需要额外注意的地方——锁的使用。保证线程安全的前提下,尽量减少临界区代码。
线程安全:注意锁重叠。感觉这是踩坑经验练粗来的~,多看别人的竞态分析,很有帮助。
附上本段代码的race condition:
1、"go chan"内部也是锁实现的,chan操作不要放在临界区,否则就锁中套锁了,极其危险。
2、比如连续两次触发buf被写满,第二次的chan会阻塞,挂起Append()的线程
若chan位于临界区内则还占用着Mutex
后台writeLoop被唤醒时,同样要访问临界区,就被挂起了
然后两线程此时就都挂着咯~
减少临界区:多利用栈变量,转移共享变量的资源,这样真正的操作可以放在临界区之外。
还有个timeout线程,间隔5秒唤醒一次_writeLoop写日志,免得Append()调用不多,数据一直不落地。
【其它问题】
1、强关进程,内存中的缓冲数据很可能丢失。
2、逻辑如果写的非常快,Append()可能变成阻塞调用。
a)这个可以搞定:前后台各增加一个bufferList;前台buffer被写满就仍进bufferList,两片buff都满了new个新的,保证前台一直有buffer可用。
b)后台临界增加“交换前台的bufferList”,IO完毕后清空后台bufferList。
【c++】
后续打算用c++重写,用c++的话就不必timeout线程了,用条件变量替代“go chan”,自带超时,仅需一条后台线程即可。
得额外处理new出buffer的回收:初始的四片buffer(curBuf、spareBuf、bufToWrite1、bufToWrite2)连同new出的,其内存块均可能被移至bufferList。
为保证初始的四片buffer始终在bufferList的前四个位置,buffer的swap应换为move:
1)前台:curBuf移进List;spareBuf非空则curBuf = move(spareBuf);反之curBuf = new()
2)后台:交换bufferList,curBuf移至后台bufferList,curBuf = move(bufToWrite1),spareBuf = move(bufToWrite2)
3)IO完毕后,若bufToWrite1、bufToWrite2为nullptr,将后台bufferList头部移进去,其它的回收。
4)用shared_ptr包装buffer,方便自动回收。