MapReduce对输入多文件的处理2自定义FileInputFormat类

多种自定义文件格式的文件输入处理

MultipleInputs可以让MR支持多种输入格式

比如我们有两种文件格式,那么我们就要有两套Record Class,RecordReader和InputFormat

InputFormat(extends FileInputFormat)--->RecordReader(extends RecordReader)--->RecordClass(implements Writable)

MultipleInpts需要不同的InputFormat,一种InputFormat使用一种RecordReader来读取文件并返回一种Record格式的值

这就是这三个典型的关系,也是map过程中涉及的三个步骤的工具和产物

数据准备

a文件

1t80

2t90

3t100

4t50

5t73

b文件

1tlilit3

2txiaomingt3

3tfeifeit3

4tzhangsant3

5tlisit3

t表示分隔符

设计思路

将t前面的Text表示给map将要输入的key

t后面的作为给map要输入的value

要求自定义实现InputFormat,输出key,value格式数据。以产生Map的输入的数据(key,value)

!!!三个文件步骤!!!

InputFormat(extends FileInputFormat)--->RecordReader(extends RecordReader)--->RecordClass(implements Writable)

本例是对两个文件操作

1.两个RecordClass类(实现Writable接口)

package test.mr.multiinputs2;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/*
 * 对map输入的value的预处理
 * 对原始数据的预加工
 */
/*
 * 第一张表数据
 */
public class FirstClass implements Writable {
	private String value;

	public String getValue() {
		return value;
	}

	public void setValue(String value) {
		this.value = value;
	}

	public FirstClass() {
		super();
		// TODO Auto-generated constructor stub
	}

	public FirstClass(String value) {
		super();
		this.value = value;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(this.value);

	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.value = in.readUTF();
	}

	@Override
	public String toString() {
		return "FirstClasst" + value;
	}
}
package test.mr.multiinputs2;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/*
 * 对map输入的value的预处理
 * 对原始数据的预加工
 */
/*
 * 第二张表数据
 */
public class SecondClass implements Writable {
	private String username;
	private int classNo;

	public SecondClass() {
		super();
	}

	public SecondClass(String username, int classNo) {
		super();
		this.username = username;
		this.classNo = classNo;
	}

	public String getUsername() {
		return username;
	}

	public void setUsername(String username) {
		this.username = username;
	}

	public int getClassNo() {
		return classNo;
	}

	public void setClassNo(int classNo) {
		this.classNo = classNo;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(username);
		out.writeInt(classNo);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.username = in.readUTF();
		this.classNo = in.readInt();
	}

	@Override
	public String toString() {
		return "SecondClasst" + username + "t" + classNo;
	}

}

2.两个自定义RecordReader类(继承RecordReader类)

package test.mr.multiinputs2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

public class FirstRecordReader extends RecordReader<Text, FirstClass> {

	// 定义一个真正读取split中文件的读取器
	private LineRecordReader lineRecordReader = null;
	private Text key = null;
	private FirstClass value = null;

	@Override
	public void initialize(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		close();
		lineRecordReader = new LineRecordReader();
		lineRecordReader.initialize(split, context);
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		// 没有读取到东西
		if (!lineRecordReader.nextKeyValue()) {
			key = null;
			value = null;
			return false;
		}
		Text val = lineRecordReader.getCurrentValue();
		String line = val.toString();
		String[] str = line.split("t");
		key = new Text(str[0]);
		value = new FirstClass(str[1].trim()); // 实现对原始数据的预分割
		return true;
	}

	// 读取key的当前值
	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		return key;
	}

	// 读取value的当前值
	@Override
	public FirstClass getCurrentValue() throws IOException,
			InterruptedException {
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		return lineRecordReader.getProgress();
	}

	@Override
	public void close() throws IOException {
		if (null != lineRecordReader) {
			lineRecordReader.close();
			lineRecordReader = null;
		}
		key = null;
		value = null;
	}

}
package test.mr.multiinputs2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

public class SecondRecordReader extends RecordReader<Text, SecondClass> {
	// 定义一个真正读取split中文件的读取器
	private LineRecordReader lineRecordReader = null;
	private Text key = null;
	private SecondClass value = null;

	@Override
	public void initialize(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		close();
		lineRecordReader = new LineRecordReader();
		lineRecordReader.initialize(split, context);
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		if (!lineRecordReader.nextKeyValue()) {
			key = null;
			value = null;
			return false;
		}
		Text val = lineRecordReader.getCurrentValue();
		String line = val.toString();
		String str[] = line.split("t");
		key = new Text(str[0]);
		value = new SecondClass(str[1], Integer.parseInt(str[2]));
		return true;
	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		return key;
	}

