Spark Shuffle的技术演进

在Spark或Hadoop MapReduce的分布式计算框架中,数据被按照key分成一块一块的分区,打散分布在集群中各个节点的物理存储或内存空间中,每个计算任务一次处理一个分区,但map端和reduce端的计算任务并非按照一种方式对相同的分区进行计算,例如,当需要对数据进行排序时,就需要将key相同的数据分布到同一个分区中,原分区的数据需要被打乱重组,这个按照一定的规则对数据重新分区的过程就是Shuffle(洗牌)

Spark Shuffle的两阶段

对于Spark来讲,一些Transformation或Action算子会让RDD产生宽依赖,即parent RDD中的每个Partition被child RDD中的多个Partition使用,这时便需要进行Shuffle,根据Record的key对parent RDD进行重新分区。如果对这些概念还有一些疑问,可以参考我的另一篇文章《Spark基本概念快速入门》

以Shuffle为边界,Spark将一个Job划分为不同的Stage,这些Stage构成了一个大粒度的DAG。Spark的Shuffle分为Write和Read两个阶段,分属于两个不同的Stage,前者是Parent Stage的最后一步,后者是Child Stage的第一步。如下图所示:

执行Shuffle的主体是Stage中的并发任务,这些任务分ShuffleMapTask和ResultTask两种,ShuffleMapTask要进行Shuffle,ResultTask负责返回计算结果,一个Job中只有最后的Stage采用ResultTask,其他的均为ShuffleMapTask。如果要按照map端和reduce端来分析的话,ShuffleMapTask可以即是map端任务,又是reduce端任务,因为Spark中的Shuffle是可以串行的;ResultTask则只能充当reduce端任务的角色。

我把Spark Shuffle的流程简单抽象为以下几步以便于理解:

  • Shuffle Write

    1. Map side combine (if needed)
    2. Write to local output file
  • Shuffle Read
    1. Block fetch
    2. Reduce side combine
    3. Sort (if needed)

Write阶段发生于ShuffleMapTask对该Stage的最后一个RDD完成了map端的计算之后,首先会判断是否需要对计算结果进行聚合,然后将最终结果按照不同的reduce端进行区分,写入当前节点的本地磁盘。
Read阶段开始于reduce端的任务读取ShuffledRDD之时,首先通过远程或本地数据拉取获得Write阶段各个节点中属于当前任务的数据,根据数据的Key进行聚合,然后判断是否需要排序,最后生成新的RDD。

Spark Shuffle具体实现的演进

