hadoop编程小技巧(5)---自定义输入文件格式类InputFormat

Hadoop代码测试环境:Hadoop2.4

应用:在对数据需要进行一定条件的过滤和简单处理的时候可以使用自定义输入文件格式类。

Hadoop内置的输入文件格式类有:

1)FileInputFormat<K,V>这个是基本的父类,我们自定义就直接使用它作为父类;

2)TextInputFormat<LongWritable,Text>这个是默认的数据格式类,我们一般编程,如果没有特别指定的话,一般都使用的是这个;key代表当前行数据距离文件开始的距离,value代码当前行字符串;

3)SequenceFileInputFormat<K,V>这个是序列文件输入格式,使用序列文件可以提高效率,但是不利于查看结果,建议在过程中使用序列文件,最后展示可以使用可视化输出;

4)KeyValueTextInputFormat<Text,Text>这个是读取以Tab(也即是\t)分隔的数据,每行数据如果以\t分隔,那么使用这个读入,就可以自动把\t前面的当做key,后面的当做value;

5)CombineFileInputFormat<K,V>合并大量小数据是使用;

6)MultipleInputs,多种输入,可以为每个输入指定逻辑处理的Mapper;

原理:

InputFormat接口有两个重要的函数:

1)getInputSplits,用于确定输入分片,当我们继承FileInputFormat时,就可以忽略此函数,而使用FileInputFormat的此函数即可;

2)createRecordReader ,针对数据如何读取的类,定义输入文件格式,其实也就是定义此类;

在每个map函数中,最开始调用的都是nextKeyValue()函数,这个函数就是在RecordReader中定义的(我们自定义RecordReader就是使用不同的实现而已),所以这里会调用我们指定的RecordReader中的nextKeyValue函数。这个函数就会处理或者说是初始化key和value,然后返回true,告知已经处理好了。接着就会调用getCurrentKey 和getCurrentValue获取当前的key和value值。最后,返回map,继续执行map逻辑。

自定义输入文件格式类:

package fz.inputformat;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
 * 自定义输入文件读取类
 *
 * @author fansy
 *
 */
public class CustomInputFormat extends FileInputFormat<Text, Text> {

	@Override
	public RecordReader<Text, Text> createRecordReader(InputSplit split,
			TaskAttemptContext context) throws IOException,
			InterruptedException {
		// TODO Auto-generated method stub
		return new CustomReader();
	}
}

这里看到如果继承了FileInputFormat 后,就不需要关心getInputSplits了,而只需要定义RecordReader即可。

自定义RecordReader

package fz.inputformat;

//import java.io.BufferedReader;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

public  class CustomReader extends RecordReader<Text ,Text>{
//	private BufferedReader in;
	private LineReader lr ;
	private Text key = new Text();
	private Text value = new Text();
	private long start ;
	private long end;
	private long currentPos;
	private Text line = new Text();
	@Override
	public void initialize(InputSplit inputSplit, TaskAttemptContext cxt)
			throws IOException, InterruptedException {
		FileSplit split =(FileSplit) inputSplit;
		Configuration conf = cxt.getConfiguration();
		Path path = split.getPath();
		FileSystem fs = path.getFileSystem(conf);
		FSDataInputStream is = fs.open(path);
		lr = new LineReader(is,conf);

		// 处理起始点和终止点
		start =split.getStart();
		end = start + split.getLength();
		is.seek(start);
		if(start!=0){
			start += lr.readLine(new Text(),0,
					(int)Math.min(Integer.MAX_VALUE, end-start));
		}
		currentPos = start;
	}

	// 针对每行数据进行处理
	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		if(currentPos > end){
			return false;
		}
		currentPos += lr.readLine(line);
		if(line.getLength()==0){
			return false;
		}
		if(line.toString().startsWith("ignore")){
			currentPos += lr.readLine(line);
		}

		String [] words = line.toString().split(",");
		// 异常处理
		if(words.length<2){
			System.err.println("line:"+line.toString()+".");
			return false;
		}
		key.set(words[0]);
		value.set(words[1]);
		return true;

	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		return key;
	}

	@Override
	public Text getCurrentValue() throws IOException, InterruptedException {
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		if (start == end) {
            return 0.0f;
        } else {
            return Math.min(1.0f, (currentPos - start) / (float) (end - start));
        }
	}

	@Override
	public void close() throws IOException {
		// TODO Auto-generated method stub
		lr.close();
	}

}