	@Override
	public SecondClass getCurrentValue() throws IOException,
			InterruptedException {
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		return lineRecordReader.getProgress();
	}

	@Override
	public void close() throws IOException {
		if (null != lineRecordReader) {
			lineRecordReader.close();
			lineRecordReader = null;
		}
		key = null;
		value = null;
	}

}

3.自定义两个FileInputFormat类(继承FileInputFormat类)

package test.mr.multiinputs2;

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class FirstInputFormat extends FileInputFormat<Text, FirstClass> {

	@Override
	public RecordReader<Text, FirstClass> createRecordReader(InputSplit split,
			TaskAttemptContext context) throws IOException,
			InterruptedException {
		return new FirstRecordReader();
	}
}
package test.mr.multiinputs2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class SecondInputFormat extends FileInputFormat<Text, SecondClass> {

	@Override
	public RecordReader<Text, SecondClass> createRecordReader(InputSplit split,
			TaskAttemptContext context) throws IOException,
			InterruptedException {
		return new SecondRecordReader();
	}

}

4.两个Map类

package test.mr.multiinputs2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FirstMap extends Mapper<Text, FirstClass, Text, Text> {
	@Override
	protected void map(Text key, FirstClass value,
			Mapper<Text, FirstClass, Text, Text>.Context context)
			throws IOException, InterruptedException {
		context.write(key, new Text(value.toString()));
	}
}
package test.mr.multiinputs2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SecondMap extends Mapper<Text, SecondClass, Text, Text> {
	@Override
	protected void map(Text key, SecondClass value,
			Mapper<Text, SecondClass, Text, Text>.Context context)
			throws IOException, InterruptedException {
		context.write(key, new Text(value.toString()));
	}
}

5.reduce类

package test.mr.multiinputs2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MultiInputsRedu extends Reducer<Text, Text, Text, Text> {
	@Override
	protected void reduce(Text key, Iterable<Text> values,
			Reducer<Text, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		for (Text val : values) {
			context.write(key, val);
		}
	}
}

6.Job类

/*
 * 要求自定义实现InputFormat,输出key,value格式数据
 */
public class MultiInputsMain extends Configuration implements Tool {
	private String input1 = null; // 定义的多个输入文件
	private String input2 = null;
	private String output = null;

	@Override
	public void setConf(Configuration conf) {

	}

	@Override
	public Configuration getConf() {
		return new Configuration();
	}

