spark shuffle 内幕彻底解密课程

一:到底什么是Shuffle?

Shuffle中文翻译为“洗牌”,需要Shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节点上进行计算。

二:Shuffle可能面临的问题?运行Task的时候才会产生Shuffle(Shuffle已经融化在Spark的算子中了)。

1, 数据量非常大;

2, 数据如何分类,即如何Partition,Hash、Sort、钨丝计算;

3, 负载均衡(数据倾斜);

4, 网络传输效率,需要在压缩和解压缩之间做出权衡,序列化和反序列也是要考虑的问题;

说明:具体的Task进行计算的时候尽一切最大可能使得数据具备Process Locality的特性;退而求次是增加数据分片,减少每个Task处理的数据量。

三:Hash Shuffle

1, key不能是Array;

2, Hash Shuffle不需要排序,此时从理论上讲就节省了Hadoop MapReduce中进行Shuffle需要排序时候的时间浪费,因为实际生产环境有大量的不需要排序的Shuffle类型;

思考:不需要排序的Hash Shuffle是否一定比需要排序的Sorted Shuffle速度更快?不一定!如果数据规模比较小的情形下,Hash
Shuffle会比Sorted Shuffle速度快(很多)!但是如果数据量大,此时Sorted Shuffle一般都会比Hash
Shuffle快(很多)

3,每个ShuffleMapTask会根据key的哈希值计算出当前的key需要写入的Partition,然后把决定后的结果写入当单独的文件,此时会导致每个Task产生R(指下一个Stage的并行度)个文件,如果当前的Stage中有M个ShuffleMapTask,则会M*R个文件!!!

注意:Shuffle操作绝大多数情况下都要通过网络,如果Mapper和Reducer在同一台机器上,此时只需要读取本地磁盘即可。

Hash Shuffle的两大死穴:第一:Shuffle前会产生海量的小文件于磁盘之上,此时会产生大量耗时低效的IO操作;第二:内存不共用!!!由于内存中需要保存海量的文件操作句柄和临时缓存信息,如果数据处理规模比较庞大的话,内存不可承受,出现OOM等问题!

三:Sorted Shuffle:

为了改善上述的问题(同时打开过多文件导致Writer Handler内存使用过大以及产生过度文件导致大量的随机读写带来的效率极为低下的磁盘IO操作),Spark后来推出了Consalidate机制,来把小文件合并,此时Shuffle时文件产生的数量为cores*R,对于ShuffleMapTask的数量明显多于同时可用的并行Cores的数量的情况下,Shuffle产生的文件会大幅度减少,会极大降低OOM的可能;

为此Spark推出了Shuffle Pluggable开放框架,方便系统升级的时候定制Shuffle功能模块,也方便第三方系统改造人员根据实际的业务场景来开放具体最佳的Shuffle模块;核心接口ShuffleManager,具体默认实现有HashShuffleManager、SortShuffleManager等,Spark
1.6.0中具体的配置如下:

一:为什么需要Sort-Based Shuffle?

1,Shuffle一般包含两阶段任务:第一部分,产生Shuffle数据的阶段(Map阶段,额外补充,需要实现ShuffleManager中getWriter来写数据(数据可以BlockManager写到Memory、Disk、Tachyon等,例如像非常快的Shuffle,此时可以考虑把数据写在内存中,但是内存不稳定,建议采用MEMORY_AND_DISK方式));第二部分,使用Shuffle数据的阶段(Reduce阶段,额外的补充,需要实现ShuffleManager的getReader,Reader会向Driver去获取上一下Stage产生的Shuffle数据);

2,Spark的Job会被划分成很多Stage:

如果只有一个Stage,则这个Job就相当于只有一个Mapper阶段,当然不会产生产生Shuffle,适合于简单的ETL;

如果不止一个Stage,则最后一个Stage就是最终的Reducer,最左侧的第一个Stage就仅仅是整个Job的Mapper,中间所有的任意一个Stage是其父Stage的Reducer且是其子Stage的Mapper;

3,Spark Shuffle在最开始的时候只支持Hash-based Shuffle:默认Mapper阶段会为Reducer阶段的每一个Task单独创建一个文件来保存该Task中要使用的数据,但是在一些情况下(例如数据量非常大的情况)会造成大量文件(M*R,其中M代表Mapper中的所有的并行任务数量,R代表Reducer中所有的并行任务数量)的随机磁盘I/O操作且会性能大量的Memory消耗(极易造成OOM),这是致命的问题,因为第一不能够处理大规模的数据,第二Spark不能够运行在大规模的分布式集群上!后来的改善方式是加入了Shuffle
Consolidate机制来将Shuffle时候产生的文件数量减少到C*R个(C代表在Mapper端同时能够使用的Cores的数量,R代表Reducer中所有的并行任务数量),但是此时如果Reducer端的并行数据分片过多的话则C*R可能已经过大,此时依旧没有逃脱文件打开过多的厄运!!!

 

