Hadoop FileInputFormat实现原理及源码分析

FileInputFormat(org.apache.hadoop.mapreduce.lib.input.FileInputFormat)是专门针对文件类型的数据源而设计的,也是一个抽象类,它提供两方面的作用:

(1)定义Job输入文件的静态方法;

(2)为输入文件形成切片的通用实现;

至于如何将切片中的数据转换为一条条的“记录”则根据文件类型的不同交由具体的子类负责实现。

FileInputFormat input paths

FileInputFormat提供了四个静态方法用于定义Job的输入文件路径:

public static void addInputPath(Job job, Path path)

public static void addInputPaths(Job job, String commaSeparatedPaths)

public static void setInputPaths(Job job, Path... inputPaths)

public static void setInputPaths(Job job, String commaSeparatedPaths)

addInputPath()、addInputPaths()用于“添加”一个(批)输入路径,可以重复被调用,源码如下:

addInputPath的每一次调用都会将输入路径(Path会被转换为字符串形式)与原有值以“,”分隔进行拼接(并不会覆盖原有值),并保存至Job Configuration的属性INPUT_DIR(mapreduce.input.fileinputformat.inputdir)中。

而addInputPaths实际是对addInputPath的循环调用。

setInputPaths()实际是两个重载方法,用于“设置”一个(批)输入路径,该方法用于一次性调用,每一次调用都会覆盖之前的结果,源码如下:

该方法的最后会替换Job Configuration属性INPUT_DIR(mapreduce.input.fileinputformat.inputdir)的原有值。

这里所说的输入路径可以代表一个文件,也可以代表一个目录(该目录下的所有文件将全部作为输入数据),而且可以在输入路径中使用通配符或者使用“,”进行多个输入路径的拼接。

注意:目录中的内容(子目录)不会被递归处理。实际上目录中应仅包含文件,如果目录中包含子目录,这些子目录会被当作文件处理,从而引发异常。如果我们不需要递归目录,我们可以通过File Pattern或者Filter(见后)告知FileInputFormat仅仅选取指定目录中的文件;如果我们确实需要递归处理目录,则可以通过设置mapreduce.input.fileinputformat.input.dir.recursive为true实现。

有些时候我们还需要“过滤”输入路径中的一些文件,这可以通过方法setInputPathFilter()为FileInputFormat设置相应的过滤器实现,源码如下:

实际就是指定一个PathFilter(PathFilter的相关内容不再讨论范围)的具体实现类名称,保存于Job Configuration属性PATHFILTER_CLASS(mapreduce.input.pathFilter.class)中。

如果我们没有显示设置PathFilter,FileInputFormat会有一个默认的过滤器,用于过滤目录中的隐藏文件;如果我们显示设置PathFilter,则FileInputFormat的过滤器实则是一个过滤器链,而默认的过滤器会居于过滤器链的首部,优先被执行。

综上所述,FileInputFormat的输入路径和过滤器实际可以直接通过相应的属性值进行设置,如下图所求:

FileInputFormat input splits

FileInputFormat生成切片的过程是由getSplits()方法实现的,核心逻辑及相关源码如下:

1. 确定切片大小的最小值与最大值;

最小值:getFormatMinSplitSize()与getMinSplitSize()两者之间的较大值。getFormatMinSplitSize()是FileInputFormat中的一个实例方法,默认返回值为1,即1字节;getMinSplitSize()返回值由属性mapreduce.input.fileinputformat.split.minsize决定,默认值为1,即1字节。如果没有特殊需要,最小值即为1字节。有些数据格式的文件对切片的最小大小是有要求的,如SequenceFile(具体可参考SequenceFile相关文档),这时就需要在FileInputFormat子类中重写getFormatMinSplitSize()方法来满足特定需求。

最大值:getMaxSplitSize()返回值由属性mapreduce.input.fileinputformat.split.maxsize决定,默认值为Long.MAX_VALUE。

2. 获取输入路径中的所有文件信息;

3. 迭代处理输入路径中的每一个文件,为每一个文件生成切片;

对于每一个文件而言,生成切片的过程大致可以概括为以下5个关键步骤:

获取文件的路径及长度(1);

