hadoop运行原理之shuffle

  hadoop的核心思想是MapReduce,但shuffle又是MapReduce的核心。shuffle的主要工作是从Map结束到Reduce开始之间的过程。首先看下这张图,就能了解shuffle所处的位置。图中的partitions、copy phase、sort phase所代表的就是shuffle的不同阶段。

  

  shuffle阶段又可以分为Map端的shuffle和Reduce端的shuffle。

  一、Map端的shuffle

  Map端会处理输入数据并产生中间结果,这个中间结果会写到本地磁盘,而不是HDFS。每个Map的输出会先写到内存缓冲区中,当写入的数据达到设定的阈值时,系统将会启动一个线程将缓冲区的数据写到磁盘,这个过程叫做spill。

  在spill写入之前,会先进行二次排序,首先根据数据所属的partition进行排序,然后每个partition中的数据再按key来排序。partition的目是将记录划分到不同的Reducer上去,以期望能够达到负载均衡,以后的Reducer就会根据partition来读取自己对应的数据。接着运行combiner(如果设置了的话),combiner的本质也是一个Reducer,其目的是对将要写入到磁盘上的文件先进行一次处理,这样,写入到磁盘的数据量就会减少。最后将数据写到本地磁盘产生spill文件(spill文件保存在{mapred.local.dir}指定的目录中,Map任务结束后就会被删除)。

  最后,每个Map任务可能产生多个spill文件,在每个Map任务完成前,会通过多路归并算法将这些spill文件归并成一个文件。至此,Map的shuffle过程就结束了。

  二、Reduce端的shuffle

  Reduce端的shuffle主要包括三个阶段,copy、sort(merge)和reduce。首先要将Map端产生的输出文件拷贝到Reduce端,每个Reducer如何知道自己应该处理哪些数据呢?因为Map端进行partition的时候,实际上就相当于指定了每个Reducer要处理的数据(partition就对应了Reducer),所以Reducer在拷贝数据的时候只需拷贝与自己对应的partition中的数据即可。也就是说,每个Reducer会处理一个或者多个partition,但需要先将自己对应的partition中的数据从每个Map的输出结果中拷贝过来。

阿发书法

时间: 2024-10-13 22:17:25

hadoop运行原理之shuffle的相关文章

hadoop运行原理之Job运行(四) JobTracker端心跳机制分析

接着上篇来说,TaskTracker端的transmitHeartBeat()方法通过RPC调用JobTracker端的heartbeat()方法来接收心跳并返回心跳应答.还是先看看这张图,对它的大概流程有个了解. 下面来一段一段的分析该方法. 1 public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, 2 boolean restarted, 3 boolean initialContact, 4 bo

hadoop运行原理之Job运行(三) TaskTracker的启动及初始化

与JobTracker一样,TaskTracker也有main()方法,然后以线程的方式启动(继承了Runnable接口).main()方法中主要包含两步:一是创建一个TaskTracker对象:二是启动TaskTracker线程. 1 public static void main(String argv[]) throws Exception { 2 ... 3 try { 4 JobConf conf=new JobConf(); 5 ... 6 TaskTracker tt = new

hadoop运行原理之Job运行(二) Job提交及初始化

本篇主要介绍Job从客户端提交到JobTracker及其被初始化的过程. 以WordCount为例,以前的程序都是通过JobClient.runJob()方法来提交Job,但是现在大多用Job.waitForCompletion(true)方法来提交(true表示打印出运行过程),但其本质都是一样的,最终都是通过JobClient的submitJobInternal()方法来提交Job.

hadoop运行原理之Job运行(五) 任务调度

接着上篇来说.hadoop首先调度辅助型task(job-cleanup task.task-cleanup task和job-setup task),这是由JobTracker来完成的:但对于计算型task,则是由作业调度器TaskScheduler来分配的,其默认实现为JobQueueTaskScheduler.具体过程在assignTasks()方法中完成,下面来一段一段的分析该方法.

hadoop运行原理之作业提交(一)

这部分的计划是这样的,首先解释JobTracker的启动过程和作业从JobClient提交到JobTracker上:然后:最后将整个流程debug一遍来加深映象. 在看JobTracker源代码的时候就会发现,它里边有main()方法,这就说明了它是一个独立的java进程.在hadoop根目录下的bin文件夹中的hadoop脚本中可以看到,它指定了JobTracker类.如下图所示: JobTracker的main()方法中最主要的是以下两条语句: 1 public static void ma

Hadoop伪分布安装详解+MapReduce运行原理+基于MapReduce的KNN算法实现

本篇博客将围绕Hadoop伪分布安装+MapReduce运行原理+基于MapReduce的KNN算法实现这三个方面进行叙述. (一)Hadoop伪分布安装 1.简述Hadoop的安装模式中–伪分布模式与集群模式的区别与联系. Hadoop的安装方式有三种:本地模式,伪分布模式,集群(分布)模式,其中后两种模式为重点,有意义 伪分布:如果Hadoop对应的Java进程都运行在一个物理机器上,称为伪分布 分布:如果Hadoop对应的Java进程运行在多台物理机器上,称为分布.[集群就是有主有从] 伪

【转载】Spark系列之运行原理和架构

参考 http://www.cnblogs.com/shishanyuan/p/4721326.html 1. Spark运行架构 1.1 术语定义 lApplication:Spark Application的概念和Hadoop MapReduce中的类似,指的是用户编写的Spark应用程序,包含了一个Driver 功能的代码和分布在集群中多个节点上运行的Executor代码: lDriver:Spark中的Driver即运行上述Application的main()函数并且创建SparkCon

[Linux][Hadoop] 运行WordCount例子

紧接上篇,完成Hadoop的安装并跑起来之后,是该运行相关例子的时候了,而最简单最直接的例子就是HelloWorld式的WordCount例子.   参照博客进行运行:http://xiejianglei163.blog.163.com/blog/static/1247276201443152533684/   首先创建一个文件夹,并创建两个文件,目录随意,为以下文件结构: examples --file1.txt --file2.txt 文件内容随意填写,我是从新闻copy下来的一段英文: 执

Spark Shuffle原理、Shuffle操作问题解决和参数调优

摘要: 1 shuffle原理 1.1 mapreduce的shuffle原理 1.1.1 map task端操作 1.1.2 reduce task端操作 1.2 spark现在的SortShuffleManager 2 Shuffle操作问题解决 2.1 数据倾斜原理 2.2 数据倾斜问题发现与解决 2.3 数据倾斜解决方案 3 spark RDD中的shuffle算子 3.1 去重 3.2 聚合 3.3 排序 3.4 重分区 3.5 集合操作和表操作 4 spark shuffle参数调优