Hadoop :map+shuffle+reduce和YARN笔记分享

  今天在公司做了一个hadoop分享,包括mapreduce,及shuffle深度讲解,还有YARN框架的详细说明等。

v\:* {behavior:url(#default#VML);}
o\:* {behavior:url(#default#VML);}
w\:* {behavior:url(#default#VML);}
.shape {behavior:url(#default#VML);}

Normal
0
false

7.8 磅
0
2

false
false
false

EN-US
ZH-CN
X-NONE

/* Style Definitions */
table.MsoNormalTable
{mso-style-name:普通表格;
mso-tstyle-rowband-size:0;
mso-tstyle-colband-size:0;
mso-style-noshow:yes;
mso-style-priority:99;
mso-style-parent:"";
mso-padding-alt:0cm 5.4pt 0cm 5.4pt;
mso-para-margin:0cm;
mso-para-margin-bottom:.0001pt;
mso-pagination:widow-orphan;
font-size:10.5pt;
mso-bidi-font-size:11.0pt;
font-family:"Calibri",sans-serif;
mso-ascii-font-family:Calibri;
mso-ascii-theme-font:minor-latin;
mso-hansi-font-family:Calibri;
mso-hansi-theme-font:minor-latin;
mso-font-kerning:1.0pt;}
table.MsoTableGrid
{mso-style-name:网格型;
mso-tstyle-rowband-size:0;
mso-tstyle-colband-size:0;
mso-style-priority:39;
mso-style-unhide:no;
border:solid windowtext 1.0pt;
mso-border-alt:solid windowtext .5pt;
mso-padding-alt:0cm 5.4pt 0cm 5.4pt;
mso-border-insideh:.5pt solid windowtext;
mso-border-insidev:.5pt solid windowtext;
mso-para-margin:0cm;
mso-para-margin-bottom:.0001pt;
mso-pagination:widow-orphan;
font-size:10.5pt;
mso-bidi-font-size:11.0pt;
font-family:"Calibri",sans-serif;
mso-ascii-font-family:Calibri;
mso-ascii-theme-font:minor-latin;
mso-hansi-font-family:Calibri;
mso-hansi-theme-font:minor-latin;
mso-font-kerning:1.0pt;}

 一 引言

  1、海量日志数据,提取出某日访问百度次数最多的那个IP

  算法思想:分而治之+Hash

  1.IP地址最多有2^32=4G种取值情况,所以不能完全加载到内存中处理; 
  2.可以考虑采用“分而治之”的思想,按照IP地址的Hash(IP)%1024值,把海量IP日志分别存储到1024个小文件中。这样,每个小文件最多包含4MB  个IP地址; 
  3.对于每一个小文件,可以构建一个IP为key,出现次数为value的Hash map,同时记录当前出现次数最多的那个IP地址;
  4.可以得到1024个小文件中的出现次数最多的IP,再依据常规的排序算法得到总体上出现次数最多的IP;

  2、Top K:统计最热门的10个查询串

  假设目前有一千万个记录,请你统计最热门的10个查询串,要求使用的内存不能超过1G。

  第一步、维护一个Key为Query字串,Value为该Query出现次数的HashTable,每次读取一个Query,如果该字串不在Table中,那么加入该字串,并且将Value值设为  1;如果该字串在Table中,那么将该字串的计数加一即可。采用Hash Table的原因:查询速度非常的快,几乎是O(1)的时间复杂度。

  第二步、堆顶存放的是整个堆中最小的数,现在遍历N个数,把最先遍历到的k个数存放到最小堆中,并假设它们就是我们要找的最大的k个数,X1>X2...Xmin(堆顶),  而后遍历后续的N-K个数,一一与堆顶元素进行比较,如果遍历到的Xi大于堆顶元素Xmin,则把Xi放入堆中,而后更新整个堆,更新的时间复杂度为logK,如果      Xi<Xmin,则不更新堆

  最终的时间复杂度是:ON + N‘*OlogK

  下面介绍另一种处理大数据的编程模式:mapreduce

 二 Mapreduce

  Hadoop 起源于 google’s 三篇papers:

  1 mapreduce;

  2 The google file system;

  3 big table;

 2.1 Mapreduce机制 –
from google paper

 Mapreduce 是一种处理大数据的编程模式。

  1执行概述图Execution
Overview

  2Master的数据结构Master
Data Structures

  对于每个map和reduce任务,master中保存着三个状态(idle, in-progress, or completed)和非空闲任务的机器id;master是map 到 reduce的管道;

  3容错机制Fault
Tolerance

  Master会周期性的ping每个worker,如果超时没有收到回应,则认为这个worker死了。

  已完成和死掉的worker都会被master置成空闲状态,为了下一次调度

  已经完成的map tasks不能重新执行,因为他们的输出保存在本地硬盘里;

  已经完成的reduce task不需要重新执行,因为他们的输出已经保存在了全局的HDFS里面;

  例如,Map task在worker A 执行失败了,稍后会在worker B再次执行,这时所有的reduce都需要重新执行,以为所有的reducer没有准备好从worker A读出改为从  worker B读入;

  Map Reduce 是对大范围的worker失败有复原能力的,例如,有80台机器持续是unreachable状态,master为了保证进度的向前推进,简单的使unreachable机器  再次执行任务;

  4 存储Locality

  网络带宽是一个稀缺资源,所以我们利用GFS把文件保存在本地磁盘以节省带宽;

  GFS把每个文件划分成64M的block,并且有3个副本在不同的机器上;所以master把map任务分配到有输入文件对应副本的机器上;若这个map任务失败,会转移到具有这个副本的同一网络的其它机器上再次执行;

  5 任务粒度Task
Granularity

  我们把map阶段划分长M pieces,把 reduce阶段划分成Rpieces,理想的M和R均比工作的机器数大很多。M和R应该多大好呢?master需要O(M+R)个调度决策和    O(M*R)个状态保存在内存;推荐M=200,000 and R=5,000, using 2,000 工作机器;

  6备份任务Backup
Tasks

  造成总的执行时间过长的原因主要是由某个任务的执行相对其它所有任务过慢,称为掉队者;造成这个任务掉队的可能原因是硬盘读写慢,或者这台机器在调度其它任   务,cpu,内存,硬盘或网络带宽资源被占用;

  解决办法是当mapreduce任务将要执行完成时,master调度in-progress状态掉队任务的备份执行任务;我们调整这种机制,使占用的计算资源不高于一定的百分比并且实验表明不采用这种机制,时间会增长44%。

 2.2 MapReduce 例子  –  from yahoo

  “Hello World” of MapReduce :
the Word Count algorithm

  流程介绍:

  Multiple Input files -> Map (Discarded
the offset and Split the text) -> Sort
and Shuffle(
sorts the key-value pairs by key and it “shuffles” all
pairs with the same key to the same Reducer) -> Reduce (add up all counts)-> Output to R files

  Java中Key-value pair格式:

  Map’s input pattern  (offset,text) -> (1: ‘Hadoop
uses’),(2: ‘ MapReduce‘)

  Reduce’s input pattern (word,count) ->
(word1:1),(word2,[1,1])

  Coding:见ssh

 2.3 mapreduce 与机器学习

  机器学习算法要是适用于hadoop必须重构算法,改写成mapreduce的分布式程序;有些机器学习算法很好改进,比如朴素贝叶斯就是天然的适用于mapreduce模式,因为先验概率和条件概率可以很好的分布式统计,然而有些机器学习算法却不容易改成分布式。

  例子,k-means的分布式程序:

  代码:
见ssh

 2.4 详解shaffle过程

  参考:http://langyu.iteye.com/blog/992916

shuffle总体概述图如上(来自官方文档);shuffle又叫洗牌或打乱,把map结果输入到reduce中,具体过程如下;左半部分细节图如下(来自博客)。

   1 一个输入文件会被划分成很多64M的block,split分布式的输入到各个map中;

  2 partition的作用是map之后的键值对需要确定分配给哪个reduce,partition会把hash(key)%R的reduce序号追加到map处理后的键值对尾,保存到中间缓存中;

  3 spill:sort和combiner组成,网络带宽是十分宝贵的资源,为了节省带宽,排序并合并缓存中的map生成的键值对,例如(a,1)(a,1)合并(a,2);

  4 merge:缓存快满的时候,会写入磁盘,上一步spill只是合并了缓存中的键值对,硬盘中有来自不同文件的键值对,需要再次合并相同key的value,也是为了减少带宽;例如 (a,2)(a,3)合并为(a,[2],[3]),再调用combine方法,形成(a,5),最后根据partition的hash地址发送到目标reduce中。

右半部分细节图:

1
copy:通过HTTP方式请求map task所在的TaskTracker获取map task的输出文件,保存在缓存中(map后的结果存储在磁盘,由TaskTracker管理);

2
merge:跟map过程一样,缓存内容溢写到了磁盘,merge磁盘来自不同map的输出,合并相同key的value,输出到reduce中。

3
input:把全部缓存和磁盘的文件形成最终文件,输入到reduce中。

备注:至此两个步骤合起来才是整个shuffle的完整过程。我认为整个mapreduce过程应该是:
map + Shuffle+reduce才贴切。

 3 hadoop1 和
hadoop2架构对比

  由于MRv1 在扩展性、可靠性、资源利用率和多框架等方面存在明显不足,Apache开始尝试对MapReduce 进行升级改造,于是诞生了更加先进的下一代MapReduce 计算框架MRv2。由于MRv2 将资源管理模块构建成了一个独立的通用系统YARN,这直接使得MRv2 的核心从计算框架MapReduce 转移为资源管理系统YARN。

  3.1 编程模型对比

v\:* {behavior:url(#default#VML);}
o\:* {behavior:url(#default#VML);}
w\:* {behavior:url(#default#VML);}
.shape {behavior:url(#default#VML);}

Normal
0

7.8 磅
0
2

false
false
false

EN-US
ZH-CN
X-NONE

/* Style Definitions */
table.MsoNormalTable
{mso-style-name:普通表格;
mso-tstyle-rowband-size:0;
mso-tstyle-colband-size:0;
mso-style-noshow:yes;
mso-style-priority:99;
mso-style-parent:"";
mso-padding-alt:0cm 5.4pt 0cm 5.4pt;
mso-para-margin:0cm;
mso-para-margin-bottom:.0001pt;
mso-pagination:widow-orphan;
font-size:10.5pt;
mso-bidi-font-size:11.0pt;
font-family:"Calibri",sans-serif;
mso-ascii-font-family:Calibri;
mso-ascii-theme-font:minor-latin;
mso-hansi-font-family:Calibri;
mso-hansi-theme-font:minor-latin;
mso-font-kerning:1.0pt;}

v\:* {behavior:url(#default#VML);}
o\:* {behavior:url(#default#VML);}
w\:* {behavior:url(#default#VML);}
.shape {behavior:url(#default#VML);}

Normal
0

7.8 磅
0
2

false
false
false

EN-US
ZH-CN
X-NONE

/* Style Definitions */
table.MsoNormalTable
{mso-style-name:普通表格;
mso-tstyle-rowband-size:0;
mso-tstyle-colband-size:0;
mso-style-noshow:yes;
mso-style-priority:99;
mso-style-parent:"";
mso-padding-alt:0cm 5.4pt 0cm 5.4pt;
mso-para-margin:0cm;
mso-para-margin-bottom:.0001pt;
mso-pagination:widow-orphan;
font-size:10.5pt;
mso-bidi-font-size:11.0pt;
font-family:"Calibri",sans-serif;
mso-ascii-font-family:Calibri;
mso-ascii-theme-font:minor-latin;
mso-hansi-font-family:Calibri;
mso-hansi-theme-font:minor-latin;
mso-font-kerning:1.0pt;}

Map Reduce 1 架构


Map Reduce 2 架构

v\:* {behavior:url(#default#VML);}
o\:* {behavior:url(#default#VML);}
w\:* {behavior:url(#default#VML);}
.shape {behavior:url(#default#VML);}

Normal
0

7.8 磅
0
2

false
false
false

EN-US
ZH-CN
X-NONE

/* Style Definitions */
table.MsoNormalTable
{mso-style-name:普通表格;
mso-tstyle-rowband-size:0;
mso-tstyle-colband-size:0;
mso-style-noshow:yes;
mso-style-priority:99;
mso-style-parent:"";
mso-padding-alt:0cm 5.4pt 0cm 5.4pt;
mso-para-margin:0cm;
mso-para-margin-bottom:.0001pt;
mso-pagination:widow-orphan;
font-size:10.5pt;
mso-bidi-font-size:11.0pt;
font-family:"Calibri",sans-serif;
mso-ascii-font-family:Calibri;
mso-ascii-theme-font:minor-latin;
mso-hansi-font-family:Calibri;
mso-hansi-theme-font:minor-latin;
mso-font-kerning:1.0pt;}

YARN 负责资源管理和调度,而ApplicationMaster
仅负责一个作业的管理;MRv1 仅是一个独立的离线计算框架,而MRv2 则是运行于YARN 之上的MapReduce

YARN上可以支持运行出mapreduce外的多种计算框架,并能够统一管理和调度。

3.2 Hadoop1架构和Hadoop2架构对比


Hadoop1架构


Hadoop2架构

通过将原有JobTracker 中与应用程序相关和无关的模块分开,不仅减轻了JobTracker 负载,也使得Hadoop 支持更多的计算框架。

  3.3  YARN基本组成结构

  YARN 主要由ResourceManager、NodeManager、ApplicationMaster和Container 等几个组件构成。

  1 Resource Manager主要由两个组件构成:调度器(Scheduler)和(Applications Manager,ASM);调度器:将系统中的资源分配给各个正在运行的应用程序;应用程序管理器:应用程序管理器负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster 运行状态并在失败时重新启动它等。

  2 ApplicationMaster

  1)与RM 调度器协商以获取资源(用 Container 表示);

  2)将得到的任务进一步分配给内部的任务;

  3)与 NM 通信以启动 / 停止任务;

  4)监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务。

  3 NodeManagerNM):NM 是每个节点上的资源和任务管理器;

  (NM与ApplicationMaster的区别是NM是控制整台机器资源和任务管理,而AM是控制每个应用的资源和任务管理)

  4 Container:是YARN 中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等。

  总结:hadoop2统称为YARN,YARN负责资源管理和任务调度,MR2只是上面的一个计算框架,YARN还可以支持Spark和Hbase等框架。

Normal
0
false

7.8 磅
0
2

false
false
false

EN-US
ZH-CN
X-NONE

/* Style Definitions */
table.MsoNormalTable
{mso-style-name:普通表格;
mso-tstyle-rowband-size:0;
mso-tstyle-colband-size:0;
mso-style-noshow:yes;
mso-style-priority:99;
mso-style-parent:"";
mso-padding-alt:0cm 5.4pt 0cm 5.4pt;
mso-para-margin:0cm;
mso-para-margin-bottom:.0001pt;
mso-pagination:widow-orphan;
font-size:10.5pt;
mso-bidi-font-size:11.0pt;
font-family:"Calibri",sans-serif;
mso-ascii-font-family:Calibri;
mso-ascii-theme-font:minor-latin;
mso-hansi-font-family:Calibri;
mso-hansi-theme-font:minor-latin;
mso-font-kerning:1.0pt;}
table.MsoTableGrid
{mso-style-name:网格型;
mso-tstyle-rowband-size:0;
mso-tstyle-colband-size:0;
mso-style-priority:39;
mso-style-unhide:no;
border:solid windowtext 1.0pt;
mso-border-alt:solid windowtext .5pt;
mso-padding-alt:0cm 5.4pt 0cm 5.4pt;
mso-border-insideh:.5pt solid windowtext;
mso-border-insidev:.5pt solid windowtext;
mso-para-margin:0cm;
mso-para-margin-bottom:.0001pt;
mso-pagination:widow-orphan;
font-size:10.5pt;
mso-bidi-font-size:11.0pt;
font-family:"Calibri",sans-serif;
mso-ascii-font-family:Calibri;
mso-ascii-theme-font:minor-latin;
mso-hansi-font-family:Calibri;
mso-hansi-theme-font:minor-latin;
mso-font-kerning:1.0pt;}

时间: 2024-10-11 15:56:14

Hadoop :map+shuffle+reduce和YARN笔记分享的相关文章

Hadoop Map/Reduce

Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集.一个Map/Reduce 作业(job) 通常会把输入的数据集切分为若干独立的数据块,由 map任务(task)以完全并行的方式处理它们.框架会对map的输出先进行排序, 然后把结果输入给reduce任务.通常作业的输入和输出都会被存储在文件系统中. 整个框架负责任务的调度和监控,以及重新执行已经失败的任务.通常,Map/R

Yarn中的Map和Reduce的优化

通过Hive执行的批次任务处理失败,Spark中报的错误日志如下: [plain] view plain copyERROR : Failed to monitor Job[ 3] with exception 'java.lang.IllegalStateException(RPC channel is closed.)' java.lang.IllegalStateException: RPC channel is closed. at com.google.common.base.Prec

MapReduce剖析笔记之七:Child子进程处理Map和Reduce任务的主要流程

在上一节我们分析了TaskTracker如何对JobTracker分配过来的任务进行初始化,并创建各类JVM启动所需的信息,最终创建JVM的整个过程,本节我们继续来看,JVM启动后,执行的是Child类中的Main方法,这个方法是如何执行的. 1,从命令参数中解析相应参数,获取JVMID.建立RPC连接.启动日志线程等初始化操作: 父进程(即TaskTracker)在启动子进程时,会加入一些参数,如本机的IP.端口.TaskAttemptID等等,通过解析可以得到JVMID. String ho

MapReduce剖析笔记之五:Map与Reduce任务分配过程

在上一节分析了TaskTracker和JobTracker之间通过周期的心跳消息获取任务分配结果的过程.中间留了一个问题,就是任务到底是怎么分配的.任务的分配自然是由JobTracker做出来的,具体来说,存在一个抽象类:TaskScheduler,主要负责分配任务,继承该类的有几个类: CapacityTaskScheduler.FairScheduler.JobQueueTaskScheduler(LimitTasksPerJobTaskScheduler又继承于该类). 从名字大致可以看出

一步一步跟我学习hadoop(5)----hadoop Map/Reduce教程(2)

Map/Reduce用户界面 本节为用户採用框架要面对的各个环节提供了具体的描写叙述,旨在与帮助用户对实现.配置和调优进行具体的设置.然而,开发时候还是要相应着API进行相关操作. 首先我们须要了解Mapper和Reducer接口,应用通常须要提供map和reduce方法以实现他们. 接着我们须要对JobConf, JobClient,Partitioner,OutputCollector,Reporter,InputFormat,OutputFormat,OutputCommitter等进行讨

shuffle 过程map与reduce交换数据过程的关键

Shuffle描述着数据从map task输出到reduce task输入的这段过程. 个人理解: map执行的结果会保存为本地的一个文件中: 只要map执行 完成,内存中的map数据就一定会保存到本地文件,保存这个文件有个过程 叫做spilll(溢写),如果需要对map的执行结果做 combine  也是在这个时候(溢写执行的时候,写入磁盘之前)做的 reduce怎么接受数据: 在shuffle过程中,shuffle分为两部分,前半部分是map过程, 如四中写的问题,后半部分是reduce过程

hadoop异常之 reduce拉取数据失败 &#160;(error in shuffle in fetcher)

主要错误信息:Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in shuffle in fetcher#43 解决办法:限制reduce的shuffle内存使用 hive:set mapreduce.reduce.shuffle.memory.limit.percent=0.1; MR:job.getConfiguration().setStrings("mapreduce.reduce.sh

Map、Reduce任务中Shuffle和排序的过程

Map.Reduce任务中Shuffle和排序的过程 流程分析: Map端: 1.每个输入分片会让一个map任务来处理,默认情况下,以HDFS的一个块的大小(默认为64M)为一个分片,当然我们也可以设置块的大小.map输出 的结果会暂且放在一个环形内存缓冲区中(该缓冲区的大小默认为100M,由io.sort.mb属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的 80%,由io.sort.spill.percent属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件.

【Hadoop】Map和Reduce个数问题

在hadoop中当一个任务没有设置的时候,该任务的执行的map的个数是由任务本身的数据量决定的,具体计算方法会在下文说明:而reduce的个数hadoop是默认设置为1的.为何设置为1那,因为一个任务的输出的文件个数是由reduce的个数来决定的.一般一个任务的结果默认是输出到一个文件中,所以reduce的数目设置为1.那如果我们为了提高任务的执行速度如何对map与reduce的个数来进行调整那. 在讲解之前首先,看一下hadoop官方文档是如何说明的. Number of MapsThe nu