Mapreduce运行过程分析(基于Hadoop2.4)——(三)

4.4 Reduce类

4.4.1 Reduce介绍

整完了Map,接下来就是Reduce了。YarnChild.main()—>ReduceTask.run()。ReduceTask.run方法開始和MapTask类似,包含initialize()初始化,依据情况看是否调用runJobCleanupTask(),runTaskCleanupTask()等。之后进入正式的工作,主要有这么三个步骤:Copy、Sort、Reduce。

4.4.2 Copy

Copy就是从运行各个Map任务的节点获取map的输出文件。这是由ReduceTask.ReduceCopier 类来负责。ReduceCopier对象负责将Map函数的输出拷贝至Reduce所在机器。假设大小超过一定阈值就写到磁盘,否则放入内存,在远程拷贝数据的同一时候,Reduce Task启动了两个后台线程对内存和磁盘上的文件进行合并,防止内存使用过多和磁盘文件过多。

Step1:

首先在ReduceTask的run方法中,通过例如以下配置来mapreduce.job.reduce.shuffle.consumer.plugin.class装配shuffle的plugin。默认的实现是Shuffle类:

1     Class<? extends ShuffleConsumerPlugin> clazz = job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
7     shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
9     LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);

Step2:

初始化上述的plugin后,运行其run方法,得到RawKeyValueIterator的实例。

run方法的运行过程例如以下:

Step2.1:

量化Reduce的事件数目:

1     int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH, MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
3     int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);

Step2.2:

生成map的完毕状态获取线程,并启动此线程:

 final EventFetcher<K,V> eventFetcher = new EventFetcher<K,V>(reduceId, umbilical, scheduler, this, maxEventsToFetch);

  eventFetcher.start(); 

获取已经完毕的Map信息,如Map的host、mapId等放入ShuffleSchedulerImpl中的Set<MapHost>中便于以下进行数据的拷贝传输。

1       URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());
3       addKnownMapOutput(u.getHost() + ":" + u.getPort(),
5           u.toString(),
7           event.getTaskAttemptId());
9       maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());

Step2.3:

在Shuffle类中启动初始化Fetcher线程组,并启动:

 1     boolean isLocal = localMapFiles != null;
 2
 3     final int numFetchers = isLocal ? 1 :
 4
 5       jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
 6
 7     Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
 8
 9     if (isLocal) {
10
11       fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
12
13           merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
14
15           localMapFiles);
16
17       fetchers[0].start();
18
19     } else {
20
21       for (int i=0; i < numFetchers; ++i) {
22
23         fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
24
25                                        reporter, metrics, this,
26
27                                        reduceTask.getShuffleSecret());
28
29         fetchers[i].start();
30
31       }
32
33     }

线程的run方法就是进行数据的远程拷贝:

 1     try {
 3           // If merge is on, block
 5           merger.waitForResource();
 8
 9           // Get a host to shuffle from
11           host = scheduler.getHost();
13           metrics.threadBusy();
17           // Shuffle
19           copyFromHost(host);
21         } finally {
23           if (host != null) {
25             scheduler.freeHost(host);
27             metrics.threadFree();
29           }
31         }

Step2.4:

来看下这个copyFromHost方法。主要是就是使用HttpURLConnection,实现远程数据的传输。

建立连接之后,从接收到的Stream流中读取数据。每次读取一个map文件。

1     TaskAttemptID[] failedTasks = null;
2
3       while (!remaining.isEmpty() && failedTasks == null) {
4
5         failedTasks = copyMapOutput(host, input, remaining);
6
7       }

上面的copyMapOutput方法中,每次读取一个mapid,依据MergeManagerImpl中的reserve函数,检查map的输出是否超过了mapreduce.reduce.memory.totalbytes配置的大小,此配置的默认值

是当前Runtime的maxMemory*mapreduce.reduce.shuffle.input.buffer.percent配置的值,Buffer.percent的默认值为0.90。

假设mapoutput超过了此配置的大小时,生成一个OnDiskMapOutput实例。在接下来的操作中,map的输出写入到local暂时文件里。

假设没有超过此大小,生成一个InMemoryMapOutput实例。在接下来操作中,直接把map输出写入到内存。

最后,运行ShuffleScheduler.copySucceeded完毕文件的copy,调用mapout.commit函数,更新状态或者触发merge操作。

Step2.5:

等待上面全部的拷贝完毕之后,关闭相关的线程。

 1    eventFetcher.shutDown();
 2
 3     // Stop the map-output fetcher threads
 4     for (Fetcher<K,V> fetcher : fetchers) {
 5       fetcher.shutDown();
 6     }
 7
 8     // stop the scheduler
 9     scheduler.close();
