Spark技术内幕:Shuffle的性能调优

通过上面的架构和源码实现的分析,不难得出Shuffle是Spark Core比较复杂的模块的结论。它也是非常影响性能的操作之一。因此,在这里整理了会影响Shuffle性能的各项配置。尽管大部分的配置项在前文已经解释过它的含义,由于这些参数的确是非常重要,这里算是做一个详细的总结。

1.1.1  spark.shuffle.manager

前文也多次提到过,Spark1.2.0官方支持两种方式的Shuffle,即Hash Based Shuffle和Sort Based Shuffle。其中在Spark 1.0之前仅支持Hash Based Shuffle。Spark 1.1的时候引入了Sort Based Shuffle。Spark 1.2的默认Shuffle机制从Hash变成了Sort。如果需要Hash Based Shuffle,可以将spark.shuffle.manager设置成“hash”即可。

如果对性能有比较苛刻的要求,那么就要理解这两种不同的Shuffle机制的原理,结合具体的应用场景进行选择。

Hash Based Shuffle,就是将数据根据Hash的结果,将各个Reducer partition的数据写到单独的文件中去,写数据时不会有排序的操作。这个问题就是如果Reducer的partition比较多的时候,会产生大量的磁盘文件。这会带来两个问题:

1)       同时打开的文件比较多,那么大量的文件句柄和写操作分配的临时内存会非常大,对于内存的使用和GC带来很多的压力。尤其是在Sparkon YARN的模式下,Executor分配的内存普遍比较小的时候,这个问题会更严重。

2)       从整体来看,这些文件带来大量的随机读,读性能可能会遇到瓶颈。

更加细节的讨论可以参见7.1节和7.6.6(尝试去解决写的文件太多的问题)。

Sort Based Shuffle会根据实际情况对数据采用不同的方式进行Sort。这个排序可能仅仅是按照Reducer的partition进行排序,保证同一个Shuffle Map Task的对应于不同的Reducer的partition的数据都可以写到同一个数据文件,通过一个Offset来标记不同的Reducer partition的分界。因此一个Shuffle Map Task仅仅会生成一个数据文件(还有一个index索引文件),从而避免了Hash Based Shuffle文件数量过多的问题。

选择Hash还是Sort,取决于内存,排序和文件操作等因素的综合影响。

对于不需要进行排序的Shuffle而且Shuffle产生的文件数量不是特别多,Hash Based Shuffle可能是个更好的选择;毕竟Sort Based Shuffle至少会按照Reducer的partition进行排序。

而Sort BasedShuffle的优势就在于Scalability,它的出现实际上很大程度上是解决Hash Based Shuffle的Scalability的问题。由于Sort Based Shuffle还在不断的演进中,因此Sort Based Shuffle的性能会得到不断的改善。

对选择那种Shuffle,如果对于性能要求苛刻,最好还是通过实际的场景中测试后再决定。不过选择默认的Sort,可以满足大部分的场景需要。

1.1.2  spark.shuffle.spill

这个参数的默认值是true,用于指定Shuffle过程中如果内存中的数据超过阈值(参考spark.shuffle.memoryFraction的设置),那么是否需要将部分数据临时写入外部存储。如果设置为false,那么这个过程就会一直使用内存,会有Out Of Memory的风险。因此只有在确定内存足够使用时,才可以将这个选项设置为false。

对于Hash BasedShuffle的Shuffle Write过程中使用的org.apache.spark.util.collection.AppendOnlyMap就是全内存的方式,而org.apache.spark.util.collection.ExternalAppendOnlyMap对org.apache.spark.util.collection.AppendOnlyMap有了进一步的封装,在内存使用超过阈值时会将它spill到外部存储,在最后的时候会对这些临时文件进行Merge。

而Sort BasedShuffle Write使用到的org.apache.spark.util.collection.ExternalSorter也会有类似的spill。

而对于ShuffleRead,如果需要做aggregate,也可能在aggregate的过程中将数据spill的外部存储。

1.1.3  spark.shuffle.memoryFraction和spark.shuffle.safetyFraction

在启用spark.shuffle.spill的情况下,spark.shuffle.memoryFraction决定了当Shuffle过程中使用的内存达到总内存多少比例的时候开始Spill。在Spark 1.2.0里,这个值是0.2。通过这个参数可以设置Shuffle过程占用内存的大小,它直接影响了Spill的频率和GC。

如果Spill的频率太高,那么可以适当的增加spark.shuffle.memoryFraction来增加Shuffle过程的可用内存数,进而减少Spill的频率。当然为了避免OOM(内存溢出),可能就需要减少RDD cache所用的内存,即需要减少spark.storage.memoryFraction的值;但是减少RDD cache所用的内存有可能会带来其他的影响,因此需要综合考量。

在Shuffle过程中,Shuffle占用的内存数是估计出来的,并不是每次新增的数据项都会计算一次占用的内存大小,这样做是为了降低时间开销。但是估计也会有误差,因此存在实际使用的内存数比估算值要大的情况,因此参数 spark.shuffle.safetyFraction作为一个保险系数降低实际Shuffle过程所需要的内存值,降低实际内存超出用户配置值的风险。

