在Hadoop中重写FileInputFormat类以处理二进制格式存储的整数

最近开始使用MapReduce,发现网上大部分例子都是对文本数据进行处理的,也就是说在读取输入数据时直接使用默认的TextInputFormat进行处理即可。对于文本数据处理,这个类还是能满足一部分应用场景。但是如果要处理以二进制形式结构化记录存储的文件时,这些类就不再适合了。

本文以一个简单的应用场景为例:对按照二进制格式存储的整数做频数统计。当然,也可以在此基础上实现排序之类的其他应用。实现该应用的主要难点就是如何处理输入数据。参考《权威指南·第三版》得知需要继承FileInputFormat这个类,并实现以下三个方法:

class MyInputFormat extends FileInputFormat<Type1, Type2> {
	/*
	 * 查询判断当前文件是否可以分块?"true"为可以分块,"false"表示不进行分块
	 */
	protected boolean isSplitable(Configuration conf, Path path) {

	}

	/*
	 * MapReduce的客户端调用此方法得到所有的分块,然后将分块发送给MapReduce服务端。
	 * 注意,分块中不包含实际的信息,而只是对实际信息的分块信息。具体的说,每个分块中
	 * 包含当前分块对应的文件路径,当前分块在该文件中起始位置,当前分块的长度以及对应的
	 * 实际数据所在的机器列表。在实现这个函数时,将这些信息填上即可。
	 * */
	public List<InputSplit> getSplits(Configuration conf) throws IOException {
	}

	/*
	 * 类RecordReader是用来创建传给map函数的Key-Value序列,传给此类的参数有两个:一个分块(split)和作业的配置信息(context).
	 * 在Mapper的run函数中可以看到MapReduce框架执行Map的逻辑:
	 * public void run(Context context) throws IOException, InterruptedException {
	 * 		setup(context);
	 * 		调用RecordReader方法的nextKeyValue,生成新的键值对。如果当前分块(Split)中已经处理完毕了,则nextKeyValue会返回false.退出run函数
	 *		while (context.nextKeyValue()) {
	 *			map(context.getCurrentKey(), context.getCurrentValue(), context);
	 *		}
	 *		cleanup(context);
	 * }
	 **/
	public RecordReader<LongWritable, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
	}
}

在RecordReader函数中实现以下几个接口:

public class BinRecordReader extends RecordReader<LongWritable, IntWritable> {
	/*关闭文件流
	 * */
	public void close() {}

	/*
	 * 获取处理进度
	 **/
	public float getProgress() {}

	/*
	 * 获取当前的Key
	 * */
	public LongWritable getCurrentKey() throws IOException,
	InterruptedException {}

	/* 获取当前的Value
	 * */
	public IntWritable getCurrentValue() throws IOException,InterruptedException {}

	/*
	 * 进行初始化工作,打开文件流,根据分块信息设置起始位置和长度等等
	 * */
	public void initialize(InputSplit inputSplit, TaskAttemptContext context)
			throws IOException, InterruptedException {}

	/*生成下一个键值对
	 **/
	public boolean nextKeyValue() throws IOException, InterruptedException {
	}
}

以下为是三个文件的代码,首先是BinInputFormat.java的代码:

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.examples.BinRecordReader;

class BinInputFormat extends FileInputFormat<LongWritable, IntWritable> {

	private static final double SPLIT_SLOP=1.1;

	/*
	 * 查询判断当前文件是否可以分块?"true"为可以分块,"false"表示不进行分块
	 */
	protected boolean isSplitable(Configuration conf, Path path) {
		return true;
	}

