Hadoop TextInputFormat源码分析

InputFormat主要用于描述输入数据的格式(我们只分析新API,即org.apache.hadoop.mapreduce.lib.input.InputFormat),提供以下两个功能:

(1).数据切分:按照某个策略将输入数据切分成若干个split,以便确定MapTask个数以及对应的split。

(2).为Mapper提供输入数据:读取给定的split的数据,解析成一个个的key/value对,共Mapper使用。

InputFormat抽象类中只有两个方法,分别对应上面两个功能,源码如下:

[java] view plain copy

  1. public abstract class InputFormat<K, V> {
  2. public abstract
  3. List<InputSplit> getSplits(JobContext context
  4. ) throws IOException, InterruptedException;
  5. public abstract
  6. RecordReader<K,V> createRecordReader(InputSplit split,
  7. TaskAttemptContext context
  8. ) throws IOException,
  9. InterruptedException;
  10. }

getSplits()方法返回的是InputSplit类型的集合,InputSplit分片由两个特点:

(1):逻辑分片即只是在逻辑上对数据进行分片,并不进行物理切分,这点和block是不同的,它只记录一些元信息,比如起始位置、长度以及所在的节点列表等;

(2):必须可序列化,分片信息要上传到HDFS文件系统,还会被JobTracker读取,序列化可以方便进程通信以及永久存储。

createRecordReader()方法返回的是RecordReader对象,该对象可以将输入数据,即InputSplit对应的数据解析成众多的key/value对,会作为MapTask的map方法的输入。

我们本节就一最常用的TextInputFormat为例讲解分片和读取分片数据。

我们先来看看,TextInputFormat、FileInputFormat和InputFormat三者之间的关系,如下:

[java] view plain copy

  1. public class TextInputFormat extends FileInputFormat;
  2. public abstract class FileInputFormat<K, V> extends InputFormat;
  3. public abstract class InputFormat。

最顶的父类InputFormat只有两个未实现的抽象方法getSplits()和createRecordReader();而FileInputFormat包含的方法比较多,如下图:

我们在自己的MapReduce程序中设置输入目录就是调用这里面的方法。

TextInputFormat这个类只有两个方法,源码如下:

[java] view plain copy

  1. /** An {@link InputFormat} for plain text files.  Files are broken into lines.
  2. * Either linefeed or carriage-return are used to signal end of line.  Keys are
  3. * the position in the file, and values are the line of text.. */
  4. public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
  5. @Override
  6. public RecordReader<LongWritable, Text>
  7. createRecordReader(InputSplit split,
  8. TaskAttemptContext context) {
  9. return new LineRecordReader();
  10. }
  11. @Override
  12. protected boolean isSplitable(JobContext context, Path file) {
  13. CompressionCodec codec =
  14. new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
  15. if (null == codec) {
  16. return true;
  17. }
  18. return codec instanceof SplittableCompressionCodec;
  19. }
  20. }

isSplitable方法表示是否要切分文件,这个方法显示如果是压缩文件就不切分,非压缩文件就切分。

接下来,我们只关注那两个主要方法,首先来看:

一:getSplits()方法,这个方法在FileInputFormat类中,它的子类一般只需要实现TextInputFormat中的两个方法而已,getSplits()方法代码如下:

[java] view plain copy

  1. /**
  2. * Generate the list of files and make them into FileSplits.
  3. */
  4. public List<InputSplit> getSplits(JobContext job
  5. ) throws IOException {
  6. long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
  7. long maxSize = getMaxSplitSize(job);    //Long.MAX_VALUE
  8. // generate splits
  9. List<InputSplit> splits = new ArrayList<InputSplit>();
  10. List<FileStatus>files = listStatus(job);
  11. for (FileStatus file: files) {
  12. Path path = file.getPath();
  13. FileSystem fs = path.getFileSystem(job.getConfiguration());
  14. long length = file.getLen();    //整个文件的长度
  15. BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
  16. if ((length != 0) && isSplitable(job, path)) {    //默认是true,但是如果是压缩的,则是false
  17. long blockSize = file.getBlockSize();        //64M,67108864B
  18. long splitSize = computeSplitSize(blockSize, minSize, maxSize);    //计算split大小  Math.max(minSize, Math.min(maxSize, blockSize))
  19. long bytesRemaining = length;
  20. while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  21. int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  22. splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
  23. blkLocations[blkIndex].getHosts()));        //hosts是主机名,name是IP
  24. bytesRemaining -= splitSize;        //剩余块的大小
  25. }
  26. if (bytesRemaining != 0) {    //最后一个
  27. splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
  28. blkLocations[blkLocations.length-1].getHosts()));
  29. }
  30. } else if (length != 0) {    //isSplitable(job, path)等于false
  31. splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
  32. } else {
  33. //Create empty hosts array for zero length files
  34. splits.add(new FileSplit(path, 0, length, new String[0]));
  35. }
  36. }
  37. // Save the number of input files in the job-conf
  38. job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
  39. LOG.debug("Total # of splits: " + splits.size());
  40. return splits;
  41. }

