hadoop InputFormat详解

1. 概述

我们在设置MapReduce输入格式的时候,会调用这样一条语句:

job.setInputFormatClass(KeyValueTextInputFormat.class);

这条语句保证了输入文件会按照我们预设的格式被读取。KeyValueTextInputFormat即为我们设定的数据读取格式。

所有的输入格式类都继承自InputFormat,这是一个抽象类。其子类有例如专门用于读取普通文件的FileInputFormat,还有用来读取数据库的DBInputFormat等等。相关类图简单画出如下:

2. InputFormat

从InputFormat类图看,InputFormat抽象类仅有两个抽象方法:

  • List<InputSplit> getSplits(), 获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题。
  • RecordReader<K,V> createRecordReader(),创建RecordReader,从InputSplit中读取数据,解决读取分片中数据问题。

在后面说到InputSplits的时候,会介绍在getSplits()时需要验证输入文件是否可分割、文件存储时分块的大小和文件大小等因素,所以总体来说,通过InputFormat,Mapreduce框架可以做到:

  • 验证作业输入的正确性
  • 将输入文件切割成逻辑分片(InputSplit),一个InputSplit将会被分配给一个独立的MapTask
  • 提供RecordReader实现,读取InputSplit中的“K-V对”供Mapper使用

InputFormat抽象类源码也很简单,如下供参考(文章格式考虑,删除了部分注释,添加了部分中文注释):

public abstract class InputFormat<K, V> {

	/**
	 * 仅仅是逻辑分片,并没有物理分片,所以每一个分片类似于这样一个元组 <input-file-path, start, offset>
	 */
	public abstract List<InputSplit> getSplits(JobContext context)
			throws IOException, InterruptedException;

	/**
	 * Create a record reader for a given split.
	 */
	public abstract RecordReader<K, V> createRecordReader(InputSplit split,
			TaskAttemptContext context) throws IOException,
			InterruptedException;

}

不同的InputFormat会各自实现不同的文件读取方式以及分片方式,每个输入分片会被单独的map task作为数据源。下面详细介绍输入分片(inputSplit)是什么。

3.InputSplit

Mappers的输入是一个一个的输入分片,称InputSplit。看源码可知,InputSplit也是一个抽象类,它在逻辑上包含了提供给处理这个InputSplit的Mapper的所有K-V对。

public abstract class InputSplit {
	  /**
	   * 获取Split的大小,支持根据size对InputSplit排序.
	   */
	  public abstract long getLength() throws IOException, InterruptedException;

	  /**
	   * 获取存储该分片的数据所在的节点位置.
	   */
	  public abstract
	    String[] getLocations() throws IOException, InterruptedException;
}

下面深入看一个InputSplit的子类:FileSplit类

public class FileSplit extends InputSplit implements Writable {
	private Path file;
	private long start;
	private long length;
	private String[] hosts;

	/**
	 * Constructs a split with host information
	 *
	 * @param file
	 *            the file name
	 * @param start
	 *            the position of the first byte in the file to process
	 * @param length
	 *            the number of bytes in the file to process
	 * @param hosts
	 *            the list of hosts containing the block, possibly null
	 */
	public FileSplit(Path file, long start, long length, String[] hosts) {
		this.file = file;
		this.start = start;
		this.length = length;
		this.hosts = hosts;
	}

	/** The number of bytes in the file to process. */
	@Override
	public long getLength() {
		return length;
	}

	@Override
	public String[] getLocations() throws IOException {
		if (this.hosts == null) {
			return new String[] {};
		} else {
			return this.hosts;
		}
	}

	// 略掉部分方法
}

从源码中可以看出,FileSplit有四个属性:文件路径,分片起始位置,分片长度和存储分片的hosts。用这四项数据,就可以计算出提供给每个Mapper的分片数据。在InputFormat的getSplit()方法中构造分片,分片的四个属性会通过调用FileSplit的Constructor设置。

再看一个InputSplit的子类:CombineFileSplit。源码如下:

public class CombineFileSplit extends InputSplit implements Writable {

