Hadoop源码解析之: TextInputFormat如何处理跨split的行

转自:http://blog.csdn.net/bluishglc/article/details/9380087

我们知道hadoop将数据给到map进行处理前会使用InputFormat对数据进行两方面的预处理:

  • 对输入数据进行切分,生成一组split,一个split会分发给一个mapper进行处理。
  • 针对每个split,再创建一个RecordReader读取Split内的数据,并按照<key,value>的形式组织成一条record传给map函数进行处理。

最常见的FormatInput就是TextInputFormat,在split的读取方面,它是将给到的Split按行读取,以行首字节在文件中的偏移做key,以行数据做value传给map函数处理,这部分的逻辑是由它所创建并使用的RecordReader:LineRecordReader封装和实现的.关于这部分逻辑,在一开始接触hadoop时会有一个常见的疑问:如果一个行被切分到两个split里(这几乎是一定会发生的情况),TextInputFormat是如何处理的?如果是生硬地把一行切割到两个split里,是对数据的一种破坏,可能会影响数据分析的正确性(比如word count就是一个例子).搞清楚这个问题还是需要从源码入手了解TextInputFormat的详细工作方式,这里简单地梳理记录如下(本文参考的是hadoop1.1.2的源码):

1. LineRecordReader会创建一个org.apache.hadoop.util.LineReader实例,并依赖这个LineReader的readLine方法来读取一行记录,具体可参考org.apache.hadoop.mapred.LineRecordReader.next(LongWritable, Text),Line 176),那么关键的逻辑就在这个readLine方法里了,下面是添加了额外中文注释的该方法源码.这个方法主要的逻辑归纳起来是3点:

  • 总是是从buffer里读取数据,如果buffer里的数据读完了,先加载下一批数据到buffer
  • 在buffer中查找"行尾",将开始位置至行尾处的数据拷贝给str(也就是最后的Value).如果为遇到"行尾",继续加载新的数据到buffer进行查找.
  • 关键点在于:给到buffer的数据是直接从文件中读取的,完全不会考虑是否超过了split的界限,而是一直读取到当前行结束为止

