nsq源码阅读笔记之nsqd(三)——diskQueue

diskQueuebackendQueue接口的一个实现。backendQueue的作用是在实现在内存go channel缓冲区满的情况下对消息的处理的对象。 
除了diskQueue外还有dummyBackendQueue实现了backendQueue接口。

对于临时(#ephemeral结尾)Topic/Channel,在创建时会使用dummyBackendQueue初始化backend, 
dummyBackendQueue只是为了统一临时和非临时Topic/Channel而写的,它只是实现了接口,不做任何实质上的操作, 
因此在内存缓冲区满时直接丢弃消息。这也是临时Topic/Channel和非临时的一个比较大的差别。 
每个非临时Topic/Channel,创建的时候使用diskQueue初始化backenddiskQueue的功能是将消息写入磁盘进行持久化, 
并在需要时从中取出消息重新向客户端投递。

diskQueue的实现在nsqd/disk_queue.go中。需要注意一点,查找diskQueue中的函数的调用可能不会返回正确的结果, 
因为diskQueue对外是以backendQueue形式存在,因此查找diskQueue的函数的调用情况时应当查找backendQueue中相应函数的调用。

diskQueue的创建和初始化

diskQueue的获得是通过newDiskQueue,该函数比较简单,通过传入的参数创建一个dispQueue, 
然后通过retrieveMetaData函数获取之前与该diskQueue相关联的Topic/Channel已经持久化的信息。最后启动ioLoop循环处理消息。

retrieveMetaData函数从磁盘中恢复diskQueue的状态。diskQueue会定时将自己的状态备份到文件中, 
文件名由metaDataFileName函数确定。retrieveMetaData函数同样通过metaDataFileName函数获得保存状态的文件名并打开。 
该文件只有三行,格式为%d\n%d,%d\n%d,%d\n,第一行保存着该diskQueue中消息的数量(depth), 
第二行保存readFileNumreadPos,第三行保存writeFileNumwritePos

这里不太理解的一个地方是d.depth通过一个临时变量去获取然后通过atomic.StoreInt64保存。个人觉得没有必要这么做。 
当然作者在nsqd: diskqueue corruption and depth accounting这个Pull Request中也提到:

I dont believe that this should be strictly necessary because retrieveMetaData is only ever called in NewDiskQueue and the ioLoopgoroutine is launched after that call (which according to the go memory model is safe).

However, I’m not 100% sure about interactions between the go memory model, go-routines, and the combined use of atomic and non-atomic operations (which is what this was relying on before this change… i.e. this was the only mutation of d.depth that was notusing atomic ops).

因此,这只是个比较保险的做法,并不一定意味着直接保存到d.depth就不安全。

retrieveMetaData相对应的是persistMetaData函数,这个函数将运行时的元数据保存到文件用于下次重新构建diskQueue时的恢复。 
逻辑基本与retrieveMetaData,此处不再赘述。

diskQueue的消息循环

ioLoop函数实现了diskQueue的消息循环,diskQueue的定时操作和读写操作的核心都在这个函数中完成。

函数首先使用time.NewTicker(d.syncTimeout)定义了syncTicker变量,syncTicker的类型是time.Ticker, 
每隔d.syncTimeout时间就会在syncTicker.C这个go channel产生一个消息。 
通过select syncTicker.C能实现至多d.syncTimeout时间就跳出select块一次,这种方式相当于一个延时的default子句。 
ioLoop中,通过这种方式,就能在一个goroutine中既实现消息的接收又实现定时任务(跳出select后执行定时任务,然后在进入select)。 
有点类似于定时的轮询。

ioLoop的定时任务是调用sync函数刷新文件,防止突然结束程序后内存中的内容未被提交到磁盘,导致内容丢失。 
控制是否需要同步的变量是d.needSync,该变量在一次sync后会被置为false,在许多需要刷新文件的地方会被置为true。 
ioLoop中,d.needSync变量还跟刷新计数器count变量有关,count值的变化规则如下:

  1. 如果一次消息循环中,有写入操作,那么count就会被自增。
  2. count达到d.syncEvery时,会将count重置为0并且将d.needSync置为true,随后进行文件的刷新。
  3. emptyChan收到消息时,count会被重置为0,因为文件已经被删除了,所有要重置刷新计数器。
  4. syncTicker.C收到消息后,会将count重置为0,并且将d.needSync置为true。也就是至多d.syncTimeout时间刷新一次文件。

ioLoop还定时检测当前是否有数据需要被读取,如果(d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) 
`d.nextReadPos == d.readPos这两个条件成立,则执行d.readOne()并将结果放入dataRead中,然后设置rd.readChan。 
如果条件不成立,则将r置为空值nil。随后的select语句中有case r <- dataRead:这样一个分支,在注释中作者写了这是一个Golang的特性, 
即:如果r不为空,则会将dataRead送入go channel。进入d.readChan的消息通过ReadChan函数向外暴露,最终被Topic/Channel的消息循环读取。 
而如果r为空,则这个分支会被跳过。这个特性的使用统一了select的逻辑,简化了当数据为空时的判断。

diskQueue的写操作

写操作的对外接口是Put函数,该函数比较简单,加锁,并且将数据放入d.writeChan,等待d.writeResponseChan的结果后返回。 
d.writeChan的接收在ioLoop中select的一个分支,处理时调用writeOne函数,并将处理结果放入d.writeResponseChan

writeOne函数是写操作的最终执行部分,负责将消息写入磁盘。函数逻辑比较简单。消息写入步骤如下:

  1. 若当前要写的文件不存在,则通过d.fileName(d.writeFileNum)获得文件名,并创建文件
  2. 根据d.writePos定位本次写的位置
  3. 从要写入的内容得到要写入的长度
  4. 先写入3中计算出的消息长度(4字节),然后写入消息本身
  5. d.writePos后移4 + 消息长度作为下次写入位置。加4是因为消息长度本身也占4字节。
  6. 判断d.writePos是否大于每个文件的最大字节数d.maxBytesPerFile,如果是,则将d.writeFileNum加1, 
    并重置d.writePos。这个操作的目的是为了防止单个文件过大。
  7. 如果下次要写入新的文件,那么需要调用sync函数对当前文件进行同步。

diskQueue的读操作

消息读取对外暴露的是一个go channel,而数据的最终来源是ioLoop中调用的readOne函数。readOne函数逻辑跟writeOne类似, 
只是把写操作换成了读操作,唯一差异较大的地方是d.nextReadPosd.nextReadFileNum这两个变量的使用。

在写操作时,如果写入成功,则可以直接将写入位置和写入文件更新。但是对于读操作来说,由于读取的目的是为了向客户端投递, 
因此无法保证一定能投递成功。因此需要使用next开头的两个变量来保存成功后需要读的位置,如果投递没有成功, 
则继续使用当前的读取位置将再一次尝试将消息投递给客户端。

当消息投递成功后,则使用moveForward函数将保存在d.nextReadPosd.nextReadFileNum中的值取出, 
赋值给d.readPosd.readFileNummoveForward函数还负责清理已经读完的旧文件。最后,调用checkTailCorruption函数检查文件是否有错, 
如果出现错误,则调用skipToNextRWFile重置读取和写入的文件编号和位置。

diskQueue中的其他函数

diskQueue中还有与错误处理相关的handleReadError,与关闭diskQueue相关的CloseDeleteexitEmptydeleteAllFiles等, 
函数,逻辑较简单,不再专门分析。

diskQueue总结

diskQueue主要逻辑是对磁盘的读写操作,较为琐碎但没有复杂的架构。 
其中消息循环的思路和读写过程周全的考虑都值得学习的。

时间: 2024-10-21 06:31:50

nsq源码阅读笔记之nsqd(三)——diskQueue的相关文章

nsq源码阅读笔记之nsqd(一)——nsqd的配置解析和初始化

配置解析 nsqd的主函数位于apps/nsqd.go中的main函数 首先main函数调用nsqFlagset和Parse进行命令行参数集初始化, 然后判断version参数是否存在,若存在,则打印版本号并退出程序 接下来钩住系统的syscall.SIGINT和syscall.SIGTERM消息,用来阻塞主goroutine防止退出 随后判断config参数是否存在,若存在的话还需进行配置文件的读取, nsq使用toml格式的配置文件,并通过github.com/BurntSushi/toml

nsq源码阅读笔记之nsqd(四)——Channel

与Channel相关的代码主要位于nsqd/channel.go, nsqd/nsqd.go中. Channel与Topic的关系 Channel是消费者订阅特定Topic的一种抽象.对于发往Topic的消息,nsqd向该Topic下的所有Channel投递消息,而同一个Channel只投递一次,Channel下如果存在多个消费者,则随机选择一个消费者做投递.这种投递方式可以被用作消费者负载均衡. Channel从属于特定Topic,可以认为是Topic的下一级.在同一个Topic之下可以有零个

nsq源码阅读笔记之nsqd(二)——Topic

与Topic相关的代码主要位于nsqd/nsqd.go, nsqd/topic.go中. Topic的获取 Topic通过GetTopic函数获取 GetTopic函数用于获取topic对象,首先先尝试从topicMap表中获取,如果指定的topic存在,则直接返回topic对象. 当topic不存在时需要新建一个topic,加入到topicMap中, 如果启用了nsqlookupd则需要从lookupd中获取该topic的所有channel,在去除#ephemeral结尾的临时channel后

源码阅读笔记 - 1 MSVC2015中的std::sort

大约寒假开始的时候我就已经把std::sort的源码阅读完毕并理解其中的做法了,到了寒假结尾,姑且把它写出来 这是我的第一篇源码阅读笔记,以后会发更多的,包括算法和库实现,源码会按照我自己的代码风格格式化,去掉或者展开用于条件编译或者debug检查的宏,依重要程度重新排序函数,但是不会改变命名方式(虽然MSVC的STL命名实在是我不能接受的那种),对于代码块的解释会在代码块前(上面)用注释标明. template<class _RanIt, class _Diff, class _Pr> in

CI框架源码阅读笔记5 基准测试 BenchMark.php

上一篇博客(CI框架源码阅读笔记4 引导文件CodeIgniter.php)中,我们已经看到:CI中核心流程的核心功能都是由不同的组件来完成的.这些组件类似于一个一个单独的模块,不同的模块完成不同的功能,各模块之间可以相互调用,共同构成了CI的核心骨架. 从本篇开始,将进一步去分析各组件的实现细节,深入CI核心的黑盒内部(研究之后,其实就应该是白盒了,仅仅对于应用来说,它应该算是黑盒),从而更好的去认识.把握这个框架. 按照惯例,在开始之前,我们贴上CI中不完全的核心组件图: 由于BenchMa

CI框架源码阅读笔记2 一切的入口 index.php

上一节(CI框架源码阅读笔记1 - 环境准备.基本术语和框架流程)中,我们提到了CI框架的基本流程,这里这次贴出流程图,以备参考: 作为CI框架的入口文件,源码阅读,自然由此开始.在源码阅读的过程中,我们并不会逐行进行解释,而只解释核心的功能和实现. 1.       设置应用程序环境 define('ENVIRONMENT', 'development'); 这里的development可以是任何你喜欢的环境名称(比如dev,再如test),相对应的,你要在下面的switch case代码块中

jdk源码阅读笔记之java集合框架(二)(ArrayList)

关于ArrayList的分析,会从且仅从其添加(add)与删除(remove)方法入手. ArrayList类定义: p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 18.0px Monaco } span.s1 { color: #931a68 } public class ArrayList<E> extends AbstractList<E> implements List<E> ArrayList基本属性: /** *

Spark源码阅读笔记之Broadcast(一)

Spark源码阅读笔记之Broadcast(一) Spark会序列化在各个任务上使用到的变量,然后传递到Executor中,由于Executor中得到的只是变量的拷贝,因此对变量的改变只在该Executor有效.序列化后的任务的大小是有限制的(由spark.akka.frameSize决定,值为其减去200K,默认为10M-200K),spark会进行检查,超出该限制的任务会被抛弃.因此,对于需要共享比较大的数据时,需要使用Broadcast. Spark实现了两种传输Broadcast的机制:

CI框架源码阅读笔记3 全局函数Common.php

从本篇开始,将深入CI框架的内部,一步步去探索这个框架的实现.结构和设计. Common.php文件定义了一系列的全局函数(一般来说,全局函数具有最高的加载优先权,因此大多数的框架中BootStrap引导文件都会最先引入全局函数,以便于之后的处理工作). 打开Common.php中,第一行代码就非常诡异: if ( ! defined('BASEPATH')) exit('No direct script access allowed'); 上一篇(CI框架源码阅读笔记2 一切的入口 index