	private Path[] paths;
	private long[] startoffset;
	private long[] lengths;
	private String[] locations;
	private long totLength;

	public CombineFileSplit(Path[] files, long[] start, long[] lengths,
			String[] locations) {
		initSplit(files, start, lengths, locations);
	}

	private void initSplit(Path[] files, long[] start, long[] lengths,
			String[] locations) {
		this.startoffset = start;
		this.lengths = lengths;
		this.paths = files;
		this.totLength = 0;
		this.locations = locations;
		for (long length : lengths) {
			totLength += length;
		}
	}

	public long getLength() {
		return totLength;
	}

	/** Returns all the Paths where this input-split resides */
	public String[] getLocations() throws IOException {
		return locations;
	}

	//省略了部分构造函数和方法,深入学习请阅读源文件
}

为什么介绍该类呢,因为接下来要学习《Hadoop学习(五) – 小文件处理》,深入理解该类,将有助于该节学习。

上面我们介绍的FileSplit对应的是一个输入文件,也就是说,如果用FileSplit对应的FileInputFormat作为输入格式,那么即使文件特别小,也是作为一个单独的InputSplit来处理,而每一个InputSplit将会由一个独立的Mapper Task来处理。在输入数据是由大量小文件组成的情形下,就会有同样大量的InputSplit,从而需要同样大量的Mapper来处理,大量的Mapper Task创建销毁开销将是巨大的,甚至对集群来说,是灾难性的!

CombineFileSplit是针对小文件的分片,它将一系列小文件封装在一个InputSplit内,这样一个Mapper就可以处理多个小文件。可以有效的降低进程开销。与FileSplit类似,CombineFileSplit同样包含文件路径,分片起始位置,分片大小和分片数据所在的host列表四个属性,只不过这些属性不再是一个值,而是一个列表。

需要注意的一点是,CombineFileSplit的getLength()方法,返回的是这一系列数据的数据的总长度。

现在,我们已深入的了解了InputSplit的概念,看了其源码,知道了其属性。我们知道数据分片是在InputFormat中实现的,接下来,我们就深入InputFormat的一个子类,FileInputFormat看看分片是如何进行的。

4. FileInputFormat

FileInputFormat中,分片方法代码及详细注释如下,就不再详细解释该方法:

public List<InputSplit> getSplits(JobContext job) throws IOException {
	// 首先计算分片的最大和最小值。这两个值将会用来计算分片的大小。
	// 由源码可知,这两个值可以通过mapred.min.split.size和mapred.max.split.size来设置
	long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
	long maxSize = getMaxSplitSize(job);

	// splits链表用来存储计算得到的输入分片结果
	List<InputSplit> splits = new ArrayList<InputSplit>();
	// files链表存储由listStatus()获取的输入文件列表,listStatus比较特殊,我们在下面详细研究
	List<FileStatus> files = listStatus(job);
	for (FileStatus file : files) {
		Path path = file.getPath();
		FileSystem fs = path.getFileSystem(job.getConfiguration());
		long length = file.getLen();
		// 获取该文件所有的block信息列表[hostname, offset, length]
		BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
				length);
		// 判断文件是否可分割,通常是可分割的,但如果文件是压缩的,将不可分割
		// 是否分割可以自行重写FileInputFormat的isSplitable来控制
		if ((length != 0) && isSplitable(job, path)) {
			long blockSize = file.getBlockSize();
			// 计算分片大小
			// 即 Math.max(minSize, Math.min(maxSize, blockSize));
			// 也就是保证在minSize和maxSize之间,且如果minSize<=blockSize<=maxSize,则设为blockSize
			long splitSize = computeSplitSize(blockSize, minSize, maxSize);

			long bytesRemaining = length;
			// 循环分片。
			// 当剩余数据与分片大小比值大于Split_Slop时,继续分片, 小于等于时,停止分片
			while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
				int blkIndex = getBlockIndex(blkLocations, length
						- bytesRemaining);
				splits.add(new FileSplit(path, length - bytesRemaining,
						splitSize, blkLocations[blkIndex].getHosts()));
				bytesRemaining -= splitSize;
			}
			// 处理余下的数据
			if (bytesRemaining != 0) {
				splits.add(new FileSplit(path, length - bytesRemaining,
						bytesRemaining,
						blkLocations[blkLocations.length - 1].getHosts()));
			}
		} else if (length != 0) {
			// 不可split,整块返回
			splits.add(new FileSplit(path, 0, length, blkLocations[0]
					.getHosts()));
		} else {
			// 对于长度为0的文件,创建空Hosts列表,返回
			splits.add(new FileSplit(path, 0, length, new String[0]));
		}
	}

	// 设置输入文件数量
	job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
	return splits;
}

