-
前言
业务的日志ETL拉取框架一直存在很多问题,每次出现故障就导致手忙大乱,因此这次决心要对其进行大改造。这个ETL系统是基于Storm实现的,主要是依靠Spout拉取原始日志,Bolt进行处理再入库,为了提高吞吐量,采用了12个Bolt进行并行处理。旧算法由于没有使用Storm的ack特性,而且还是根据发送Tuple的hash值发送到对应的Bolt中,完全没有考虑好负载均衡问题,在崩溃重启后也需要重新处理当天日志,可用性极低。
-
新的想法
为了能够达到bolt的负载平衡,想到了采用动态平衡的算法去适应,另外也需要提供一个可靠的checkpoint机制,能够使崩溃重启后从当前时间点继续处理。
这个动态平衡的算法,可以参考TCP的流控制算法。TCP是可靠报文传输协议,具有可靠性,高效性,能够避免网络拥堵等特点。这里列出几个主要的TCP流控制算法的特点:
- TCP把多个发送信息的字节组合成一个数据报,提高发送效率
- TCP采用超时重发机制,并且按照超时时间按照优先级进行排列发送。
- 接收端收到数据报后发送ack到对应的发送端,这里是cumulative ack算法,也就是如果依次接收到数据报2,3,则发送两个ack,ack的是数据报1。
- 发送端有发送窗口,接收端也有接收窗口的概念,发送窗口表示发送端可以发送的数据报数量,接收窗口可以理解为接收端的接收数据报的缓存队列,一般情况下两个窗口大小相同。
- 接收端可以根据自己的处理速度发送接收窗口的大小给发送端,发送端根据需要调整发送窗口大小,避免发送速度超过处理速度。
- 除了5中根据发送端反馈的窗口大小外,发送端也会根据网络拥堵情况,动态调整发送窗口大小。(TCP Congestion Handling and Congestion Avoidance Algorithms)
当然TCP远远不止以上6点的特点,这里只列出部分的对于新算法有参考性的特点。具体可以参考这里以及IETF的标准规范rfc2001。
-
具体算法
由于ETL系统是依赖Storm进行开发,因此有一定的限制。不过可以根据Storm的特性类比TCP算法。Spout,也就是发送端,每个发送单元被称为Tuple,可以类比为TCP的数据报。Bolt接收端收到Tuple后,根据需要ack或者fail掉这个tuple,然后Spout会重发fail的tuple,同样地,Storm内部会探测到超时没响应的Tuple,然后发送fail到Spout。
虽然Storm拥有这些发送特性,但Storm除了ack和fail以外,没办法发送更多的反馈信息到Spout处,特别是Bolt的接收窗口大小(也就是缓存队列剩下的容量),这样Spout就没法根据Bolt进行发送窗口调整。另外Bolt每次也只能够ack或者fail对应的Tuple,不能使用cumulative ack算法。另外Spout和Bolt的关系,目前是一对多,也就是一个发送端发送到多个接收端进行并行处理,因此负载平衡显得尤为重要。
对于这个新算法,希望达到的目标是高吞吐量,高可用性以及简单性。高吞吐量是指多个Bolt的并发处理量,为了提高吞吐量,关键是实现动态的负载平衡。对于每个Bolt,发送窗口越大并不一定是越好,因为即使发送过去,Bolt也未必能够马上处理,相反,应该是把更多的Tuple发送到处理更快的Bolt上去,因此最恰当的做法是根据Bolt的处理速度,也就是ack的往返时间来衡量发送窗口大小。对于高可用性,要达到joiner崩溃后,重新启动后能够崩溃前的处理点重新处理,这就需要一种类似checkpoint的机制。要达到这个要求,并且保证简单性,可以通过保存已经ack的rt值。但由于Bolt只能够做到ack或者fail接收到的tuple,因此必须在Spout端统计采用累计的算法统计rt值,也就是如果ack了1,3,4的Tuple,只能记录第1个tuple的rt值,如果再收到ack
2的tuple,才记录第4个Tuple的rt值。简单性是为了避免算法和实际业务的代码的耦合,可以采用类似消费——生产者模式。
在rfc2001规范中,指出的TCP的拥塞处理和避免算法采用的是一种启发式的动态流控制算法。由于网络拥堵没办法可以使用数字精确测定,但可以简单地根据接收端的ack情况和丢包情况进行判断拥堵情况,然后线性或者指数式增加/减小发送窗口大小,从而避免增加网络拥堵压力。基于此算法,结合joiner的目标,可以延伸出一种类似的自适应动态流控制算法:
- Spout把多条数据库记录组合成一个Tuple进行发送,这样可以减轻ack/fail时的压力,提高发送效率
- Spout针对不同的Bolt拥有不同的发送窗口,每个发送窗口都会根据情况动态地增加或减小,并且都会记录Tuple的发送时间
- Spout收到Bolt的ack时,根据之前记录的发送时间计算处理时间值,如果处于正常范围,则为NORMAL状态,线性增加发送窗口大小,如果超出阈值,则转为SLOW状态,线性减小发送窗口大小,另外Spout通过累积的ack计算出需要记录的tuple的rt值。
- 当Spout收到fail,也就是Storm判断Tuple处理时间超时(目前不允许Bolt主动发送fail),则转为BUSY状态,马上减小1个发送窗口大小,并把Tuple重新放回发送缓冲区,等待发送到下一个Bolt,减轻当前BUSY的Bolt的压力。
这个新算法相比较TCP的拥堵控制算法,减小了一些特性,如Slow Start、Fast Retransmit等,有些是由于没有必要(Slow Start是避免一开始就遇到拥堵,但Bolt的初始化状态肯定为空闲),有些则是为了适用多个接收端的情况,(和Fast Retransmit相比较起来,发送给下一个空闲的Bolt更有优势),另外,新算法没有了ssthresh,慢开始阈值,用以区分指数增加发送窗口和线性增加发送窗口的阈值,因为指数增加发送窗口的时候很容易造成SLOW状态,另外盲目地增加发送窗口大小也并不能提高吞吐量,因此只选择线性增加。还有的就是Bolt并没有强调Tuple的处理顺序性,也就是只要收到ack,对应的Tuple就从发送窗口移出,就可以发送一个新的Tuple,这和TCP的发送窗口的必须等待处理完毕才能右移有区别。
-
实现
实现
整个实现的类图如下:
FlowProducer是由业务实现的流生产者,主要是实现数据流的生成,FlowControlSpout实现了Storm的Spout的逻辑,需要将Storm的发送Tuple和Ack/Fail Tuple的消息传递给AdapativeFlowConsumer,另外还充当缓冲队列,主要缓冲FlowProducer一次产生的数据流等待AdapativeFlowConsumer消费。AdapativeFlowConsumer是实现了自适应动态流控制算法的消费者,每个AdapativeFlowConsumer对应一个Bolt,自身维护了对应的发送窗口大小,以及重发等机制。FlowControlBolt实现了Storm的Bolt的逻辑,主要负责对处理过的Tuple进行ack(目前不支持处理失败发送Fail)。
- 总结
新算法对比旧算法来看,有了更高的吞吐量以及可用性。具体的日志处理量都得到了提升。
尽管如此,实际上新算法还有一定的提升地步,就是更精确的负载平衡控制。目前在发送Tuple的时候,新算法是按照round robin算法逐个请求AdapativeFlowConsumer发送Tuple的,但事实上,应该计算出最空闲的Bolt,按照优先级进行发送,这就需要一个衡量空闲度的方法,但设计算法的时候考虑到流的生成是不间断的,而且这个衡量空闲度的算法暂时没想到简单的,因此就暂时不考虑。或许将来有需要的时候可以添加。