这里主要是两个函数,initial和nextKeyValue。

initial主要用于初始化,包括打开和读取文件,定义读取的进度等;

nextKeyValue则是针对每行数据(由于这里使用的是LineReader,所以每次读取的是一行,这里定义不同的读取方式,可以读取不同的内容),产生对应的key和value对,如果没有报错,则返回true。这里可以看到设置了一条规则,如果输入数据是以ignore开始的话就忽略,同时每行只取得逗号前后的数据分别作为key和value。

实战:

输入数据:

ignore,2
a,3
ignore,4
c,1
c,2,3,2
4,3,2
ignore,34,2

定义主类,主类的Mapper是默认的Mapper,没有reducer。

package fz.inputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FileInputFormatDriver extends Configured implements Tool{

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		ToolRunner.run(new Configuration(), new FileInputFormatDriver(),args);
	}

	@Override
	public int run(String[] arg0) throws Exception {
		if(arg0.length!=2){
			System.err.println("Usage:\nfz.inputformat.FileInputFormatDriver <in> <out>");
			return -1;
		}
		Configuration conf = getConf();

		Path in = new Path(arg0[0]);
		Path out= new Path(arg0[1]);
		out.getFileSystem(conf).delete(out, true);

		Job job = Job.getInstance(conf,"fileintputformat test job");
		job.setJarByClass(getClass());

		job.setInputFormatClass(CustomInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		job.setMapperClass(Mapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
//		job.setOutputKeyClass(LongWritable.class);
//		job.setOutputValueClass(VectorWritable.class);
		job.setNumReduceTasks(0);
//		System.out.println(job.getConfiguration().get("mapreduce.job.reduces"));
//		System.out.println(conf.get("mapreduce.job.reduces"));
		FileInputFormat.setInputPaths(job, in);
		FileOutputFormat.setOutputPath(job, out);

		return job.waitForCompletion(true)?0:-1;
	}

}

查看输出:

这里可以看到,ignore的数据已经被忽略掉了,同时每行只输出了逗号前后的数据而已。

同时需要注意到:

这里有一行数据读入的是空字符串,这个暂时还没找到原因。

总结:自定义输入数据格式可以针对不同的数据做些过滤,进行一些简单的逻辑处理,有点类似map的功能,但是如果仅仅是这点功能的话,那完全可以使用map来取代了。其实输入数据格式还有其他的功能,比如合并大量的小数据,以提高效率,这个在下篇再说。

分享,成长,快乐

转载请注明blog地址:http://blog.csdn.net/fansy1990

hadoop编程小技巧(5)---自定义输入文件格式类InputFormat

时间: 2024-10-15 03:37:30

hadoop编程小技巧(5)---自定义输入文件格式类InputFormat的相关文章

hadoop编程小技巧(6)---处理大量小数据文件CombineFileInputFormat应用

代码测试环境:Hadoop2.4 应用场景:当需要处理很多小数据文件的时候,可以应用此技巧来达到高效处理数据的目的. 原理:应用CombineFileInputFormat,可以把多个小数据文件在进行分片的时候合并.由于每个分片会产生一个Mapper,当一个Mapper处理的数据比较小的时候,其效率较低.而一般使用Hadoop处理数据时,即默认方式,会把一个输入数据文件当做一个分片,这样当输入文件较小时就会出现效率低下的情况. 实例: 参考前篇blog:hadoop编程小技巧(5)---自定义输

hadoop编程小技巧(7)---自定义输出文件格式以及输出到不同目录

代码测试环境:Hadoop2.4 应用场景:当需要定制输出数据格式时可以采用此技巧,包括定制输出数据的展现形式,输出路径,输出文件名称等. Hadoop内置的输出文件格式有: 1)FileOutputFormat<K,V>  常用的父类: 2)TextOutputFormat<K,V> 默认输出字符串输出格式: 3)SequenceFileOutputFormat<K,V> 序列化文件输出: 4)MultipleOutputs<K,V> 可以把输出数据输送到