在getSplits()方法中,我们提到了一个方法,listStatus(),我们先来看一下这个方法:

protected List<FileStatus> listStatus(JobContext job) throws IOException {

	// 省略部分代码...

	List<PathFilter> filters = new ArrayList<PathFilter>();
	filters.add(hiddenFileFilter);
	PathFilter jobFilter = getInputPathFilter(job);
	if (jobFilter != null) {
		filters.add(jobFilter);
	}
	// 创建了一个MultiPathFilter,其内部包含了两个PathFilter
	// 一个为过滤隐藏文件的Filter,一个为用户自定义Filter(如果制定了)
	PathFilter inputFilter = new MultiPathFilter(filters);

	for (int i = 0; i < dirs.length; ++i) {
		Path p = dirs[i];
		FileSystem fs = p.getFileSystem(job.getConfiguration());
		FileStatus[] matches = fs.globStatus(p, inputFilter);
		if (matches == null) {
			errors.add(new IOException("Input path does not exist: " + p));
		} else if (matches.length == 0) {
			errors.add(new IOException("Input Pattern " + p
					+ " matches 0 files"));
		} else {
			for (FileStatus globStat : matches) {
				if (globStat.isDir()) {
					for (FileStatus stat : fs.listStatus(
							globStat.getPath(), inputFilter)) {
						result.add(stat);
					}
				} else {
					result.add(globStat);
				}
			}
		}
	}

	// 省略部分代码
}
NLineInputFormat是一个很有意思的FileInputFormat的子类,有时间可以了解一下。

5. PathFilter

PathFilter文件筛选器接口,使用它我们可以控制哪些文件要作为输入,哪些不作为输入。PathFilter有一个accept(Path)方法,当接收的Path要被包含进来,就返回true,否则返回false。可以通过设置mapred.input.pathFilter.class来设置用户自定义的PathFilter。

public interface PathFilter {
  /**
   * Tests whether or not the specified abstract pathname should be
   * included in a pathname list.
   *
   * @param  path  The abstract pathname to be tested
   * @return  <code>true</code> if and only if <code>pathname</code>
   *          should be included
   */
  boolean accept(Path path);
}

FileInputFormat类有hiddenFileFilter属性:

private static final PathFilter hiddenFileFilter = new PathFilter() {
	public boolean accept(Path p) {
		String name = p.getName();
		return !name.startsWith("_") && !name.startsWith(".");
	}
};

hiddenFileFilter过滤掉隐藏文件。

FileInputFormat类还有一个内部类:

private static class MultiPathFilter implements PathFilter {
	private List<PathFilter> filters;

	public MultiPathFilter(List<PathFilter> filters) {
		this.filters = filters;
	}

	public boolean accept(Path path) {
		for (PathFilter filter : filters) {
			if (!filter.accept(path)) {
				return false;
			}
		}
		return true;
	}
}

MultiPathFilter类类似于一个PathFilter代理,其内部有一个PathFilter list属性,只有符合其内部所有filter的路径,才被作为输入。在FileInputFormat类中,它被listStatus()方法调用,而listStatus()又被getSplits()方法调用来获取输入文件,也即实现了在获取输入分片前进行文件过滤。

至此,我们已经利用PathFilter过滤了文件,利用FileInputFormat 的getSplits方法,计算出了Mapreduce的Map的InputSplit。作业的输入分片有了,而这些分片,是怎么被Map读取的呢?

