Spark--Shuffle

理解reduceByKey操作,有助于理解Shuffle

reduceByKey

reduceByKey操作将map中的有相同key的value值进行合并,但是map中的数据键值对,并不一定分布在相同的partition中,甚至相同的机器中。

所以需要将数据取到相同的主机进行计算-同地协作。

单一task操作在单一partition上,为了组织所有数据进行单一的redueceByKey reduce 任务执行,Spark需要完成all-to-all(多对多)操作,所以必须在所有partitions中寻找所有values为了所有keys。

然后将每一个key对应的值从不同的partitions中放到一起进行最终的计算。这就是Shuffle.

Shuffle

1、数据完整性

2、网络IO消耗

3、磁盘IO消耗

回顾MapReduce的shuffle

MapReduce的shuffle操作

Shuffle阶段在map函数的输出到reduce函数的输入,都是shuffle阶段,

Split与block的对应关系可能是多对一,默认是一对一。每个map任务会处理一个split,如果block大和split相同,有多少个block就有多少个map任务,hadoop的2.*版本中一个block默认128M。

Map阶段的shuffle操作:

得到map的输出,然后进行分区,默认的分区规则:key值计算hash然后对应reduce个数取模;分区个数与reduce个数一致

map分区后的结果会放入到内存的环形缓冲区,它的默认大小是100M,配置信息是mapreduce.task.io.sort.mb,当缓冲区的大小使用超过一定的阀值(mapred-site.xml:mapreduce.map.sort.spill.percent,默认80%),一个后台的线程就会启动把缓冲区中的数据溢写(spill)到本地磁盘中(mapred-site.xml:mapreduce.cluster.local.dir),与此同时Mapper继续向环形缓冲区中写入数据。

环形缓冲区将数据溢写到磁盘,在溢写过程中对数据进行sort和Combiner,排序默认是针对key进行排序,combiner如果指定是优化的一种,类似将reduce的操作在map端进行,避免多余数据的传输,比如在分区中有3个("Hadoop",1),可将数据进行合并("Hadoop",3)。溢写到磁盘小文件大小为80M。

然后将多个小文件合并成一个大文件,在这个过程中,还是会进行sort和combiner,因为即使小文件的内容是已经排序的,合并以后数据也还是需要排序的。不然数据还是无序的。

Reduce阶段的shuffle操作:

Reduce从Task Tracker中取数据,使用http协议取数据,copy过来的数据放入到内存缓存区中,这里的内存缓冲区的大小为JVM的heap大小。

然后对数据进行merge,这里的merge也会进行sort和combiner,如果设置了combiner。merge也会进行默认的分组,将key进行分组。

Spark Shuffle

HashBaseShuffle

缺点:小文件过多,数量为task*reduce的数量

数据到内存buffer是进行partition操作,对key求hash然后根据reduce数量取模。buffer的大小不大32k,不是很大,buffer的数据会随时写到block file,这个过程没有sort。

reduce端通过netty传输来取数据,然后将数据放到内存。通过hashmap存储。

优化:使用spark.shuffle.consolidateFiles机制,修改值为true,默认为false,没有启用。

文件数量为:reduce*core

在一个core里面并行运行的task其中生成的文件数为reduce的个数。一个core里面并行运行的task,将数据都追加到一起。

SortBaseShuffle

现在默认的shuflle为SortBaseShuffle

自带consolidateFiles机制

自带sort

不用sort排序可以通过配置实现

1、spark.shuffle.sort.bypassMergeThreshold 默认值为200 ,如果shuffle read task的数量小于这个阀值200,则不会进行排序。

2、或者使用hashbasedshuffle + consolidateFiles 机制

修改shuffle方法:

spark.shuffle.manager 默认值:sort

有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。tungsten-sort慎用,存在bug.

参考:http://langyu.iteye.com/blog/992916

时间: 2024-08-06 16:01:48

Spark--Shuffle的相关文章

spark shuffle过程分析

