MapReduce工作机制

1 剖析MapReduce作业运行机制

1.1 作业的提交

客户端通过JobClient.runJob()来提交一个作业到jobtracker,JobClient程序逻辑如下:

a) 向Jobtracker请求一个新的job id (JobTracker.getNewJobId());

b) 检查作业的输出说明,如已存在抛错误给客户端;计算作业的输入分片;

c) 将运行作业所需要的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中以job id命名的目录下。作业jar副本较多(mapred.submit.replication = 10);

d) 告知jobtracker作业准备执行 (submit job)。

1.2 作业的初始化

job tracker接收到对其submitJob()方法的调用后,将其放入内部队列,交由job scheduler进行调度,并对其进行初始化,包括创建一个正在运行作业的对象(封装任务和记录信息)。

为了创建任务运行列表,job scheduler首先从共享文件系统中获取JobClient已计算好的输入分片信息,然后为每个分片创建一个map任务;创建的reduce任务数量由JobConf的mapred.reduce.task属性决定,schedule创建相应数量的reduce任务。任务此时被执行ID。

1.3 任务的分配

jobtacker应该先选择哪个job来运行?这个由job scheduler来决定,下面会详细讲到。

jobtracker如何选择tasktracker来运行选中作业的任务呢?

每个tasktracker定期发送心跳给jobtracker,告知自己还活着,是否可以接受新的任务。jobtracker以此来决定将任务分配给谁(仍然使用心跳的返回值与tasktracker通信)。每个tasktracker会有固定数量的任务槽来处理map和reduce(比如2,表示tasktracker可以同时运行两个map和reduce),由机器内核的数量和内存大小来决定。job tracker会先将tasktracker的map槽填满,然后分配reduce任务到tasktracker。

jobtracker选择哪个tasktracker来运行map任务需要考虑网络位置,它会选择一个离输入分片较近的tasktracker,优先级是数据本地化(data-local)–>机架本地化(rack-local)。

对于reduce任务,没有什么标准来选择哪个tasktracker,因为无法考虑数据的本地化。map的输出始终是需要经过整理(切分排序合并)后通过网络传输到reduce的,可能多个map的输出会切分出一部分送给一个reduce,所以reduce任务没有必要选择和map相同或最近的机器上。

1.4 任务的执行

1. tasktracker分配到一个任务后,首先从HDFS中把作业的jar文件复制到tasktracker所在的本地文件系统(jar本地化用来启动JVM)。同时将应用程序所需要的全部文件从分布式缓存复制到本地磁盘。

2. 接下来tasktracker为任务新建一个本地工作目录work,并把jar文件的内容解压到这个文件夹下。

3. tasktracker新建一个taskRunner实例来运行该任务。TaskRunner启动一个新的JVM来运行每个任务,以便客户的map/reduce不会影响tasktracker守护进程。但在不同任务之间重用JVM还是可能的。子进程通过umbilical接口(?什么含义,暂时未知)与父进程进行通信。任务的子进程每隔几秒便告知父进程的进度,直到任务完成。

Streaming和Pipes是用来运行其它语言编写的map和reduce。Streaming任务特指任务使用标准输入输出steaming与进程通信,可以是任何语言编写的。pipes特指C++语言编写的任务,其通过socket来通信(persistent socket connection)。

1.5 进度和状态的更新

一个作业和每个任务都有一个状态信息,包括:作业或任务的运行状态(running, successful, failed),map和reduce的进度,计数器值,状态消息或描述。

这些信息通过一定的时间间隔由child JVM –> task tracker –> job tracker汇聚。job tracker将产生一个表明所有运行作业及其任务状态的全局试图。你可以通过Web UI查看。同时JobClient通过每秒查询jobtracker来获得最新状态。

1.6 作业的完成

1.7 作业的失败

2. 作业的调度

默认调度器 – 基于队列的FIFO调度器

公平调度器(Fair Scheduler)- 每个用户都有自己的作业池,用map和reduce的任务槽数来定制作业池的最小容量,也可以设置每个池的权重。Fair Scheduler支持抢占,如果一个池在特定的一段时间内未得到公平的资源共享,它会中止运行池得到过多资源的任务,以便把任务槽让给运行资源不足的池。启动步骤:

1) 拷贝contrib/fairscheduler下的jar复制到lib下;

2) mapred.jobtracker.taskScheduler = org.apache.hadoop.mapred.FairScheduler

3) 重启节点hadoop

能力调度器(Capacity Scheduler)-

3. shuffle和排序

shuffle特指map输出后到reduce运行前得到输入的整个过程,它是MapReduce的心脏,属于不断被优化和改进的代码库的一部分,下面主要针对0.20版本。