这由InputFormat中的另一个方法createRecordReader()来负责。FileInputFormat没有对于这个方法的实现,而是交给子类自行去实现它。

6. RecordReader

RecordReader将读入到Map的数据拆分成<key, value>对。RecordReader也是一个抽象类,下面我们通过源码看一下,RecordReader主要做哪些工作:

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {

	/**
	 * 由一个InputSplit初始化
	 */
	public abstract void initialize(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException;

	/**
	 * 顾名思义,读取分片下一个<key, value>对
	 */
	public abstract boolean nextKeyValue() throws IOException,
			InterruptedException;

	/**
	 * Get the current key
	 */
	public abstract KEYIN getCurrentKey() throws IOException,
			InterruptedException;

	/**
	 * Get the current value.
	 */
	public abstract VALUEIN getCurrentValue() throws IOException,
			InterruptedException;

	/**
	 * 跟踪读取分片的进度
	 */
	public abstract float getProgress() throws IOException,
			InterruptedException;

	/**
	 * Close the record reader.
	 */
	public abstract void close() throws IOException;
}

从源码可以看出,一个RecordReader主要来完成这几项功能。接下来,通过一个具体的RecordReader实现类,来详细了解一下各功能的具体操作。

public class LineRecordReader extends RecordReader<LongWritable, Text> {
	private CompressionCodecFactory compressionCodecs = null;
	private long start;
	private long pos;
	private long end;
	private LineReader in;
	private int maxLineLength;
	private LongWritable key = null;
	private Text value = null;

	// initialize函数即对LineRecordReader的一个初始化
	// 主要是计算分片的始末位置,打开输入流以供读取K-V对,处理分片经过压缩的情况等
	public void initialize(InputSplit genericSplit, TaskAttemptContext context)
			throws IOException {
		FileSplit split = (FileSplit) genericSplit;
		Configuration job = context.getConfiguration();
		this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
				Integer.MAX_VALUE);
		start = split.getStart();
		end = start + split.getLength();
		final Path file = split.getPath();
		compressionCodecs = new CompressionCodecFactory(job);
		final CompressionCodec codec = compressionCodecs.getCodec(file);

		// 打开文件,并定位到分片读取的起始位置
		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn = fs.open(split.getPath());
		boolean skipFirstLine = false;
		if (codec != null) {
			// 文件是压缩文件的话,直接打开文件
			in = new LineReader(codec.createInputStream(fileIn), job);
			end = Long.MAX_VALUE;
		} else {
			//
			if (start != 0) {
				skipFirstLine = true;
				--start;
				// 定位到偏移位置,下次读取就会从便宜位置开始
				fileIn.seek(start);
			}
			in = new LineReader(fileIn, job);
		}
		if (skipFirstLine) { // skip first line and re-establish "start".
			start += in.readLine(new Text(), 0,
					(int) Math.min((long) Integer.MAX_VALUE, end - start));
		}
		this.pos = start;
	}

	public boolean nextKeyValue() throws IOException {
		if (key == null) {
			key = new LongWritable();
		}
		key.set(pos);// key即为偏移量
		if (value == null) {
			value = new Text();
		}
		int newSize = 0;
		while (pos < end) {
			newSize = in.readLine(value, maxLineLength,
					Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
							maxLineLength));
			// 读取的数据长度为0,则说明已读完
			if (newSize == 0) {
				break;
			}
			pos += newSize;
			// 读取的数据长度小于最大行长度,也说明已读取完毕
			if (newSize < maxLineLength) {
				break;
			}
			// 执行到此处,说明该行数据没读完,继续读入
		}
		if (newSize == 0) {
			key = null;
			value = null;
			return false;
		} else {
			return true;
		}
	}
	// 省略了部分方法
}

数据从InputSplit分片中读出已经解决,但是RecordReader是如何被Mapreduce框架利用的呢?我们先看一下Mapper类

