Hadoop MapReduce中如何处理跨行Block和UnputSplit

Hadoop的初学者经常会疑惑这样两个问题:1.Hadoop的一个Block默认是64M,那么对于一个记录行形式的文本,会不会造成一行记录被分到两个Block当中?2.在把文件从Block中读取出来进行切分时,会不会造成一行记录被分成两个InputSplit,如果被分成两个InputSplit,这样一个InputSplit里面就有一行不完整的数据,那么处理这个InputSplit的Mapper会不会得出不正确的结果?

对于上面的两个问题,首先要明确两个概念:Block和InputSplit

1. block是hdfs存储文件的单位(默认是64M);
      2. InputSplit是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念,每个InputSplit并没有对文件实际的切割,只是记录了要处理的数据的位置(包括文件的path和hosts)和长度(由start和length决定)。

因此,以行记录形式的文本,还真可能存在一行记录被划分到不同的Block,甚至不同的DataNode上去。通过分析FileInputFormat里面的getSplits方法,可以得出,某一行记录同样也可能被划分到不同的InputSplit。

[java] view plaincopy

  1. public List<InputSplit> getSplits(JobContext job) throws IOException {
  2. long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
  3. long maxSize = getMaxSplitSize(job);
  4. // generate splits
  5. List<InputSplit> splits = new ArrayList<InputSplit>();
  6. List<FileStatus> files = listStatus(job);
  7. for (FileStatus file: files) {
  8. Path path = file.getPath();
  9. long length = file.getLen();
  10. if (length != 0) {
  11. FileSystem fs = path.getFileSystem(job.getConfiguration());
  12. BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
  13. if (isSplitable(job, path)) {
  14. long blockSize = file.getBlockSize();
  15. long splitSize = computeSplitSize(blockSize, minSize, maxSize);
  16. long bytesRemaining = length;
  17. while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  18. int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  19. splits.add(makeSplit(path, length-bytesRemaining, splitSize,
  20. blkLocations[blkIndex].getHosts()));
  21. bytesRemaining -= splitSize;
  22. }
  23. if (bytesRemaining != 0) {
  24. splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
  25. blkLocations[blkLocations.length-1].getHosts()));
  26. }
  27. } else { // not splitable
  28. splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
  29. }
  30. } else {
  31. //Create empty hosts array for zero length files
  32. splits.add(makeSplit(path, 0, length, new String[0]));
  33. }
  34. }
  35. // Save the number of input files for metrics/loadgen
  36. job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
  37. LOG.debug("Total # of splits: " + splits.size());
  38. return splits;
  39. }

从上面的代码可以看出,对文件进行切分其实很简单:获取文件在HDFS上的路径和Block信息,然后根据splitSize

对文件进行切分,splitSize = computeSplitSize(blockSize, minSize, maxSize);blockSize,minSize,maxSize都可以配置,默认splitSize 就等于blockSize的默认值(64m)。

FileInputFormat对文件的切分是严格按照偏移量来的,因此一行记录比较长的话,其可能被切分到不同的InputSplit。但这并不会对Map造成影响,尽管一行记录可能被拆分到不同的InputSplit,但是与FileInputFormat关联的RecordReader被设计的足够健壮,当一行记录跨InputSplit时,其能够到读取不同的InputSplit,直到把这一行记录读取完成,在Hadoop里,记录行形式的文本,通常采用默认的TextInputFormat,TextInputFormat关联的是LineRecordReader,下面我们来看看LineRecordReader的的nextKeyValue方法里读取文件的代码:

