hadoop中OutputFormat 接口的设计与实现

OutputFormat 主要用于描述输出数据的格式,它能够将用户提供的 key/value 对写入特定格式的文件中。 本文将介绍 Hadoop 如何设计 OutputFormat 接口 , 以及一些常用的OutputFormat 实现。

1.旧版 API 的 OutputFormat 解析

如图所示, 在旧版 API 中,OutputFormat 是一个接口,它包含两个方法:

RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
                                     String name, Progressable progress)
                                             throws IOException;
void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;

checkOutputSpecs 方法一般在用户作业被提交到 JobTracker 之前, 由 JobClient 自动调用,以检查输出目录是否合法。

getRecordWriter 方法返回一个 RecordWriter 类对象。 该类中的方法 write 接收一个key/value 对, 并将之写入文件。在 Task 执行过程中, MapReduce 框架会将 map() 或者reduce() 函数产生的结果传入 write 方法, 主要代码(经过简化)如下。假设用户编写的 map() 函数如下:

public void map(Text key, Text value,
         OutputCollector<Text, Text> output,
         Reporter reporter) throws IOException {
         // 根据当前 key/value 产生新的输出 <newKey, newValue>, 并输出
         ……
         output.collect(newKey, newValue);
}

则函数 output.collect(newKey, newValue) 内部执行代码如下:

RecordWriter<K, V> out = job.getOutputFormat().getRecordWriter(...);
out.write(newKey, newValue);

Hadoop 自带了很多OutputFormat 实现, 它们与 InputFormat 实现相对应,具体如图所示。所有基于文件的 OutputFormat 实现的基类为 FileOutputFormat, 并由此派生出一些基于文本文件格式、 二进制文件格式的或者多输出的实现。

为了深入分析OutputFormat的实现方法,选取比较有代表性的FileOutputFormat类进行分析。同介绍 InputFormat 实现的思路一样,我们先介绍基类FileOutputFormat,再介绍其派生类 TextOutputFormat。基类 FileOutputFormat 需要提供所有基于文件的 OutputFormat 实现的公共功能,总结起来,主要有以下两个:
(1) 实现 checkOutputSpecs 接口
该接口 在作业运行之前被调用, 默认功能是检查用户配置的输出目 录是否存在,如果存在则抛出异常,以防止之前的数据被覆盖。
(2) 处理 side-effect file
任务的 side-effect file 并不是任务的最终输出文件,而是具有特殊用途的任务专属文件。 它的典型应用是执行推测式任务。 在 Hadoop 中,因为硬件老化、网络故障等原因,同一个作业的某些任务执行速度可能明显慢于其他任务,这种任务会拖慢整个作业的执行速度。为了对这种“ 慢任务” 进行优化, Hadoop 会为之在另外一个节点上启动一个相同的任务,该任务便被称为推测式任务,最先完成任务的计算结果便是这块数据对应的处理结果。为防止这两个任务同 时往一个输出 文件中 写入数据时发生写冲突, FileOutputFormat会为每个 Task 的数据创建一个 side-effect file,并将产生的数据临时写入该文件,待 Task完成后,再移动到最终输出目 录中。 这些文件的相关操作, 比如创建、删除、移动等,均由 OutputCommitter 完成。它是一个接口,Hadoop 提供了默认实现 FileOutputCommitter,用户也可以根据自己的需求编写 OutputCommitter 实现, 并通过参数 {mapred.output.committer.class} 指定。OutputCommitter 接口定义以及 FileOutputCommitter 对应的实现如表所示。

表-- OutputCommitter 接口定义以及 FileOutputCommitter 对应的实现

方法 何时被调用 FileOutputCommitter 实现
setupJob 作业初始化 创建临时目录 ${mapred.out.dir} /_temporary
commitJob 作业成功运行完成 删除临时目录,并在${mapred.out.dir} 目录下创建空文件_SUCCESS
abortJob 作业运行失败  删除临时目录
setupTask 任务初始化 不进行任何操作。原本是需要在临时目录下创建 side-effect file
的,但它是用时创建的(create on demand)
needsTaskCommit  判断是否需要提交结果 只要存在side-effect file,就返回 true
commitTask 任务成功运行完成 提交结果, 即将 side-effect file 移动到 ${mapred.out.dir} 目录下
abortTask 任务运行失败 删除任务的 side-effect file注意默认情况下,当作业成功运行完成后,会在最终结果目录 ${mapred.out.dir} 下生成

注意:默认情况下,当作业成功运行完成后,会在最终结果目录 ${mapred.out.dir} 下生成空文件 _SUCCESS。该文件主要为高层应用提供作业运行完成的标识,比如,Oozie 需要通过检测结果目 录下是否存在该文件判 断作业是否运行完成。

2. 新版 API 的 OutputFormat 解析

如图所示, 除了 接口 变为抽象类外, 新 API 中 的 OutputFormat 增加了一个新的方法:getOutputCommitter,以允许用户自 己定制合适的 OutputCommitter 实现。

参考资料

《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》

时间: 2024-08-09 16:40:40

