Spark Job调优(Part 1)

原文链接:https://wongxingjun.github.io/2016/05/11/Spark-Job%E8%B0%83%E4%BC%98-Part-1/

Spark应用的执行效率是所有程序员需要关心的问题,单纯从代码层面去了解和优化明显是不够的,本文介绍Spark的底层执行模式,并给出了一些经验性的调优建议。本文是对Cloudera一篇博文的译文。

学习调优你的Spark Job获得最优的效率

当你通过公共API写Spark代码的时候,你会遇到诸如transformation,action和RDD这些字眼。在这个层面理解Spark对写Spark程序是很重要的。类似的,当情况变得糟糕时,或者你纠结于Web UI去搞清楚你的应用为什么跑得这么慢的时候,你就会遇到一些新的字眼,比如job,stage和task。在这个层面理解Spark对写好的Spark程序是很重要的,这个好当然指的就是快。为了写出执行效率高的Spark程序,理解Spark底层的执行模式是很有帮助的。
本文中你会学习到Spark程序到底如何在集群上执行的。然后有几点涉及Spark执行模式的如何写高效程序的经验性建议。

Spark怎么执行程序

一个Spark应用包含一个driver进程和一系列分布在集群节点上的executor进程。driver是上层负责控制工作流的进程。executor进程负责以task形式执行工作,同时也存储一些用户cache的数据。driver和executor在整个应用的生命周期中都保持运行,即使随后会有动态资源分配使之改变。单个executor有多个slot来执行task,并在其生命周期中并发执行。如何部署这些进程到集群中取决于使用的集群管理器(YARN,Mesos或者Standalone),但每个Spark应用中都有driver和executor。

执行模式最上层是job。在Spark应用中调用一个action就会触发开始一个Spark Job来完成这个action对应的操作。为了解析这个job的全貌,Spark会检查action的RDD依赖关系图并由此生成一个执行计划。执行计划从后面最远的RDD,也就是那些不依赖其他RDD,或者引用一些已经缓存的数据的RDD,直到最后产生出action结果的RDD。

执行计划包括将job的transformations划分成stage。一个stage对应着执行相同代码的task集合,每个task负责不同的数据子集。每个stage包括一系列不需要shuffle数据就可以完成的transformations。

那么什么决定数据是否需要被shuffle呢?回忆一下,RDD是由固定数量的partition分区组成,每个分区又由一些record记录组成。对那些由被称作窄transformation(比如map和filter)返回的RDD,用来计算单个分区里的record所需的record都在父RDD的单个分区中,每个对象只依赖父RDD中的单个对象。coalesce之类的操作能够产生处理多个分区的task,但是这种transformation依旧被视作是窄transformation,因为用来计算任何一个单个的输出record的输入record依旧只能在有限的分区集合中。

但是Spark也支持诸如groupByKey和reduceByKey这种宽的transformation。在这些依赖中,用来计算单个分区中record的数据可能分布在父RDD的多个分区中。一个task处理的分区里所有相同key的tuple元组最后必须最终分到同一个分区中。为了满足这些操作,Spark就得shuffle了,shuffle操作可以将分散于集群中的数据和结果传进一个新的stage,这个新的stage由新的分区组成(这些分区就是shuffle的结果)。

比如说,看下面的代码:


1

2

3

4

5


sc.textFile("someFile.txt").

map(mapFunc).

flatMap(flatMapFunc).

filter(filterFunc).

count()

这段代码只执行一个action,这个action依赖于一个文本文件产生的RDD上的一系列操作。然后这个action可以在一个stage内完成,因为三个操作的输出所依赖的计算数据没有来自输入RDD的不同分区的。

对比一下,下面这段代码是用来找出在一个文本文件中出现次数超过1000次的所有单词中所有字母出现的次数


1

2

3

4

5

6


val tokenized = sc.textFile(args(0)).flatMap(_.split(‘ ‘))

val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)

val filtered = wordCounts.filter(_._2 >= 1000)

val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).

reduceByKey(_ + _)

charCounts.collect()

这个过程会被分解成3个stage。reduceByKey操作产生stage分界,因为计算它的输出需要根据key重新对数据分区。