Map端

1)Map输出首先放在内存缓冲区(io.sort.mb属性定义,默认100MB);

2)守护进程会将缓冲区的数据按照目标reducer划分成不同的分区(partition),同时按键进行内排序;如果客户端定义了combiner,则combiner会在排序后运行,继续压缩缓存区的数据;

3)缓冲区上定义了一个阈值(io.sort.spill.percent,默认为0.8),当存储内容达到这个值时,缓冲区的值会被写到本地文件中(mapred.local.dir定义,可以是一个或多个目录);这种文件会有多个,每个的内容都是按照reducer分区且局部排序的。这个过程简称spill to disk;

4)Map输出完毕前,这些中间的输出文件会合并成一个已分区且已排序的输出文件中,合并会分多次,每次合并的中间文件个数有io.sort.factor来定义,默认是10;这个过程也会伴随着combiner的运行,min.num.spills.for.combine定义了运行combiner之前溢出写的次数;

5)写磁盘时可以压缩文件。mapred.compress.map.output设置为true,mapred.map.output.compression.codec指定压缩实现类;

map任务完成后,会通知父tasktracker状态已更新,然后tasktracker通过心跳通知jobtracker。下面的reduce所在的tasktracker有一个线程定期询问jobtracker以便获得map输出的位置,直到它获得所有输出的位置。

Reduce端

1)每个map任务的完成时间可能不同,但只要有一个任务完成,reduce任务得知后就开始复制对应它的输出,复制线程数由mapred.reduce.parallel.copies定义,默认为5;

2)如果map输出相当小,则不用复制到文件中,而是reduce tasktracker的内存中。缓冲区大小由mapred.job.shuffle.input.buffer.percent定义用于此用途的堆空间的百分比,默认0.7;一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent,默认值为0.66)或达到reduce输出阈值(mapred.inmem.merge.threshold,默认值为1000),则合并后溢出写到磁盘中;

3)随着磁盘上副本的增多,后台线程会将它们合并为更大的排好序的文件。为了合并,压缩的map输出必须在内存中被解压缩;

4) 复制完所有的map输出后,reduce任务进入合并阶段(sort phase,合并多个文件,并按键排序)。io.sort.factor定义了每次合并数,默认为10,即每10个map输出合并一次。会有很多个合并后的中间文件。

5)最后直接把中间文件数据输入给reduce函数,对已排序输出中的每个键都要调用reduce函数,此阶段的输出直接写到HDFS中。

配置的调优

总原则:给shuffle过程尽量多提供内存空间,但也要确保map函数和reduce函数能得到足够的内存。

运行map和reduce任务的JVM内存大小有mapred.child.java.opts属性设置。

在map端,避免多次溢出写磁盘来获得最佳性能。计数器spilled.records计算在作业运行整个阶段中溢出写磁盘的记录数,大则表明写磁盘太频繁;

在reduce端,中间数据全部驻留在内存中就能得到最佳性能。如果reduce函数的内存需求不大,那么把mapred.inmem.merg.threshold设置为0,把mapred.job.reduce.input.buffer.percent设置为1会带来性能的提升。

4. 任务的执行

Hadoop发现一个任务运行比预期慢的时候,它会尽量检测,并启动另一个相同的任务作为备份,即“推测执行”(speculative execution)。

推测执行是一种优化措施,并不能使作业运行更可靠。默认启用,但可以单独为map/reduce任务设置,mapred.map.tasks.speculative.execution和mapred.reduce.tasks.speculative.execution。开启此功能会减少整个吞吐量,在集群中倾向于关闭此选项,而让用户根据个别作业需要开启该功能。

Hadoop为每个任务启动一个新JVM需要耗时1秒,对于大量超短任务如果重用JVM会提升性能。当启用JVM重用后,JVM不会同时运行多个任务,而是顺序执行。tasktracker可以一次启动多个JVM然后同时运行,接着重用这些JVM。控制任务重用JVM的属性是mapred.job.reuse.jvm.num.tasks,它指定给定作业每个JVM运行的任务的最大数,默认为1,即无重用;-1表示无限制即该作业的所有的任务都是有一个JVM。

在map/reduce程序中,可以通过某些环境属性(Configuration)得知作业和任务的信息。

mapred.job.id              作业ID,如job_201104121233_0001

mapred.tip.id               任务ID,如task_201104121233_0001_m_000003

mapred.task.id             任务尝试ID,如attempt_201104121233_0001_m_000003_0

mapred.task.partition    作业中任务的ID,如3

mapred.task.is.map       此任务是否为map任务,如true

