第37课:Spark中Shuffle详解及作业

1、什么是Spark的Shuffle

图1

Spark有很多算子,比如:groupByKey、join等等都会产生shuffle。

产生shuffle的时候,首先会产生Stage划分。

上一个Stage会把

计算结果放在LocalSystemFile中,并汇报给Driver;

下一个Stage的运行由Driver触发,Executor向Driver请求,把上一个Stage的计算结果抓取过来。



2、Hadoop的Shuffle过程

图2

该图表达了Hadoop的map和reduce两个阶段,通过Shuffle怎样把map task的输出结果有效地传送到reduce端,描述着数据从map task输出到reduce task输入的这段过程

map的计算为reduce产生不同的文件,在Hadoop集群环境中,大部分map task与reduce task的执行是在不同的节点上,reduce执行时需要跨节点去拉取其它节点上的map task结果,那么对集群内部的网络资源消耗会很严重。我们希望最大化地减少不必要的消耗, 于是对Shuffle过程的期望有:

- 完整地从map task端拉取数据到reduce 端。

- 在跨节点拉取数据时,尽可能地减少对带宽的不必要消耗。

- 减少磁盘IO对task执行的影响。

可优化的地方主要在于减少拉取数据的量及尽量使用内存而不是磁盘。

map端的Shuffle细节:

整个map流程,简单些可以这样说:

1)input, 根据split输入数据,运行map任务;

2)patition, 每个map task都有一个内存缓冲区,存储着map的输出结果;

3)spill, 当缓冲区快满的时候需要将缓冲区的数据以临时文件的方式存放到磁盘;

4)merge, 当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。

下面对map流程的细节进行说明:

1)输入数据:在Map Reduce中,map task只读取split,Split与block的对应关系可能是多对一,默认是一对一;

2)mapper运行后,通过Partitioner接口,根据key或value及reduce的数量来决定当前map的输出数据最终应该交由哪个reduce task处理。然后将数据写入内存缓冲区中,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组;

3)内存缓冲区有大小限制,默认是100MB。需要在一定条件下将缓冲区中的数据临时写入磁盘,从内存往磁盘写数据的过程被称为Spill(溢写);

splill是由单独线程来完成,不影响往缓冲区写map结果的线程,splill的过程会涉及到Sort和Combiner,当splill线程启动后,需要对锁定内存块空间内的key做排序,是对序列化的字节做排序。 如果有很多个key/value对需要发送到某个reduce端去,那么需要将这些key/value值拼接到一块,减少与partition相关的索引记录,非正式地合并数据叫做combine了, Combiner会优化MapReduce的中间结果。

4)每次溢写会在磁盘上生成一个溢写文件,如果map的输出结果很大,就会有多个溢写文件存在。当map task完成时,内存缓冲区中的数据也全部溢写到磁盘中形成一个溢写文件。最终磁盘中会至少有一个这样的溢写文件存在(如果map的输出结果很少,当map执行完成时,只会产生一个溢写文件),因为最终的文件只有一个,所以需要将这些溢写文件归并到一起,这个过程就叫做Merge。

Merge是怎样的?比如WordCount示例中,某个单词“aaa”从某个map task读取过来时值是5,从另外一个map task 读取时值是8,因为它们有相同的key,所以就是像这样:{“aaa”, [5, 8, 2, …]},数组中的值就是从不同溢写文件中读取出来的,然后再把这些值加起来。

因为merge是将多个溢写文件合并到一个文件,所以可能也有相同的key存在,在这个过程中如果client设置过Combiner,也会使用Combiner来合并相同的key。

至此,map端的所有工作都已结束,最终生成的这个文件也存放在TaskTracker够得着的某个本地目录内。每个reduce task不断地通过RPC从JobTracker那里获取map task是否完成的信息,如果获知TaskTracker上的map task执行完成,Shuffle的后半段过程开始启动。

下面讲解reduce 端的Shuffle细节:

reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge,也最终形成一个文件作为reduce task的输入文件。

1) Copy过程,简单地拉取数据。

Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求map task所在的TaskTracker获取map task的输出文件。因为map task早已结束,这些文件就归TaskTracker管理在本地磁盘中。

