spark(四):shuffle

shuflle write

  1. 上图有 4 个 ShuffleMapTask 要在同一个 worker node 上运行,CPU core 数为 2,可以同时运行两个 task。
  2. 在一个 core 上连续执行的 ShuffleMapTasks 可以共用一个输出文件 ShuffleFile。先执行完的 ShuffleMapTask 形成 ShuffleBlock i,后执行的 ShuffleMapTask 可以将输出数据直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i‘,每个 ShuffleBlock 被称为 FileSegment。

shuflle read

  1. 在什么时候 fetch 数据?当 parent stage 的所有 ShuffleMapTasks 结束后再 fetch。
  2. 边 fetch 边处理还是一次性 fetch 完再处理?边 fetch 边处理。使用可以 aggregate 的数据结构,比如 HashMap,每 shuffle 得到(从缓冲的 FileSegment 中 deserialize 出来)一个 <Key, Value> record,直接将其放进 HashMap 里面。如果该 HashMap 已经存在相应的 Key,那么直接进行 aggregate 也就是 func(hashMap.get(Key), Value)
  3. fetch 来的数据存放到哪里?刚 fetch 来的 FileSegment 存放在 softBuffer 缓冲区,经过处理后的数据放在内存 + 磁盘上。
  4. 怎么获得要 fetch 的数据的存放位置?reducer 在 shuffle 的时候是要去 driver 里面的 MapOutputTrackerMaster 询问 ShuffleMapTask 输出的数据位置的。每个 ShuffleMapTask 完成时会将 FileSegment 的存储位置信息汇报给MapOutputTrackerMaster。

Shuffle read 中的 HashMap

ashMap 是 Spark shuffle read 过程中频繁使用的、用于 aggregate 的数据结构。Spark 设计了两种:一种是全内存的 AppendOnlyMap,另一种是内存+磁盘的 ExternalAppendOnlyMap。

  1. 类似 HashMap,但没有remove(key)方法。其实现原理很简单,开一个大 Object 数组,蓝色部分存储 Key,白色部分存储 Value。
  2. 如果 Array 的利用率达到 70%,那么就扩张一倍,并对所有 key 进行 rehash 后,重新排列每个 key 的位置。
  3. ExternalAppendOnlyMap 持有一个 AppendOnlyMap,shuffle 来的一个个 (K, V) record 先 insert 到 AppendOnlyMap 中,insert 过程与原始的 AppendOnlyMap 一模一样。
  4. 如果 AppendOnlyMap 快被装满时检查一下内存剩余空间是否可以够扩展,够就直接在内存中扩展,不够就 sort 一下 AppendOnlyMap,将其内部所有 records 都 spill 到磁盘上。
  5. 每次 spill 完在磁盘上生成一个 spilledMap 文件,然后重新 new 出来一个 AppendOnlyMap。
  6. 最后一个 (K, V) record insert 到 AppendOnlyMap 后,表示所有 shuffle 来的 records 都被放到了 ExternalAppendOnlyMap 中,但不表示 records 已经被处理完,因为每次 insert 的时候,新来的 record 只与 AppendOnlyMap 中的 records 进行 aggregate,并不是与所有的 records 进行 aggregate(一些 records 已经被 spill 到磁盘上了)。因此当需要 aggregate 的最终结果时,需要对 AppendOnlyMap 和所有的 spilledMaps 进行全局 merge-aggregate。
  7. 全局 merge-aggregate 的流程:先将 AppendOnlyMap 中的 records 进行 sort,形成 sortedMap。
  8. 然后分别从 sortedMap 和各个 spilledMap 读出一部分数据(StreamBuffer)放到 mergeHeap 里面。StreamBuffer 里面包含的 records 需要具有相同的 hash(key)
  9. mergeHeap 顾名思义就是使用堆排序不断提取出 hash(firstRecord.Key) 相同的 StreamBuffer,并将其一个个放入 mergeBuffers 中,放入的时候与已经存在于 mergeBuffers 中的 StreamBuffer 进行 merge-combine


在Sort Based Shuffle的Shuffle Write阶段,map端的任务会按照Partition id以及key对记录进行排序。同时将全部结果写到一个数据文件中,同时生成一个索引文件,reduce端的Task可以通过该索引文件获取相关的数据。

原文地址:https://blog.51cto.com/4876017/2383558

时间: 2024-08-07 04:14:12

spark(四):shuffle的相关文章

MapReduce和spark的shuffle过程详解

面试常见问题,必备答案. 参考:https://blog.csdn.net/u010697988/article/details/70173104 mapReducehe和Spark之间的最大区别是前者较偏向于离线处理,而后者重视实效性,下面主要介绍mapReducehe和Spark两者的shuffle过程. MapReduce的Shuffle过程 MapReduce计算模型一般包括两个重要的阶段:Map是映射,负责数据的过滤分发:Reduce是规约,负责数据的计算归并.Reduce的数据来源于