在具体的实现上,Shuffle经历了Hash、Sort、Tungsten-Sort三阶段:

  • Spark 0.8及以前 Hash Based Shuffle
    在Shuffle Write过程按照Hash的方式重组Partition的数据,不进行排序。每个map端的任务为每个reduce端的Task生成一个文件,通常会产生大量的文件(即对应为M*R个中间文件,其中M表示map端的Task个数,R表示reduce端的Task个数),伴随大量的随机磁盘IO操作与大量的内存开销。
    Shuffle Read过程如果有combiner操作,那么它会把拉到的数据保存在一个Spark封装的哈希表(AppendOnlyMap)中进行合并。
    在代码结构上:

    • org.apache.spark.storage.ShuffleBlockManager负责Shuffle Write
    • org.apache.spark.BlockStoreShuffleFetcher负责Shuffle Read
    • org.apache.spark.Aggregator负责combine,依赖于AppendOnlyMap
  • Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制
    通过文件合并,中间文件的生成方式修改为每个执行单位(一个Executor中的执行单位等于Core的个数除以每个Task所需的Core数)为每个reduce端的任务生成一个文件。最终可以将文件个数从M*R修改为E*C/T*R,其中,E表示Executor的个数,C表示每个Executor中可用Core的个数,T表示Task所分配的Core的个数。
    是否采用Consolidate机制,需要配置spark.shuffle.consolidateFiles参数
  • Spark 0.9 引入ExternalAppendOnlyMap
    在combine的时候,可以将数据spill到磁盘,然后通过堆排序merge(可以参考这篇文章,了解其具体实现)
  • Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle
    在Sort Based Shuffle的Shuffle Write阶段,map端的任务会按照Partition id以及key对记录进行排序。同时将全部结果写到一个数据文件中,同时生成一个索引文件,reduce端的Task可以通过该索引文件获取相关的数据。
    在代码结构上:
    • 从以前的ShuffleBlockManager中分离出ShuffleManager来专门管理Shuffle Writer和Shuffle Reader。两种Shuffle方式分别对应
      org.apache.spark.shuffle.hash.HashShuffleManager和
      org.apache.spark.shuffle.sort.SortShuffleManager,
      可通过spark.shuffle.manager参数配置。两种Shuffle方式有各自的ShuffleWriter:org.apache.spark.shuffle.hash.HashShuffle和org.apache.spark.shuffle.sort.SortShuffleWriter;但共用一个ShuffleReader,即org.apache.spark.shuffle.hash.HashShuffleReader。
    • org.apache.spark.util.collection.ExternalSorter实现排序功能。可通过对spark.shuffle.spill参数配置,决定是否可以在排序时将临时数据Spill到磁盘。
  • Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle
  • Spark 1.4 引入Tungsten-Sort Based Shuffle
    将数据记录用序列化的二进制方式存储,把排序转化成指针数组的排序,引入堆外内存空间和新的内存管理模型,这些技术决定了使用Tungsten-Sort要符合一些严格的限制,比如Shuffle dependency不能带有aggregation、输出不能排序等。由于堆外内存的管理基于JDK Sun Unsafe API,故Tungsten-Sort Based Shuffle也被称为Unsafe Shuffle。
    在代码层面:
    • 新增org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
    • 新增org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter(用java实现)
    • ShuffleReader复用HashShuffleReader
  • Spark 1.6 Tungsten-sort并入Sort Based Shuffle
    由SortShuffleManager自动判断选择最佳Shuffle方式,如果检测到满足Tungsten-sort条件会自动采用Tungsten-sort Based Shuffle,否则采用Sort Based Shuffle。
    在代码方面:
    • UnsafeShuffleManager合并到SortShuffleManager
    • HashShuffleReader 重命名为BlockStoreShuffleReader,Sort Based Shuffle和Hash Based Shuffle仍共用ShuffleReader。
  • Spark 2.0 Hash Based Shuffle退出历史舞台
    从此Spark只有Sort Based Shuffle。

Spark Shuffle源码结构

这里以最新的Spark 2.1为例简单介绍一下Spark Shuffle相关部分的代码结构

  • Shuffle Write

    • ShuffleWriter的入口链路

      org.apache.spark.scheduler.ShuffleMapTask#runTask
        ---> org.apache.spark.shuffle.sort.SortShuffleManager#getWriter
            ---> org.apache.spark.shuffle.sort.SortShuffleWriter#write(如果是普通sort)
            ---> org.apache.spark.shuffle.sort.UnsafeShuffleWriter#write (如果是Tungsten-sort)
    • SortShuffleWriter的主要依赖
      org.apache.spark.util.collection.ExternalSorter 负责按照(partition id, key)排序,如果需要Map side combine,需要提供aggregator
        ---> org.apache.spark.util.collection.PartitionedAppendOnlyMap
    • UnsafeShuffleWriter的主要依赖
      org.apache.spark.shuffle.sort.ShuffleExternalSorter (Java实现)
  • Shuffle Read
    • ShuffleReader的入口链路

      org.apache.spark.rdd.ShuffledRDD#compute
        ---> org.apache.spark.shuffle.sort.SortShuffleManager#getReader
            ---> org.apache.spark.shuffle.BlockStoreShuffleReader#read
    • ShuffleReader主要依赖
      org.apache.spark.Aggregator 负责combine
        ---> org.apache.spark.util.collection.ExternalAppendOnlyMap
      org.apache.spark.util.collection.ExternalSorter 取决于是否需要对最终结果进行排序

参考资料及推荐阅读

  1. Spark 1.0之前Hash Based Shuffle的原理

  2. Spark 1.1时Sort Based Shuffle的资料
  3. Spark 1.2之前两种Shuffle方式的分析和对比
  4. Spark 1.6之前三种Shuffle方式的分析和对比
  5. Spark 1.6之前Sort Based Shuffle的源码和原理
  6. Spark 1.6之前Tungsten-sort Based Shuffle的原理
时间: 2024-10-18 02:05:27

Spark Shuffle的技术演进的相关文章

Spark Shuffle原理、Shuffle操作问题解决和参数调优