Spark在引入Sort-based Shuffle(Spark 1.1版本以前)以前比较适用于中小规模的大数据处理!

4,为了让Spark在更大规模的集群上更高性能处理更大规模的数据,于是就引入了Sort-based Shuffle!
从此以后(Spark 1.1版本开始),Spark可以胜任任意规模(包含PB级别及PB以上的级别)的大数据的处理,尤其是随着钨丝计划的引入和优化,把Spark更快速的在更大规模的集群处理更海量的数据的能力推向了一个新的巅峰!

5,Spark 1.6版本支持至少三种类型Shuffle:

// Let the user specify short names for shuffle managers

val shortShuffleMgrNames =
Map(

"hash"
-> "org.apache.spark.shuffle.hash.HashShuffleManager",

"sort"
-> "org.apache.spark.shuffle.sort.SortShuffleManager",

"tungsten-sort"
-> "org.apache.spark.shuffle.sort.SortShuffleManager")

val shuffleMgrName = conf.get("spark.shuffle.manager",
"sort")

val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase,
shuffleMgrName)

val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

实现ShuffleManager接口可以更具自己的业务实际需要最优化的使用自定义的Shuffle实现;

6,Spark 1.6默认采用的就是Sort-based Shuffle的方式:

val shuffleMgrName =
conf.get("spark.shuffle.manager",
"sort")

上述的源码说明,你可以在Spark的配置文件中配置Spark框架运行时要使用的具体的ShuffleManager的实现

修改conf/spark-default.conf, 加入如下内容:

spark.shuffle.manager SORT

Sort-based Shuffle不会为每个Reducer中的Task生成一个单独的文件,相反,Sort-based
Shuffle会把Mapper中每个ShuffleMapTask所有的输出数据Data只写到一个文件中,因为每个ShuffleMapTask的中的数据会被分类,所以Sort-based
Shuffle使用了index文件存储具体ShuffleMapTask输出数据在同一个Data文件中是如何分类的信息!!!所以说基于Sort-base的
Shuffle会在Mapper中的每一个ShuffleMapTask中产生两个文件:Data文件和Index文件,其中Data文件是存储当前Task的Shuffle输出的,而Index文件中则存储数据了Data文件中的数据通过Partitioner的分类信息,此时下一个阶段的Stage中的Task就是根据这个Index文件获取自己所要抓取的上一个Stage中的ShuffleMapTask产生的数据的;

Sort-based Shuffle会产生2M(M代表Mapper阶段中并行的Partition的总数量,其实就是Mapper端ShuffleMapTask的总数量)个Shuffle临时文件!!!

回顾整个Shuffle的历史,Shuffle产生的临时文件的数量的变化一次为:

Basic Hash Shuffle:M*R;

Consalidate方式的Hash Shuffle:C*R;

Sort-based Shuffle:2M;

二:在集群中动手实战Sort-based Shuffle

通过动手实践确实证明了Sort-based Shuffle产生了2M个文件!!!

shuffle_0_0_0.data

shuffle_0_3_0.index

在Sort-based Shuffle的中Reducer是如何获取自己需要的数据的呢?具体而言,Reducer首先找Driver去获取父Stage中每个ShuffleMapTask输出的位置信息,根据位置信息获取index文件,解析index文件,从解析的index文件中获取Data文件中属于自己的那部分内容;

三:默认Sort-based Shuffle的几个缺陷:

1, 如果Mapper中Task的数量过大,依旧会产生很多小文件;此时在Shuffle传递数据的过程中到Reducer端,reduce会需要同时大量的记录来进行反序列化,导致大量的内存消耗和GC的巨大负担,造成系统缓慢甚至崩溃!

2, 如果需要在分片内也进行排序的话,此时需要进行Mapper端和Reducer端的两次排序!!!

val
shortShuffleMgrNames =
Map(

"hash"
-> "org.apache.spark.shuffle.hash.HashShuffleManager",

"sort"
-> "org.apache.spark.shuffle.sort.SortShuffleManager",

"tungsten-sort"
-> "org.apache.spark.shuffle.sort.SortShuffleManager")

时间: 2024-10-17 02:22:05

spark shuffle 内幕彻底解密课程的相关文章

Spark Sort-Based Shuffle内幕彻底解密(DT大数据梦工厂)

内容: 1.为什么使用Sorted-Based Shuffle: 2.Sorted-Based Shuffle实战: 3.Sorted-Based Shuffle内幕: 4.Sorted-Based Shuffle的不足: 最常用的Shuffle方式,Sorted-Based Shuffle涉及了大规模Spark开发.运维时核心问题,以及答案的要害所在. 必须掌握这一讲内容. 本课是从Spark初级人才成功升级为Spark中级人才的通道. 稍有水平的大公司,面试内容本讲肯定会涉及. ======