[java] view plaincopy

  1. /**
  2. * Read one line from the InputStream into the given Text.  A line
  3. * can be terminated by one of the following: ‘\n‘ (LF) , ‘\r‘ (CR),
  4. * or ‘\r\n‘ (CR+LF).  EOF also terminates an otherwise unterminated
  5. * line.
  6. *
  7. * @param str the object to store the given line (without newline)
  8. * @param maxLineLength the maximum number of bytes to store into str;
  9. *  the rest of the line is silently discarded.
  10. * @param maxBytesToConsume the maximum number of bytes to consume
  11. *  in this call.  This is only a hint, because if the line cross
  12. *  this threshold, we allow it to happen.  It can overshoot
  13. *  potentially by as much as one buffer length.
  14. *
  15. * @return the number of bytes read including the (longest) newline
  16. * found.
  17. *
  18. * @throws IOException if the underlying stream throws
  19. */
  20. public int readLine(Text str, int maxLineLength,
  21. int maxBytesToConsume) throws IOException {
  22. /* We‘re reading data from in, but the head of the stream may be
  23. * already buffered in buffer, so we have several cases:
  24. * 1. No newline characters are in the buffer, so we need to copy
  25. *    everything and read another buffer from the stream.
  26. * 2. An unambiguously terminated line is in buffer, so we just
  27. *    copy to str.
  28. * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
  29. *    in CR.  In this case we copy everything up to CR to str, but
  30. *    we also need to see what follows CR: if it‘s LF, then we
  31. *    need consume LF as well, so next call to readLine will read
  32. *    from after that.
  33. * We use a flag prevCharCR to signal if previous character was CR
  34. * and, if it happens to be at the end of the buffer, delay
  35. * consuming it until we have a chance to look at the char that
  36. * follows.
  37. */
  38. str.clear();
  39. int txtLength = 0; //tracks str.getLength(), as an optimization
  40. int newlineLength = 0; //length of terminating newline
  41. boolean prevCharCR = false; //true of prev char was CR
  42. long bytesConsumed = 0;
  43. do {
  44. int startPosn = bufferPosn; //starting from where we left off the last time
  45. //如果buffer中的数据读完了,先加载一批数据到buffer里
  46. if (bufferPosn >= bufferLength) {
  47. startPosn = bufferPosn = 0;
  48. if (prevCharCR)
  49. ++bytesConsumed; //account for CR from previous read
  50. bufferLength = in.read(buffer);
  51. if (bufferLength <= 0)
  52. break; // EOF
  53. }
  54. //注意:这里的逻辑有点tricky,由于不同操作系统对“行结束符“的定义不同:
  55. //UNIX: ‘\n‘  (LF)
  56. //Mac:  ‘\r‘  (CR)
  57. //Windows: ‘\r\n‘  (CR)(LF)
  58. //为了准确判断一行的结尾,程序的判定逻辑是:
  59. //1.如果当前符号是LF,可以确定一定是到了行尾,但是需要参考一下前一个
  60. //字符,因为如果前一个字符是CR,那就是windows文件,“行结束符的长度”
  61. //(即变量:newlineLength,这个变量名起的有点糟糕)应该是2,否则就是UNIX文件,“行结束符的长度”为1。
  62. //2.如果当前符号不是LF,看一下前一个符号是不是CR,如果是也可以确定一定上个字符就是行尾了,这是一个mac文件。
  63. //3.如果当前符号是CR的话,还需要根据下一个字符是不是LF判断“行结束符的长度”,所以只是标记一下prevCharCR=true,供读取下个字符时参考。
  64. for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
  65. if (buffer[bufferPosn] == LF) {
  66. newlineLength = (prevCharCR) ? 2 : 1;
  67. ++bufferPosn; // at next invocation proceed from following byte
  68. break;
  69. }
  70. if (prevCharCR) { //CR + notLF, we are at notLF
  71. newlineLength = 1;
  72. break;
  73. }
  74. prevCharCR = (buffer[bufferPosn] == CR);
  75. }
  76. int readLength = bufferPosn - startPosn;
  77. if (prevCharCR && newlineLength == 0)
  78. --readLength; //CR at the end of the buffer
  79. bytesConsumed += readLength;
  80. int appendLength = readLength - newlineLength;
  81. if (appendLength > maxLineLength - txtLength) {
  82. appendLength = maxLineLength - txtLength;
  83. }
  84. if (appendLength > 0) {
  85. str.append(buffer, startPosn, appendLength);
  86. txtLength += appendLength;
  87. }//newlineLength == 0 就意味着始终没有读到行尾,程序会继续通过文件输入流继续从文件里读取数据。
  88. //这里有一个非常重要的地方:in的实例创建自构造函数:org.apache.hadoop.mapred.LineRecordReader.LineRecordReader(Configuration, FileSplit)
  89. //第86行:FSDataInputStream fileIn = fs.open(split.getPath()); 我们看以看到:
  90. //对于LineRecordReader:当它对取“一行”时,一定是读取到完整的行,不会受filesplit的任何影响,因为它读取是filesplit所在的文件,而不是限定在filesplit的界限范围内。
  91. //所以不会出现“断行”的问题!
  92. } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
  93. if (bytesConsumed > (long)Integer.MAX_VALUE)
  94. throw new IOException("Too many bytes before newline: " + bytesConsumed);
  95. return (int)bytesConsumed;
  96. }

2. 按照readLine的上述行为,在遇到跨split的行时,会到下一个split继续读取数据直至行尾,那么下一个split怎么判定开头的一行有没有被上一个split的LineRecordReader读取过从而避免漏读或重复读取开头一行呢?这方面LineRecordReader使用了一个简单而巧妙的方法:既然无法断定每一个split开始的一行是独立的一行还是被切断的一行的一部分,那就跳过每个split的开始一行(当然要除第一个split之外),从第二行开始读取,然后在到达split的结尾端时总是再多读一行,这样数据既能接续起来又避开了断行带来的麻烦.以下是相关的源码:

在LineRecordReader的构造函数org.apache.hadoop.mapred.LineRecordReader.LineRecordReader(Configuration, FileSplit) 108到113行确定start位置时,明确注明::会特别地忽略掉第一行!

[java] view plaincopy

  1. // If this is not the first split, we always throw away first record
  2. // because we always (except the last split) read one extra line in
  3. // next() method.
  4. if (start != 0) {
  5. start += in.readLine(new Text(), 0, maxBytesToConsume(start));
  6. }

相应地,在LineRecordReader判断是否还有下一行的方法:org.apache.hadoop.mapred.LineRecordReader.next(LongWritable, Text) 170到173行中,while使用的判定条件是:当前位置小于或等于split的结尾位置,也就说:当当前以处于split的结尾位置上时,while依然会执行一次,这一次读到显然已经是下一个split的开始行了!

