继承FileInputFormat类来理解 FileInputFormat类

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
//import org.apache.hadoop.mapreduce.lib.input.FileInputFormat.MultiPathFilter;
import org.apache.hadoop.mapreduce.security.TokenCache;

import com.google.common.base.Charsets;

public class MyFileinput extends FileInputFormat<LongWritable, Text> {

	private static final PathFilter hiddenFileFilter = new PathFilter() {
		public boolean accept(Path p) {
			String name = p.getName();
			return ((!(name.startsWith("_"))) && (!(name.startsWith("."))));
		}
	};

	// 遍历文件列表, 过滤掉_ .开头的文件(可以自定义过滤)
	protected List<FileStatus> listStatus(JobContext job) throws IOException {
		System.out.println("*********************");
		List result = new ArrayList();
		Path[] dirs = getInputPaths(job);
		System.out.println("dirs" + dirs);
		System.out.println("dirs length = " + dirs.length);
		for(Path p: dirs){
			System.out.println("Path loop " + p);
		}

		if (dirs.length == 0) {
			throw new IOException("No input paths specified in job");
		}

		TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
				job.getConfiguration());

		List errors = new ArrayList();

		List filters = new ArrayList();
		filters.add(hiddenFileFilter);
		PathFilter jobFilter = getInputPathFilter(job);
		if (jobFilter != null) {
			filters.add(jobFilter);
		}

		// 过滤函数,可以拓展
		PathFilter inputFilter = new MultiPathFilter(filters);

		for (int i = 0; i < dirs.length; ++i) {
			Path p = dirs[i];
			FileSystem fs = p.getFileSystem(job.getConfiguration());
			FileStatus[] matches = fs.globStatus(p, inputFilter);
			System.out.println("matches=" + matches);
			for(FileStatus match: matches){
				System.out.println("loop matches" + match.getPath());
			}

			if (matches == null)
				errors.add(new IOException("Input path does not exist: " + p));
			else if (matches.length == 0)
				errors.add(new IOException("Input Pattern " + p
						+ " matches 0 files"));
			else {
				for (FileStatus globStat : matches) {
					System.out.println("globStat " + globStat);
					if (globStat.isDirectory())
						for (FileStatus stat : fs.listStatus(
								globStat.getPath(), inputFilter)) {
							result.add(stat);
						}
					else {
						result.add(globStat);
					}
				}
			}
		}