7. Mapper

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

	public class Context extends MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
		public Context(Configuration conf, TaskAttemptID taskid,
				RecordReader<KEYIN, VALUEIN> reader,
				RecordWriter<KEYOUT, VALUEOUT> writer,
				OutputCommitter committer, StatusReporter reporter,
				InputSplit split) throws IOException, InterruptedException {
			super(conf, taskid, reader, writer, committer, reporter, split);
		}
	}

	/**
	 * 预处理,仅在map task启动时运行一次
	 */
	protected void setup(Context context) throws IOException,
			InterruptedException {
	}

	/**
	 * 对于InputSplit中的每一对<key, value>都会运行一次
	 */
	@SuppressWarnings("unchecked")
	protected void map(KEYIN key, VALUEIN value, Context context)
			throws IOException, InterruptedException {
		context.write((KEYOUT) key, (VALUEOUT) value);
	}

	/**
	 * 扫尾工作,比如关闭流等
	 */
	protected void cleanup(Context context) throws IOException,
			InterruptedException {
	}

	/**
	 * map task的驱动器
	 */
	public void run(Context context) throws IOException, InterruptedException {
		setup(context);
		while (context.nextKeyValue()) {
			map(context.getCurrentKey(), context.getCurrentValue(), context);
		}
		cleanup(context);
	}
}

重点看一下Mapper.class中的run()方法,它相当于map task的驱动。

  • run()方法首先调用setup()进行初始操作
  • 然后循环对每个从context.nextKeyValue()获取的“K-V对”调用map()函数进行处理
  • 最后调用cleanup()做最后的处理

事实上,content.nextKeyValue()就是使用了相应的RecordReader来获取“K-V对”。Mapper.class中的Context类,它继承自MapContext类,使用一个RecordReader进行构造。下面我们再看这个MapContext。

public class MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends
		TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
	private RecordReader<KEYIN, VALUEIN> reader;
	private InputSplit split;

	public MapContext(Configuration conf, TaskAttemptID taskid,
			RecordReader<KEYIN, VALUEIN> reader,
			RecordWriter<KEYOUT, VALUEOUT> writer, OutputCommitter committer,
			StatusReporter reporter, InputSplit split) {
		super(conf, taskid, writer, committer, reporter);
		this.reader = reader;
		this.split = split;
	}

	/**
	 * Get the input split for this map.
	 */
	public InputSplit getInputSplit() {
		return split;
	}

	@Override
	public KEYIN getCurrentKey() throws IOException, InterruptedException {
		return reader.getCurrentKey();
	}

	@Override
	public VALUEIN getCurrentValue() throws IOException, InterruptedException {
		return reader.getCurrentValue();
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		return reader.nextKeyValue();
	}

}

从MapContent类中的方法可见,content.getCurrentKey(),content.getCurrentValue()以及nextKeyValue(),其实都是对RecordReader方法的封装,即MapContext是直接使用传入的RecordReader来对InputSplit进行“K-V对”读取的。

至此,我们已经清楚的知道Mapreduce的输入文件是如何被过滤、读取、分片、读出“K-V对”,然后交给Mapper类来处理的。

时间: 2024-08-26 09:09:27

hadoop InputFormat详解的相关文章

【转】Hadoop安全模式详解及配置

原文链接 http://www.iteblog.com/archives/977 在<Hadoop 1.x中fsimage和edits合并实现>文章中提到,Hadoop的NameNode在重启的时候,将会进入到安全模式.而在安全模式,HDFS只支持访问元数据的操作才会返回成功,其他的操作诸如创建.删除文件等操作都会导致失败. NameNode在重启的时候,DataNode需要向NameNode发送块的信息,NameNode只有获取到整个文件系统中有99.9%(可以配置的)的块满足最小副本才会自

Hadoop Pipeline详解[摘抄]

最近使用公司内部的一个框架写map  reduce发现没有封装hadoop streaming这些东西,查了下pipeline相关的东西 Hadoop Pipeline详解 20. Aug / hadoop / 1 Comment 一.说明Hadoop 2.x相比较于1.x有了较大的改变,像MapReduce层面架构以及代码基本上是完全重写的,在HDFS层面加入了HA,Federation等特性,代码更加层次化和易读,同时加入的PB初期可能给阅读带来障碍,熟悉之后就没有太大问题了.Pipelin

