开宗明义!本文根据Google Beam大神Tyler Akidau的系列文章《The world beyond batch: Streaming 101》(批处理之外的流式世界)整理而成, 主要讨论流式数据处理。在大数据领域,流式数据处理越发地重要了。原因有以下几点:
- 人们越来越想要得到更及时的数据,而切换到流式处理(streaming)无疑是一个降低延时的好办法
- 海量数据的生产变得越来越频繁,即使是小公司也会产出超大量的每日数据。因此必然要求有一种系统能够处理这种无穷多的数据集合
- 数据更快地被处理可以实现负载均衡,对资源的消耗也更加可控
基于这种业务需求驱动的流式处理浪潮逐渐兴起,但现存的流式处理系统比起它们的“一生之敌”批处理系统而言尚不能算成熟,故而在这个领域内依然大有可为。讨论流式处理,有些问题必须要先搞清楚:
- 术语:如此复杂的讨论不明晰术语一定是举步维艰的。当前在流式处理的概念中名词歧义现象十分常见,故消除歧义明确含义的事情一定要首先完成
- 能力边界:知其为,也要知其不可为。流式处理系统能做什么,不能做什么,这是个大问题
- 时间概念:明确两个时间维度的名词概念以及它们之间的联系还有各自的优劣
术语:什么是流式?
Streaming——流式一词含义极多,澄清有些难度。困难的原因在于我们解释一个事物通常喜欢以解决或完成这个事物的方式进行描述,而非它的本质。这种不好的习惯掩盖了streaming的本质。在某些情况下,这可能会给人带来一种误解:仿佛流式系统就只能以流式方式进行处理,从而计算出来的结果值也不是准确的。实际上,如果设计得当,流式系统完全可以重复地计算出正确的一致性结果。那么何为streaming?streaming就是一类数据处理引擎,旨在处理无限的数据集——广义上说,它既包括纯streaming也包含模拟streaming的微批次实现(micro-batch)。Spark Streaming就是micro-batch思想的实现。
Streaming经常也被解释成以下几个名词表示的意思:
- 无限数据(unbounded data)—— 表示一直增长,无穷无尽的数据集合,它们甚至被称为“流式数据“。 这里面的问题就在于当我们说streaming或batch的时候我们其实并不是在表征它们处理的数据的特性。如前所述,streaming或batch的本质是处理这些数据的执行引擎,而非数据本身。streaming和batch的主要区别其实也在于它们处理的数据的有限性,所以刻画事物时应该抓住事物本质上的特点。这类数据的正确提法是:unbounded data和bounded data,而不是流式数据
- 无限数据处理(unbounded data processing)—— 应用在unbounded data上持续的数据处理。把streaming解释成这个意思也是有问题的,因为batch引擎其实也可以处理unbounded data——重复多次地运行batch引擎已经被用于处理这类数据了,所以要区分streaming和unbounded data processing的区别
- 能做到低延时但计算结果不准确,只是近似值—— 说起streamig好像它就无法产生出准确计算的值一样,这是不对的。事实上,batch系统不能做到低延时或产生近似值的观念也只过时了——batch引擎当然可以产生近似准确的值。因此,我们最好使用低延时/近似值来表示这个意思,不要因为历史上它们是由streaming引擎产生出来的,就认为把它们和streaming等同起来。
再次强调一下,streaming表示的是用于处理unbounded data的执行引擎,仅此而已!
确定能力边界
—— 特别在世人夸大了streaming的限制之后
下面我们讨论一下streaming系统能做什么,不能做什么,重点强调它能做什么。Streaming系统长期以来一直被视为是一个小众领域,它以低延时的方式产出近似准确的结果,而且streaming通常还与功能更加强大的batch系统协同工作最终为用户提供准确的结果。具体而言就是同时部署一套batch系统和一套streaming系统,两套系统一起执行相同的计算。streaming系统实时运行,延时低但结果不准确(因为使用了近似算法或系统本身就没有提供准确性保障),而batch系统虽稍后登场,但能够保证计算结果是准确的——这种部署方式或架构最早由Apache Storm的作者Nathan Marz提出,并经实践证明后被认为是在当时非常成功的。从准确性这点来看,streaming系统的确不尽如人意,而batch系统通常又很笨拙,因此两者的结合简直可谓是“鱼与熊掌的兼得"。令人遗憾的是,维护两套系统的成本是很高的——因为它们是两套独立的数据管道,同时它们产出的结果有需要执行合并操作。
Tyler本人对于强一致性的streaming引擎有着很深的造诣,他本人并不赞同这种两套系统的架构设计——相反地,他是Jay Kreps提出的可重演系统的拥趸(你不知道Jay Kreps是谁? 好吧,你总听过Kafka吧,他是Kafka的原作者):使用Kafka这样的可重演系统连接streaming引擎以解决可重复性的问题,并自始至终地使用一套管道来进行数据处理。如果把这个观点更进一步,我们可以认为定义良好的streaming系统甚至提供的是batch引擎所具功能的超集(superset)——事实上, 去年火爆开源社区的Apache Flink就是这样的思想,Flink中的batch引擎是作为streaming引擎的一个特例而实现的,这点和Spark streaming正好相反:Spark streaming的streaming其实是借助于micro-batch的思想而实现的。如果不是目前执行效率上的一些差异,当今batch系统根本没有存在的必要的。在这点上要感谢Flink开发者为我们构建了一个”随时随地streaming化“的系统,即使是batch模式,Flink底层也是使用streaming实现的。
所有这一切的结论就是:streaming系统的广泛成熟与无穷数据处理框架的结合必然会令batch引擎逐步退出历史舞台。不过streaming系统若要打败batch系统,还需要完成两件事情:
正确性
正确性帮助streaming系统足以匹敌batch系统。从本质上说,正确性可归结为一致性存储。streaming系统需要有能力定期地持久化状态(checkpoint)并且还要能够维护系统崩溃下的一致性。Spark streaming在这方面是先驱,它很好地维护了一致性。时至今日很多streaming系统都能够做到这点了——最多一次的处理语义实在是个伪命题,但目前它依然存在。再次强调一下:强一致性是实现”精确一次处理语义”的必要条件,“精确一次处理语义”是实现争取性的必要条件,而任何streaming系统,若想要打败batch系统就必须实现正确性。除非真的不在乎计算结果,否则还是建议尽量不要使用那些不提供强一致性的streaming系统。若要学习如何实现强一致性,Spark streaming论文是很好的材料,推荐大家读一读。
推导时间的工具
如果说正确性帮助你的streaming系统匹敌batch系统,那么这些工具将令你超越batch。推导时间的工具是处理无界无序数据集的利器。当前海量数据表现出来的特点是随机变化的数据倾斜——即事件被处理的时间与事件真实发生时间的差值随机分布,而现有的bach系统(以及大部分的streaming系统)都无法处理这种情况——这也是streaming系统着重要解决的问题之一。
接下来我们需要了解一下时间方面的2个概念,然后才能深入地去讨论下之前说的无界无序数据随机变化的时间差值是什么意思,最后针对这些问题streaming系统都有哪些解决之道。
Event time VS. Processing time
坦率地说,无界数据处理先搞清楚时间概念。在任何数据处理系统中,通常都有两类时间维度的概念:
- Event time,即事件真实发生的时间,正式名称是发生时间
- Processing time: 事件在系统中被观测到的时间,正式名称是处理时间
并非所有情况都需要考虑event time——事实上,如果你根本不care发生时间,事情变得简单多了,也许后面的你都不用看了——但很多场景下event time确实是需要被考虑的,比如统计用户行为、计费系统以及很多异常检测等。理想情况下,event time和processing time应该是相等的,即事件一旦发生就立即被处理。显然,这不可能是真的,两者之间的差值不仅不为0,而且通常是由各种因素影响的一个变化的函数——我们把这个差值不为0的事实成为时间倾斜,或简称为倾斜(skew)。影响这种skew的因素可能有输入源、执行引擎或硬件等。具体来说包括:
- 共享资源限制:比如带宽争用、网络分区或CPU资源共享等
- 软件原因:分布式系统逻辑限制、竞争等
- 数据本身的特性导致,包括key分布,TPS变动或无序性变动
因此,如果分别以event time和process time为轴画一张图的话,那么一个真实场景下的数据倾斜分布就应该类似如下图这个样子:
黑虚线表示理想情况,斜率=1表示event time总是等于processing time;红线表示实际情况。在实际场景中系统总是会滞后一些,表现为processing time永远大于event time,但两者的差值其实一条变动的曲线,最开始时差距很大,在中段逐渐靠近理想情况,最后又开始偏离。曲线上同一个纵轴点对应的两个横坐标的差值即标识了现实与理想之间的skew——显然这种skew是因为数据处理管道的延时所引入的。
Event time与processing time之间的差值不是固定的,这就意味着如果要使用event time,单靠processing time是不够的。令人遗憾地是,目前大多数streaming系统在设计的时候都只是考虑了processing time。如果要处理这种无穷多的数据,streaming系统必须要规定一种类似于时间窗口似的的概念。本质上它就是沿着时间维度把数据划分到不同的时间窗口中。虽然大部分系统就是这样做的, 但如果要实现基于event time的正确性,使用processing time来定义时间窗口显然是不行的。鉴于event time和processing time之间并没有一致性的关系,很有可能我们会把某些event time数据划分到错误的processing time窗口(比如因为延时)从而导致计算的不准确。后面需要详细讨论一下解决之道。
即使是根据event time进行时间窗口划分也不是所有问题都解决了。对于unbounded数据而言,无序性和skew的不确定性会带来一个完整性的问题:因为没有可控的event time/processing time映射关系,我们如何能够确定在时间窗口X中观测到的数据是完备的?真实场景中,我们无法提供完备性验证。主流的处理系统都依赖于完备性的概念,但当应用于无穷数据集时这些系统就有些捉襟见肘了。
与其把无限数据集打散到有限的batch中,我们不如设计一种工具可以让我们能够应付真实场景下的这种不确定性。新数据必将到达,旧数据可能会被删除或更新,任何系统都应该独立地处理这些事情。在这些系统中完备性的概念只是一个辅助而非一个必要条件。
下面我们讨论一下常见的数据处理范型(data processing pattern),既包括streaming引擎也包括batch引擎。micro-batch也被算作是streaming引擎。
有限数据集
处理有限数据集很简单,如下图所示:
上图中左边的数据集杂乱无章,运行某个数据处理引擎后(通常是batch引擎,比如MapReduce)变成了右边的“更有序”的样子。怎么捣腾数据虽然玩法是无穷的,但万变不离其宗,这种处理方式是不变的,依然非常简单。有挑战的还是处理无穷数据集,包括batch处理无穷数据集和streaming处理无穷数据集。
batch处理无穷数据
虽然设计的时候并不是用于处理无穷数据集的,但batch引擎处理unbounded data可谓历史悠久,谁让batch是先发明出来的呢。具体的方法就是分而治之的思想,即以批处理的方式把无穷数据集划分成一组有限数据集进行处理。
固定时间窗口
最常用的方式就是不断执行batch引擎从而把输入数据划分成大小相等的窗口,如下图所示。然后独立地处理每个窗口中的数据。对于像日志这种类型的输入数据,日志被写入到不同的路径和文件中,因此路径和文件的名字就特别适合用于命名时间窗口。这样看来似乎事情变得非常简单了,你只需要执行一个基于时间的路由策略就可以把所有数据按照event time发送到不同的时间窗口中。在实际使用时,大多数的系统会遭遇完备性的问题:某些事件在写入到日之前被耽搁了(比如网络原因或磁盘IO),或者事件虽然是全局收集的但在处理前被转移到一个公共的地方了,再或者事件是由移动设备发送过来的。这些情况中我们就需要一些手段来处理完备性,比如引入某种延时处理机制直到我们确信所有的时间都已经被收集了,或者只要那些晚到的数据到达,之前时间窗口中的数据就重新被处理一次。
如果batch引擎使用更加复杂的窗口策略(比如会话,session)来处理无界数据时,上面的方法就会更加的有局限性。Session本质上都被定义为特定用户的操作时段。Session之间的时段就是该用户无操作的时段。若使用batch引擎计算session,得到的session通常都是跨batch的,如下图中红色箭头部分所示:
增大batch size固然可以减少这种跨度,但代价就是延时的增加。另一种办法就是增加额外的逻辑将这种“断裂”的session缝合在一起,不过想想就知道这实现起来有多复杂。不论那种方式,使用经典的batch引擎来计算session效率很低。更好的办法是使用streaming的方式。
streaming处理无穷数据集
和大多数batch处理无穷数据集相反的是,streaming系统天生就是处理无穷数据集的。真实场景下的无限数据集有以下特点:
- 与event time高度无序——意味着你需要某种基于时间的路由规则
- 变动的event time skew——意味着你不能想当然地认为在[Y - a, Y + a]时间范围内总是看到event time = X的所有数据
当然,处理这类数据时还是有一些方法可用的,基本上可以划分成以下四类:
- 时间无关性方法
- 近似方法
- 基于process time的时间窗口
- 基于event time的时间窗口
时间无关性方法
如果本质上不关心时间——比如所有的逻辑都是数据驱动的——那么这类方法就非常适合了。其实这也没什么新鲜的,一个streaming引擎通常都是要支持的。本质上说,所有现存的streaming系统都天然支持这种与时间无关的使用场景。Batch系统也非常适合这种时间无关性的数据处理,只需简单地把无穷输入源划分成任意序列长度的有界数据集并分别独立处理即可。下面举几个例子来说明一下:
Filtering: 一个典型的例子就是过滤(filtering),如下图所示:
假设我们处理的是Web流量日志,想要过滤出某个特定领域来的所有流量,那么我们只需查看每条日志的来源,如果不符合条件直接pass掉。显然这和时间是没有关系的,因此数据源是否是无序,无穷或是变动的skew就显得不重要了。
Inner-joins: 另一个时间无关性的例子就是内连接(inner-joins)。当连接两个无穷数据源时倘若我们只在乎连接的结果,则处理逻辑就不需要考虑时间的因素。一旦看到某股输入源中出现一个值,那么我们就把这个值缓存起来。当值出现在第二股输入源时,只需要发送合并的消息即可了,如下图所示:
如果切换到外连接将引入数据完备性的问题:一旦看到了join的一边,那么如何才能确定另一边也到达了呢?老实说,我们没法得知,因此我们就必须引入某种超时机制——而这必然会引入时间因素。时间因素本质上就是时间窗口的形式。
近似算法
第二大类方法就是近似算法,比如近似的TopN,streaming K-means等。这些算法接收无穷数据源作为输入,而输出结果只能算是基本上满足我们的预期。近似算法的好处在于开销很低并且天生就是用于处理无穷数据集的,而缺点在于算法通常是很复杂的,而且它们的近似特性限制了它们的应用。值得注意的是,这些算法在设计上通常都引入了时间的元素。算法在处理事件时,时间因素通常都是基于processing time的,这对于提供了某类可控错误边界的算法而言是极其重要的。近似算法本身也可以被视为是与时间无关性处理的另一个例子。
时间窗口
剩下的两类方法都是时间窗口的变种,首先讨论下时间窗口的具体含义。时间窗口本质上就是将数据源沿着时间线划分成有限的数据块。下图表明不同的窗口范型:
- 固定窗口———— 固定窗口把时间划分成固定大小的段。具体还可以细分为对齐窗口和未对齐窗口
- 滑动窗口———— 固定窗口的一种广义形式,滑动窗口也是有固定的长度以及固定的间隔。如果间隔长度<窗口长度,那么窗口必然会造成重叠。如果间隔长度=窗口长度,那么就是固定窗口。如果间隔长度>窗口长度,这就被称为“取样窗口”,它只会查询一部分数据。滑动窗口通常是对齐的。
- 会话———— 属于动态窗口,会话就是一组事件序列,通常被用于分析用户行为。既然是用户操作事件序列,我们无法提前为session定义窗口长度,而且由于在实际中不同的用户其session也是不同的,因此它们属于经典的未对齐窗口
对于processing time和event time而言,时间窗口都是适用的,当然还是有区别的。我们首先来看基于processing time的时间窗口:
根据processing time创建时间窗口时,系统会缓存输入数据到窗口中直至超过了某段时间。比方说对于5分钟的固定时间窗口,系统会缓存之前5分钟的所有数据并封装进一个窗口中,之后发送给下游系统用于处理。这种窗口的特点如下:
- 简单
- 极易检验完备性:系统完全知道所有输入数据是否已经到来,无需处理延时数据
- 适用于数据被观测被产生价值的使用场景,比如通过计算每秒请求数的变化来判断是否出现服务中断
不过这种基于processing time的窗口有一个非常大的缺陷:必须要求数据按照event time顺序到达,否则无法真实再现事件发生场景,但是按照event time顺序的输入数据几乎不存在。。。。举个简单的例子,假设手机上的一个app收集用户统计信息。当手机未连上网络时,这段时间内收集到的数据就无法上传。这就意味着数据可能比真实的发生时间晚几分钟、几个小时、几周甚至更长。在处理时间窗口时,期望从这样的数据集中获取任何有用的结论都是不可能的。另一个例子,假设有一个全球服务处理从各个大洲收集上来的数据。如果网络问题导致带宽受阻,那么此时必然造成数据的skew。如果对这种数据基于processing time做窗口,那么这种窗口就无法表达包含在它们之下数据的真实发生情况。相反地,它们表示的是事件到达时的情况,必然是新旧数据相互混合的。这两个例子其实都应该以event time进行时间窗口的划分——即所谓的基于event time的时间窗口。
基于event time的时间窗口:反映事件发生时间的时间窗口。下图展示了一个基于event time的1小时固定时间窗口:
两条白线表明了两个特殊的数据:这两个数据点上的数据对应的processing time时间窗口与基于event time的时间窗口是错配的。因此如果使用基于processing time的时间窗口必然造成结果的不准确。由此可见,能够提供正确性是基于event time时间窗口的一大优势。
基于event time时间窗口的另一个优势在于它的大小可以动态变更,比如session,再不会有跨batch或跨窗口的情形发生,如下图所示:
天下没有免费的午餐。这种时间窗口有2个缺陷:
- 需要额外缓存:因为时间窗口的时间周期来拉长,需要缓存更多的数据。当然现在存储的成本不断下降,此缺陷便显得不是那么重要了
- 完备性:因为无法明确得知某个窗口下的所有数据都已经到来,因此便无法确认何时才能开始处理这个窗口下的数据。在实践过程中,系统通常会给一个经验值来定义窗口的完备性,但如果从绝对正确性的角度来考虑,唯一的解决办法就是提供一种方式能够让窗口中的数据可以被重新处理从而不断修正计算结果
以上就是我们对于streaming以及streaming系统的一些初级讨论,第二篇中将会讨论实际场景中streaming系统的解决之法。