Spark Shuffle内幕解密(24)

一.到底什么是Shuffle? Shuffle中文翻译为"洗牌",需要Shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个     计算节点上进行计算. 二.Shuffle可能面临的问题? 1, 数据量非常大: 2, 数据如何分类,即如何Partition,Hash.Sort.钨丝计算: 3, 负载均衡(数据倾斜): 4, 网络传输效率,需要在压缩和解压缩之间做出权衡,序列化和反序列也是要考虑的问题: 说明:具体的Task进行计算的时候尽一切最大可能使得数据具备Proc

Spark Executor内幕彻底解密(DT大数据梦工厂)

内容: 1.Spark Executor工作原理图: 2.ExecutorBackend注册源码解密: 3.Executor实例化内幕: 4.Executor具体是如何工作的? 1.Master发指令给Worker启动Executor: 2.Worker接受到Master发送来的指令,通过ExecutorRunner启动另外一个进程来运行Executor: 3.此时会启动粗粒度的ExecutorBackend(CoarseGrainedExecutorBackend): 4.CoarseGrai

Spark Executor内幕彻底解密:Executor工作原理图、ExecutorBackend注册源码解密、Executor实例化内幕、Executor具体工作内幕

本课主题 Spark Executor 工作原理图 ExecutorBackend 注册源码鉴赏和 Executor 实例化内幕 Executor 具体是如何工作的 Spark Executor 工作原理图 第一步:Master 发指令给 Worker 启动 Executor: 第二步:Worker 接收到 Master 发送过来的指令通过 ExecutorRunner 远程启动另外一个线程来运行 Executor: 第三步:通过发送 RegisterExecutor 向 Driver 注册 E

王家林谈Spark性能优化第八季之Spark Tungsten-sort Based Shuffle 内幕解密

内容: 1."钨丝计划"Shuffle实例: 2."钨丝计划"下的Shuffle解密: 现在SparkSQL就是用的"钨丝计划"来处理Shuffle的 ==========使用Tubsten功能============ 1.如果想要你的程序使用Tungsten功能,可以设置 spark.shuffle.manager=tungsten-sort 2.DataFrame中自动开启了Tungsten功能: ==========Tungsten-bas

spark性能调优(四) spark shuffle中JVM内存使用及配置内幕详情

转载:http://www.cnblogs.com/jcchoiling/p/6494652.html 引言 Spark 从1.6.x 开始对 JVM 的内存使用作出了一种全新的改变,Spark 1.6.x 以前是基于静态固定的JVM内存使用架构和运行机制,如果你不知道 Spark 到底对 JVM 是怎么使用,你怎么可以很有信心地或者是完全确定地掌握和控制数据的缓存空间呢,所以掌握Spark对JVM的内存使用内幕是至关重要的.很多人对 Spark 的印象是:它是基于内存的,而且可以缓存一大堆数据

[Spark性能调优] 第四章 : Spark Shuffle 中 JVM 内存使用及配置内幕详情

本课主题 JVM 內存使用架构剖析 Spark 1.6.x 和 Spark 2.x 的 JVM 剖析 Spark 1.6.x 以前 on Yarn 计算内存使用案例 Spark Unified Memory 的运行原理和机制 引言 Spark 从1.6.x 开始对 JVM 的内存使用作出了一种全新的改变,Spark 1.6.x 以前是基于静态固定的JVM内存使用架构和运行机制,如果你不知道 Spark 到底对 JVM 是怎么使用,你怎么可以很有信心地或者是完全确定地掌握和控制数据的缓存空间呢,所

Spark Shuffle 中 JVM 内存使用及配置内幕详情

本课主题 JVM 內存使用架构剖析 Spark 1.6.x 和 Spark 2.x 的 JVM 剖析 Spark 1.6.x 以前 on Yarn 计算内存使用案例 Spark Unified Memory 的运行原理和机制 引言 Spark 从1.6.x 开始对 JVM 的内存使用作出了一种全新的改变,Spark 1.6.x 以前是基于静态固定的JVM内存使用架构和运行机制,如果你不知道 Spark 到底对 JVM 是怎么使用,你怎么可以很有信心地或者是完全确定地掌握和控制数据的缓存空间呢,所

[Spark內核] 第42课:Spark Broadcast内幕解密:Broadcast运行机制彻底解密、Broadcast源码解析、Broadcast最佳实践

本课主题 Broadcast 运行原理图 Broadcast 源码解析 Broadcast 运行原理图 Broadcast 就是将数据从一个节点发送到其他的节点上; 例如 Driver 上有一张表,而 Executor 中的每个并行执行的Task (100万个Task) 都要查询这张表的话,那我们通过 Broadcast 的方式就只需要往每个Executor 把这张表发送一次就行了,Executor 中的每个运行的 Task 查询这张唯一的表,而不是每次执行的时候都从 Driver 中获得这张表