2)Merge阶段。

这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。merge有三种形式:1)内存到内存 2)内存到磁盘 3)磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达阈值,就启动内存到磁盘的merge。

与map 端类似,这也是溢写的过程,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的那个文件。

3)Reducer的输入文件。

不断地merge后,最后会生成一个“最终文件”。为什么加引号?因为这个文件可能存在于磁盘上,也可能存在于内存中。对我们来说,当然希望它存放于内存中,直接作为Reducer的输入,但默认情况下,hadoop是把这个文件是存放于磁盘中的。当Reducer的输入文件已定,整个Shuffle才最终结束。然后就是Reducer执行,把结果放到HDFS上。



3、Hadoop的MapReduce Shuffle数据流动过程

图3

这张图非常有意思,形象地描述了整个数据流动的过程。

图上map阶段,有4个map;Reduce端,有3个reduce。

4个map 也就是4个JVM,每个JVM处理一个数据分片(split1~split4),每个map产生一个map输出文件,但是每个map都为后面的reduce产生了3部分数据(分别用红1、绿2、蓝3标识),也就是说每个输出的map文件都包含了3部分数据。正如前面第二节所述:

mapper运行后,通过Partitioner接口,根据key或value及reduce的数量来决定当前map的输出数据最终应该交由哪个reduce task处理

Reduce端一共有3个reduce,去前面的4个map的输出结果中抓取属于自己的数据。

在构建算法时,Shuffle是最重要的思考点。



4、Spark Shuffle

图4

该图描述了最简单的Spark 0.X版本的Spark Shuffle过程。

与Hadoop Map Reduce的区别在于输出文件个数的变化。

每个ShuffleMapTask产生与Ruducer个数相同的Shuffle blockFile文件,图中有3个reducer,那么每个ShuffleMapTask就产生3个Shuffle blockFile文件,4个ShuffleMapTask,那么一共产生12个Shuffle blockFile文件。

在内存中每个Shuffle blockFile文件都会存在一个句柄从而消耗一定内存,又因为物理内存的限制,就不能有很多并发,这样就限制了Spark集群的规模。

该图描绘的只是Spark 0.X版本而已,让人误以为Spark不支持大规模的集群计算,当时这只是Hash Based Shuffle。Spark后来做了改进,引入了Sort Based Shuffle之后,就再也没有人说Spark只支持小规模的集群运算了。

4.1 Hash based shuffle

Hash based shuffle的每个mapper都需要为每个reducer写一个文件,供reducer读取,即需要产生M*R个数量的文件,如果mapper和reducer的数量比较大,产生的文件数会非常多。

Hadoop Map Reduce被人诟病的地方,很多不需要sort的地方的sort导致了不必要的开销,于是Spark的Hash based shuffle设计的目标之一就是避免不需要的排序,

但是它在处理超大规模数据集的时候,产生了大量的磁盘IO和内存的消耗,很影响性能。

Hash based shuffle不断优化,Spark 0.8.1引入的file consolidation在一定程度上解决了这个问题。

4.2 Sort based shuffle

为了解决hash based shuffle性能差的问题,Spark 1.1 引入了Sort based shuffle,完全借鉴mapreduce实现,每个map产生一个文件,每个Shuffle Map Task不会为每个Reducer生成一个单独的文件;

相反,它会将所有的结果写到一个文件里,同时会生成一个index文件,Reducer可以通过这个index文件取得它需要处理的数据。

避免产生大量的文件的直接收益就是节省了内存的使用和顺序Disk IO带来的低延时。节省内存的使用可以减少GC的风险和频率。

而减少文件的数量可以避免同时写多个文件对系统带来的压力。

Sort based shuffle在速度和内存使用方面优于Hash based shuffle。

以上逻辑可以使用下图来描述:

时间: 2024-10-10 21:32:58

第37课:Spark中Shuffle详解及作业的相关文章

Hadoop Mapreduce中shuffle 详解

MapReduce 里面的shuffle:描述者数据从map task 输出到reduce task 输入的这段过程 Shuffle 过程: 首先,map 输出的<key,value > 会放在内存中,内存有一定的大小,超过之后,会将内存里的东西溢写(spill) 到磁盘(disk)中 .在从内存溢写到磁盘的过程中,会有两个操作:分区(parttition),排序(sort).map结束之后,磁盘中会有很多文件 . 有很多小文件,需要将文件进行文件的合并,并且排序.map 中的一些map任务可