1.1.4  spark.shuffle.sort.bypassMergeThreshold

这个配置的默认值是200,用于设置在Reducer的partition数目少于多少的时候,Sort Based Shuffle内部不使用Merge Sort的方式处理数据,而是直接将每个partition写入单独的文件。这个方式和Hash Based的方式是类似的,区别就是在最后这些文件还是会合并成一个单独的文件,并通过一个index索引文件来标记不同partition的位置信息。从Reducer看来,数据文件和索引文件的格式和内部是否做过Merge Sort是完全相同的。

这个可以看做SortBased Shuffle在Shuffle量比较小的时候对于Hash Based Shuffle的一种折衷。当然了它和Hash Based Shuffle一样,也存在同时打开文件过多导致内存占用增加的问题。因此如果GC比较严重或者内存比较紧张,可以适当的降低这个值。

1.1.5  spark.shuffle.blockTransferService

在Spark 1.2.0,这个配置的默认值是netty,而之前是nio。这个主要是用于在各个Executor之间传输Shuffle数据。Netty的实现更加简洁,但实际上用户不用太关心这个选项。除非是有特殊的需求,否则采用默认配置就可以。

1.1.6  spark.shuffle.consolidateFiles

这个配置的默认配置是false。主要是为了解决在Hash Based Shuffle过程中产生过多文件的问题。如果配置选项为true,那么对于同一个Core上运行的Shuffle Map Task不会新产生一个Shuffle文件而是重用原来的。但是每个Shuffle Map Task还是需要产生下游Task数量的文件,因此它并没有减少同时打开文件的数量。如果需要了解更加详细的细节,可以阅读7.1节。

但是consolidateFiles的机制在Spark 0.8.1就引入了,到Spark 1.2.0还是没有稳定下来。从源码实现的角度看,实现源码是非常简单的,但是由于涉及本地的文件系统等限制,这个策略可能会带来各种各样的问题。由于它并没有减少同时打开文件的数量,因此不能减少由文件句柄带来的内存消耗。如果面临Shuffle的文件数量非常大,那么是否打开这个选项最好还是通过实际测试后再决定。

1.1.7  spark.shuffle.service.enabled

(false)

1.1.8  spark.shuffle.compress和 spark.shuffle.spill.compress

这两个参数的默认配置都是true。spark.shuffle.compress和spark.shuffle.spill.compress都是用来设置Shuffle过程中是否对Shuffle数据进行压缩;其中前者针对最终写入本地文件系统的输出文件,后者针对在处理过程需要spill到外部存储的中间数据,后者针对最终的shuffle输出文件。

如何设置spark.shuffle.compress?

如果下游的Task通过网络获取上游Shuffle Map Task的结果的网络IO成为瓶颈,那么就需要考虑将它设置为true:通过压缩数据来减少网络IO。由于上游Shuffle Map Task和下游的Task现阶段是不会并行处理的,即上游Shuffle Map Task处理完成,然后下游的Task才会开始执行。因此如果需要压缩的时间消耗就是Shuffle MapTask压缩数据的时间 + 网络传输的时间 + 下游Task解压的时间;而不需要压缩的时间消耗仅仅是网络传输的时间。因此需要评估压缩解压时间带来的时间消耗和因为数据压缩带来的时间节省。如果网络成为瓶颈,比如集群普遍使用的是千兆网络,那么可能将这个选项设置为true是合理的;如果计算是CPU密集型的,那么可能将这个选项设置为false才更好。

如何设置spark.shuffle.spill.compress?

如果设置为true,代表处理的中间结果在spill到本地硬盘时都会进行压缩,在将中间结果取回进行merge的时候,要进行解压。因此要综合考虑CPU由于引入压缩解压的消耗时间和Disk IO因为压缩带来的节省时间的比较。在Disk IO成为瓶颈的场景下,这个被设置为true可能比较合适;如果本地硬盘是SSD,那么这个设置为false可能比较合适。

1.1.9  spark.reducer.maxMbInFlight

这个参数用于限制一个ReducerTask向其他的Executor请求Shuffle数据时所占用的最大内存数,尤其是如果网卡是千兆和千兆以下的网卡时。默认值是48MB。设置这个值需要中和考虑网卡带宽和内存。

如果您喜欢 本文,那么请动一下手指支持以下博客之星的评比吧。非常感谢您的投票。每天可以一票哦。

点我投票

时间: 2024-10-21 20:08:51

Spark技术内幕:Shuffle的性能调优的相关文章

Spark数据本地化-->如何达到性能调优的目的

Spark数据本地化-->如何达到性能调优的目的 1.Spark数据的本地化:移动计算,而不是移动数据 2.Spark中的数据本地化级别: TaskSetManager 的 Locality Levels 分为以下五个级别: PROCESS_LOCAL  NODE_LOCAL NO_PREF    RACK_LOCAL ANY PROCESS_LOCAL   进程本地化:task要计算的数据在同一个Executor中     NODE_LOCAL    节点本地化:速度比 PROCESS_LOC