10
11     copyPhase.complete(); // copy is already complete
12     taskStatus.setPhase(TaskStatus.Phase.SORT);
13     reduceTask.statusUpdate(umbilical);

Step2.6:

运行终于的merge操作,由Shuffle中的MergeManager完毕:

 1 public RawKeyValueIterator close() throws Throwable {
 2
 3     // Wait for on-going merges to complete
 4
 5     if (memToMemMerger != null) {
 6
 7       memToMemMerger.close();
 8
 9     }
10
11     inMemoryMerger.close();
12
13     onDiskMerger.close();
14
15
16
17     List<InMemoryMapOutput<K, V>> memory =
18
19       new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
20
21     inMemoryMergedMapOutputs.clear();
22
23     memory.addAll(inMemoryMapOutputs);
24
25     inMemoryMapOutputs.clear();
26
27     List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
28
29     onDiskMapOutputs.clear();
30
31     return finalMerge(jobConf, rfs, memory, disk);
32
33   }

Step3:

释放资源。

mapOutputFilesOnDisk.clear();

Copy完成。

4.4.3 Sort

Sort(事实上相当于合并)就相当于排序工作的一个延续,它会在全部的文件都拷贝完成后进行。使用工具类Merger归并全部的文件。经过此过程后,会产生一个合并了全部(全部并不准确)Map任务输出文件的新文件,而那些从其它各个server搞过来的 Map任务输出文件会删除。依据hadoop是否分布式来决定调用哪种排序方式。

在上面的4.3.2节中的Step2.4结束之后就会触发此操作。

4.4.4 Reduce

经过上面的步骤之后,回到ReduceTask中的run方法继续往下运行,调用runNewReducer。创建reducer:

1 org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
2
3       (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
4
5         ReflectionUtils.newInstance(taskContext.getReducerClass(), job);

并运行其run方法,此run方法就是我们的org.apache.hadoop.mapreduce.Reducer中的run方法。

 1 public void run(Context context) throws IOException, InterruptedException {
 2
 3     setup(context);
 4
 5     try {
 6
 7       while (context.nextKey()) {
 8
 9         reduce(context.getCurrentKey(), context.getValues(), context);
10
11         // If a back up store is used, reset it
12
13         Iterator<VALUEIN> iter = context.getValues().iterator();
14
15         if(iter instanceof ReduceContext.ValueIterator) {
16
17           ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
18
19         }
20
21       }
22
23     } finally {
24
25       cleanup(context);
26
27     }
28
29   }
30
31 }

while的循环条件是ReduceContext.nextKey()为真,这种方法就在ReduceContext中实现的,这种方法的目的就是处理下一个唯一的key,由于reduce方法的输入数据是分组的,所以每次都会处理一个key及这个key相应的全部value,又由于已经将全部的Map Task的输出拷贝过来并且做了排序,所以key同样的KV对都是挨着的。

    nextKey方法中,又会调用nextKeyValue方法来尝试去获取下一个key值,而且假设没数据了就会返回false,假设还有数据就返回true。防止获取反复的数据就在这里做的处理。

接下来就是调用用户自己定义的reduce方法了。

 1 public void reduce(Text key, Iterable<IntWritable> values,
 2
 3                        Context context
 4
 5                        ) throws IOException, InterruptedException {
 6
 7       int sum = 0;
 8
 9       for (IntWritable val : values) {
10
11         sum += val.get();
12
13       }
14
15       result.set(sum);
16
17       context.write(key, result);
18
19     }

-------------------------------------------------------------------------------

假设您看了本篇博客,认为对您有所收获,请点击右下角的 [推荐]

假设您想转载本博客,请注明出处

假设您对本文有意见或者建议,欢迎留言

感谢您的阅读,请关注我的兴许博客

时间: 2024-10-28 10:36:16

Mapreduce运行过程分析(基于Hadoop2.4)——(三)的相关文章

Mapreduce执行过程分析(基于Hadoop2.4)——(三)

4.4 Reduce类 4.4.1 Reduce介绍 整完了Map,接下来就是Reduce了.YarnChild.main()—>ReduceTask.run().ReduceTask.run方法开始和MapTask类似,包括initialize()初始化,根据情况看是否调用runJobCleanupTask(),runTaskCleanupTask()等.之后进入正式的工作,主要有这么三个步骤:Copy.Sort.Reduce. 4.4.2 Copy Copy就是从执行各个Map任务的节点获取

Mapreduce执行过程分析(基于Hadoop2.4)——(一)

