不懂Hadoop心脏Shuffle的原理这一篇就够了(含讲解视频))

学习Hadoop搞明白Shuffle的原理是非常重要的,然而相信很多人看了《Hadoop权威指南4》好几遍,也没有真正搞明白它真正的原理。看完这篇文章,相信会对你理解Shuffle有很大的帮助。



官方给的定义:系统执行排序、将map输出作为输入传给reducer的过程称为Shuffle。(看完是不是一脸懵逼)
通俗来讲,就是从map产生输出开始到reduce消化输入的整个过程称为Shuffle。如下图用黑线框出的部分:

圆形缓冲区介绍:

每一个map任务都会有一个圆形缓冲区。默认大小100MB(io.sort.mb属性)阈值0.8也就是80MB(io.sort.spill.percent属性指定) ,

一旦达到阈值一个后台线程开始把内容写到(spill)磁盘的指定目录mapred.local.dir下的新建的一个溢出写文件。写入磁盘前先partition、sort、[combiner]。一个map task任务可能产生N个磁盘文件。
map task运算完之后,产生了N个文件,然后将这些文件merge合成一个文件。
如果N < 3,合成的新文件写入磁盘前只经过patition(分区)和sort(排序)过程,不会执行combiner合并(无论是否指定combiner类),如下图所示:

如果N>=3,合成的新文件写入磁盘前经过patition(分区)、sort(排序)过和combiner合并(前提是指定了combiner类),如下图所示:

思考:为什么只有当N>=3时,合成文件才会执行combiner呢?
这是因为如果N< 3时,执行combiner虽然减少了文件的大小,但是同时产生了一定的系统开销。由于减少的文件大小不大,权衡利弊后,确定N< 2时不在执行combiner操作。
当该map task全部执行完之后,对应的reduce task将会拷贝对应分区的数据(该过程称为fetch),如下图所示:


其它的map task任务完成后,对应的reduce task也同样执行fetch操作,如下图所示:

每个map任务的完成时间可能不同,因此只要有一个任务完成,reduce任务就开始复制其输出。该阶段被称为reduce的复制阶段。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,但这个默认值可以通过设置mapred.reduce.parallel.copies属性改变。

复制完所有map输出后,reduce任务进入合并阶段,该阶段将合并map输出,并维持其顺序排序(相当于执行了sort),如果指定了combiner,在写入磁盘前还会执行combiner操作。

那么具体是如何合并的呢?
合并因子默认是10,可以通过io.sort.factor属性设置。合并过程是循环进行了,可能叫经过多趟合并。目标是合并最小数量的文件以便满足最后一趟的合并系数。假设有40个文件,我们不会在四趟中每趟合并10个文件从而得到4个文件。相反,第一趟只合并4个文件,随后的三趟分别合并10个文件。再最后一趟中4个已合并的文件和余下的6个(未合并的)文件合计10个文件。具体流程如下图所示:


注意:这并没有改变合并次数,它只是一个优化措施,目的是尽量减少写到磁盘的数据量,因为最后一趟总是直接合并到reduce。
看到这里您是否理解了Shuffle的具体原理呢,如果没有,也没有关系,接下来我们通过一个wordcount案例再将整个流程梳理一遍。
首先map任务的代码如下:

public class WCMapper extends Mapper< LongWritable, Text, Text, LongWritable> {
  public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
    String line = ivalue.toString();
    String words[] = line.split(" ");
    for (String word : words) {
      context.write(new Text(word), new LongWritable(1));
    }
  }
}

在分区(分区规则:按首字母分四个区,分别为a-i,j-q,r-z,其它)的过程中,会将相同的单词合并到一起,将出现次数用逗号隔开,如上图所示。注意此时还没有排序。分区代码如下:

package cn.geekmooc;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class WCPatitioner extends Partitioner<Text, LongWritable> {
  @Override
  public int getPartition(Text key, LongWritable value, int numPartitions) {
    int first_char = key.charAt(0);
    if(first_char>=97&&first_char<=105){
      return 0;
    }else if(first_char>=106&&first_char<=113){
      return 1;
    }else if(first_char>=114&&first_char<=122){
      return 2;
    }else{
      return 3;
    }
  }
}

接着执行排序操作,默认排序规则是按照key的字典升序排序,当然你也可以指定排序规则,排序后如下图所示:

接下来执行combiner操作,将每个单词后续的1求和,WCCombiner类代码如下:

package cn.geekmooc;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WCCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {
  @Override
  protected void reduce(Text key, Iterable<LongWritable> values,
      Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
    System.out.println(key.toString()+":"+values.toString());
    long count = 0;
    Iterator<LongWritable> iter = values.iterator();
    while(iter.hasNext()){
      count += iter.next().get();
    }
    context.write(key, new LongWritable(count));
  }
}

combiner的结果如下图所示


map任务执行完,产生N个spill文件,接着对N个文件进行合并,分以下两种情况:
1.N < 3,无论是否指定combiner类,合并文件时都不会执行combiner

2.N>=3,如果指定了combiner类将执行combiner操作,如下图:

接下来进入fetch(或copy)阶段

然后在reduce端进行合并

然后执行最后一趟合并,并将结果直接传给reduce

reduce类代码如下:

