Spark Streaming实践和优化

发表于:《程序员》杂志2016年2月刊。链接:http://geek.csdn.net/news/detail/54500

作者:徐鑫,董西成

在流式计算领域,Spark Streaming和Storm时下应用最广泛的两个计算引擎。其中,Spark Streaming是Spark生态系统中的重要组成部分,在实现上复用Spark计算引擎。如图1所示,Spark Streaming支持的数据源有很多,如Kafka、Flume、TCP等。Spark Streaming的内部数据表示形式为DStream(Discretized Stream,离散的数据流),其接口设计与RDD非常相似,这使得它对Spark用户非常友好。Spark Streaming的核心思想是把流式处理转化为“微批处理”,即以时间为单位切分数据流,每个切片内的数据对应一个RDD,进而可以采用Spark引擎进行快速计算。由于Spark Streaming采用了微批处理方式,因此严格来说只是一个近实时的处理系统,而不是真正的流式处理系统。

图1:Spark Streaming数据流

Storm是这个领域另一个著名的开源流式计算引擎,这是一个真正的流式处理系统,它每次从数据源读一条数据,然后单独处理。相比于Spark Streaming,Storm有更快速的响应时间(小于一秒),更适合低延迟的应用场景,比如信用卡欺诈系统,广告系统等。但是对比Storm,Spark Streaming的优势是吞吐量大,响应时间也可以接受(秒级),并且兼容Spark系统中的其他工具库,如MLlib和GraphX。从而,对于时间不敏感且流量很大的系统,Spark Streaming是更优的选择。

Spark Streaming在Hulu应用

Hulu是美国的专业在线视频网站,每天会有大量用户在线观看视频,进而产生大量用户观看的行为数据。这些数据通过收集系统进入Hulu的大数据平台,存储并做进一步处理。在大数据平台之上,各个团队会根据需要设计相应的算法对数据进行分析和挖掘以便产生商业价值:推荐团队从这些数据里挖掘出用户感兴趣的内容并做精准推荐,广告团队根据用户的历史行为推送最合适的广告,数据团队从数据的各个维度进行分析从而为公司的策略制定提供可靠依据。

Hulu大数据平台的实现依循Lambda架构。Lambda架构是一个通用的大数据处理框架,包含离线的批处理层、在线的加速层和服务层三部分,具体如图2所示。服务层一般使用HTTP服务或自定制的客户端对外提供数据访问,离线的批处理层一般使用批处理计算框架Spark和MapReduce进行数据分析,在线的加速层一般使用流式实时计算框架Spark Streaming和Storm进行数据分析。

图2:lambda架构原理图

对于实时计算部分,Hulu内部使用了Kafka、Codis和Spark Streaming。下面按照数据流的过程,介绍我们的项目。

  1. 1.     收集数据

从服务器日志中收集数据,主要包括两个部分:

q  来自网页、手机App、机顶盒等设备用户产生的视频观看、广告点击等行为,这些行为数据记录在各自的Nginx服务的日志中。

q   使用Flume将用户行为数据同时导入HDFS和Kafka,其中HDFS中的数据用于离线分析,而Kafka中数据则用于流式实时分析。

图3:Hulu数据收集流程

  1. 2.     存储标签数据

Hulu使用HBase存储用户标签数据,包括基本信息如性别、年龄、是否付费,以及其他模型推测出来的偏好属性。这些属性需要作为计算模型的输入,同时HBase随机读取的速度比较慢,需要将数据同步到缓存服务器中以加快数据读取速度。Redis是一个应用广泛的开源缓存服务器,但其本身是个单机系统,不能很好地支持大量数据的缓存。为解决Redis扩展性差的问题,豌豆荚开源了Codis,一个分布式Redis解决方案。Hulu将Codis打成Docker镜像,并实现一键式构建缓存系统,附带自动监控和修复功能。为了更精细的监控,Hulu构建了多个Codis缓存,分别是:

q  codis-profile,同步HBase中的用户属性;

q  codis-action,缓存来自Kafka的用户行为;

q  codis-result,记录计算结果。

  1. 3.     实时处理数据

在一切准备就绪,启动Spark Streaming程序:

1)      Spark Streaming启动Kafka Receiver,持续地从Kafka服务器拉取数据;

2)      每隔两秒,Kafka的数据被整理成一个RDD,交给Spark引擎处理;

3)      对一条用户行为,Spark会从codis-action缓存中拿到该用户的行为记录,然后把新的行为追加进去;

4)      Spark从codis-action和codis-profile中获得该用户的所有相关属性,然后执行广告和推荐的计算模型,最后把结果写入codis-result,进而供服务层实时读取这些结果。