	/*
	 * MapReduce的客户端调用此方法得到所有的分块,然后将分块发送给MapReduce服务端。
	 * 注意,分块中不包含实际的信息,而只是对实际信息的分块信息。具体的说,每个分块中
	 * 包含当前分块对应的文件路径,当前分块在该文件中起始位置,当前分块的长度以及对应的
	 * 实际数据所在的机器列表。在实现这个函数时,将这些信息填上即可。
	 * */
	public List<InputSplit> getSplits(Configuration conf) throws IOException {
		List<InputSplit> splits = new ArrayList<InputSplit>();
		long minSplitSize = conf.getLong("mapred.min.split.size",1);
		long maxSplitSize = conf.getLong("mapred.max.split.size", 1);
		long blockSize = conf.getLong("dfs.block.size",1);
		long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
		FileSystem fs = FileSystem.get(conf);
		String path = conf.get(INPUT_DIR);
		FileStatus[] files = fs.listStatus(new Path(path));

		for (int fileIndex = 0; fileIndex < files.length; fileIndex++) {
			FileStatus file = files[fileIndex];
			System.out.println("input file: " + file.getPath().toString());
			long length = file.getLen();
			FileSystem fsin = file.getPath().getFileSystem(conf);
		    BlockLocation[] blkLocations = fsin.getFileBlockLocations(file, 0, length);
		    if ((length != 0) && isSplitable(conf, file.getPath())) {
		        long bytesRemaining = length;
		        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
		          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
		          splits.add(new FileSplit(file.getPath(), length-bytesRemaining, splitSize,
		                                   blkLocations[blkIndex].getHosts()));
		          bytesRemaining -= splitSize;
		        }

		        if (bytesRemaining != 0) {
		          splits.add(new FileSplit(file.getPath(), length-bytesRemaining, bytesRemaining,
		                     blkLocations[blkLocations.length-1].getHosts()));
		        }
		      } else if (length != 0) {
		        splits.add(new FileSplit(file.getPath(), 0, length, blkLocations[0].getHosts()));
		      } else {
		        //Create empty hosts array for zero length files
		        splits.add(new FileSplit(file.getPath(), 0, length, new String[0]));
		      }
		}
		return splits;
	}

	/*
	 * 类RecordReader是用来创建传给map函数的Key-Value序列,传给此类的参数有两个:一个分块(split)和作业的配置信息(context).
	 * 在Mapper的run函数中可以看到MapReduce框架执行Map的逻辑:
	 * public void run(Context context) throws IOException, InterruptedException {
	 * 		setup(context);
	 * 		调用RecordReader方法的nextKeyValue,生成新的键值对。如果当前分块(Split)中已经处理完毕了,则nextKeyValue会返回false.退出run函数
	 *		while (context.nextKeyValue()) {
	 *			map(context.getCurrentKey(), context.getCurrentValue(), context);
	 *		}
	 *		cleanup(context);
	 * }
	 **/
	public RecordReader<LongWritable, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		BinRecordReader reader = new BinRecordReader();
		reader.initialize(split,context);
		return reader;
	}
}

以下为BinRecordReader.java的代码:

package org.apache.hadoop.examples;

import java.io.IOException;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordReader;

/**
 * Return a single record (filename, "") where the filename is taken from
 * the file split.
 */
public class BinRecordReader extends RecordReader<LongWritable, IntWritable> {
	private FSDataInputStream inputStream = null;
	private long start,end,pos;
	private Configuration conf = null;
	private FileSplit fileSplit = null;
	private LongWritable key = new LongWritable();
	private IntWritable value = new IntWritable();
	private boolean processed = false;
	public BinRecordReader() throws IOException {
	}