代码分析:

(1):minSize=Math.max(getFormatMinSplitSize(),getMinSplitSize(job)):

getFormatMinSplitSize()返回了一个固定值即1;getMinSplitSize(job)是获取在mapred-site.xml文件中"mapred.min.split.size"属性配置的值,默认是0。

(2):maxSize=getMaxSplitSize(job):

getMaxSplitSize(job)是获取"mapred.max.split.size"属性配置的值,默认是Long.MAX_VALUE,即Long类型的最大值(注:这个属性在mapred-site.xml文件中并没有,所以不推荐配置)。

(3):接下来是遍历输入目录下所有文件的FileStatus信息列表。

(4):然后对每一个文件获取它的目录、文件的长度、文件对应的所有块信息(可能有多个块,每个块对应3个副本);

(5):然后如果文件长度不为0且支持分割(isSplitable方法等于true):获取block大小,默认是64MB,通过方法computeSplitSize(blockSize,minSize,maxSize)计算分片的大小,这个方法的源码返回Max.max(minSize,Math.min(maxSize,blockSize))。

(6):将bytesRemaiining(剩余分片字节数)设置为整个文件的长度。如果bytesRemaining超过分片大小splitSize一定量才会将文件分成多个InputSplit即当bytesRemaing/splitSize>SPLIT_SLOP(SPLIT_SLOP是固定值为1.1)时进入while循环执行getBlockIndex(blkLocations, length-bytesRemaining)获取block的索引,第二个参数就是这个block在整个文件中的偏移量,在循环中会从0越来越大,该方法代码如下:

[java] view plain copy

  1. protected int getBlockIndex(BlockLocation[] blkLocations,
  2. long offset) {
  3. for (int i = 0 ; i < blkLocations.length; i++) {
  4. // is the offset inside this block?
  5. if ((blkLocations[i].getOffset() <= offset) &&
  6. (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
  7. return i;
  8. }
  9. }
  10. BlockLocation last = blkLocations[blkLocations.length -1];
  11. long fileLength = last.getOffset() + last.getLength() -1;
  12. throw new IllegalArgumentException("Offset " + offset +
  13. " is outside of file (0.." +
  14. fileLength + ")");
  15. }

这个方法中的if语句的条件会限制获取到这个偏移量对应的block的索引。

(7)将这个索引对应的block信息的主机节点以及文件的路径名、开始的偏移量、分片大小splitSize封装到一个InputSplit中加入List<InputSplit> splits中。

(8)bytesRemaining -= splitSize是修改剩余字节大小。

(9)判断循环是否继续,如果满足条件则继续执行循环条件,否则跳出循环。

(10)跳出循环之后如果剩余bytesRemaining还不为0,表示还有未分配的数据,将剩余的数据及最后一个block加入到splits集合中。

(11)自此,我们已经走完了getSplits()方法中的第一个if条件,下面说第二个if条件,当不允许分割即isSplitable==false,则将第一个block、文件目录、开始位置0,长度为整个文件的长度封装到一个InputSplit中,加入到splits中。

(12)执行else条件即当文件的长度==0时,则会splits.add(new FileSplit(path,0,length,new String[0]))没有block,并且初始和长度都为0。

(13)将输入目录下文件的个数赋值给"mapreduce.input.num.files",方便以后的校对。

(14) 返回分片信息splits。

以上就是getSplits获取分片的过程,当使用基于FileInputFormat继承InputFormat时,为了提高MapTask的数据本地性,应尽量使InputSplit大小与block大小相同,因为当一个分片包含多个block的时候,总会从其他节点读取数据,也就是做不到所有的计算都是本地化,为了发挥计算本地化性能,应该尽量使InputSplit大小和块大小相当。

特殊问题:就是如果分片大小超过block大小,但是InputSplit中封装了单个block的所在主机信息啊,这样能读取多个block数据吗?这个问题留到最后讲解。

二:createRecordReader()方法,该方法返回一个RecordReader对象,实现了类似的迭代功能,将某个InputSplit解析成一个个key/value对。RecordReader应该注意两点:

(1):定位记录边界:为了能识别一条完整的记录,应该添加一些同步标示,TextInputFormat的标示是换行符;SequenceFileInputFormat的标示是每隔若干条记录会添加固定长度的同步字符串。为了解决InputSplit中第一条或者最后一条可能跨InputSplit的情况,RecordReader规定每个InputSplit的第一条不完整记录划给前一个InputSplit。

(2):解析key/value:将每个记录分解成key和value两个部分,TextInputFormat每一行的内容是value,该行在整个文件中的偏移量为key;SequenceFileInputFormat的记录共有四个字段组成:前两个字段分别是整个记录的长度和key的长度,均为4字节,后两个字段分别是key和value的内容。

TextInputFormat使用的RecordReader是org.apache.hadoop.mapreduce.lib.input.LineRecordReader。这个类中的方法:首先是initialize()方法,该方法主要是获取分片信息的初始化位置和结束位置,以及输入流(若没有则是压缩流);mapper的key/value是通过LineRecordReader.nextKeyValue()方法将key和value读取到key和value中的,在这个方法中key被设置为在文件中的偏移量,value通过LineReader.readLine(value,maxLineLength,Math.max((int)Math.min(Integer.Max_VALUE,end-pos),maxLineLength))这个方法读取一行数据放入value中,方法代码如下:

[java] view plain copy

  1. /**
  2. * Read one line from the InputStream into the given Text.  A line
  3. * can be terminated by one of the following: ‘\n‘ (LF) , ‘\r‘ (CR),
  4. * or ‘\r\n‘ (CR+LF).  EOF also terminates an otherwise unterminated
  5. * line.
  6. *
  7. * @param str the object to store the given line (without newline)
  8. * @param maxLineLength the maximum number of bytes to store into str;
  9. *  the rest of the line is silently discarded.
  10. * @param maxBytesToConsume the maximum number of bytes to consume
  11. *  in this call.  This is only a hint, because if the line cross
  12. *  this threshold, we allow it to happen.  It can overshoot
  13. *  potentially by as much as one buffer length.
  14. *
  15. * @return the number of bytes read including the (longest) newline
  16. * found.
  17. *
  18. * @throws IOException if the underlying stream throws
  19. */
  20. public int readLine(Text str, int maxLineLength,
  21. int maxBytesToConsume) throws IOException {
  22. /* We‘re reading data from in, but the head of the stream may be
  23. * already buffered in buffer, so we have several cases:
  24. * 1. No newline characters are in the buffer, so we need to copy
  25. *    everything and read another buffer from the stream.
  26. * 2. An unambiguously terminated line is in buffer, so we just
  27. *    copy to str.
  28. * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
  29. *    in CR.  In this case we copy everything up to CR to str, but
  30. *    we also need to see what follows CR: if it‘s LF, then we
  31. *    need consume LF as well, so next call to readLine will read
  32. *    from after that.
  33. * We use a flag prevCharCR to signal if previous character was CR
  34. * and, if it happens to be at the end of the buffer, delay
  35. * consuming it until we have a chance to look at the char that
  36. * follows.
  37. */
  38. str.clear();
  39. int txtLength = 0; //tracks str.getLength(), as an optimization
  40. int newlineLength = 0; //length of terminating newline
  41. boolean prevCharCR = false; //true of prev char was CR
  42. long bytesConsumed = 0;
  43. do {
  44. int startPosn = bufferPosn; //starting from where we left off the last time
  45. if (bufferPosn >= bufferLength) {
  46. startPosn = bufferPosn = 0;
  47. if (prevCharCR)
  48. ++bytesConsumed; //account for CR from previous read
  49. bufferLength = in.read(buffer);    //从输入流中读取一定数量的字节,并将其存储在缓冲区数组 b 中。以整数形式返回实际读取的字节数。
  50. if (bufferLength <= 0)        //结束了,没数据了
  51. break; // EOF
  52. }
  53. //‘\n‘,ASCII码:10,意义:换行NL;;;;‘\r‘ ,ASCII码:13,意义: 回车CR
  54. for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
  55. if (buffer[bufferPosn] == LF) {            //如果是换行字符\n
  56. newlineLength = (prevCharCR) ? 2 : 1;
  57. ++bufferPosn; // at next invocation proceed from following byte,越过换行字符
  58. break;
  59. }
  60. if (prevCharCR) { //CR + notLF, we are at notLF,如果是回车字符\r
  61. newlineLength = 1;
  62. break;
  63. }
  64. prevCharCR = (buffer[bufferPosn] == CR);
  65. }
  66. int readLength = bufferPosn - startPosn;
  67. if (prevCharCR && newlineLength == 0)    //表示还没遇到换行,有回车字符,且缓存最后一个是\r
  68. --readLength; //CR at the end of the buffer
  69. bytesConsumed += readLength;
  70. int appendLength = readLength - newlineLength;    //newlineLength换行符个数
  71. if (appendLength > maxLineLength - txtLength) {
  72. appendLength = maxLineLength - txtLength;
  73. }
  74. if (appendLength > 0) {
  75. str.append(buffer, startPosn, appendLength);    //将数据加入str
  76. txtLength += appendLength;
  77. }
  78. } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);//循环条件没有换行并且没超过上限
  79. if (bytesConsumed > (long)Integer.MAX_VALUE)
  80. throw new IOException("Too many bytes before newline: " + bytesConsumed);
  81. return (int)bytesConsumed;
  82. }