Spark Streaming优化经验

实践中,业务逻辑首先保证完成,使得在Kafka输入数据量较小的情况下系统稳定运行,且输入输出满足项目需求。然后开始调优,修改Spark Streaming的参数,比如Executor的数量,Core的数量,Receiver的流量等。最后发现仅调参数无法完全满足本项目的业务场景,所以有更进一步的优化方案,总结如下:

  1. Executor初始化

很多机器学习的模型在第一次运行时,需要执行初始化方法,还会连接外部的数据库,常常需要5-10分钟,这会成为潜在的不稳定因素。在Spark Streaming应用中,当Receiver完成初始化,它就开始源源不断地接收数据,并且由Driver定期调度任务消耗这些数据。如果刚启动时Executor需要几分钟做准备,会导致第一个作业一直没有完成,这段时间内 Driver不会调度新的作业。这时候在Kafka Receiver端会有数据积压,随着积压的数据量越来越大,大部分数据会撑过新生代进入老年代,进而给Java GC带来严重的压力,容易引发应用程序崩溃。

本项目的解决方案是,修改Spark内核,在每个Executor接收任务之前先执行一个用户自定义的初始化函数,初始化函数中可以执行一些独立的用户逻辑。示例代码如下:


// sc:是SparkContext, setupEnvironment是Hulu扩展的API

sc.setupEnvironment(() => {

application.initialize() // 用户应用程序初始化,需执行几分钟

})

该方案需要更改Spark的任务调度器,首先将每个Executor设置为未初始化状态。此时,调度器只会给未初始化状态的Executor分配初始化任务(执行前面提到的初始化函数)。等初始化任务完毕,调度器更新Executor的状态为已初始化,这样的Executor才可以分配正常的计算任务。

  1. 异步处理Task中的业务逻辑

本项目中,模型的输入参数均来自Codis,甚至模型内部也可能访问外部存储,直接导致模型计算时长不稳定,很多时间消耗在网络等待上。

为提高系统吞吐量,增大并行度是常用的优化方案,但在本项目的场景中并不适用。Spark作业的调度策略是,等待上一个作业的所有Task执行完毕,然后调度下一个作业。如果单个Task的运行时间不稳定,易发生个别Task拖慢整个作业的情况,以至于资源利用率不高;甚至并行度越大,该问题越严重。一种常用解决Task不稳定的方案是增大Spark Streaming的micro batch的时间间隔,该方案会使整个实时系统的延迟变长,并不推荐。

因此这里通过异步处理Task中的业务逻辑来解决。如下文的代码所示,同步方案中,Task内执行业务逻辑,处理时间不定;异步方案中,Task把业务逻辑嵌入线程,交给线程池执行,Task立刻结束, Executor向Driver报告执行完毕,异步处理的时间非常短,在100ms以内。另外,当线程池中积压的线程数量太大时(代码中qsize>100的情况),会暂时使用同步处理,配合反压机制(见下文的参数spark.streaming.backpressure.enabled),可以保证不会因为数据积压过多而导致系统崩溃。经实验验证,该方案大大提高了系统的吞吐量。


// 同步处理

// 函数 runBusinessLogic是 Task 中的业务逻辑,执行时间不定

rdd.foreachPartition(partition => runBusinessLogic (partition))

// 异步处理,threadPool是线程池

rdd.foreachPartition(partition => {

val qsize = threadPool.getQueue.size // 线程池中积压的线程数

if (qsize > 100) {

runBusinessLogic(partition) // 暂时同步处理

}

threadPool.execute(new Runnable {

override def run() = runBusinessLogic(partition)

})

})

异步化Task也存在缺点:如果Executor发生异常,存放在线程池中的业务逻辑无法重新计算,会导致部分数据丢失。经实验验证,仅当Executor异常崩溃时有数据丢失,且不常见,在本项目的场景中可以接受。

  1. Kafka Receiver的稳定性