如果文件长度为0,则生成一个“空”的切片(5);如果文件长度不为0,则获取文件的数据块信息(2);

如果文件格式不可切片,则将整个文件生成一个切片(4);如果文件格式可切片,则为该文件生成切片(3)。

其中文件格式是否支持切片,由FileInputFormat isSplitable()方法决定,默认返回值为true,即默认可切片。可以根据实际应用场景的不同,在FileInputFormat的子类中重写该方法,使返回值为false,达到禁止切片的功效,这样每一个Map Task会处理一个文件的全部数据。

在详细介绍第3步之前,需要先引入一个新的类FileSplit,它表示一个文件切片,包含的变量如下:

file:切片所引用的文件路径(名称);

start:切片在文件中的起始偏移量;

length:切片大小;

hosts:由切片在文件中的起始偏移量、切片大小、文件数据块信息可以计算出切片所引用的数据块有哪些(切片大小可能大于HDFS的数据块大小),hosts中保存着这些数据块中的第一个数据块的副本位置(主机名),默认为3个主机名,MapReduce根据此值完成Map Task的调度;

hostInfos:相对于hosts中保存着的主机名,还保存着副本是否位于主机内存的信息。

下面介绍一个(可切片)文件切片的形成过程,大体也可以分为5个步骤:

step1

首先获取文件的数据块大小blockSize(这里也可以看出不同的文件,数据块大小也是可以不同的);

然后根据数据块大小(blockSize)、切片最小值(minSize)、切片最大值(maxSize)计算文件对应的切片大小(splitSize),计算公式如下:

splitSize = max(minimumSize, min(maximumSize, blockSize))

step2

判断文件的剩余大小(未切片的大小)是否满足继续进行切片的条件:((double) bytesRemaining)/splitSize > SPLIT_SLOP为true,其中bytesRemaining初始值为文件长度length,SPLIT_SLOP值为1.1,且不可修改,即文件剩余大小需为切片大小(splitSize)的1.1倍才会继续切片。

step3

获取切片对应的数据块。一个切片根据切片大小的不同,可能会包含若干个数据块,这里将第一个数据块的副本位置作为切片的存储位置。

切片在文件中的起始偏移量的计算公式:

offset = (n - 1)* splitSize,n表示第几个切片

对于给定的切片的offset,getBlockIndex实际就是计算文件的哪个数据块的起止范围恰好包含offset,返回这个数据块在数据块列表(blkLocations)的下标,计算流程如下:

step4

根据下标对应的数据块信息构建一个FileSplit。根据FileSplit的信息,可以看出FileSplit并不实现保存数据,仅仅是通过文件名称、起始偏移量、大小关联数据,并将对应数据块的副本位置作为切片的存储位置进行Map Task的调度。

循环执行step2、step3、step4直到文件剩余大小无法满足切片条件。

step5

将文件的剩余部分构建一个FileSplit。

时间: 2024-10-08 07:25:33

Hadoop FileInputFormat实现原理及源码分析的相关文章

【Spring】Spring&WEB整合原理及源码分析

表现层和业务层整合: 1. Jsp/Servlet整合Spring: 2. Spring MVC整合SPring: 3. Struts2整合Spring: 本文主要介绍Jsp/Servlet整合Spring原理及源码分析. 一.整合过程 Spring&WEB整合,主要介绍的是Jsp/Servlet容器和Spring整合的过程,当然,这个过程是Spring MVC或Strugs2整合Spring的基础. Spring和Jsp/Servlet整合操作很简单,使用也很简单,按部就班花不到2分钟就搞定了

ConcurrentHashMap实现原理及源码分析

ConcurrentHashMap实现原理 ConcurrentHashMap源码分析 总结 ConcurrentHashMap是Java并发包中提供的一个线程安全且高效的HashMap实现(若对HashMap的实现原理还不甚了解,可参考我的另一篇文章HashMap实现原理及源码分析),ConcurrentHashMap在并发编程的场景中使用频率非常之高,本文就来分析下ConcurrentHashMap的实现原理,并对其实现原理进行分析(JDK1.7). ConcurrentHashMap实现原