1 概述 该瞅瞅MapReduce的内部运行原理了,以前只知道个皮毛,再不搞搞,不然怎么死的都不晓得.下文会以2.4版本中的WordCount这个经典例子作为分析的切入点,一步步来看里面到底是个什么情况. 2 为什么要使用MapReduce Map/Reduce,是一种模式,适合解决并行计算的问题,比如TopN.贝叶斯分类等.注意,是并行计算,而非迭代计算,像涉及到层次聚类的问题就不太适合了. 从名字可以看出,这种模式有两个步骤,Map和Reduce.Map即数据的映射,用于把一组键值对映射成另

Mapreduce执行过程分析(基于Hadoop2.4)——(二)

4.3 Map类 创建Map类和map函数,map函数是org.apache.hadoop.mapreduce.Mapper类中的定义的,当处理每一个键值对的时候,都要调用一次map方法,用户需要覆写此方法.此外还有setup方法和cleanup方法.map方法是当map任务开始运行的时候调用一次,cleanup方法是整个map任务结束的时候运行一次. 4.3.1 Map介绍 Mapper类是一个泛型类,带有4个参数(输入的键,输入的值,输出的键,输出的值).在这里输入的键为Object(默认是

Hadoop伪分布安装详解+MapReduce运行原理+基于MapReduce的KNN算法实现

本篇博客将围绕Hadoop伪分布安装+MapReduce运行原理+基于MapReduce的KNN算法实现这三个方面进行叙述. (一)Hadoop伪分布安装 1.简述Hadoop的安装模式中–伪分布模式与集群模式的区别与联系. Hadoop的安装方式有三种:本地模式,伪分布模式,集群(分布)模式,其中后两种模式为重点,有意义 伪分布:如果Hadoop对应的Java进程都运行在一个物理机器上,称为伪分布 分布:如果Hadoop对应的Java进程运行在多台物理机器上,称为分布.[集群就是有主有从] 伪

mapreduce运行流程总结

先上图,下图描绘了一个mapreduce程序的的一般运行过程和需要经过的几个阶段 大体上我们可以将mapreduce程序划分为inputformat ,map ,shuffle,reduce,outputformat五个阶段,下面我们会详细介绍各个阶段的具体的运行细节 以最简单的wordcount程序为例,本例使用基于hadoop2.6的环境,一般的api都使用mapreudce下的,注意不要使用mapred下的api可能会引起未知错误  惯例hello word程序 driver类,负责构建m

Task运行过程分析3——Map Task内部实现

Map Task内部实现 在Task运行过程分析2中提到,MapTask分为4种,分别是Job-setup Task.Job-cleanup Task.Task-cleanup Task和Map Task.其中,Job-setup Task和Job-cleanup Task分别是作业运行时启动的第一个任务和最后一个任务,主要工作分别是进行一些作业初始化和收尾工作,比如创建和删除作业临时输出目录:而Task-cleanup Task则是任务失败或者被杀死后,用于清理已写入临时目录中数据的任务.本文

【转】mapreduce运行机制

转自http://langyu.iteye.com/blog/992916 写的相当好! 谈 mapreduce运行机制,可以从很多不同的角度来描述,比如说从mapreduce运行流程来讲解,也可以从计算模型的逻辑流程来进行讲解,也许有些 深入理解了mapreduce运行机制还会从更好的角度来描述,但是将mapreduce运行机制有些东西是避免不了的,就是一个个参入的实例对象,一个 就是计算模型的逻辑定义阶段,我这里讲解不从什么流程出发,就从这些一个个牵涉的对象,不管是物理实体还是逻辑实体. 首

MFC的运行过程分析

MFC程序的运行细节剖析 MFC程序也是Windows程序,所以它应该也有一个WinMain,但是在程序中看不到它的踪影.其实在程序进入点之前,还有一个(而且仅有一个)全局对象(theApp),这就是所谓的Application object,当操作系统将程序加载并激活时,这个全局对象获得配置,其构造函数会先执行,比WinMain更早. 一 CWinApp取代WinMain CWinApp的派生对象被称为application object,可以想见,CWinApp本身就代表一个程序本体.所谓程

MapReduce教程(一)基于MapReduce框架开发&lt;转&gt;

1 MapReduce编程 1.1 MapReduce简介 MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算,用于解决海量数据的计算问题. MapReduce分成了两个部分: 1.映射(Mapping)对集合里的每个目标应用同一个操作.即,如果你想把表单里每个单元格乘以二,那么把这个函数单独地应用在每个单元格上的操作就属于mapping. 2.化简(Reducing)遍历集合中的元素来返回一个综合的结果.即,输出表单里一列数字的和这个任务属于reducing. 你向Ma