本项目使用了Spark Streaming中的Kafka Receiver,本质上调用Kafka官方的客户端ZookeeperConsumerConnector。其策略是每个客户端在Zookeeper的固定路径下把自己注册为临时节点,于是所有客户端都知道其他客户端的存在,然后自动协调和分配Kafka的数据资源。该策略存在一个弊端,当一个客户端与Zookeeper的连接状态发生改变(断开或者连上),所有的客户端都会通过Zookeeper协调, 重新分配Kafka的数据资源;在此期间所有客户端都断开与Kafka的连接,系统接收不到Kafka的数据,直到重新分配成功。如果网络质量不佳,并且Receiver的个数较多,这种策略会造成数据输入不稳定,很多Spark Streaming用户遇到这样的问题。在我们的系统中,该策略并没有产生明显的负面影响。值得注意的是,Kafka 客户端与Zookeeper有个默认的参数zookeeper.session.timeout.ms=6000,表示客户端与Zookeeper连接的session有效时间为6秒,我们的客户端多次出现因为Full GC超过6秒而与Zookeeper断开连接,之后再次连接上,期间所有客户端都受到影响,系统表现不稳定。所以项目中设置参数zookeeper.session.timeout.ms=30000。

  1. YARN资源抢占问题

在Hulu内部,Spark Streaming这样的长时服务与MapRedue、Spark、Hive等批处理应用共享YARN集群资源。在共享环境中,经常因一个批处理应用占用大量网络资源或者CPU资源导致Spark Streaming服务不稳定(尽管我们采用了CGroup进行资源隔离,但效果不佳)。更严重的问题是,如果个别Container崩溃Driver需要向YARN申请新的Container,或者如果整个应用崩溃需要重启,Spark Streaming不能保证很快申请到足够的资源,也就无法保证线上服务的质量。为解决该问题,Hulu使用label-based scheduling的调度策略,从YARN集群中隔离出若干节点专门运行Spark Streaming和其他长时服务,避免与批处理程序竞争资源。

  1. 完善监控信息

监控反映系统运行的性能状态,也是一切优化的基础。 Hulu使用Graphite和Grafana作为第三方监控系统,本项目把系统中关键的性能参数(如计算时长和次数)发送给Graphite服务器,就能够在Grafana网页上看到直观的统计图。

图4:Graphite监控信息,展示了Kafka中日志的剩余数量,一条线对应于一个partition的历史余量

图4是统计Kafka中日志的剩余数量,一条线对应于一个partition的历史余量,大部分情况下余量接近零,符合预期。图中09:55左右日志余量开始出现很明显的尖峰,之后又迅速逼近零。事后经过多种数据核对,证实Kafka的数据一直稳定,而当时Spark Streaming执行作业突然变慢,反压机制生效,于是Kafka Receiver减小读取日志的速率,造成Kafka数据积压;一段时间之后Spark Streaming又恢复正常,快速消耗了Kafka中的数据余量。

直观的监控系统能有效地暴露问题,进而理解和强化系统。 在我们的实践中,主要的监控指标有:

q   Kafka的剩余数据量

q  Spark的作业运行时间和调度时间

q  每个Task的计算时间

q  Codis的访问次数、时间、命中率

另外,有脚本定期分析这些统计数据,出现异常则发邮件报警。比如图4中 Kafka 的日志余量过大时,会有连续的报警邮件。我们的经验是,监控越细致,之后的优化工作越轻松。

  1. 参数优化

下表列出本项目中比较关键的几个参数:


spark.yarn.max.executor.failures


Executor允许的失败上限;如果超过该上限,整个Spark Streaming会失败,需要设置比较大


spark.yarn.executor.memoryOverhead


Executor中JVM的开销,与堆内存不一样,设置太小会导致内存溢出异常


spark.receivers.num


Kafka Receiver的个数


spark.streaming.receiver.maxRate


每个Receiver能够接受数据的最大速率;这个值超过峰值约50%


spark.streaming.backpressure.enabled


反压机制;如果目前系统的延迟较长,Receiver端会自动减小接受数据的速率,避免系统因数据积压过多而崩溃


spark.locality.wait


系统调度Task会尽量考虑数据的局部性,如果超过spark.locality.wait设置时间的上限,就放弃局部性;该参数直接影响Task的调度时间


spark.cleaner.ttl


Spark系统内部的元信息的超时时间;Streaming长期运行,元信息累积太多会影响性能

总结

Spark Streaming的产品上线运行一年多,期间进行了多次Spark版本升级,从最早期的0.8版本到最近的 1.5.x版本。总体上Spark Streaming是一款优秀的实时计算框架,可以在线上使用 。但仍然存在一些不足,包括:Spark同时使用堆内和堆外的内存,缺乏有效的监控,遇到OOM时分析和调试比较困难;缺少Executor初始化接口; 新版本的Spark有一些异常,如Shuffle过程中Block丢失、内存溢出。

时间: 2024-08-05 14:45:59

Spark Streaming实践和优化的相关文章

5分钟spark streaming实践之 与kafka联姻

你:kafka是什么? 我:嗯,这个嘛..看官网. Apache Kafka? is a distributed streaming platform Kafka is generally used for two broad classes of applications: Building real-time streaming data pipelines that reliably get data between systems or applications,Building rea