摘要: 1 shuffle原理 1.1 mapreduce的shuffle原理 1.1.1 map task端操作 1.1.2 reduce task端操作 1.2 spark现在的SortShuffleManager 2 Shuffle操作问题解决 2.1 数据倾斜原理 2.2 数据倾斜问题发现与解决 2.3 数据倾斜解决方案 3 spark RDD中的shuffle算子 3.1 去重 3.2 聚合 3.3 排序 3.4 重分区 3.5 集合操作和表操作 4 spark shuffle参数调优

物联网核心协议—消息推送技术演进

消息触达能力是物联网(internet ofthings, IOT)的重要支撑,而物联网很多技术都源于移动互联网.本文阐述移动互联网消息推送技术在物联网中的应用和演进. 一.物联网架构和关键技术 从开发的角度,无线接入是物联网设备端的核心技术,身份设备管理和消息推送技术是物联网云端的核心技术.而从场景体验的角度,除了前者,还要包括手机的前端开发技术. 在上一篇<一张图读懂基于微信硬件平台的物联网架构>博文中,笔者曾用一张大图详细描述了基于微信硬件平台的物联网架构的组成要素.关键场景.和通信协议

关于大型网站技术演进的思考(一)--存储的瓶颈(上)

前不久公司请来了位互联网界的技术大牛跟我们做了一次大型网站架构的培训,两天12个小时信息量非常大,知识的广度和难度也非常大,培训完后我很难完整理出全部听到的知识,今天我换了个思路是回味这次培训,这个思路就是通过本人目前的经验和技术水平来思考下大型网站技术演进的过程. 首先我们要思考一个问题,什么样的网站才是大型网站,从网站的技术指标角度考虑这个问题人们很容易犯一个毛病就是认为网站的访问量是衡量的指标,懂点行的人也许会认为是网站在单位时间里的并发量的大小来作为指标,如果按这些标准那么像hao123

通过京东技术演进和淘宝技术演进,探察未来技术和架构

通过京东技术演进和淘宝技术演进,探察未来技术和架构 我们从京东和淘宝技术架构演进,可以看出电商在发展过程中的必经之路:Mysql->Oracle->分布式计算和分布式存储->???未来 引用下<京东技术解密>书中的内容: 基于Hadoop,以Mapreduce作为计算引擎的的分布式数据仓库可以说是大数据处理的"标配",2012年8月,由40台机器搭建的第一版集群上线,相较于Oracle小型机,性能明显提升,让我们第一次体会到分布式的威力.2012年12月,

spark shuffle过程分析

spark shuffle流程分析 回到ShuffleMapTask.runTask函数 现在回到ShuffleMapTask.runTask函数中: overridedef runTask(context:TaskContext): MapStatus = { 首先得到要reduce的task的个数. valnumOutputSplits= dep.partitioner.numPartitions metrics= Some(context.taskMetrics) valblockMana

spark shuffle 内幕彻底解密课程

一:到底什么是Shuffle? Shuffle中文翻译为"洗牌",需要Shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节点上进行计算. 二:Shuffle可能面临的问题?运行Task的时候才会产生Shuffle(Shuffle已经融化在Spark的算子中了). 1, 数据量非常大: 2, 数据如何分类,即如何Partition,Hash.Sort.钨丝计算: 3, 负载均衡(数据倾斜): 4, 网络传输效率,需要在压缩和解压缩之间做出权衡,序列化和反序列也是要考

关于大型网站技术演进的思考(一)--存储的瓶颈

关于大型网站技术演进的思考(一)--存储的瓶颈(1)  http://www.cnblogs.com/sharpxiajun/p/4237704.html#!comments

Spark Shuffle过程详细分析

在MapReduce中shuffle和Spark的shuffle的过程有一些区别.这里做一下具体的介绍. Mapreduce的shuffle过程图解 Spark shuffle过程图解 注意:spark shuffle过程中没有分区和排序的过程,而且存储结果存储在内存中,所以速度要比mapreduce要快很多. 先就到这里吧,图解的说明应该比较清晰了.有问题欢迎留言

Spark Shuffle 堆外内存溢出问题与解决(Shuffle通信原理)

Spark Shuffle 堆外内存溢出问题与解决(Shuffle通信原理) 问题描述 Spark-1.6.0已经在一月份release,为了验证一下它的性能,我使用了一些大的SQL验证其性能,其中部分SQL出现了Shuffle失败问题,详细的堆栈信息如下所示: 16/02/17 15:36:36 WARN server.TransportChannelHandler: Exception in connection from /10.196.134.220:7337 java.lang.Out