	/*关闭文件流
	 * */
	public void close() {
		try {
			if(inputStream != null)
				inputStream.close();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	/*
	 * 获取处理进度
	 **/
	public float getProgress() {
		return ((processed == true)? 1.0f : 0.0f);
	}

	/*
	 * 获取当前的Key
	 * */
	public LongWritable getCurrentKey() throws IOException,
	InterruptedException {
		// TODO Auto-generated method stub
		return key;
	}

	/* 获取当前的Value
	 * */
	public IntWritable getCurrentValue() throws IOException,InterruptedException {
		// TODO Auto-generated method stub
		return value;
	}

	/*
	 * 进行初始化工作,打开文件流,根据分块信息设置起始位置和长度等等
	 * */
	public void initialize(InputSplit inputSplit, TaskAttemptContext context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		fileSplit = (FileSplit)inputSplit;
		conf = context.getConfiguration();

		this.start = fileSplit.getStart();
		this.end = this.start + fileSplit.getLength();

		try{
			Path path = fileSplit.getPath();
			FileSystem fs = path.getFileSystem(conf);
			this.inputStream = fs.open(path);
			inputStream.seek(this.start);
			this.pos = this.start;
		}	catch(IOException e)	{
			e.printStackTrace();
		}
	}

	/*生成下一个键值对
	 **/
	public boolean nextKeyValue() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		if(this.pos < this.end) {
			key.set(this.pos);
			value.set(Integer.reverseBytes(inputStream.readInt()));
			this.pos = inputStream.getPos();
			return true;
		} else {
			processed = true;
			return false;
		}
	}
}

以下是主文件BinCount.java的代码

package org.apache.hadoop.examples;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.examples.BinInputFormat;

public class IntCount {
	public static class TokenizerMapper extends Mapper<LongWritable, IntWritable, Text, IntWritable>{

		private final static IntWritable one = new IntWritable(1);
		private Text intNum = new Text();                             

		public void map(LongWritable key, IntWritable value, Context context
				) throws IOException, InterruptedException {
			intNum.set(Integer.toString(value.get()));
			context.write(intNum, one);
		}
	}