spark shuffle流程分析 回到ShuffleMapTask.runTask函数 现在回到ShuffleMapTask.runTask函数中: overridedef runTask(context:TaskContext): MapStatus = { 首先得到要reduce的task的个数. valnumOutputSplits= dep.partitioner.numPartitions metrics= Some(context.taskMetrics) valblockMana

spark shuffle 内幕彻底解密课程

一:到底什么是Shuffle? Shuffle中文翻译为"洗牌",需要Shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节点上进行计算. 二:Shuffle可能面临的问题?运行Task的时候才会产生Shuffle(Shuffle已经融化在Spark的算子中了). 1, 数据量非常大: 2, 数据如何分类,即如何Partition,Hash.Sort.钨丝计算: 3, 负载均衡(数据倾斜): 4, 网络传输效率,需要在压缩和解压缩之间做出权衡,序列化和反序列也是要考

Spark Shuffle过程详细分析

在MapReduce中shuffle和Spark的shuffle的过程有一些区别.这里做一下具体的介绍. Mapreduce的shuffle过程图解 Spark shuffle过程图解 注意:spark shuffle过程中没有分区和排序的过程,而且存储结果存储在内存中,所以速度要比mapreduce要快很多. 先就到这里吧,图解的说明应该比较清晰了.有问题欢迎留言

Spark Shuffle 堆外内存溢出问题与解决(Shuffle通信原理)

Spark Shuffle 堆外内存溢出问题与解决(Shuffle通信原理) 问题描述 Spark-1.6.0已经在一月份release,为了验证一下它的性能,我使用了一些大的SQL验证其性能,其中部分SQL出现了Shuffle失败问题,详细的堆栈信息如下所示: 16/02/17 15:36:36 WARN server.TransportChannelHandler: Exception in connection from /10.196.134.220:7337 java.lang.Out

spark性能调优(四) spark shuffle中JVM内存使用及配置内幕详情

转载:http://www.cnblogs.com/jcchoiling/p/6494652.html 引言 Spark 从1.6.x 开始对 JVM 的内存使用作出了一种全新的改变,Spark 1.6.x 以前是基于静态固定的JVM内存使用架构和运行机制,如果你不知道 Spark 到底对 JVM 是怎么使用,你怎么可以很有信心地或者是完全确定地掌握和控制数据的缓存空间呢,所以掌握Spark对JVM的内存使用内幕是至关重要的.很多人对 Spark 的印象是:它是基于内存的,而且可以缓存一大堆数据

Spark Shuffle数据处理过程与部分调优(源码阅读七)

shuffle...相当重要,为什么咩,因为shuffle的性能优劣直接决定了整个计算引擎的性能和吞吐量.相比于Hadoop的MapReduce,可以看到Spark提供多种计算结果处理方式,对shuffle过程进行了优化. 那么我们从RDD的iterator方法开始: 我们可以看到,它调用了cacheManager的getOrCompute方法,如果分区任务第一次执行还没有缓存,那么会调用computeOrReadCheckpoint.如果某个partition任务执行失败,可以利用DAG重新调

spark shuffle内在原理说明

在MapReduce框架中,shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量.Spark作为MapReduce框架的一种实现,自然也实现了shuffle的逻辑. Shuffle Shuffle是MapReduce框架中的一个特定的phase,介于Map phase和Reduce phase之间,当Map的输出结果要被Reduce使用时,输出结果需要按key哈希,并且分发到每

Spark Shuffle的技术演进

在Spark或Hadoop MapReduce的分布式计算框架中,数据被按照key分成一块一块的分区,打散分布在集群中各个节点的物理存储或内存空间中,每个计算任务一次处理一个分区,但map端和reduce端的计算任务并非按照一种方式对相同的分区进行计算,例如,当需要对数据进行排序时,就需要将key相同的数据分布到同一个分区中,原分区的数据需要被打乱重组,这个按照一定的规则对数据重新分区的过程就是Shuffle(洗牌). Spark Shuffle的两阶段 对于Spark来讲,一些Transfor

Spark Shuffle Write 阶段函数调用分析

Shuffle Write阶段函数调用如下: org.apache.spark.executor.run() --> org.apache.spark.scheduler.Task.run() --> org.apache.spark.scheduler.runTask() --> org.apache.spark.shuffle.hash.HashShuffleWriter.write() --> org.apache.spark.storage.DiskBlockObjectW

Spark Shuffle原理、Shuffle操作问题解决和参数调优

摘要: 1 shuffle原理 1.1 mapreduce的shuffle原理 1.1.1 map task端操作 1.1.2 reduce task端操作 1.2 spark现在的SortShuffleManager 2 Shuffle操作问题解决 2.1 数据倾斜原理 2.2 数据倾斜问题发现与解决 2.3 数据倾斜解决方案 3 spark RDD中的shuffle算子 3.1 去重 3.2 聚合 3.3 排序 3.4 重分区 3.5 集合操作和表操作 4 spark shuffle参数调优