disque 实现简略
按照作者的定义,disque是一个DIStributed以及DISorder的QUEue,也就是分布式乱序队列,更准确地说,是一个消息代理,用队列这个词地话,总是给人一种这是个顺序队列的错觉。
下文中,我会这个队列的几个主要实现分别描述,不过按照作者说法,现在只是个apoha版本,很多实现形式乃至命令可能都会变化,但目前雏形已定,基础结构上,相必不再会有特别大的改变了。
乱序
首先说乱序。
一般说起队列,很多人下意识的就会认为应该是顺序的,即先入先出,在单机情况下,由于有无锁队列的技术,简单的生产消费模型下,先天就可以弄到实现良好的队列,但是,在分布式环境下,如果需要保证顺序,需要引入分布式协调的一系列东西,相比较带来的实现复杂度(以及理所当然的bug数量),Antirez选择了redis一贯的处理手段:不去处理这个问题。
体现在代码上,就是,对于同一条消息队列的消息(代码中叫做job),都会被存储在一条skiplist上,关于skiplist详细内容可以参考各种文章,这里不会详细描述。简单来说,skiplist可以保证list上的元素是顺序的。代码中,依靠的是创建时间作为排序的主键的,也就是说,在单个节点的环境下,disque可以保证元素是按照时间顺序排队。
但是,考虑到从别的节点过来的消息,未得到确认执行重新放回队列的消息等这些因素,不可能达到绝对的顺序,如果需要严格顺序,或者对队列的顺序性有依赖的话,并不推荐采用disque。
至少消费一次
至少消费一次,这是个很有意思的承诺,言下之意是,一个消息可能会被消费多次。
首先说一下为什么。消息队列的处理中,最头疼的一件事情就是,单纯从消息队列看,它已经安全把消息发送给消费者,任务已经完成,但是消费者在消费的时候,由于种种意外(天灾人祸),消费失败了,从业务角度来讲,这就是丢消息了,这是个很无奈但需要解决的问题。产出的折衷之一,就是即使有可能重复消费消息,也要保证消息至少能被消费一次。
既然涉及到重复消费消息,那么对消息的设计,就必须需要保证是幂等的了。幂等,指的是,同一个操作,无论执行多少次,产生的结果都是相同的,比如拿hbase举例,put操作就是幂等的,而incr操作就不是,后者每次调用,都会导致计数变化,hbase为了避免客户端重试导致incr反复执行,引入了一套复杂的看起来就bug重重的重试判断逻辑,其具体实现与本次主题无关,按下不表。
disque的实现上,做法如下:在每个job里面,保存了一个属性retry时间,消息添加到队列中的时候,同时会加入到一个服务器调度队列中。每当消息从消息队列取出后,并不会马上从disque中删除,只是单纯从队列中取出,如果在指定(retry)时间没有得到消费完成的确认(ackjob)的话,消息就会被重新入队用于消费。
很明显会产生重复消费,retry时间是消费时间的一个预估,如果估计失败,可能导致消息会被反复消费,造成队列堆积。对于这个问题,redis的解决方案很简单,为job指定最长生存时间,如果达到这个时间,即使job没有被消费,也会被删除。(队列阻塞过久可能会导致消息丢失)
对于非幂等的操作,如何才能保证消费次数,就要靠下面介绍的“最多消费一次”了。
最多消费一次
相对于前一个,这一个承诺主要用于的场景是,消息不是幂等的,但可以丢消息———最多消费一次,也就是说,一次都不消费或者消费不完整也在承诺之内。
实现手段上,很简单,只要让前一个主题中提到的retry=0就可以,如果只是目测,感觉不到丢消息的气息,只会有之前提到的消息消费不完整的问题,但如果把宕机作为因素考虑,就可以明显看到,宕机会导致丢消息。
前一个“至少消费一次”的机制,在集群级别如何避免丢消息,就是接下来的内容。
分布式
disque其中的一个意思就是分布式(DIStributed),他是如何做到的呢?
首先说一下disque实现的分布式:无中心节点的,实现AP而不顾虑C,即不保证多节点一致性。
复制
分布式环境下,想要在实例级别宕机不丢消息,手段不外乎复制和落地,而落地对于机器再起不能的状况也比较无力,所以通常手段就是复制了,disque也不能避免。
复制的实现是,在添加消息的时候,手动指定复制副本数目,也就是说,如果有一个10实例的集群,可以指定只有其中三个实例上有这个消息,减少了对内存的消耗(这点很重要,之后的“不落地”部分会详细说这部分的意义),目前的实现是,每当添加一个job之后,就会从当前集群中随机选取指定数目的集群节点,顺序发送新增指定job请求到这些节点,只有当最后一个节点返回成功之后,才会给客户端返回成功(文档中提到的消息添加的事务实现),普通看没什么问题,但由于上层采用redis的网络协议,单个job耗时显然会由于网络大大增加,直接导致服务性能下降,这里应该是主要的性能瓶颈点之一。
除此之外,在“最多消费一次”的场景中,为了防止重复消费消息,不能启用消息复制机制。
failure over
分布式必然会涉及到自动故障处理,对于这点,disque的处理很简单。由于本身就是无中心的分布式系统,如果故障节点的消息全部采用了复制,那么在别的节点,必然会有该消息的复制,这样消息就不会丢失了。
顺带一提,disque的分布式协议采用的是Gossip。
load balance
disque会动态计算每个节点的压力,对于请求量较大导致队列空掉的节点,会从别的节点拿到更多消息用于消费。这个可能和一般意义的负载均衡逻辑有点区别。
不落地
这部分单独提出来,是因为作为消息队列来说,持久化方式直接关系到队列积压数量的大小。disque在设计之初,可以看出其设计目标里面对这部分不是很看重。
作者的思路是想在OOM的时候,写日志入盘,之后等有内存的时候在加载回来,目前没有实现,但不失为一个好的思路。
command
几个主要命令的主要参数及其实现
addjob
ADDJOB queue_name job [REPLICATE ] [DELAY ] [RETRY ] [TTL ] [MAXLEN ] [ASYNC]
添加一个job,主要参数包括:
ms-timeout:操作超时时间,即从发起命令,到命令返回的时间,如果超过指定时间,返回超时错误,目前的实现由于包含顺序replication,网络环境出问题的话,很容易触发这个超时。
REPLICATE count:复制份数,之前提到的,会从现有实例中,取出count个实例作为复制目标。
DELAY sec:之前没有提到的参数,指定多少秒钟之后,才把消息放入队列中。
RETRY sec:消息取走没有收到ack sec秒之后,重新把消息放回队列。
TTL sec:消息如果在队列中存在时间超过sec秒,就直接删除消息,无论有没有被消费。
MAXLEN count:消息队列最大长度。
ASYNC:采用异步方式操作。
主要过程描述:
1. 默认值:ttl:24小时,retry:-1,delay:0,replication:如果集群数目大于3,则为3,否则为集群数目。
2. 确认retry=0的时候,replication<1。
3. delay必须>=ttl
4. 如果没有指定retry,retry=ttl/10,如果ttl=0,retry=1.
5. 检查当前可达节点数目大于等于指定的复制数目。
6. 检查队列长度是否超出指定长度。
7. 创建job。
8. 分配ctime,用于实例内队列按照时间排序。
9. 设置delay或者retry时间用于调度。
10. 对于jobid冲突的情况,返回错误。
11. 添加消息进入队列,如果队列不存在,则自动创建。
12. 复制消息。
13. 对于异步消息,并且如果服务器目前内存容量不足,则在发送job到别的节点上之后,删除本地消息。
添加一个job后,client拿到的jobId,这个id的组成是,前两位采用关键字DI(不知道什么作用),之后8位用来标志生成这个id的实例,之后32位随机生成但和时间相关的字符串,之后4位是消息存活时间,用于在消息在指定时间内没有被消费的处理。最后加上关键字SQ。
长这个样子:DI | 0f0c644f | d3ccb51c2cedbd47fcb6f312646c993c | 05a0 | SQ
消息ID的作用,主要有以下几点:
- 每当生产者发送一个job,就会拿到这个id,可以用于生成者确认消息是否被正确消费。
- 每当消费者拿到一个job,就会附带这个id,用于消费者通知生成者或者disque消息已经被正确处理完毕。
getjob
GETJOB [TIMEOUT ] [COUNT ] FROM queue1 queue2 ... queueN
从队列中拿出消息。
TIMEOUT:执行超时时间,如果超时,直接返回。
COUNT count:返回count个消息。
主要过程:
1. 默认:count:1
2. 从队列中取出消息。
3. 如果消息未命中,进入负载均衡阶段。
4. 从别的节点请求必要数量的消息。
5. 返回客户端。
返回的消息,附带队列名,消息以及jobID。
deljob
DELJOB jobid_1 jobid_2 ... jobid_N
删除当前节点上指定的消息。
del只会操作当前节点,对其他节点不会操作,也不会通知。
主要过程:
1. 从回放队列中取出。
2. 删除job。
ackjob
ACKJOB jobid1 jobid2 ... jobidN
用法很简单,直接加上jobid就ok。不需要别的。
这个命令用于确认消息已经被消费。
主要过程:
1. 如果不存在指定job,创建空的状态为JOB_STATE_ACKED的job。
2. 从队列中取出指定job(防止已经被回放的job被消费)。
3. 设置job状态为JOB_STATE_ACKED。
4. 从回放队列中取出。
5. 顺序发送ack消息到集群中别的所有节点,每个节点必须应答之后才算完成。
6. 应答消息的时候,回答节点必须确认处理完指定job之后才会应答。
fastack
FASTACK jobid_1 jobid_2 ... jobid_N
用法与ACKJOB一样,区别是,FASTACK不需要确认其他节点应答ACK消息就会返回。
作者的思路
对于目前存在的不少问题,Antirez给出了一些应答,但目前没有实现。
- 性能问题。当前模式下,不对性能有任何承诺,需要结合具体生产环境的使用方式去优化。目前没必要和别的队列产品比较。
- fastack。ack由于需要所有节点确认,是一个较大的瓶颈点,如果在实际使用中,发现用户并不关心消息的重复消费问题的话,这个实现可能有所改变。
- 单线程问题。redis的单线程模式对于redis这种数据结构服务可能比较适用,但是在队列的实现中,这个是没必要的,Antirez可能需要参考实际使用决策是否采用多线程实现。
- 落地以及消息容量问题。与redis一样,作为内存不落地消息队列,存量受限于内存。Antirez的思路是,当OOM的时候,落地消息到磁盘,等内存里面的消息消费完成之后,读取出磁盘上的消息入队。
- addjob时候的广播问题。目前采用的是串行发送到目标实例,这里如果修改为并行的话,性能会好很多。
- 负载均衡的实现问题,当前的模型比较简单,对很多特殊负载处理不好,可能会在未来优化。