	public static class IntSumReducer
	extends Reducer<Text,IntWritable,Text,IntWritable> {
		private IntWritable result = new IntWritable();              

		public void reduce(Text key, Iterable<IntWritable> values,
				Context context
				) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();                                         

			}
			result.set(sum);
			context.write(key, result);
		}
	}

	public static void main(String[] args) throws Exception {
		System.out.println("testing1");
		Configuration conf = new Configuration();
		String[] newArgs = new String[]{"hdfs://localhost:9000/read","hdfs://localhost:9000/data/wc_output21"};
		String[] otherArgs = new GenericOptionsParser(conf, newArgs).getRemainingArgs();
		if (otherArgs.length != 2) {
			System.err.println("Usage: wordcount <in> <out>");
			System.exit(2);
		}
		Job job = new Job(conf, "IntCount");
		job.setJarByClass(IntCount.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		//设置自定义的输入类
		job.setInputFormatClass(BinInputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

接着我们用一段C语言生成二进制格式存储的文件,C语言代码如下:

#include<stdio.h>
int main(){
	FILE * fp = fopen("tmpfile","wb");
	int i,j;
	for(i=0;i<10;i++) {
		for(j=0;j<10;j++)
			fwrite(&j,sizeof(int),1,fp);
	}
	fclose(fp);
	return 0;
}

将生成的文件拷贝到/read/下,接着启动IntCount这个MapReduce程序,打开运行结果:

在Hadoop中重写FileInputFormat类以处理二进制格式存储的整数

时间: 2024-11-07 06:39:21

在Hadoop中重写FileInputFormat类以处理二进制格式存储的整数的相关文章

hadoop中典型Writable类详解

本文地址:http://www.cnblogs.com/archimedes/p/hadoop-writable.html,转载请注明源地址. Hadoop将很多Writable类归入org.apache.hadoop.io包中,在这些类中,比较重要的有Java基本类.Text.Writable集合.ObjectWritable等,重点介绍Java基本类和ObjectWritable的实现. 1. Java基本类型的Writable封装 目前Java基本类型对应的Writable封装如下表所示.

Hadoop Mapreduce 中的FileInputFormat类的文件切分算法和host选择算法

文件切分算法 文件切分算法主要用于确定InputSplit的个数以及每个InputSplit对应的数据段. FileInputFormat以文件为单位切分成InputSplit.对于每个文件,由以下三个属性值确定其对应的InputSplit的个数. goalSize:根据用户期望的InputSplit数据计算,即totalSize/numSplit.totalSize为文件总大小:numSplit为用户设定的Map Task个数,默认情况下是1. minSize:InputSplit的最小值,由

hadoop中Text类 与 java中String类的区别

hadoop 中 的Text类与java中的String类感觉上用法是相似的,但两者在编码格式和访问方式上还是有些差别的,要说明这个问题,首先得了解几个概念: 字符集: 是一个系统支持的所有抽象字符的集合.字符是各种文字和符号的总称,包括各国家文字.标点符号.图形符号.数字等.例如 unicode就是一个字符集,它的目标是涵盖世界上所有国家的文字和符号: 字符编码:是一套法则,使用该法则能够对自然语言的字符的一个集合(如字母表或音节表),与其他东西的一个集合(如号码或电脉冲)进行配对.即在符号集

hibernate实体继承问题(派生类重写基类某数学系的get方法)

目标:hibernate的所有实体类的主键均继承一个基类IdEntity,基类如下:  /**  * 统一定义id的entity基类.  * @author MingDao  */ // JPA基类标识 @MappedSuperclass public abstract class IdEntity {  protected Long id;  @Id  @GeneratedValue(strategy = GenerationType.IDENTITY)  public Long getId(

Hadoop中作业(job)、任务(task)和task attempt

hadoop中,MapReduce作业(job)ID的格式为job_201412081211_0002.这表示该作业是第二个作业(作业号从0001开始),作业开始于2014年12月8号12:11. 任务(task)属于作业,通过使用"task"替换作业ID的"job"前缀,然后在后面加上一个后缀表示哪个作业中间的任务.例如:task_201412081211_0002_m_000003,表示该任务属于job_201412081211_0002作业的第三个map任务(

Hadoop中Writable类之四

1.定制Writable类型 Hadoop中有一套Writable实现,例如:IntWritable.Text等,但是,有时候可能并不能满足自己的需求,这个时候,就需要自己定制Writable类型. 定制分以下几步: 需要实现WritableComparable接口,因为Writable常常作为健值对出现,而在MapReduce中,中间有个排序很重要,因此,Hadoop中就让Writable实现了WritableComparable 需要实现WritableComparable的write().

hadoop中Configuration类剖析

Configuration是hadoop中五大组件的公用类,所以放在了core下,org.apache.hadoop.conf.Configruration.这个类是作业的配置信息类,任何作用的配置信息必须通过Configuration传递,因为通过Configuration可以实现在多个mapper和多个reducer任务之间共享信息. 类图 说明:Configuration实现了Iterable和Writable两个接口,其中实现Iterable是为了迭代,迭代出Configuration对

【转载】 C++多继承中重写不同基类中相同原型的虚函数

本篇随笔为转载,原文地址:C++多继承中重写不同基类中相同原型的虚函数. 在C++多继承体系当中,在派生类中可以重写不同基类中的虚函数.下面就是一个例子: class CBaseA { public: virtual void TestA(); }; class CBaseB { public: virtual void TestB(); }; class CDerived : public CBaseA, public CBaseB { public: virtual void TestA()

Hadoop中Writable类之二

1.ASCII.Unicode.UFT-8 在看Text类型的时候,里面出现了上面三种编码,先看看这三种编码: ASCII是基于拉丁字母的一套电脑编码系统.它主要用于显示现代英语和其他西欧语言.它是现今最通用的单字节编码系统,并等同于国际标准ISO/IEC 646.ASCII是7位字符集,是美国标准信息交换代码的缩写,为美国英语通信所设计.它由128个字符组成,包括大小写字母.数字0-9.标点符号.非打印字符(换行副.制表符等4个)以及控制字符(退格.响铃等)组成.从定义,很明显,单字节编码,现