1. 介绍
Twitter依赖大量的实时流处理.多年以来,Twitter内部都在用Strom. 但是以目前的规模来说,使用Strom变的越来越有挑战.特别是涉及到可扩展性, debug能力, 可管理能力以及和其他数据服务的有效的资源分配等问题.
一个很大的挑战就是debug-ability.当一个topo不正常工作的时候,需要很快的定位性能下降的原因. 但是在Storm中,一个topo的很多compoents都在一个进程,使得debug比较困难. 我们需要一个从逻辑单元到物理进程的清晰映射. 这种映射对于关键的topo非常重要.
另外,Strom需要精细的集群资源,需要特定的硬件分配给Topo.这导致了对于宝贵集群资源的低效使用,而且限制了可扩展性.我们需要的是一种更灵活的方式进行资源调度,并且运行不同的数据处理系统能共享资源.在Twitter内部,这个工作由Aurora管理.
在Strom上为一个新topo分配资源需要人工隔离机器,当不要这个topo的时候,这些机器必须下掉.管理机器分配是非常麻烦的.而且我们想要更加有效的方式,Twitter的规模是很大的. 任何改善都会极大的节省硬件成本和提高生产力.
我们想解决上面的问题,而且不想重写大量的正在Strom上运行的应用.所以,和Strom接口兼容是很重要的.综合
综合多方面的考虑,我们设计了Heron来达成上述目标.它和Storm兼容,非常容易迁移.Twitter所有的生产topo现在都跑在Heron上.除了显著的性能提升和更低的资源消耗,Heron也在debug-ability, scalability, manageability上有极大的优势.
本文主要介绍Heron的设计,并给出了性能比较.
3 关于Storm
3.1 Storm背景
Storm是由spout和bolt组成的有向图. Spout是输入的数据源, bolt是流计算的抽象.Spout经常从队列中拉数据,例如Kafka或者Kestrel, 产生一个tuple流. 这个tuple给到bolt进行计算.一个实时统计在线人数的topo如下图:
Spout和bolt在运行的时候都称为task. 多个task分在一个executor中. 多个executor分在一个woker中, 每个woker作为一个JVM进程运行. 如下图. 一个主机可以跑多个worker,每个worker都可以属于不同的topo.
3.2 Storm worker架构的限制
Worker的设计比较复杂. 大量的instances都在一个worker进程里面. 每个executor有两个线程.这些线程的调度基于JVM的抢占和优先级调度算法. 每个thread都跑好几个task, executor又实现了另外的算法根据收到的数据来唤醒相关的任务. 这种多重调度和复杂交互导致了调度时的不确定性.
每个worker都可以跑不同的task, 比如kafka spout, 一个连接服务的bolt, 一个写入存储的bolt都在一个JVM上.因为各个Task调度相互影响 , 资源也无法隔离, 出了问题只能重启.重启之后,有问题的那个task可能被调度了,非常难追踪和解决问题.
多个task的Log都打在一起.错误很区分. 如果一个task出现异常,就只能终止整个进程.topo的一部分的错误会影响topo整体的性能.而且不同的task引发的GC问题非常难追踪..
总资源分配的角度,storm假设所有的worker都是同质的.这种假设导致了资源分配的低效,而且经常导致资源过度分配. 举个例子: 把3个spout和1个bolt调度到两个woker中. 假设bolt需要10G, 每个spout需要5G. 所以每个woker需要预留15G,因为其中一个woker肯定要跑一个bolt和一个spout. 所以两个woker分配了30G. 但实际上需要的是25G.当woker的components的量非常大的时候,这个问题会变得很糟糕.从上层抽象分配复杂的topo的时候也经常发生.
Debug 挑战. 为worker分配大内存的结果是,jstack或者heap dump变得非常麻烦. 而且worker执行heap dump的时候容易导致失去心跳, 从而被Storm监控杀死.
我们是否可以重新设计Storm, 一个worker一个task? 这样会导致资源浪费,而且也限制了并发. 这会导致每个topo有大量的worker. 导致显著的资源过度分配. 要为每个worker保留下面这么多的内存.
而且component数量增加, 每个worker要连接的其他的worker上,会导致没有足够的端口. 降低的可扩展性.
Storm workers使用多个线程和队列传输数据. 一个globakreceive thread (upstream) ,一个global send thread. (downstream). 另外每个executor还有一个user logic thread, 还有一个local send thread. 所以每个tuple都要经历4个线程传递数据. 这导致了明显的负担和队列问题.
3.3 Nimbus的问题
Nimbus瓶颈. 管理的事情太多. 调度, 管理, 分配jar包, 管理topo.
不支持资源预定和隔离.
生产环境,单独的机器跑topo, 浪费. 完全使用资源很难, 在YARN上也没有完全解决问题.
Zk管理心跳, 成为瓶颈. 有个折中方案,但是增加了运维负担.
单点.
3.4 缺少背压
没有背压. Backpressure
如果中间断开,有很多不好.浪费很多.
3.5 效率
不完美. 一个断会导致整个断.
GC时间长,
队列容易满.
解决这个就需要过度供给,资源浪费. Overprovision
举例: 600 core, 20-30%的利用率. 期望是150 core搞定.
4. 设计上的思考
修改的代价太高.
Spart streaming, api不一样,迁移代价高. 使用的规模不一样.
所以决定再写一个.
5 Heron
5.1 数据模型和api
Api兼容. Topo. Bolt ,spout都一样.
Grouping.
At most once
At least once
5.2 架构
Aurora是mesos上的一个调度框架. 通过调度器抽象Heron也可以跑在YARN,Mesos,ECS
先是 aurora scheduler, 管理所有topo
每个topo有一个 topo master. (第一个) ---- 后面的container 每个有 Stream manager, Metrics Manger, 一大堆 Heron Instance. (其实就是spout/bolt).一个物理机上可以启动多个container.
( aurora使用cgroups隔离container).元数据保存在 zk. 一个Heron Instance就是一个jvm. Heron内部通讯用的protobufs.
5.3 Topo Master
类似YARN的app master.
通过zk发现自己,写一个临时节点. 也是拓扑的gateway.不含数据处理,不会成为瓶颈.
5.4 Stream Manager
SM的功能: 有效管理tuple的路由. 每个HI(instance)通过连接本地SM收发数据. 一个拓扑的所有SM组成了O(k2)网络连接, K是SM的数量. 本机上不同的HI连接路由通过本地短环路机制完成,不走SM. 由于Hi的数量远远的大于k, 所以可以这种设计扩展overlay网络, 通过O(n2)* O(k2)
5.4.1 拓扑背压
可以动态调整数量流速. 当不同的component处理速度不一样的时候非常重要. 举例: 比如downstream慢了, 如果上游不慢下来, 队列就会慢. 系统会丢弃这个tuple. 如果从中间丢弃,那么就是一种浪费. 背压机制就是可以减慢上游. 有下面一些策略.
TCP背压
TCP窗口机制从HI到其他上游组件传播背压.TCP sockets通讯, buffer的速度和消费速度是一样的. 如果HI 慢了, buffer就会满. SM就会发现, 因为他的send buffer也会满. 这种背压传递给了其他的SM和HI上游. 直到 HI赶上之后才会消除.
TCP 背压很容易实现. 但是实际上效果不好, 因为很多HI之间的逻辑channel在SM的物理链路上被覆盖了. 这种双工引起上游和下游HI都变慢. 结果是故障恢复的特别慢,导致整个拓扑的性能长期的下降.
Spout 背压
SM压低本地spouts来减少新数据注入. 这种方式和TCP背压联合使用. 当SM发现HI慢了,就停止从spout读数据. 这样spout的buffer会满,最终会block. SM发送一条 “startbackpressure” 消息给其他的SM, 要求他们停止自己的spouts. 收到消息的SM就会停止从本地spout读数据. 当HI赶上之后, SM再发一个”stopbackpressure”消息给其他SM. 然后其他SM又开始从本地spout消费数据.
这种方式直接压制了大部分上游的spouts. 这种方式其实不是最优的,我们压制spout不如减上游的生产者. 另外就是这种方式增加了消息传递的负担. 好处是响应时间很快,和topo的深度无关.
Stage-by-stage 背压
一个topo由多个阶段组成. 在这种方式中,我们不断的传播背压一直到达spout.
和spout背压一样,这种方式和tcp 背压结合使用. 但是是在SM和Hi之间的. 不需要在SM中间传递消息.
5.4.2 实现
在heron中我们实现的是spout 背压. 因为这种方式比较容易实现.工作的很好,当倾斜事件发生的时候,也容易debug,很容易找到背压的根源. 每个socket channel都关联一个应用层面的buffer, 这buffer有一个水位的上下限. 当buffer size到达上限的时候就触发背压, 指导size低于水位下限为止. 这种设计的根据是防止topo在背压来回变化中快速摇摆.
这种设计的结果是一旦一个tuple被spout发出,Heron就不会丢弃它.除非进程或者机器挂了.这种行为使得tuple的失败更加确定.
当topo进入背压模式后, 运行的和最慢的一个component一样慢. 这样运行一段时间后,可能导致数据源数据的堆积.根据topo的需要, spout可配置为丢弃老数据.
5.5 Heron Instance
Spout和bolt都是在HI中运行. 不同于Storm的worker, HI是一个jvm进程, 只执行一个spout或者bolt任务. 这种设计可以很容易的dubeg/profile一个spout或bolt. 因为开发者可以很容易的看到一个HI的日志.
因为复杂的数据传递都是通过SM进行的,所以未来可以通过其他语言来写HI.
实现HI使用单线程或者两线程.我们讲讲:
5.5.1 单线程方式
线程维护一个tcp channel到本地SM,等待tuple. 一旦tuple到来就执行用户逻辑. 如果用户逻辑中产生了一个output tuple, 就会被buffer. 当buffer超过一定的门限,就发给本地SM.
这种方式简单,但是也有不好,用户的code可能block. 比如:
Sleep了.
IO 读写系统调用.
线程同步原语.
这些block都不是期望的. 会导致不可预期的行为. 一旦merics不能收集和及时上报, 就无法判断 HI是否处于 bad状态.
5.5.2 双线程方式
在这种设计中,有两个线程: Gateway thread 和Task exe thread. Gateway负责HI全部的数据出入和通讯. 连接本地SM和MM. 也负责从SM接受tuples. 这些tuple会发给task exe 线程.
Task exe执行用户代码. 启动的时候,执行”open”或者”prepare”方法. 如果是bolt,就会执行”execute”方法. 如果是spout,就执行”next Tuple”方法, 把数据作为tuple发给topo. 这些tuple都发给gateway, 再转发给SM. 另外task exe还收集统计数据比如执行tuple数量, 发送tuple量, acknowleged tuple量, 处理耗时等.
Gateway和exe通过三个单向队列通讯.Datain, dataout. Metices-out
Datain 和 dataout都是有门限的.Datain满了的时候,gate停止从SM接受数据.这会触发SM的背压机制. Dataout满了的时候,gateway会认为SM接受不了更多数据了. Task exe就不会再发出或者执行tuples.
在生产环境运行大量的topo时,我们会遇到 unexpected GC. 网络突然中断, Gateway不再从dataout队列中发送tuple. 队列堆积,不能回收. HI内存占满. 网络恢复的时候,gateway又开始从SM读数据. 如果gateway读数据发生在发送之前, 因为内存满了,会触发GC. 这导致了更多的性能下降.
为了避免这种问题,我们周期性的检查两个队列,并增加/减少队列大小. 如果容量超过限制,就把队列size减半. 这么做要么把队列容量减少到一个稳定的水平,要么减少到0. 一旦到了0, 就不能接受数据也不能发数据. 结果这样更容易从GC中恢复过来. 同样,如果大部分队列都小于限制,就会逐渐增加队列大小,直到队列到达限制.
5.6 Meritcs Manager
收集计量数据. 系统的和用户topo的. 每个container一个.
信息发给一个监控系统. 也发给topo Manage.
5.7 启动和失败场景
提交给scheduler, 分配资源, 调度到集群的机器上. 第一个container成为TM. 注册到zk.
SM找到TM. 连接,发心跳.
当所有的SM连接后, TM启动spout/bolt到不同的containers. 这叫做物理计划. 完成后,SM从TM获取全部计划. 相互发现. 然后SM相互连接,构成网络. HI启动, 下载本HI的物理计划,开始执行. Tuples开始执行. TM写物理计划到zk来容灾.
导致topo失败的情况有: 进程死掉, container失败. 机器问题.
当TM进程死掉,container会重启失败进程,TM从Zk中恢复其状态. TM也会做主从切换. SM会发现新的主TM,并连接.
类似的, SM挂掉, container会重启. 重新发现TM, 获取物理计划并检查状态. 其他的SM也会获取新的物理计划获得新SM的位置. 当HI挂了的时候, 也会重启,并连接本地SM. 获取物理计划, 确定自己是spout还是bolt, 开始执行用户逻辑代码.
当container被调度到新机器上的时候, 新的SM也会发现TM,重复上面的失败恢复过程.
启动流程图:
5.8 架构总结
设计上几个重要的点
1. 资源分配从cluster manager中抽象出来, 这可以Heron和其他的基础设施更好的结合.
2. 一个HI只运行一个task( spout or bolt), 更容易调试, jstack, heap dump等.
3. Topo运行更透明, 很容易知道那个慢或者失败了.Metrics collection是细粒度的,把问题非常明确的定位到系统特定的进程.
4. 在component级别进行资源分配, 避免的资源过度分配问题.
5. 每个topo一个TM, topo之间相互独立. 一个失败不会影响其他的.
6. 背压机制让我们获得一个稳定的处理速度, 更容易理解系统. 把topo从一些container迁移到另外一些,这也是一个重要的机制.
7. 系统不再有单点.
6. Heron in Production
Heron Tracker , Heron UI, Heron VIz
和topo交互;观察topo指标和趋势;HI问题追查; 看日志
6.1 Heron Tracker
6.1 Heron Tracker
一个gateway,接入topo的信息.通过zk保存元数据. 通过zk watch新topo,在运行的topo,被kiled的topo, 任何物理计划改变都会监控到. 也用zk元数据获取其他的TM并收集信息.
提供REST API. 提供物理,逻辑计划, 不容的指标,Hi的日志链接., Aurora job页. 作为一个Aurora service运行. 有容灾和负载均衡.
6.2 Heron UI
使用Tracker API,展现topo信息. 逻辑plan和物理plan.
内环是机器, 中间是container, 外部是HI. 可以drill down. Counts, 耗时, ack count, fail count.
View logs.
6.3 Heron Viz
一个 dashboard. 周期性的获取数据.
Health, resource, component, stream manager.
Fail count.
CPU分配, 内存分配, GC时间.
Tuple数量, emit, fail, ack.Tuple end to end 时间.
HI的处理数量, 丢失量. 背压传播时间. …
7 性能比较
Twitter已经用Heron完全替换了Storm。前者现在每天处理“数10TB的数据,生成数10亿输出元组”,在一个标准的单词计数测试中,“吞吐量提升了6到14倍,元组延迟降低到了原来的五到十分之一”,硬件减少了2/3。
资料比较多. 此处略.
值得一提的是,Heron是完全Docker化的,docker化部署见git上的介绍:
https://github.com/twitter/heron/tree/master/docker
论文地址:
http://dl.acm.org/citation.cfm?id=2742788
开源地址:
https://github.com/twitter/heron