Spark技术内幕: Shuffle详解(三)

前两篇文章写了Shuffle Read的一些实现细节.但是要想彻底理清楚这里边的实现逻辑,还是需要更多篇幅的:本篇开始,将按照Job的执行顺序,来讲解Shuffle.即,结果数据(ShuffleMapTask的结果和ResultTask的结果)是如何产生的:结果是如何处理的:结果是如何读取的. 在Worker上接收Task执行命令的是org.apache.spark.executor.CoarseGrainedExecutorBackend.它在接收到LaunchTask的命令后,通过在Driv

spark性能调优(二) 彻底解密spark的Hash Shuffle

装载:http://www.cnblogs.com/jcchoiling/p/6431969.html 引言 Spark HashShuffle 是它以前的版本,现在1.6x 版本默应是 Sort-Based Shuffle,那为什么要讲 HashShuffle 呢,因为有分布式就一定会有 Shuffle,而且 HashShuffle 是 Spark以前的版本,亦即是 Sort-Based Shuffle 的前身,因为有 HashShuffle 的不足,才会有后续的 Sorted-Based S

[大数据性能调优] 第二章:彻底解密Spark的HashShuffle

本課主題 Shuffle 是分布式系统的天敌 Spark HashShuffle介绍 Spark Consolidated HashShuffle介绍 Shuffle 是如何成为 Spark 性能杀手 Shuffle 性能调优思考 Spark HashShuffle 源码鉴赏 引言 Spark HashShuffle 是它以前的版本,现在1.6x 版本默应是Sort-Based Shuffle,那为什么要讲 HashShuffle 呢,因为有分布式就一定会有 Shuffle,而且 HashShu

Spark日志分析项目Demo(9)--常规性能调优

一 分配更多资源 分配更多资源:性能调优的王道,就是增加和分配更多的资源,性能和速度上的提升,是显而易见的:基本上,在一定范围之内,增加资源与性能的提升,是成正比的:写完了一个复杂的spark作业之后,进行性能调优的时候,首先第一步,我觉得,就是要来调节最优的资源配置:在这个基础之上,如果说你的spark作业,能够分配的资源达到了你的能力范围的顶端之后,无法再分配更多的资源了,公司资源有限:那么才是考虑去做后面的这些性能调优的点. 问题: 1.分配哪些资源? 2.在哪里分配这些资源? 3.为什么

[大数据性能调优] 第一章:性能调优的本质、Spark资源使用原理和调优要点分析

本課主題 大数据性能调优的本质 Spark 性能调优要点分析 Spark 资源使用原理流程 Spark 资源调优最佳实战 Spark 更高性能的算子 引言 我们谈大数据性能调优,到底在谈什么,它的本质是什么,以及 Spark 在性能调优部份的要点,这两点让直式进入性能调优都是一个至关重要的问题,它的本质限制了我们调优到底要达到一个什么样的目标或者说我们是从什么本源上进行调优.希望这篇文章能为读者带出以下的启发: 了解大数据性能调优的本质 了解 Spark 性能调优要点分析 了解 Spark 在资

Spark性能调优之资源分配

性能优化王道就是给更多资源!机器更多了,CPU更多了,内存更多了,性能和速度上的提升,是显而易见的.基本上,在一定范围之内,增加资源与性能的提升,是成正比的:写完了一个复杂的spark作业之后, 进行性能调优的时候,首先第一步,我觉得,就是要来调节最优的资源配置:在这个基础之上, 如果说你的spark作业,能够分配的资源达到了你的能力范围的顶端之后,无法再分配更多的资源了, 公司资源有限:那么才是考虑去做后面的这些性能调优的点. 大体上这两个方面:core    mem 问题: 1.分配哪些资源

[Spark性能调优] 第四章 : Spark Shuffle 中 JVM 内存使用及配置内幕详情

本课主题 JVM 內存使用架构剖析 Spark 1.6.x 和 Spark 2.x 的 JVM 剖析 Spark 1.6.x 以前 on Yarn 计算内存使用案例 Spark Unified Memory 的运行原理和机制 引言 Spark 从1.6.x 开始对 JVM 的内存使用作出了一种全新的改变,Spark 1.6.x 以前是基于静态固定的JVM内存使用架构和运行机制,如果你不知道 Spark 到底对 JVM 是怎么使用,你怎么可以很有信心地或者是完全确定地掌握和控制数据的缓存空间呢,所

[Spark性能调优] Spark Shuffle 中 JVM 内存使用及配置详情

[Spark性能调优]  Spark Shuffle 中 JVM 内存使用及配置详情 本课主题 JVM 內存使用架构剖析 Spark 1.6.x 和 Spark 2.x 的 JVM 剖析 Spark 1.6.x 以前 on Yarn 计算内存使用案例 Spark Unified Memory 的运行原理和机制 引言 Spark 从1.6.x 开始对 JVM 的内存使用作出了一种全新的改变,Spark 1.6.x 以前是基于静态固定的JVM内存使用架构和运行机制,如果你不知道 Spark 到底对