粉红色框中表示用于执行的stage图

在每个stage边界,父stage将数据写入磁盘,然后子stage的task通过网络将之拉取。因为会产生严重的磁盘和网络IO开销,所以stage边界的代价是很高昂的,我们应该尽可能避免。父stage的数据分区数是可能和子stage分区数不同的。触发stage边界的transformation可以接受一个numPartitions参数来决定在子stage中将数据分成多少个分区。

和MapReduce Job中reducer个数是个很重要的参数一样,调整stage边界的分区数常常对应用性能起决定性作用。接下来将会深入介绍如何调优这个参数。

选择正确的操作符

当开发者试图用Spark来完成一些工作时总是倾向于选择一些产生同样结果的action编排方式和transformation,然而并不是所有的编排都有同样的性能:避免常见的陷阱和选择正确的编排可以对一个应用的性能产生很大的影响。有一些规则和建议可以帮你如何抉择。

最近在Spark-5097的工作开始增加了SchemaRDD来用Spark的核心API向开发者打开Spark的Catalyst optimizer,允许Spark在使用那些操作上做一些上层选择。当SchemaRDD成为一个稳定的组件时,用户可以避免再做一些这类决定了。

选择操作的编排方式根本目的就是为了减少shuffle数和shuffle的数据量。这是因为shuffle是代价相当高的操作,所有的shuffle数据必须被写到磁盘然后通过网络传输。repartition,join,cogroup和任何带有By,ByKey的转换操作都会产生shuffle。并非所有的操作代价都是相同的,但是一些Spark开发菜鸟们遇到的性能陷阱都是因为做了错误的选择:

  • 进行关联的规约操作时避免使用groupByKey

比如说rdd.groupByKey().mapValues(_.sum)就可以和rdd.reduceByKey(+)产生出同样的计算结果。但是前者却需要将整个数据集通过网络传输,而后者只需要对每个分区中的每个key求本地和,然后在shuffle之后将之合并成一个更大的和。

  • 当输入输出值类型不同时避免使用reduceByKey

比如说写一个操作找出每个key对应的不同字符串,一种方式就是使用map将每个元素转换成一个Set之后再通过reduceByKey来合并:


1

2


rdd.map(kv => (kv._1, new Set[String]() + kv._2))

.reduceByKey(_ ++ _)

这段代码会造成大量不必要的对象创建,因为每个record必须分配一个新的set。在这种情况下最好用aggregateByKey操作,它可以更高效地做map-side的聚合:


1

2

3

4


val zero = new collection.mutable.Set[String]()

rdd.aggregateByKey(zero)(

(set, v) => set += v,

(set1, set2) => set1 ++= set2)

  • 避免使用flatMap-join-groupBy模式

当两个数据集已经通过key分组了然后你想保持他们分组的情况下将之连接起来,你可以就使用cogroup,这就可以避免所有group的拆解和重组造成的开销。

何时没有shuffle

知道上面的操作何时不会产生shuffle也是很有用的,Spark知道当前面的转换已经根据相同的partitioner分区器分好区的时候如何避免shuffle。看下面的代码:


1

2

3


rdd1 = someRdd.reduceByKey(...)

rdd2 = someOtherRdd.reduceByKey(...)

rdd3 = rdd1.join(rdd2)

因为没有分区器传到reduceByKey,就会使用默认的分区器,最后导致rdd1和rdd2都是hash-partitioned(hash分区规则),这两个reduceByKey会产生两个shuffle。如果RDD有相同数目的分区,join操作不需要额外的shuffle操作。因为RDD是相同分区的,rdd1中任何一个分区的key集合都只能出现在rdd2中的单个分区中。因此rdd3中任何一个输出分区的内容仅仅依赖rdd1和rdd2中的单个分区,第三次shuffle就没有必要了。

比如说,如果somRdd有4个分区,someOtherRdd有两个分区,两个reduceByKey都用3个分区,那么执行的task集合就会像下面这样:

那如果rdd1和rdd2使用不同的分区器,或者使用默认的hash分区器但配置不同的分区数呢?那样的话,仅仅只有一个rdd(较少分区的RDD)需要重新shuffle后再join。