	@Override
	public int run(String[] args) throws Exception {
		setArgs(args);
		checkParam();// 对参数进行检测

		Configuration conf = new Configuration();
		Job job = new Job(conf);
		job.setJarByClass(MultiInputsMain.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		job.setReducerClass(MultiInputsRedu.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		// MultipleInputs类添加文件路径
		// 添加上自定义的fileInputFormat(分别是FirstInputFormat和SecondInputFormat)格式
		MultipleInputs.addInputPath(job, new Path(input1),
				FirstInputFormat.class, FirstMap.class);
		MultipleInputs.addInputPath(job, new Path(input2),
				SecondInputFormat.class, SecondMap.class);

		FileOutputFormat.setOutputPath(job, new Path(output));
		job.waitForCompletion(true);
		return 0;
	}

	private void checkParam() {
		if (input1 == null || "".equals(input1.trim())) {
			System.out.println("no input phone-data path");
			userMaunel();
			System.exit(-1);
		}
		if (input2 == null || "".equals(input2.trim())) {
			System.out.println("no input user-data path");
			userMaunel();
			System.exit(-1);
		}
		if (output == null || "".equals(output.trim())) {
			System.out.println("no output path");
			userMaunel();
			System.exit(-1);
		}
	}

	// 用户手册
	private void userMaunel() {
		System.err.println("Usage:");
		System.err.println("-i1 input \t phone data path.");
		System.err.println("-i2 input \t user data path.");
		System.err.println("-o output \t output data path.");
	}

	// 对属性进行赋值
	// 设置输入的格式:-i1 xxx(输入目录) -i2 xxx(输入目录) -o xxx(输出目录)
	private void setArgs(String[] args) {
		for (int i = 0; i < args.length; i++) {
			if ("-i1".equals(args[i])) {
				input1 = args[++i]; // 将input1赋值为第一个文件的输入路径
			} else if ("-i2".equals(args[i])) {
				input2 = args[++i];
			} else if ("-o".equals(args[i])) {
				output = args[++i];
			}
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		ToolRunner.run(conf, new MultiInputsMain(), args); // 调用run方法
	}
}
时间: 2024-08-20 10:02:24

MapReduce对输入多文件的处理2自定义FileInputFormat类的相关文章

MapReduce对输入多文件的处理

MultipleInputs类指定不同的输入文件路径以及输入文化格式 现有两份数据 phone 123,good number 124,common number 125,bad number user zhangsan,123 lisi,124 wangwu,125 现在需要把user和phone按照phone number连接起来.得到下面的结果 zhangsan,123,good number lisi,123,common number wangwu,125,bad number 分析思

整个文件作为一条记录处理自定义FileInputFormat类

众所周知,Hadoop对处理单个大文件比处理多个小文件更有效率,另外单个文件也非常占用HDFS的存储空间.所以往往要将其合并起来. 1,getmerge hadoop有一个命令行工具getmerge,用于将一组HDFS上的文件复制到本地计算机以前进行合并 参考:http://hadoop.apache.org/common/docs/r0.19.2/cn/hdfs_shell.html 使用方法:hadoop fs -getmerge <src> <localdst> [addnl

Hadoop 学习笔记一 ---MapReduce 的输入和输出

Hadoop 中的MapReduce库支持几种不同格式的输入数据.例如,文本模式的输入数据的每一行被视为一个key/value pair,其中key为文件的偏移量,value为那一行的内容.每一种输入类型的实现都必须能够把输入数据分割成数据片段,并能够由单独的Map任务来对数据片段进行后续处理. 一.  输入格式InputFormat 当运行一个M-R 作业的时候,我们需要为作业制定它的输入格式.InputFormat为Hadoop作业的所有输入格式的抽象基类,它描述了作业输入需要满足的规范细节

windows 10 smb,添加网络位置,输入的文件夹似乎无效

在windows 10中遇到一个现象,在"添加一个网络位置"的时候,弹出"输入的文件夹似乎无效.请选择另一个",我在这里是需要连接到Linux上的smb指定目录,经其它机子测试,smb服务器正常,输入的参数也正常.从smb服务器日志中也没有发现异常现象. 经过多次尝试,发现应该是在访问网络位置的时候,使用的用户及密码不对.由于登录了微软账户,默认使用微软用户进行访问的,所以导致无法访问. 解决方法: 1.改为添加"映射网络驱动器": 2.勾选&q

Linux shell脚本 判断用户输入的文件类型

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 编写一个脚本,从键盘输入一个文件,判断它是否存在,如果存在就判断它是什么类型的文件:并用对应的颜色输出 脚本如下: #!/bin/bash #function:test file type

calculate the number of characters-统计文件中的字符数,非空白字符数,字母数,输入到文件和屏幕:

//calculate the number of characters-统计文件中的字符数,非空白字符数,字母数,输入到文件和屏幕: #include<iostream> #include<fstream> #include<cstdlib> #include<cmath> int main() { using namespace std; ifstream fin; ofstream fout; double ch1 = 0,ch2 = 0,letter

目标系统不支持长文件名,请输入该文件的名称

问题现场 服务器:Dell poweredge r210 SATA Controller设置为ATA Mode 硬盘:samsung ssd 850 evo 250G 使用大白菜PE中带的DG分区工具对硬盘进行分区 如果磁盘用来安装系统需要设置为主分区,并激活该分区 分区格式化之后,往硬盘里复制文件就出现此错误:目标系统不支持长文件名,请输入该文件的名称 解决方法 有人说是注册表有问题. [HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\

ASP.NET Core 1.1 静态文件、路由、自定义中间件、身份验证简介

概述 之前写过一篇关于<ASP.NET Core 1.0 静态文件.路由.自定义中间件.身份验证简介>的文章,主要介绍了ASP.NET Core中StaticFile.Middleware.CustomizeMiddleware和Asp.NetCore Identity.但是由于所有的ASP.NET Core的版本有些老,所以,此次重写一次.使用最新的ASP.NET Core 1.1版本.对于ASP.NET Core 1.1 Preview 1会在以后的文章中介绍 目录 使用静态文件 使用路由

php学习之道:php中soap的使用实例以及生成WSDL文件,提供自动生成WSDL文件的类库——SoapDiscovery.class.php类

1. web service普及: Webservice soap wsdl区别之个人见解 Web Service实现业务诉求:  Web Service是真正"办事"的那个,提供一种办事接口的统称. WSDL提供"能办的事的文档说明":  对要提供的服务的一种描述格式.我想帮你的忙,但是我要告诉你我都能干什么,以及干这些事情需要的参数类型. SOAP提供"请求"的规范:  向服务接口传递请求的格式,包括方法和参数等.你想让人家办事,总得告诉人家