package cn.geekmooc;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
  @Override
  protected void reduce(Text key, Iterable<LongWritable> values,
      Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
    System.out.println(key.toString()+":"+values.toString());
    long count = 0;
    for (LongWritable val : values) {
      count += val.get();
    }
    context.write(key, new LongWritable(count));
  }
}

reduce task执行后,输出结果:

看讲解视频

原文地址:https://blog.51cto.com/14032352/2449831

时间: 2024-10-05 03:04:40

不懂Hadoop心脏Shuffle的原理这一篇就够了(含讲解视频))的相关文章

Python 速学!不懂怎么入门python的小白看这篇就够了!

Python是一种非常流行的脚本语言,而且功能非常强大,几乎可以做任何事情,比如爬虫.网络工具.科学计算.树莓派.Web开发.游戏等各方面都可以派上用场.同时无论在哪种平台上,都可以用 Python 进行系统编程. 机器学习可以用一些 Python 库来实现,比如人工智能常用的TensorFlow.也可以用像 NLTK 这样的 Python 库进行自然语言处理(NLP). 本文讨论基本的 Python 编程,后续会写一些 Python 编程的实际案例. 大家在学python的时候肯定会遇到很多难

Hadoop 4、Hadoop MapReduce的工作原理

一.MapReduce的概念 MapReduce是hadoop的核心组件之一,hadoop要分布式包括两部分,一是分布式文件系统hdfs,一部是分布式计算框就是mapreduce,两者缺一不可,也就是说,可以通过mapreduce很容易在hadoop平台上进行分布式的计算编程. 1.MapReduce编程模型 MapReduce采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果.简单地说,MapRed

Hadoop中shuffle阶段流程分析

Hadoop中shuffle阶段流程分析 MapReduce  longteng  9个月前 (12-23)  399浏览  0评论 宏观上,Hadoop每个作业要经历两个阶段:Map phase和reduce phase.对于Map phase,又主要包含四个子阶段:从磁盘上读数据->执行map函数->combine结果->将结果写到本地磁盘上:对于reduce phase,同样包含四个子阶段:从各个map task上读相应的数据(shuffle)->sort->执行red

Spark Shuffle 堆外内存溢出问题与解决(Shuffle通信原理)

Spark Shuffle 堆外内存溢出问题与解决(Shuffle通信原理) 问题描述 Spark-1.6.0已经在一月份release,为了验证一下它的性能,我使用了一些大的SQL验证其性能,其中部分SQL出现了Shuffle失败问题,详细的堆栈信息如下所示: 16/02/17 15:36:36 WARN server.TransportChannelHandler: Exception in connection from /10.196.134.220:7337 java.lang.Out

spark shuffle内在原理说明

在MapReduce框架中,shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量.Spark作为MapReduce框架的一种实现,自然也实现了shuffle的逻辑. Shuffle Shuffle是MapReduce框架中的一个特定的phase,介于Map phase和Reduce phase之间,当Map的输出结果要被Reduce使用时,输出结果需要按key哈希,并且分发到每

Hadoop数据管理介绍及原理分析

Hadoop数据管理介绍及原理分析 最近2014大数据会议正如火如荼的进行着,Hadoop之父Doug Cutting也被邀参加,我有幸听了他的演讲并获得亲笔签名书一本,发现他竟然是左手写字,当然这个他解释为个人习惯问题,也是,外国人左手写字的为数不少,在中国,左撇子在小时候的父母眼中就是“异类”,早早的被矫正过来.废话不多说了,接下来介绍Hadoop的数据管理. Hadoop的数据管理,主要包括Hadoop的分布式文件系统HDFS.分布式数据库HBase和数据仓库工具Hive. HDFS的数据

零基础学习hadoop到上手工作线路指导初级篇:hive及mapreduce

此篇是在零基础学习hadoop到上手工作线路指导(初级篇)的基础,一个继续总结.五一假期:在写点内容,也算是总结.上面我们会了基本的编程,我们需要对hadoop有一个更深的理解:hadoop分为hadoop1.X.hadoop2.X,并且还有hadoop生态系统.这里只能慢慢介绍了.一口也吃不成胖子. hadoop 1.x分为mapreduce与hdfs 其中mapreduce是很多人都需要迈过去的槛,它比较难以理解,我们有时候即使写出了mapreduce程序,但是还是摸不着头脑.我们不知道ke

零基础学习hadoop到上手工作线路指导初级篇:hive及mapreduce(转)

零基础学习hadoop到上手工作线路指导初级篇:hive及mapreduce:http://www.aboutyun.com/thread-7567-1-1.html mapreduce学习目录总结 MapReduce学习指导及疑难解惑汇总:http://www.aboutyun.com/thread-7091-1-1.html 什么是Map/Reduce:http://www.aboutyun.com/thread-5541-1-1.html Mapreduce 整个工作机制图:http://

高可用Hadoop平台-Flume NG实战图解篇

1.概述 今天补充一篇关于Flume的博客,前面在讲解高可用的Hadoop平台的时候遗漏了这篇,本篇博客为大家讲述以下内容: Flume NG简述 单点Flume NG搭建.运行 高可用Flume NG搭建 Failover测试 截图预览 下面开始今天的博客介绍. 2.Flume NG简述 Flume NG是一个分布式,高可用,可靠的系统,它能将不同的海量数据收集,移动并存储到一个数据存储系统中.轻量,配置简单,适用于各种日志收集,并支持Failover和负载均衡.并且它拥有非常丰富的组件.Fl