这个方法的目的就是读取一行记录写入str中,bytesConsumed记录为读取的字节总数;bufferLength=in.read(buffer)从输入流读取bufferLength长度的字节数据放入buffer中;do-while中开始部分的if语句要保证将bufferLength个字节数据处理完毕之后再从输入流中读取下一行数据;newlineLength表示换行的标记符长度(0、1、2三种值),因为不同的系统换行标记可能不同,有三种:\r(回车符)、\n(换行符)、\r\n(\n:Unix系统换行符末结束符;\r\n:window系统行末结束符;\r:MAC OS系统行末结束符)。

接下来是进入for循环,for循环会挨个检查字符是否是\r\n,如果是回车符\r,还会将prevCharCR设置为true,当前字符如果是换行符\n,prevCharR==true时(表示上一个字符是回车符)则newlineLength=2(这表明当前系统的换行标记是\r\n),prevcharCR==false(表示上一个字符不是回车符)则newlineLength=1(这表明当前系统的换行标记是\n),并退出for循环;如果当前字符不是换行符\n且prevCharCR==true(表明当前系统的换行标记是\r)则newlineLength=1并退出for循环;这样就找到了换行标记,然后计算数据的长度appendLength(不包含换行符),将buffer中指定位置开始长度为appendLength的数据追加到str(这里其实就是value)中;txtLength表示的是str(这里其实是value中值的长度)。

do-while循环的条件是:

(1):没有发现换行标记即newlineLength==0;

(2):读取的字节数量没有超过上限即bytesconsumed <maxBytesToConsume。

当这两个条件要同时满足。这其中有个问题就是当前系统的换行标记是\r\n,但是这两个字符没有同时出现在这次读取的数据之中,\n在下一个批次中,这没有关系,上面的for循环会检查\r出现之后的下一个字符是否是\n再对newliineLength进行设置的。从这个方法可以看出,即使是记录跨split、跨block也不能阻止它完整读取一行数据的决心啊。

我们再回来看看LineRecordReader.nextKeyValue()方法,这个方法的代码如下:

[java] view plain copy

  1. public boolean nextKeyValue() throws IOException {
  2. if (key == null) {
  3. key = new LongWritable();
  4. }
  5. key.set(pos);
  6. if (value == null) {
  7. value = new Text();
  8. }
  9. int newSize = 0;
  10. // We always read one extra line, which lies outside the upper
  11. // split limit i.e. (end - 1)
  12. while (getFilePosition() <= end) {
  13. newSize = in.readLine(value, maxLineLength,
  14. Math.max(maxBytesToConsume(pos), maxLineLength));
  15. if (newSize == 0) {
  16. break;
  17. }
  18. pos += newSize;
  19. if (newSize < maxLineLength) {
  20. break;
  21. }
  22. // line too long. try again
  23. LOG.info("Skipped line of size " + newSize + " at pos " +
  24. (pos - newSize));
  25. }
  26. if (newSize == 0) {
  27. key = null;
  28. value = null;
  29. return false;
  30. } else {
  31. return true;
  32. }
  33. }

这个方法会控制split读取数据的结束位置,上面的readLine()方法只关注输入流不会管split的大小的。需要注意的是其中的while循环,其中的pos和end表示当前在文件中的偏移量和split的结束位置,即使这个split的最后一行跨split也会完整的读取一行。也就保证了一个记录的完整性。mapper获取key/value会通过getCurrentKey()和getCurrentValue()来达到。但是调用这两个方法前得先调用nextKeyValue()方法才能实现key和value的赋值。