Spark Streaming高级特性在NDCG计算实践

从storm到spark streaming,再到flink,流式计算得到长足发展, 依托于spark平台的spark streaming走出了一条自己的路,其借鉴了spark批处理架构,通过批处理方式实现了实时处理框架.为进一步了解spark streaming的相关内容,飞马网于3月20日晚邀请到历任百度大数据的高级工程师-王富平,在线上直播中,王老师针对spark streaming高级特性以及ndcg计算实践进行了分享. 以下是本次直播的主要内容: 一.Spark Streaming简介

Spark 以及 spark streaming 核心原理及实践

导语 spark 已经成为广告.报表以及推荐系统等大数据计算场景中首选系统,因效率高,易用以及通用性越来越得到大家的青睐,我自己最近半年在接触spark以及spark streaming之后,对spark技术的使用有一些自己的经验积累以及心得体会,在此分享给大家. 本文依次从spark生态,原理,基本概念,spark streaming原理及实践,还有spark调优以及环境搭建等方面进行介绍,希望对大家有所帮助. spark 生态及运行原理 Spark 特点 运行速度快 => Spark拥有DA

【原创 Hadoop&Spark 动手实践 11】Spark Streaming 应用与动手实践

[原创 Hadoop&Spark 动手实践 11]Spark Streaming 应用与动手实践 目标: 1. 掌握Spark Streaming的基本原理 2. 完成Spark Streaming最简单的演练和动手实验 3. 完成一个完整的Spark Streaming的实际案例(用户手机信息实时分析系统)

某人视频中提到的 Spark Streaming 优化的几点事项

某人,并未提他的名字,是因为看的视频是1年前的,视频里他吹得厉害.我看视频时,查了一下他在视频里说的要做到的东西,结果上网一查,就看到了很多人说他骗了钱后,就不管交了学费的人了.真假无从查起.但是无风不起浪.也真没查到他说的要做出来的东西发布出来.所以这里不那人的名字了.只把他说的知识拿过来,做些笔记. 一.Batch中Task处理时间大 Spark Streaming 的处理模式是按照 Batch Duration 进行 Micro Batch Computation 的,且如果上一批数据没有

【转】Spark Streaming 实时计算在甜橙金融监控系统中的应用及优化

系统架构介绍 整个实时监控系统的架构是先由 Flume 收集服务器产生的日志 Log 和前端埋点数据, 然后实时把这些信息发送到 Kafka 分布式发布订阅消息系统,接着由 Spark Streaming 消费 Kafka 中的消息,同时消费记录由 Zookeeper 集群统一管理,这样即使 Kafka 宕机重启后也能找到上次的消费记录继而进行消费.在这里 Spark Streaming 首先从 MySQL 读取规则然后进行 ETL 清洗并计算多个聚合指标,最后将结果的一部分存储到 Hbase

Spark Streaming性能优化: 如何在生产环境下应对流数据峰值巨变

1.为什么引入Backpressure 默认情况下,Spark Streaming通过Receiver以生产者生产数据的速率接收数据,计算过程中会出现batch processing time > batch interval的情况,其中batch processing time 为实际计算一个批次花费时间, batch interval为Streaming应用设置的批处理间隔.这意味着Spark Streaming的数据接收速率高于Spark从队列中移除数据的速率,也就是数据处理能力低,在设置

Spark Streaming性能优化系列-如何获得和持续使用足够的集群计算资源?

一:数据峰值的巨大影响 1. 数据确实不稳定,例如晚上的时候访问流量特别大 2. 在处理的时候例如GC的时候耽误时间会产生delay延迟 二:Backpressure:数据的反压机制 基本思想:根据上一次计算的Job的一些信息评估来决定下一个Job数据接收的速度. 如何限制Spark接收数据的速度? Spark Streaming在接收数据的时候必须把当前的数据接收完毕才能接收下一条数据. 源码解析 RateController: 1. RateController是监听器,继承自Streami

Dataflow编程模型和spark streaming结合

Dataflow编程模型和spark streaming结合 主要介绍一下Dataflow编程模型的基本思想,后面再简单比较一下Spark  streaming的编程模型 == 是什么 == 为用户提供以流式或批量模式处理海量数据的能力,该服务的编程接口模型(或者说计算框架)也就是下面要讨论的dataflow model 流式计算框架处理框架很多,也有大量的模型/框架号称能较好的处理流式和批量计算场景,比如Lambda模型,比如Spark等等,那么dataflow模型有什么特别的呢? 这就要要从