		if (!(errors.isEmpty())) {
			throw new InvalidInputException(errors);
		}
		// LOG.info("Total input paths to process : " + result.size());
		return result;
	}

	// 计算分片大小,返回一个分片列表
	public List<InputSplit> getSplits(JobContext job) throws IOException {
		long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
		long maxSize = getMaxSplitSize(job);

		System.out.print("minSize " + minSize);
		System.out.print("maxSize " + maxSize);

		List splits = new ArrayList();
		// 获取输入目录下的文件列表(过滤文件)
		List<FileStatus> files = listStatus(job);
		for (FileStatus file : files) {
			Path path = file.getPath();
			long length = file.getLen();
			System.out.println("path: " + path+ " file len = " + length);
			if (length != 0L) {
				// 通过路径找到块列表
				FileSystem fs = path.getFileSystem(job.getConfiguration());
				BlockLocation[] blkLocations = fs.getFileBlockLocations(file,
						0L, length);

				if (isSplitable(job, path)) {
					long blockSize = file.getBlockSize();
					System.out.println("blockSize:" + blockSize);
					long splitSize = computeSplitSize(blockSize, minSize,
							maxSize);
					System.out.println("splitSize :" + splitSize);

					long bytesRemaining = length;
					System.out.println("bytesRemaining :" + bytesRemaining);

					System.out.println(bytesRemaining / splitSize);
					// 定义为1.1D, 为避免一个分片过小, 也需要启动一个MAP来运行
					// 最后剩余的文件大小只要不超过分片大小的1.1倍都会放入一个分片
					while (bytesRemaining / splitSize > 1.1D) {
						int blkIndex = getBlockIndex(blkLocations, length
								- bytesRemaining);
						System.out.println("blkIndex :" + blkIndex);

						// 添加到分片分片列表
						splits.add(makeSplit(path, length - bytesRemaining,
								splitSize, blkLocations[blkIndex].getHosts()));

						bytesRemaining -= splitSize;
					}

					// 文件尾
					if (bytesRemaining != 0L) {
						Long remain = length - bytesRemaining;
						System.out.println("文件尾大小" + bytesRemaining);
						int blkIndex = getBlockIndex(blkLocations, length
								- bytesRemaining);
						splits.add(makeSplit(path, length - bytesRemaining,
								bytesRemaining,
								blkLocations[blkIndex].getHosts()));
					}
				} else {
					splits.add(makeSplit(path, 0L, length,
							blkLocations[0].getHosts()));
				}
			} else {
				// 测试文件大小为0, 也会启动一个map
				splits.add(makeSplit(path, 0L, length, new String[0]));
			}
		}

		job.getConfiguration().setLong(
				"mapreduce.input.fileinputformat.numinputfiles", files.size());
		// LOG.debug("Total # of splits: " + splits.size());
		return splits;
	}

	private static class MultiPathFilter implements PathFilter {
		private List<PathFilter> filters;

		public MultiPathFilter(List<PathFilter> filters) {
			this.filters = filters;
		}

		public boolean accept(Path path) {
			for (PathFilter filter : this.filters) {
				if (!(filter.accept(path))) {
					return false;
				}
			}
			return true;
		}
	}

	// 文件内容读取, 默认按行读取
	@Override
	public RecordReader<LongWritable, Text> createRecordReader(
			InputSplit split, TaskAttemptContext context) {
		String delimiter = context.getConfiguration().get(
				"textinputformat.record.delimiter");

		System.out.println("delimiter ==" + delimiter);
		// 默认为空
		byte[] recordDelimiterBytes = null;
		if (null != delimiter)
			recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);

		return new LineRecordReader(recordDelimiterBytes);
	}
}

主要功能是计算分片和按照分片给MAP任务读取内容

public abstract class InputFormat<K, V> {
    public abstract List<InputSplit> getSplits(JobContext paramJobContext)
            throws IOException, InterruptedException;

public abstract RecordReader<K, V> createRecordReader(
            InputSplit paramInputSplit,
            TaskAttemptContext paramTaskAttemptContext) throws IOException,
            InterruptedException;
}

从顶层的派生类提供的接口差不多也能看出来。

继承FileInputFormat类来理解 FileInputFormat类

时间: 2024-11-08 06:56:15

继承FileInputFormat类来理解 FileInputFormat类的相关文章

Python学习之旅—面向对象进阶知识:类的命名空间,类的组合与继承

前言 上篇博客笔者带领大家初步梳理了Python面向对象的基础知识,本篇博客将专注于解决三个知识点:类的命名空间,类的组合以及面向对象的三大特性之一继承,一起跟随笔者老看看今天的内容吧. 1.类的命名空间 在上一篇博客中,我们提到过对象可以动态添加属性,一起来回忆下昨天的知识点,看如下的代码: class A: pass a = A() a.name = 'alex' print(a.name) 这里我们手动为a对象添加了一个属性name,然后直接打印可以得到a对象的名称.通过这个例子,我们可以

JavaScript es6 class类的理解。

在本篇文章我将会把我对JavaScript  es6新特性class类的理解.本着互联网的分享精神,我就将我自己的理解分享给大家. 使用es写一个类(构造函数) 在es5中大家一般都这么写一个类(构造函数) 另外需要注意,class类不会被提升. // 创建一个User构造函数 function User(name, age) { this.name = name; this.age = age; } // User构造函数的静态方法. User.getClassName = function