一样的转换,一样的输入但不一样的分区数,task执行如下:

有一种当join两个数据集时避免shuffle操作的方法就是使用broadcast variables广播变量的优点。当一个数据集足够小,小到可以在单个executor的内存中存放时,可以将它在driver上载入一个hash table中然后广播给每个executor。然后map转换就可以参照hash table来查询。

何时shuffle越多越好

在尽力减少shuffle数的规则外有一个例外。额外的shuffle操作可以有利于增加并行度。比如说你的数据到了一些不可分割的文件里,InputFormat指定的分区方式可能将大量的record放进每个分区中,这样就不能产生足够多的分区来充分利用可用的core。在这种情况下在载入数据后通过重新分区来将数据分成更多的分区(这就会带来一次shuffle操作)就可以让之后的操作可以利用更多集群的CPU。

还有一个例外就是当用reduce或者aggregate action来聚合数据到driver时。当对大量分区进行巨厚的时候,在driver上单线程合并所有的结果很快就会变成计算的瓶颈。为了减少driver上的负载,可以先用reduceByKeyaggregateByKey来进行一次分布式的合并,这样可以将数据集分成更少的分区。在将结果发送到driver上进行最后的聚合之前分区的value之间两两并行合并。可以看一下treeReducetreeAggregate是如何做的。(注意在1.2版本中,大多数最近的版本都被标记为开发者API,SPARK-5430开始试图将其作为稳定API加入core中)

当聚合数据已经根据一个key分组了的时候这个小窍门分外有效。比如说,一个应用想要统计一篇文章中所有单词出现的次数,然后将结果以map形式推送到driver上。一种方式可以通过aggregate action实现,具体就是每个分区进行一次本地的map,最后在driver上将所有的map合并。另一种方式可以通过aggregateByKey来实现,具体就是分布式进行count,然后简单地collectAsMap结果到driver上。

二次排序

另一个需要知道的重要的机制就是repartitionAndSortWithinPartitions转换,这个转换听起来挺神秘,但是出现所有排序的奇怪情况。它将排序push到shuffle的机器上,在那里大量数据可以被高效地溢写并且排序可以和其他操作结合起来。

比如Apache Hive on Spark在它的join实现中就使用了这种操作,这个转换还在二次排序模式中扮演了重要的模块,在二次排序中你希望将records按key分组然后当迭代到对应key的value上的时候将他们按照特定的顺序表现出来。这种情况需要根据user将events分组然后按照他们出现的时间顺序为每个用户分析events。充分利用repartitionAndSortWithinPartitions转换来做二次排序对用户来说有点难度,但是SPARK-3655即将大大简化一过程。

结论

现在你应该对影响Spark程序性能效率的基本因素有了较好的理解,在Part2中将会介绍怎么对资源请求、并行度和数据结构进行调优。

参考文献

[1]How-to: Tune Your Apache Spark Jobs (Part 1)

时间: 2024-08-07 03:27:29

Spark Job调优(Part 1)的相关文章

spark监控调优

一.Spark运行时架构: Spark分布式结构采取 主/从 结构模式.主是驱动器(Driver)节点,这个节点负责中央协调,调度各个工作(执行器executor)节点. 从是执行器(executor)节点. Spark驱动器节点和执行器节点统称为Spark应用.Spark应用通过集群管理器在集群的机器上启动. 二.驱动器和执行器的任务: 驱动器任务:负责运行组成Spark作业的任务: 执行器任务:为要求缓存的RDD提供内存式存储. 三.集群管理器 Cluster Manager可以用来启动驱动

Spark&Spark性能调优实战

Spark特别适用于多次操作特定的数据,分mem-only和mem & disk.其中mem-only:效率高,但占用大量的内存,成本很高;mem & disk:内存用完后,会自动向磁盘迁移,解决了内存不足的问题,却带来了数据的置换的消费.Spark常见的调优工具有nman.Jmeter和Jprofile,以下是Spark调优的一个实例分析: 1.场景:精确客户群 对一个容量为300g的客户信息表在spark上进行查询优化,该大宽表有1800多列,有效使用的有20列. 2.优化达到的效果:

Spark Job调优(Part 2)

原文链接:https://wongxingjun.github.io/2016/05/11/Spark-Job%E8%B0%83%E4%BC%98-Part-2/ 这篇文章将会完成Part 1中留下的部分,我会尽力介绍更多的你关心的能加速Spark程序的东西.特别是你将会学习资源调优或者配置Spark来充分利用集群提供的所有资源.然后我们会转向并行度调优,job性能中最难的也是最重要的参数.最后你会学习如何表示数据本身,Spark能读取的磁盘存储形式(用Apache Avro或者Apache P

数据倾斜是多么痛?spark作业调优秘籍

目录视图 摘要视图 订阅 [观点]物联网与大数据将助推工业应用的崛起,你认同么?      CSDN日报20170703--<从高考到程序员--我一直在寻找答案>      [直播]探究Linux的总线.设备.驱动模型! 数据倾斜是多么痛?spark作业调优秘籍 2017-06-27 13:28 39人阅读 评论(0) 收藏 举报  分类: Spark(124)  原文:https://mp.weixin.qq.com/s?__biz=MzI5OTAwMTM1MQ==&mid=2456

Spark性能调优之JVM调优

Spark性能调优之JVM调优 通过一张图让你明白以下四个问题 1.JVM GC机制,堆内存的组成                2.Spark的调优为什么会和JVM的调优会有关联?--因为Scala也是基于JVM运行的语言                3.Spark中OOM产生的原因                4.如何在JVM这个层面上来对Spark进行调优 补充:                Spark程序运行时--JVM堆内存分配比例 RDD缓存的数据(0.6)    默认 对象_

spark性能调优 数据倾斜 内存不足 oom解决办法

[重要] Spark性能调优--扩展篇 : http://blog.csdn.net/zdy0_2004/article/details/51705043

Spark性能调优之合理设置并行度

Spark性能调优之合理设置并行度 1.Spark的并行度指的是什么?  spark作业中,各个stage的task的数量,也就代表了spark作业在各个阶段stage的并行度! 当分配完所能分配的最大资源了,然后对应资源去调节程序的并行度,如果并行度没有与资源相匹配,那么导致你分配下去的资源都浪费掉了.同时并行运行,还可以让每个task要处理的数量变少(很简单的原理.合理设置并行度,可以充分利用集群资源,减少每个task处理数据量,而增加性能加快运行速度.)   举例: 假如, 现在已经在sp

Spark性能调优之代码方面的优化

Spark性能调优之代码方面的优化 1.避免创建重复的RDD 对性能没有问题,但会造成代码混乱 2.尽可能复用同一个RDD,减少产生RDD的个数   3.对多次使用的RDD进行持久化(cache,persist,checkpoint) 如何选择一种最合适的持久化策略? 默认MEMORY_ONLY, 性能很高, 而且不需要复制一份数据的副本,远程传送到其他节点上(BlockManager中的BlockTransferService),但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种

Spark性能调优之解决数据倾斜

Spark性能调优之解决数据倾斜 数据倾斜七种解决方案 shuffle的过程最容易引起数据倾斜 1.使用Hive ETL预处理数据    ? 方案适用场景:如果导致数据倾斜的是Hive表.如果该Hive表中的数据本身很不均匀(比如某个 key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用Spark对Hive表 执行某个分析操作,那么比较适合使用这种技术方案.    ? 方案实现思路:此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对

[Spark性能调优] Spark Shuffle 中 JVM 内存使用及配置详情

[Spark性能调优]  Spark Shuffle 中 JVM 内存使用及配置详情 本课主题 JVM 內存使用架构剖析 Spark 1.6.x 和 Spark 2.x 的 JVM 剖析 Spark 1.6.x 以前 on Yarn 计算内存使用案例 Spark Unified Memory 的运行原理和机制 引言 Spark 从1.6.x 开始对 JVM 的内存使用作出了一种全新的改变,Spark 1.6.x 以前是基于静态固定的JVM内存使用架构和运行机制,如果你不知道 Spark 到底对