[java] view plaincopy

  1. // We always read one extra line, which lies outside the upper
  2. // split limit i.e. (end - 1)
  3. while (getFilePosition() <= end) {

小结:

至此,跨split的行读取的逻辑就完备了.如果引申地来看,这是map-reduce前期数据切分的一个普遍性问题,即不管我们用什么方式切分和读取一份大数据中的小部分,包括我们在实现自己的InputFormat时,都会面临在切分处数据时的连续性解析问题. 对此我们应该深刻地认识到:split最直接的现实作用是取出大数据中的一小部分给mapper处理,但这只是一种"逻辑"上的,"宏观"上的切分,在"微观"上,在split的首尾切分处,为了确保数据连续性,跨越split接续并拼接数据也是完全正当和合理的.

时间: 2024-10-24 10:03:53

Hadoop源码解析之: TextInputFormat如何处理跨split的行的相关文章

hadoop源码解析---INodeReference机制

本文主要介绍了hadoop源码中hdfs的INodeReference机制. 在hdfs2.6版本中,引入了许多新的功能,一些原有的源代码设计也有一定的改造.一个重要的更新就是引入了快照功能.但是当HDFS文件或者目录处于某个快照中,并且这个文件或者目录被重命名或者移动到其他路径时,该文件或者目录就会存在多条访问路径.INodeReference就是为了解决这个问题产生的. 问题描述 /a是hdfs中的一个普通目录,s0为/a的一个快照,在/a目录下有一个文件test.根据快照的定义,我们可以通

Hadoop源码解析之 rpc通信 client到server通信

rpc是Hadoop分布式底层通信的基础,无论是client和namenode,namenode和datanode,以及yarn新框架之间的通信模式等等都是采用的rpc方式. 下面我们来概要分析一下Hadoop2的rpc. Hadoop通信模式主要是C/S方式,及客户端和服务端的模式. 客户端采用传统的socket通信方式向服务端发送信息,并等待服务端的返回. 服务端采用reactor的模式(Java nio)的方式来处理客户端的请求并给予响应. 一.客户端到服务端的通信 下面我们先分析客户端到

hadoop源码解析2 - conf包中Configuration.java解析

1 Hadoop Configuration简介    Hadoop没有使用java.util.Properties管理配置文件,也没有使用Apache Jakarta Commons Configuration管理配置文件,而是使用了一套独有的配置文件管理系统,并提供自己的API,即使用org.apache.hadoop.conf.Configuration处理配置信息. org.apache.hadoop.conf目录结构如下: 2 Hadoop配置文件的格式解析    Hadoop配置文件

Hadoop源码解析 1 --- Hadoop工程包架构解析

1 Hadoop中各工程包依赖简述     Google的核心竞争技术是它的计算平台.Google的大牛们用了下面5篇文章,介绍了它们的计算设施.     GoogleCluster: http://research.google.com/archive/googlecluster.html     Chubby:http://labs.google.com/papers/chubby.html     GFS:http://labs.google.com/papers/gfs.html   

spring MVC cors跨域实现源码解析 CorsConfiguration UrlBasedCorsConfigurationSource

spring MVC cors跨域实现源码解析 名词解释:跨域资源共享(Cross-Origin Resource Sharing) 简单说就是只要协议.IP.http方法任意一个不同就是跨域. spring MVC自4.2开始添加了跨域的支持. 跨域具体的定义请移步mozilla查看 使用案例 spring mvc中跨域使用有3种方式: 在web.xml中配置CorsFilter <filter> <filter-name>cors</filter-name> <

socketserver源码解析和协程版socketserver

来,贴上一段代码让你仰慕一下欧socketserver的魅力,看欧怎么完美实现多并发的魅力 client import socket ip_port = ('127.0.0.1',8009) sk = socket.socket() sk.connect(ip_port) sk.settimeout(5) while True: data = sk.recv(1024) print('receive:',data.decode()) inp = input('please input:') sk

【转】Java HashMap 源码解析(好文章)

- .fluid-width-video-wrapper { width: 100%; position: relative; padding: 0; } .fluid-width-video-wrapper iframe, .fluid-width-video-wrapper object, .fluid-width-video-wrapper embed { position: absolute; top: 0; left: 0; width: 100%; height: 100%; } [

Hadoop源码学习笔记(4) ——Socket到RPC调用

Hadoop源码学习笔记(4) ——Socket到RPC调用 Hadoop是一个分布式程序,分布在多台机器上运行,事必会涉及到网络编程.那这里如何让网络编程变得简单.透明的呢? 网络编程中,首先我们要学的就是Socket编程,这是网络编程中最底层的程序接口,分为服务器端和客户端,服务器负责监听某个端口,客户端负责连接服务器上的某个端口,一旦连接通过后,服务器和客户端就可以双向通讯了,我们看下示例代码: ServerSocket server = new ServerSocket(8111); S

spring mvc源码解析

1.从DispatcherServlet开始     与很多使用广泛的MVC框架一样,SpringMVC使用的是FrontController模式,所有的设计都围绕DispatcherServlet 为中心来展开的.见下图,所有请求从DispatcherServlet进入,DispatcherServlet根据配置好的映射策略确定处理的 Controller,Controller处理完成返回ModelAndView,DispatcherServlet根据配置好的视图策略确定处理的 View,由V