[java] view plaincopy

  1. while (getFilePosition() <= end) {
  2. newSize = in.readLine(value, maxLineLength,
  3. Math.max(maxBytesToConsume(pos), maxLineLength));
  4. if (newSize == 0) {
  5. break;
  6. }

其读取文件是通过LineReader(in就是一个LineReader实例)的readLine方法完成的:

[java] view plaincopy

  1. public int readLine(Text str, int maxLineLength,
  2. int maxBytesToConsume) throws IOException {
  3. if (this.recordDelimiterBytes != null) {
  4. return readCustomLine(str, maxLineLength, maxBytesToConsume);
  5. } else {
  6. return readDefaultLine(str, maxLineLength, maxBytesToConsume);
  7. }
  8. }
  9. /**
  10. * Read a line terminated by one of CR, LF, or CRLF.
  11. */
  12. private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
  13. throws IOException {
  14. str.clear();
  15. int txtLength = 0; //tracks str.getLength(), as an optimization
  16. int newlineLength = 0; //length of terminating newline
  17. boolean prevCharCR = false; //true of prev char was CR
  18. long bytesConsumed = 0;
  19. do {
  20. int startPosn = bufferPosn; //starting from where we left off the last time
  21. if (bufferPosn >= bufferLength) {
  22. startPosn = bufferPosn = 0;
  23. if (prevCharCR)
  24. ++bytesConsumed; //account for CR from previous read
  25. bufferLength = in.read(buffer);
  26. if (bufferLength <= 0)
  27. break; // EOF
  28. }
  29. for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
  30. if (buffer[bufferPosn] == LF) {
  31. newlineLength = (prevCharCR) ? 2 : 1;
  32. ++bufferPosn; // at next invocation proceed from following byte
  33. break;
  34. }
  35. if (prevCharCR) { //CR + notLF, we are at notLF
  36. newlineLength = 1;
  37. break;
  38. }
  39. prevCharCR = (buffer[bufferPosn] == CR);
  40. }
  41. int readLength = bufferPosn - startPosn;
  42. if (prevCharCR && newlineLength == 0)
  43. --readLength; //CR at the end of the buffer
  44. bytesConsumed += readLength;
  45. int appendLength = readLength - newlineLength;
  46. if (appendLength > maxLineLength - txtLength) {
  47. appendLength = maxLineLength - txtLength;
  48. }
  49. if (appendLength > 0) {
  50. str.append(buffer, startPosn, appendLength);
  51. txtLength += appendLength;
  52. }
  53. } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);   <span style="color: #ff0000;">//①</span>
  54. if (bytesConsumed > (long)Integer.MAX_VALUE)
  55. throw new IOException("Too many bytes before newline: " + bytesConsumed);
  56. return (int)bytesConsumed;
  57. }

我们分析下readDefaultLine方法,do-while循环体主要是读取文件,然后遍历读取的内容,找到默认的换行符就终止循环。前面说,对于跨InputSplit的行,LineRecordReader会自动跨InputSplit去读取。这就体现在上述代码的While循环的终止条件上:

while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);

newlineLength==0则以为一次do-while循环中读取的内容中没有遇到换行符,因maxBytesToConsume的默认值为Integer.MAX_VALUE,所以如果读取的内容没有遇到换行符,则会一直读取下去,知道读取的内容超过maxBytesToConsume。这样的出来方式,解决了一行记录跨InputSplit的读取问题,同样也会造成下面两个疑问:

1.既然在LineReader读取方法里面没有对考虑InputSplit的end进行处理,难道读取一个InputSplit的时候,会这样无限的读取下去么?

2.如果一行记录L跨越了A,B两个InputSplit,读A的时候已经读取了跨越A,B的这条记录L,那么对B这个InputSplit读取的时候,如果做到不读取L这条记录在B中的部分呢?

为了解决这两个问题,Hadoop通过下面的代码来做到:LineRecordReader的nextKeyValue方法。

[java] view plaincopy

  1. public boolean nextKeyValue() throws IOException {
  2. if (key == null) {
  3. key = new LongWritable();
  4. }
  5. key.set(pos);
  6. if (value == null) {
  7. value = new Text();
  8. }
  9. int newSize = 0;
  10. // We always read one extra line, which lies outside the upper
  11. // split limit i.e. (end - 1)
  12. while (getFilePosition() <= end) {        <span style="color: #ff0000;"> //②</span>
  13. newSize = in.readLine(value, maxLineLength,
  14. Math.max(maxBytesToConsume(pos), maxLineLength));
  15. if (newSize == 0) {
  16. break;
  17. }
  18. pos += newSize;
  19. inputByteCounter.increment(newSize);
  20. if (newSize < maxLineLength) {
  21. break;
  22. }
  23. // line too long. try again
  24. LOG.info("Skipped line of size " + newSize + " at pos " +
  25. (pos - newSize));
  26. }
  27. if (newSize == 0) {
  28. key = null;
  29. value = null;
  30. return false;
  31. } else {
  32. return true;
  33. }
  34. }

通过代码②处得While条件,就保证了InputSplit读取边界的问题,如果存在跨InputSplit的记录,也只好跨InputSplit读取一次。

再来看LineRecordReader的initialize方法:

[java] view plaincopy

  1. // If this is not the first split, we always throw away first record
  2. // because we always (except the last split) read one extra line in
  3. // next() method.
  4. if (start != 0) {
  5. start += in.readLine(new Text(), 0, maxBytesToConsume(start));
  6. }
  7. this.pos = start;

如果不是第一InputSplit,则在读取的时候,LineRecordReader会自动忽略掉第一个换行符之前的所有内容,这样就不存在重读读取的问题。

时间: 2024-10-10 17:27:38

Hadoop MapReduce中如何处理跨行Block和UnputSplit的相关文章

Hadoop Mapreduce 中的FileInputFormat类的文件切分算法和host选择算法