Python-深入理解元类(metaclass)

1.使用 type 动态创建类(type 是一个类, 用来创建类对象的元类, 所以也可以继承) type("Person", (), {"name": "John"}) 2.元类 Python 中类也是对象, 元类就是创建这些类对象的类, 可以理解为 MyClass = MetaClass() MyObject = MyClass() 3.type实际上是一个元类, type就是Python在背后用来创建所有类的元类, 类似 str 是创建字符串

从Qt谈到C++(二):继承时的含参基类与初始化列表

提出疑问 当我们新建一个Qt的图形界面的工程时,我们可以看看它自动生成的框架代码,比如我们的主窗口名称为MainWindow,我们来看看mainwindow.cpp文件: MainWindow::MainWindow(QWidget *parent) : QMainWindow(parent), ui(new Ui::MainWindow) { } 不同于一般的继承,这里的父类的括号里带有参数,我们通常都使用过不带参数,甚至不带括号的基类名称.这里的基类为什么带着参数呢? C++继承与构造函数

深刻理解Python中的元类(metaclass)以及元类实现单例模式

深刻理解Python中的元类(metaclass)以及元类实现单例模式 在看一些框架源代码的过程中碰到很多元类的实例,看起来很吃力很晦涩:在看python cookbook中关于元类创建单例模式的那一节有些疑惑.因此花了几天时间研究下元类这个概念.通过学习元类,我对python的面向对象有了更加深入的了解.这里将一篇写的非常好的文章基本照搬过来吧,这是一篇在Stack overflow上很热的帖子,我看http://blog.jobbole.com/21351/这篇博客对其进行了翻译. 一.理解

第十一周 项目3 - 点类派生直线类】定义点类Point,并以点类为基类,继承关系

项目3 - 点类派生直线类]定义点类Point,并以点类为基类,派生出直线类Line,从基类中继承的点的信息表示直线的中点.请阅读下面的代码,并将缺少的部分写出来. [cpp] view plaincopyprint? #include<iostream> #include<Cmath> using namespace std; class Point //定义坐标点类 { public: Point():x(0),y(0) {}; Point(double x0, double 

[深入理解Android卷一全文-第五章]深入理解常见类

由于<深入理解Android 卷一>和<深入理解Android卷二>不再出版,而知识的传播不应该因为纸质媒介的问题而中断,所以我将在OSC博客中全文转发这两本书的全部内容. 第5章 深入理解常见类 本章主要内容 ·  分析RefBase.sp,wp和LightRefBase类. ·  分析Native的Thread类和常用同步类. ·  分析Java层的Handler.Looper,以及HandlerThread类. 本章涉及的源代码文件名称及位置 下面是我们本章分析的源码文件名和

C#继承机制 访问与隐藏基类成员

(1) 访问基类成员 通过base 关键字访问基类的成员:   调用基类上已被其他方法重写的方法.  指定创建派生类实例时应调用的基类构造函数.  基类访问只能在构造函数.实例方法或实例属性访问器中进行. 从静态方法中使用 base 关键字是错误的. 示例:下面程序中基类 Person 和派生类 Employee 都有一个名为 Getinfo 的方法.通过使用 base 关键字,可以从派生类中调用基类上的 Getinfo 方法. using System ;public class Person

Python——五分钟理解元类(metaclasses)

“元类的魔幻变化比 99% 的用户所担心的更多,当你搞不懂是否真的需要用它的时候,就是不需要.” —Tim Peters 本文源于在 PyCon UK 2008 上的一个快速演讲. 元类被称为 Python 中的“深奥的巫术”.尽管你需要用到它的地方极少(除非你基于zope 编程),可事实上它的基础理论其实令人惊讶地易懂. 一切皆对象 一切皆对象 一切都有类型 “class”和“type”之间本质上并无不同 类也是对象 它们的类型是 type 以前,术语 type 用于内置类型,而术语 clas