hadoop的mapReduce和Spark的shuffle过程的详解与对比及优化

https://blog.csdn.net/u010697988/article/details/70173104 大数据的分布式计算框架目前使用的最多的就是hadoop的mapReduce和Spark,mapReducehe和Spark之间的最大区别是前者较偏向于离线处理,而后者重视实现性,下面主要介绍mapReducehe和Spark两者的shuffle过程. MapReduce的Shuffle过程介绍 Shuffle的本义是洗牌.混洗,把一组有一定规则的数据尽量转换成一组无规则的数据,越随

大数据开发:剖析Hadoop和Spark的Shuffle过程差异

一.前言 对于基于MapReduce编程范式的分布式计算来说,本质上而言,就是在计算数据的交.并.差.聚合.排序等过程.而分布式计算分而治之的思想,让每个节点只计算部分数据,也就是只处理一个分片,那么要想求得某个key对应的全量数据,那就必须把相同key的数据汇集到同一个Reduce任务节点来处理,那么Mapreduce范式定义了一个叫做Shuffle的过程来实现这个效果. 二.编写本文的目的 本文旨在剖析Hadoop和Spark的Shuffle过程,并对比两者Shuffle的差异. 三.Had

详细探究Spark的shuffle实现

Background 在MapReduce框架中,shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量.Spark作为MapReduce框架的一种实现,自然也实现了shuffle的逻辑,本文就深入研究Spark的shuffle是如何实现的,有什么优缺点,与Hadoop MapReduce的shuffle有什么不同. Shuffle Shuffle是MapReduce框架中的一个

Spark 的 Shuffle过程介绍`

Spark的Shuffle过程介绍 Shuffle Writer Spark丰富了任务类型,有些任务之间数据流转不需要通过Shuffle,但是有些任务之间还是需要通过Shuffle来传递数据,比如wide dependency的group by key. Spark中需要Shuffle输出的Map任务会为每个Reduce创建对应的bucket,Map产生的结果会根据设置的partitioner得到对应的bucketId,然后填充到相应的bucket中去.每个Map的输出结果可能包含所有的Redu

Spark(四): Spark-sql 读hbase

SparkSQL是指整合了Hive的spark-sql cli, 本质上就是通过Hive访问HBase表,具体就是通过hive-hbase-handler, 具体配置参见:Hive(五):hive与hbase整合 目录: SparkSql 访问 hbase配置 测试验证 SparkSql 访问 hbase配置:  拷贝HBase的相关jar包到Spark节点上的$SPARK_HOME/lib目录下,清单如下: guava-14.0.1.jar htrace-core-3.1.0-incubati

Spark的Shuffle过程介绍

Spark的Shuffle过程介绍 Shuffle Writer Spark丰富了任务类型,有些任务之间数据流转不需要通过Shuffle,但是有些任务之间还是需要通过Shuffle来传递数据,比如wide dependency的group by key. Spark中需要Shuffle输出的Map任务会为每个Reduce创建对应的bucket,Map产生的结果会根据设置的partitioner得到对应的bucketId,然后填充到相应的bucket中去.每个Map的输出结果可能包含所有的Redu

【Spark】Spark的Shuffle机制

MapReduce中的Shuffle 在MapReduce框架中,shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量. Shuffle是MapReduce框架中的一个特定的phase,介于Map phase和Reduce phase之间,当Map的输出结果要被Reduce使用时.输出结果须要按key哈希.而且分发到每个Reducer上去.这个过程就是shuffle.因为shu

第37课:Spark中Shuffle详解及作业

1.什么是Spark的Shuffle 图1 Spark有很多算子,比如:groupByKey.join等等都会产生shuffle. 产生shuffle的时候,首先会产生Stage划分. 上一个Stage会把 计算结果放在LocalSystemFile中,并汇报给Driver: 下一个Stage的运行由Driver触发,Executor向Driver请求,把上一个Stage的计算结果抓取过来. 2.Hadoop的Shuffle过程 图2 该图表达了Hadoop的map和reduce两个阶段,通过S

我为什么要区分MR和Spark的shuffle

MR的shuffle阶段,用一张图就可以说明了: map阶段的输出结果会放在缓冲区中,另有一个较小的缓冲区维护了这个缓冲区中键值对+分区号的索引.当该缓冲区快满时,会对其索引进行排序,然后spill到磁盘上.当所有数据都spill到磁盘上后,会对这些碎片文件进行合并,这个过程中同样会发生排序和归并,以便减小传输到reducer上的数据量.reducer通过http连接从mapper上拉去最终的结果,注意是按照分区拉去所需的部分.对于一个reducer由于数据可能来自多个上游,所以仍然要继续排一次