时间: 2024-10-14 02:59:51

MapReduce工作机制的相关文章

MapReduce 工作机制剖析

MapReduce工作机制剖析: 1.     在集群中的任意一个节点提交MapReduce程序: 2.     JobClient收到作业后,JobClient向JobTracker请求获取一个Job ID: 3.     将运行作业所需要的资源文件复制到HDFS上(包括MapReduce程序打包的JAR文件.配置文件和客户端计算所得的输入划分信息),这些文件都存放在JobTracker专门为该作业创建的文件夹中,文件夹名为该作业的Job ID: 4.     获得作业ID后,提交作业: 5.

大数据学习之八——MapReduce工作机制

1.MapReduce的特点 软件框架.并行处理.可靠且容错.大规模集群.海量数据集 2.mapper和reducer mapper负责"分":把复杂的任务分解为若干个"简单的任务"来处理.简单的任务包含三层含义: (1)数据或计算的规模相对原任务要大大缩小: (2)就近计算原则,任务会分配到存放着所需数据的节点上进行计算: (3)这些小任务可以并行计算,彼此间几乎没有依赖关系. reducer负责对map阶段的结果进行汇总. 3.MapReduce的工作机制 (1

MapReduce工作机制详解

1.MapTask工作机制整个Map阶段流程大体如上图所示.简单概述:input File通过split被逻辑切分为多个split文件,通过Record按行读取内容给map(用户自己实现的)进行处理,数据被map处理结束之后交给OutputCollector收集器,对其结果key进行分区(默认使用hash分区),然后写入buffer,每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对

Hadoop Mapreduce 工作机制

一.Mapreduce 中的Combiner package com.gec.demo; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WcCombiner extends Reducer<Text, IntWritable,

MapReduce工作原理图文详解 (炼数成金)

MapReduce工作原理图文详解 1.Map-Reduce 工作机制剖析图: 1.首先,第一步,我们先编写好我们的map-reduce程序,然后在一个client 节点里面进行提交.(一般来说可以在Hadoop集群里里面的任意一个节点进行,只要该节点装了Hadoop并且连入了Hadoop集群) 2.job client 在收到这个请求以后呢,会找到JobTracker并且请求一个作业ID(Job ID).(根据我们的核心配置文件,可以很轻易的找到JobTracker) 3.通过HDFS 系统把

大数据技术之_05_Hadoop学习_02_MapReduce_MapReduce框架原理+InputFormat数据输入+MapReduce工作流程(面试重点)+Shuffle机制(面试重点)

第3章 MapReduce框架原理3.1 InputFormat数据输入3.1.1 切片与MapTask并行度决定机制3.1.2 Job提交流程源码和切片源码详解3.1.3 FileInputFormat切片机制3.1.4 CombineTextInputFormat切片机制3.1.5 CombineTextInputFormat案例实操3.1.6 FileInputFormat实现类3.1.7 KeyValueTextInputFormat使用案例3.1.8 NLineInputFormat使

【转】mapreduce运行机制

转自http://langyu.iteye.com/blog/992916 写的相当好! 谈 mapreduce运行机制,可以从很多不同的角度来描述,比如说从mapreduce运行流程来讲解,也可以从计算模型的逻辑流程来进行讲解,也许有些 深入理解了mapreduce运行机制还会从更好的角度来描述,但是将mapreduce运行机制有些东西是避免不了的,就是一个个参入的实例对象,一个 就是计算模型的逻辑定义阶段,我这里讲解不从什么流程出发,就从这些一个个牵涉的对象,不管是物理实体还是逻辑实体. 首

MapReduce工作原理讲解

有时候我们在用,但是却不知道为什么.就像苹果砸到我们头上,这或许已经是很自然的事情了,但是牛顿却发现了地球的万有引力.ok了,希望通过了解MapReduce,我们能够写出更好的MapReduce例子.第一部分:MapReduce工作原理<ignore_js_op> <ignore_js_op> MapReduce 角色•Client :作业提交发起者.•JobTracker: 初始化作业,分配作业,与TaskTracker通信,协调整个作业.•TaskTracker:保持JobTr

MapReduce 运行机制

 Hadoop 中的MapReduce是一个使用简单的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错式并行处理TB级别的数据集. 一个MapReduce作业(job)通常会把输入的数据集切分为若干独立的数据块,由Map任务以完成并行的方式处理它们.框架会对map函数的输出先进行排序,然后把结果输入 给Reduce任务.通常作业的输入和输出都会被存储在文件系统中.整个框架负责任务的调度和和监控,以及重新执行已经失败的任务. 通常,MapReduce框架