Hadoop DistributedCache详解

DistributedCache是Hadoop提供的文件缓存工具,它能够自动将指定的文件分发到各个节点上,缓存到本地,供用户程序读取使用.它具有以下几个特点:缓存的文件是只读的,修改这些文件内容没有意义:用户可以调整文件可见范围(比如只能用户自己使用,所有用户都可以使用等),进而防止重复拷贝现象:按需拷贝,文件是通过HDFS作为共享数据中心分发到各节点的,且只发给任务被调度到的节点.本文将介绍DistributedCache在Hadoop 1.0和2.0中的使用方法及实现原理. Hadoop D

Hadoop WordCount详解(二)

Hadoop集群WordCount详解(二) 源代码程序 WordCount处理过程 具体代码讲解 1.源代码程序 package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.examples.WordCount.Token

深入理解Java中的流---结合Hadoop进行详解

在JavaSe的基础课程当中,可以说流是一个非常重要的概念,并且在Hadoop中得到了广泛的应用,本篇博客将围绕流进行深入的详解. (一)JavaSe中流的相关概念 1.流的定义 ①在Java当中,若一个类专门用于数据传输,则这个类称为流 ②流就是程序和设备之间嫁接以来的一根用于数据传输的管道,这个设备可以是本地硬盘,可以是内存条,也可以是网络所关联的另外一台计算机等等,其中不同管道上有不同的按钮,按下不同的按钮相当于调用不同的方法,这根带按钮的用于数据传输的管道就是流,即流就是一根管道 ③流一

Hadoop实战之二~ hadoop作业调度详解(1)

前言 对Hadoop的最感兴趣的地方,也就在于Hadoop的作业调度了,在正式介绍如何搭建Hadoop之前,深入理解一下Hadoop的作业调度很有必要.我们不一定能用得上Hadoop,但是如果理通顺Hadoop的分布式调度原理,在有需要的时候未必不能自己写一个Mini Hadoop~: ) 开始 本文转载自:http://www.cnblogs.com/shipengzhi/articles/2487429.html Map/Reduce是一个用于大规模数据处理的分布式计算模型,它最初是由Goo

hadoop文件系统详解--(1)

Hadoop有一个抽象的文件系统概念,HDFS只是其中的一个实现.Java抽象类 org.apache.hadoop.fs.FileSystem展示了Hadoop的一个文件系统,而且有几个具体实现,如表 3-1所示. 文件系统 URI 方案 Java实 现(全部在 org.apache.hadoop) 描述 Local file fs.LocalFileSystem 针对有客户端校验和 的本地连接磁盘使用 的文件系统.针对没 有校验和的本 地文件系统使用 RawLocalFileSystem.

Hadoop文件系统详解-----(一)

Hadoop有一个抽象的文件系统概念,HDFS只是其中的一个实现.Java抽象类 org.apache.hadoop.fs.FileSystem展示了Hadoop的一个文件系统,而且有几个具体实现,如表 3-1所示. 文件系统 URI  方案 Java实 现(全部在  org.apache.hadoop) 描述 Local file fs.LocalFileSystem 针对有客户端校验和 的本地连接磁盘使用 的文件系统.针对没 有校验和的本 地文件系统使用 RawLocalFileSystem

Hadoop安装详解及安装错误的解决方案

特此声明:本内容所有详细内容一下面提供的附件形式出现,了解详细内容可查看附件 从4月28号搭建hadoop平台,到今天已经足足5天了,不断地改配置,敲命令,可以说真是人生一大快事吗?好了废话不多说,正式进入我的安装过程: 目录: 第一篇:Ubuntu13.1安装 第二篇:Sublime Text2安装 第三篇:hadoop+jdk1.8.0安装+ssh无密码登录 第四篇:出现的问题及解决办法(见另一篇博客) 正文: 第一篇: 我用的是Ubuntu13.1,大家可以到我的云盘下载,下面是链接: h