文件切分算法 文件切分算法主要用于确定InputSplit的个数以及每个InputSplit对应的数据段. FileInputFormat以文件为单位切分成InputSplit.对于每个文件,由以下三个属性值确定其对应的InputSplit的个数. goalSize:根据用户期望的InputSplit数据计算,即totalSize/numSplit.totalSize为文件总大小:numSplit为用户设定的Map Task个数,默认情况下是1. minSize:InputSplit的最小值,由

Hadoop MapReduce中压缩技术的使用

Compression and Input Splits 当我们使用压缩数据作为MapReduce的输入时,需要确认数据的压缩格式是否支持切片? 假设HDFS中有一个未经压缩的大小为1GB的文本文件,如果HDFS Block大小为128MB,那么这个文件会被HDFS存储为8个Block.当MapReduce Job使用这个文件作为输入时将会创建8个切片(默认每一个Block生成一个切片),每一个切片关联的数据都可以被一个Map Task独立地处理. 如果这个文本文件使用Gzip格式压缩,大小仍为

Hadoop Mapreduce 中的Partitioner

Partitioner的作用的对Mapper产生的中间结果进行分片,以便将同一分组的数据交给同一个Reduce处理,Partitioner直接影响Reduce阶段的负载均衡. MapReduce提供了两个Partitioner实现:HashPartitioner和TotalOederPartitioner. HashPartitioner是默认实现,实现了一种基于哈希值的分片方法,代码如下: public int getPartition(K2 key, V2 value, int numRed

Hadoop Mapreduce中shuffle 详解

MapReduce 里面的shuffle:描述者数据从map task 输出到reduce task 输入的这段过程 Shuffle 过程: 首先,map 输出的<key,value > 会放在内存中,内存有一定的大小,超过之后,会将内存里的东西溢写(spill) 到磁盘(disk)中 .在从内存溢写到磁盘的过程中,会有两个操作:分区(parttition),排序(sort).map结束之后,磁盘中会有很多文件 . 有很多小文件,需要将文件进行文件的合并,并且排序.map 中的一些map任务可

hadoop mapreduce中对splite的处理

分片: 1. 在job.submit() 提交job之后  会调用 submitter.submitJobInternal(Job.this, cluster); 2. 在submitJobInternal()函数中 会给job创建分片 int maps = writeSplits(job, submitJobDir); 在该函数中会调用writeNewSplits() 3. 在writeNewSplits()方法 中,通过反射获得InputFormat对象,会调用该对象中的getSplits(

下一代Apache Hadoop MapReduce框架的架构

背景 随着集群规模和负载增加,MapReduce JobTracker在内存消耗,线程模型和扩展性/可靠性/性能方面暴露出了缺点,为此需要对它进行大整修. 需求 当我们对Hadoop MapReduce框架进行改进时,需要时刻谨记的一个重要原则是用户的需求.近几年来,从Hadoop用户那里总结出MapReduce框架当前最紧迫的需求有: (1)可靠性(Reliability)– JobTracker不可靠 (2)可用性(Availability)– JobTracker可用性有问题 (3) 扩展

【转】Hadoop在MapReduce中使用压缩详解

原文链接 http://www.cnblogs.com/ggjucheng/archive/2012/04/22/2465580.html#top hadoop对于压缩文件的支持 hadoop对于压缩格式的是透明识别,我们的MapReduce任务的执行是透明的,hadoop能够自动为我们 将压缩的文件解压,而不用我们去关心. 如果我们压缩的文件有相应压缩格式的扩展名(比如lzo,gz,bzip2等),hadoop就会根据扩展名去选择解码器解压. hadoop对每个压缩格式的支持,详细见下表:  

【Hadoop】三句话告诉你 mapreduce 中MAP进程的数量怎么控制?

1.果断先上结论 1.如果想增加map个数,则设置mapred.map.tasks 为一个较大的值. 2.如果想减小map个数,则设置mapred.min.split.size 为一个较大的值. 3.如果输入中有很多小文件,依然想减少map个数,则需要将小文件merger为大文件,然后使用准则2. 2.原理与分析过程 看了很多博客,感觉没有一个说的很清楚,所以我来整理一下. 先看一下这个图 输入分片(Input Split):在进行map计算之前,mapreduce会根据输入文件计算输入分片(i

Hadoop学习笔记—11.MapReduce中的排序和分组

一.写在之前的 1.1 回顾Map阶段四大步凑 首先,我们回顾一下在MapReduce中,排序和分组在哪里被执行: 从上图中可以清楚地看出,在Step1.4也就是第四步中,需要对不同分区中的数据进行排序和分组,默认情况下,是按照key进行排序和分组. 1.2 实验场景数据文件 在一些特定的数据文件中,不一定都是类似于WordCount单次统计这种规范的数据,比如下面这类数据,它虽然只有两列,但是却有一定的实践意义. 3 3 3 2 3 1 2 2 2 1 1 1 (1)如果按照第一列升序排列,当