hadoop中OutputFormat 接口的设计与实现的相关文章

Hadoop中的排序的设计

排序经常会用,但是怎么在大数据中,以Map,Reduce这种形式来进行实现了? 首先你要明确目标对象,对谁进行排序,如果是自定义的对象,需要实现其CompareTo方法,因为这个是对象之间比较大小的方法. 另外你需要设计排序策略,特殊情况,正常情况,编程的时候,应该先写特殊情况,最后写正常情况,这样的话,逻辑更加清晰. 如何设计呢? 首先按照正常逻辑写好Map和Reduce程序,然后输出,最后得到的是一些文本, 在写一个Map,Reduce程序在写,进行排序.思想也是一样的,偏移量为key,(刚

Web项目架构之接口骨架设计

今天开始更新上学期项目的总结,再不更新马上都忘了,同时也和CSDNer分享一下,和大家交流关于系统设计的方法,欢迎大家拍砖.扔砖.泼凉水... 1.Spring+Hibernate+Struts2架构图 1.架构分析图 直接上图吧,有图有真相很容易分析 上面的架构中采用了MVC三层架构的方式,其中M:Model模型层 V:View视图层 C:Control控制层,其中模型层有各种JavaBean来担当,View视图层是有JSP(Struts2标签)充当,控制层有Struts2的Action来充当

Hadoop中常用的InputFormat、OutputFormat(转)

Hadoop中的Map Reduce框架依赖InputFormat提供数据,依赖OutputFormat输出数据,每一个Map Reduce程序都离不开它们.Hadoop提供了一系列InputFormat和OutputFormat方便开发,本文介绍几种常用的: TextInputFormat 作为默认的文件输入格式,用于读取纯文本文件,文件被分为一系列以LF或者CR结束的行,key是每一行的位置偏移量,是LongWritable类型的,value是每一行的内容,为Text类型. KeyValue

hadoop中Configuration类剖析

Configuration是hadoop中五大组件的公用类,所以放在了core下,org.apache.hadoop.conf.Configruration.这个类是作业的配置信息类,任何作用的配置信息必须通过Configuration传递,因为通过Configuration可以实现在多个mapper和多个reducer任务之间共享信息. 类图 说明:Configuration实现了Iterable和Writable两个接口,其中实现Iterable是为了迭代,迭代出Configuration对

结合手机上网流量业务来说明Hadoop中的自定义数据类型(序列化、反序列化机制)

大家都知道,Hadoop中为Key的数据类型必须实现WritableComparable接口,而Value的数据类型只需要实现Writable接口即可:能做Key的一定可以做Value,能做Value的未必能做Key.但是具体应该怎么应用呢?--本篇文章将结合手机上网流量业务进行分析. 先介绍一下业务场景:统计每个用户的上行流量和,下行流量和,以及总流量和. 本次描述所用数据: 日志格式描述: 日志flowdata.txt中的具体数据: 接下来贴出详细代码,代码中含有详细注释,从代码中可以看出,

Hadoop中两表JOIN的处理方法(转)

1. 概述 在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的.而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧. 本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和File2. 2.1 reduce side join reduce side join是一种最简单的join方式,其主

Hadoop介绍-4.Hadoop中NameNode、DataNode、Secondary、NameNode、JobTracker TaskTracker

Hadoop是一个能够对大量数据进行分布式处理的软体框架,实现了Google的MapReduce编程模型和框架,能够把应用程式分割成许多的 小的工作单元,并把这些单元放到任何集群节点上执行.在MapReduce中,一个准备提交执行的应用程式称为「作业(job)」,而从一个作业划分出 得.运行于各个计算节点的工作单元称为「任务(task)」.此外,Hadoop提供的分布式文件系统(HDFS)主要负责各个节点的数据存储,并实现了 高吞吐率的数据读写. 在分布式存储和分布式计算方面,Hadoop都是用

谈谈-Android中的接口回调技术

Android中的接口回调技术有很多应用的场景,最常见的:Activity(人机交互的端口)的UI界面中定义了Button,点击该Button时,执行某个逻辑. 下面参见上述执行的模型,讲述James对Android接口回调技术的理解(结合前人的知识和自己的实践). 使用一个比喻很形象地说明:客户端有个疑问打电话请教服务端,但服务端无法现场给出解答,相互之间约定:服务端一旦有答案,使用电话的方式反馈给客户端. 以上有三个主体:客户端.服务端和接口(方式). 接口回调的原理框图说明: Demo界面

漫谈Java程序设计中的接口应用

Java语言提供了一种接口(interface)机制.这种接口机制使Java的面向对象编程变得更加灵活.我们可以用接口来定义一个类的表现形式,但接口不能包含任何实现.在<Thinking in Java>一书中,作者对接口有这样的描述:“接口(interface)比抽象(abstract)的概念更进了一步.你可以把一个接口看成是一个纯的抽象类.”我认为作者对接口的这一解释再准确不过了. 理解并用好接口机制将帮助我们更好的掌握Java这种面向对象的编程语言.下面我们来讨论一下接口的使用规则以及相