hadoop编程小技巧(3)---自定义分区类Partitioner

Hadoop代码测试环境:Hadoop2.4 原理:在Hadoop的MapReduce过程中,Mapper读取处理完成数据后,会把数据发送到Partitioner,由Partitioner来决定每条记录应该送往哪个reducer节点,默认使用的是HashPartitioner,其核心代码如下: /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K2 key, V2 value, int numRe

hadoop编程小技巧(4)---全局key排序类TotalOrderPartitioner

Hadoop代码测试版本:Hadoop2.4 原理:在进行MR程序之前对输入数据进行随机提取样本,把样本排序,然后在MR的中间过程Partition的时候使用这个样本排序的值进行分组数据,这样就可以达到全局排序的目的了. 难点:如果使用Hadoop提供的方法来实现全局排序,那么要求Mapper的输入.输出的key不变才可以,因为在源码InputSampler中提供的随机抽取的数据是输入数据最原始的key,如下代码(line:225): for (int i = 0; i < splitsToSa

hadoop编程小技巧(2)---计数器Counter

Hadoop代码测试版本:2.4 应用场景:在Hadoop编程的时候,有时我们在进行我们算法逻辑的时候想附带了解下数据的一些特性,比如全部数据的记录数有多少,map的输出有多少等等信息(这些是在算法运行完毕后,直接有的),就可以使用计数器Counter. 如果是针对很特定的数据的一些统计,比如统计以1开头的所有记录数等等信息,这时就需要自定义Counter.自定义Counter有两种方式,第一种,定义枚举类型,类似: public enum MyCounters{ ALL_RECORDS,ONE

hadoop编程小技巧(9)---二次排序(值排序)

代码测试环境:Hadoop2.4 应用场景:在Reducer端一般是key排序,而没有value排序,如果想对value进行排序,则可以使用此技巧. 应用实例描述: 比如针对下面的数据: a,5 b,7 c,2 c,9 a,3 a,1 b,10 b,3 c,1 如果使用一般的MR的话,其输出可能是这样的: a 1 a 3 a 5 b 3 b 10 b 7 c 1 c 9 c 2 从数据中可以看到其键是排序的,但是其值不是.通过此篇介绍的技巧可以做到下面的输出: a 1 a 3 a 5 b 3 b

hadoop编程小技巧(1)---map端聚合

测试hadoop版本:2.4 Map端聚合的应用场景:当我们只关心所有数据中的部分数据时,并且数据可以放入内存中. 使用的好处:可以大大减小网络数据的传输量,提高效率: 一般编程思路:在Mapper的map函数中读入所有数据,然后添加到一个List(队列)中,然后在cleanup函数中对list进行处理,输出我们关系的少量数据. 实例: 在map函数中使用空格分隔每行数据,然后把每个单词添加到一个堆栈中,在cleanup函数中输出堆栈中单词次数比较多的单词以及次数: package fz.inm

hadoop编程小技巧(8)---Unit Testing (单元测试)

所需环境: Hadoop相关jar包(下载官网发行版即可): 下载junit包(最新为好): 下载mockito包: 下载mrunit包: 下载powermock-mockito包: 相关包截图如下(相关下载参考:http://download.csdn.net/detail/fansy1990/7690977): 应用场景: 在进行Hadoop的一般MR编程时,需要验证我们的业务逻辑,或者说是验证数据流的时候可以使用此环境,这个环境不要求真实的云平台,只是针对算法或者代码逻辑进行验证,方便调试

hadoop编程小技巧(7)---自己定义输出文件格式以及输出到不同文件夹

代码測试环境:Hadoop2.4 应用场景:当须要定制输出数据格式时能够採用此技巧,包含定制输出数据的展现形式.输出路径.输出文件名称称等. Hadoop内置的输出文件格式有: 1)FileOutputFormat<K,V>  经常使用的父类. 2)TextOutputFormat<K,V> 默认输出字符串输出格式. 3)SequenceFileOutputFormat<K,V> 序列化文件输出: 4)MultipleOutputs<K,V> 能够把输出数据