Android中Context详解 ---- 你所不知道的Context

转载至 :http://blog.csdn.net/qinjuning 前言:本文是我读<Android内核剖析>第7章 后形成的读书笔记 ,在此向欲了解Android框架的书籍推荐此书. 大家好,  今天给大家介绍下我们在应用开发中最熟悉而陌生的朋友-----Context类 ,说它熟悉,是应为我们在开发中 时刻的在与它打交道,例如:Service.BroadcastReceiver.Activity等都会利用到Context的相关方法 : 说它陌生,完全是 因为我们真正的不懂Context

Apache Spark源码走读之16 -- spark repl实现详解

欢迎转载,转载请注明出处,徽沪一郎. 概要 之所以对spark shell的内部实现产生兴趣全部缘于好奇代码的编译加载过程,scala是需要编译才能执行的语言,但提供的scala repl可以实现代码的实时交互式执行,这是为什么呢? 既然scala已经提供了repl,为什么spark还要自己单独搞一套spark repl,这其中的缘由到底何在? 显然,这些都是问题,要解开这些谜团,只有再次开启一段源码分析之旅了. 全局视图 上图显示了java源文件从编译到加载执行的全局视图,整个过程中最主要的步

Android中Animation详解

Animation从总体来说可以分为两类: Tweened Animations:该类提供了旋转,移动,伸展,淡入淡出等效果 Frame-By-Frame Animations:该类可以创建一个Drawable序列,这些Drawable可以按照指定的事件间隔一个一个显示,和动画片差不多 一.Tweened Animations Tweened Animations也有四种类型: Alpha:淡入淡出效果Scale:缩放效果Rotate:旋转效果Translate:移动效果 设置动画效果可以在XM

Android中Context详解 ---- 你所不知道的Context (转载)

Android中Context详解 ---- 你所不知道的Context (转载) http://blog.csdn.net/qinjuning 大家好,  今天给大家介绍下我们在应用开发中最熟悉而陌生的朋友-----Context类 ,说它熟悉,是应为我们在开发中 时刻的在与它打交道,例如:Service.BroadcastReceiver.Activity等都会利用到Context的相关方法 : 说它陌生,完全是 因为我们真正的不懂Context的原理.类结构关系.一个简单的问题是,一个应用

MySQL中EXPLAIN详解

MySQL中EXPLAIN详解 explain显示了mysql如何使用索引来处理select语句以及连接表.可以帮助选择更好的索引和写出更优化的查询语句. 使用方法,在select语句前加上explain就可以了: 如:explain select username,first_name form hx,itlearner where a.id=b.id EXPLAIN列的解释: id:本次 select 的标识符.在查询中每个 select都有一个顺序的数值. select_type :查询类

Python中dict详解

yangyzh Python中dict详解 python3.0以上,print函数应为print(),不存在dict.iteritems()这个函数. 在python中写中文注释会报错,这时只要在头部加上# coding=gbk即可 #字典的添加.删除.修改操作dict = {"a" : "apple", "b" : "banana", "g" : "grape", "o&qu

winxp计算机管理中服务详解

winxp计算机管理中服务详解01 http://blog.sina.com.cn/s/blog_60f923b50100efy9.html http://blog.sina.com.cn/s/blog_b08c76100102vijm.html winxp计算机管理中服务详解02 http://blog.sina.com.cn/s/blog_60f923b50100efz3.html http://blog.sina.com.cn/s/blog_b08c76100102vijn.html

【转】 java中HashMap详解

原文网址:http://blog.csdn.net/caihaijiang/article/details/6280251 java中HashMap详解 HashMap 和 HashSet 是 Java Collection Framework 的两个重要成员,其中 HashMap 是 Map 接口的常用实现类,HashSet 是 Set 接口的常用实现类.虽然 HashMap 和 HashSet 实现的接口规范不同,但它们底层的 Hash 存储机制完全一样,甚至 HashSet 本身就采用 H