到这里我们回头看看上面那个特殊问题,就是split的大小超过block的大小数据读取的问题,我们前面已经讲过split是逻辑分片,不是物理分片,当MapTask的数据本地性发挥作用时,会从本机的block开始读取,超过这个block的部分可能还在本机也可能不再本机,如果是后者的话就要从别的节点拉数据过来,因为实际数据是一个输入流,这个输入流面向整个问加你,不受什么block啊、split的影响,split越大可能需要从别的节点拉的数据也越大,从而效率也会越慢,拉数据的多少是由getSplits()方法中splitSize决定的。

至此,TextInputFormat的分片和数据读取过程就讲完了。这只是一个例子,其他InputFormat可以参考这个。

时间: 2025-01-19 00:51:13

Hadoop TextInputFormat源码分析的相关文章

Hadoop HDFS源码分析 关于数据块的类

Hadoop HDFS源码分析 关于数据块的类 1.BlocksMap 官方代码中的注释为: /** * This class maintains the map from a block to its metadata. * block's metadata currently includes blockCollection it belongs to and * the datanodes that store the block. */ BlocksMap数据块映射,管理名字节点上的数据

[Hadoop] - TaskTracker源码分析(状态发送)

TaskTracker节点向JobTracker汇报当前节点的运行时信息时候,是将运行状态信息同心跳报告一起发送给JobTracker的,主要包括TaskTracker的基本信息.节点资源使用信息.各任务状态等.所以信息被序列化为TaskTrackerStatus实例对象.每次发送心跳报告的时候,会重新构造一个Status对象,并重置这些信息,而且需要主要的是每次发送的status对象的大小是不一定的,因为很多信息的发送是有时间间隔的.这些操作主要位于方法transmitHeartBeat的上半

Hadoop InputFormat源码分析

平时我们写MapReduce程序的时候,在设置输入格式的时候,总会调用形如job.setInputFormatClass(KeyValueTextInputFormat.class)来保证输入文件按照我们想要的格式被读取.所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等. 不同的InputFormat都会按自己的实现来读取输入数据并产生输入分片,一个输入分片会被单独的MapT

Hadoop HDFS源码分析 读取命名空间镜像和编辑日志数据

读取命名空间镜像和编辑日志数据 1.读取命名空间镜像 类FSImage是 命名空间镜像的java实现,在源码中,英文注释为, /** * FSImage handles checkpointing and logging of the namespace edits. * */ FSImage.loadFSImage(FSNamesystem, StartupOption, MetaRecoveryContext) 读取命名空间镜像. 1 private boolean loadFSImage(

[Hadoop] - TaskTracker源码分析(TaskTracker节点健康状况监控)

在TaskTracker中对象healthStatus保存了当前节点的健康状况,对应的类是org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus.定义如下: static class TaskTrackerHealthStatus implements Writable { private boolean isNodeHealthy; // 节点是否健康 private String healthReport; //

Apache Hadoop hdfs源码分析

FileSystem.get --> 通过反射实例化了一个DistributedFileSystem --> new DFSCilent()把他作为自己的成员变量 在DFSClient构造方法里面,调用了createNamenode,使用了RPC机制,得到了一个NameNode的代理对象,就可以和NameNode进行通信了 FileSystem --> DistributedFileSystem --> DFSClient --> NameNode的代理

Hadoop2源码分析-准备篇

1.概述 我们已经能够搭建一个高可用的Hadoop平台了,也熟悉并掌握了一个项目在Hadoop平台下的开发流程,基于Hadoop的一些套件我们也能够使用,并且能利用这些套件进行一些任务的开发.在Hadoop的应用级别上,我们接着往后面去研究学习,那就是Hadoop的源码了,作为Hadoop开发人员,我们得去学习和研究Hadoop得实现原理,底层框架的设计,编码的实现过程等等,下面就开始我们今天的Hadoop源码分析之旅. 2.准备 在分析源码之前,我们需要准备好分析源码的环境,以及如何去分析(分

Hadoop源码分析之Map输入

对于MapReduce的输入输出Hadoop的官网如下所示 Input and Output types of a MapReduce job: (input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output) 这里将从源码分析 input <k1,v1>->map 的过程, Mapper 基

Hadoop之HDFS原理及文件上传下载源码分析(上)

HDFS原理 首先说明下,hadoop的各种搭建方式不再介绍,相信各位玩hadoop的同学随便都能搭出来. 楼主的环境: 操作系统:Ubuntu 15.10 hadoop版本:2.7.3 HA:否(随便搭了个伪分布式) 文件上传 下图描述了Client向HDFS上传一个200M大小的日志文件的大致过程: 首先,Client发起文件上传请求,即通过RPC与NameNode建立通讯. NameNode与各DataNode使用心跳机制来获取DataNode信息.NameNode收到Client请求后,