OpenCV学习笔记(27)KAZE 算法原理与源码分析(一)非线性扩散滤波

http://blog.csdn.net/chenyusiyuan/article/details/8710462 OpenCV学习笔记(27)KAZE 算法原理与源码分析(一)非线性扩散滤波 2013-03-23 17:44 16963人阅读 评论(28) 收藏 举报 分类: 机器视觉(34) 版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] KAZE系列笔记: OpenCV学习笔记(27)KAZE 算法原理与源码分析(一)非线性扩散滤波 OpenCV学习笔记(28)KA

caffe中HingeLossLayer层原理以及源码分析

输入: bottom[0]: NxKx1x1维,N为样本个数,K为类别数.是预测值. bottom[1]: Nx1x1x1维, N为样本个数,类别为K时,每个元素的取值范围为[0,1,2,-,K-1].是groundTruth. 输出: top[0]: 1x1x1x1维, 求得是hingeLoss. 关于HingeLoss: p: 范数,默认是L1范数,可以在配置中设置为L1或者L2范数. :指示函数,如果第n个样本的真实label为k,则为,否则为-1. tnk: bottom[0]中第n个样

【Spring】Spring&WEB整合原理及源码分析(二)

一.整合过程 Spring&WEB整合,主要介绍的是Jsp/Servlet容器和Spring整合的过程,当然,这个过程是Spring MVC或Strugs2整合Spring的基础. Spring和Jsp/Servlet整合操作很简单,使用也很简单,按部就班花不到2分钟就搞定了,本节只讲操作不讲原理,更多细节.原理及源码分析后续过程陆续涉及. 1. 导入必须的jar包,本例spring-web-x.x.x.RELEASE.jar: 2. 配置web.xml,本例示例如下: <?xml vers

深度理解Android InstantRun原理以及源码分析

深度理解Android InstantRun原理以及源码分析 @Author 莫川 Instant Run官方介绍 简单介绍一下Instant Run,它是Android Studio2.0以后新增的一个运行机制,能够显著减少你第二次及以后的构建和部署时间.简单通俗的解释就是,当你在Android Studio中改了你的代码,Instant Run可以很快的让你看到你修改的效果.而在没有Instant Run之前,你的一个小小的修改,都肯能需要几十秒甚至更长的等待才能看到修改后的效果. 传统的代

【OpenCV】SIFT原理与源码分析:关键点描述

<SIFT原理与源码分析>系列文章索引:http://www.cnblogs.com/tianyalu/p/5467813.html 由前一篇<方向赋值>,为找到的关键点即SIFT特征点赋了值,包含位置.尺度和方向的信息.接下来的步骤是关键点描述,即用用一组向量将这个关键点描述出来,这个描述子不但包括关键点,也包括关键点周围对其有贡献的像素点.用来作为目标匹配的依据(所以描述子应该有较高的独特性,以保证匹配率),也可使关键点具有更多的不变特性,如光照变化.3D视点变化等. SIFT

【OpenCV】SIFT原理与源码分析:方向赋值

<SIFT原理与源码分析>系列文章索引:http://www.cnblogs.com/tianyalu/p/5467813.html 由前一篇<关键点搜索与定位>,我们已经找到了关键点.为了实现图像旋转不变性,需要根据检测到的关键点局部图像结构为特征点方向赋值.也就是在findScaleSpaceExtrema()函数里看到的alcOrientationHist()语句: // 计算梯度直方图 float omax = calcOrientationHist(gauss_pyr[o

【OpenCV】SIFT原理与源码分析:关键点搜索与定位

<SIFT原理与源码分析>系列文章索引:http://www.cnblogs.com/tianyalu/p/5467813.html 由前一步<DoG尺度空间构造>,我们得到了DoG高斯差分金字塔: 如上图的金字塔,高斯尺度空间金字塔中每组有五层不同尺度图像,相邻两层相减得到四层DoG结果.关键点搜索就在这四层DoG图像上寻找局部极值点. DoG局部极值点 寻找DoG极值点时,每一个像素点和它所有的相邻点比较,当其大于(或小于)它的图像域和尺度域的所有相邻点时,即为极值点.如下图所