MapReduce对输入多文件的处理

MultipleInputs类指定不同的输入文件路径以及输入文化格式

现有两份数据

phone

123,good number

124,common number

125,bad number

user

zhangsan,123

lisi,124

wangwu,125

现在需要把user和phone按照phone number连接起来。得到下面的结果

zhangsan,123,good number

lisi,123,common number

wangwu,125,bad number

分析思路

还是相当于两张表的一对一join操作。join时对value设置个Bean(JavaBean实现writablecomparable接口),key为外键值

join的优化,详见http://blog.csdn.net/u010366796/article/details/44649933,设置KeyBean(外健和标识flag属性),进行排序

本例中将通过value进行排序,即在value的JavaBean中通过实习CompareTo()方法,完成排序,使得phone表位于首位

1.对value实现JavaBean(实现writablecomparable接口)

package test.mr.multiinputs;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/*
 * 自定义的JavaBean
 */
public class FlagString implements WritableComparable<FlagString> {
	private String value;
	private int flag; // 标记 0:表示phone表 1:表示user表

	public FlagString() {
		super();
		// TODO Auto-generated constructor stub
	}

	public FlagString(String value, int flag) {
		super();
		this.value = value;
		this.flag = flag;
	}

	public String getValue() {
		return value;
	}

	public void setValue(String value) {
		this.value = value;
	}

	public int getFlag() {
		return flag;
	}

	public void setFlag(int flag) {
		this.flag = flag;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(flag);
		out.writeUTF(value);

	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.flag = in.readInt();
		this.value = in.readUTF();
	}

	@Override
	public int compareTo(FlagString o) {
		if (this.flag >= o.getFlag()) {
			if (this.flag > o.getFlag()) {
				return 1;
			}
		} else {
			return -1;
		}
		return this.value.compareTo(o.getValue());
	}

}

2.多map类,map1(实现对phone表文件操作)

package test.mr.multiinputs;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MultiMap1 extends Mapper<LongWritable, Text, Text, FlagString> {
	private String delimiter; // 定义分隔符,由job端设置

	@Override
	protected void setup(
			Mapper<LongWritable, Text, Text, FlagString>.Context context)
			throws IOException, InterruptedException {
		delimiter = context.getConfiguration().get("delimiter", ",");
	}

	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, Text, FlagString>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString().trim();
		if (line.length() > 0) {
			String[] str = line.split(delimiter);
			if (str.length == 2) {
				context.write(new Text(str[0].trim()),
						new FlagString(str[1].trim(), 0)); // flag=0,表示phone表
			}
		}
	}
}

2.map2(实现对user表文件操作)

package test.mr.multiinputs;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MultiMap2 extends Mapper<LongWritable, Text, Text, FlagString> {
	private String delimiter; // 设置分隔符

	@Override
	protected void setup(
			Mapper<LongWritable, Text, Text, FlagString>.Context context)
			throws IOException, InterruptedException {
		delimiter = context.getConfiguration().get("delimiter", ",");
	}

	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, Text, FlagString>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString().trim();
		if (line.length() > 0) {
			String[] str = line.split(delimiter);
			if (str.length == 2) {
				context.write(new Text(str[1].trim()),
						new FlagString(str[0].trim(), 1)); // flag=1为user表
			}
		}
	}
}

3.reduce类

package test.mr.multiinputs;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MultiRedu extends Reducer<Text, FlagString, NullWritable, Text> {
	private String delimiter; // 设置分隔符

	@Override
	protected void setup(
			Reducer<Text, FlagString, NullWritable, Text>.Context context)
			throws IOException, InterruptedException {
		delimiter = context.getConfiguration().get("delimiter", ",");
	}

	@Override
	protected void reduce(Text key, Iterable<FlagString> values,
			Reducer<Text, FlagString, NullWritable, Text>.Context context)
			throws IOException, InterruptedException {
		// 最后输出的格式为: uservalue,key,phonevalue
		String phoneValue = "";
		String userValue = "";
		int num = 0;
		for (FlagString value : values) {
			// 第一个即为phone表
			if (num == 0) {
				phoneValue = value.getValue();
				num++;
			} else {
				userValue = value.getValue();
				context.write(NullWritable.get(),
						new Text(userValue + key.toString() + phoneValue));
			}
		}
	}
}

4.job类(关键!!实现多文件的输入格式等)

package test.mr.multiinputs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/*
 * MultipleInputs类指定不同的输入文件路径以及输入文化格式
 现有两份数据
 phone
 123,good number
 124,common number
 123,bad number

 user
 zhangsan,123
 lisi,124
 wangwu,125

 现在需要把user和phone按照phone number连接起来。得到下面的结果
 zhangsan,123,good number
 lisi,123,common number
 wangwu,125,bad number
 */
public class MultiMapMain extends Configuration implements Tool {
	private String input1 = null; // 定义的多个输入文件
	private String input2 = null;
	private String output = null;
	private String delimiter = null;

	@Override
	public void setConf(Configuration conf) {

	}

	@Override
	public Configuration getConf() {
		return new Configuration();
	}

	@Override
	public int run(String[] args) throws Exception {
		setArgs(args);
		checkParam();// 对参数进行检测

		Configuration conf = new Configuration();
		Job job = new Job(conf);
		job.setJarByClass(MultiMapMain.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(FlagString.class);

		job.setReducerClass(MultiRedu.class);
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);

		// MultipleInputs类添加文件路径
		MultipleInputs.addInputPath(job, new Path(input1),
				TextInputFormat.class, MultiMap1.class);
		MultipleInputs.addInputPath(job, new Path(input2),
				TextInputFormat.class, MultiMap2.class);

		FileOutputFormat.setOutputPath(job, new Path(output));
		job.waitForCompletion(true);
		return 0;
	}

	private void checkParam() {
		if (input1 == null || "".equals(input1.trim())) {
			System.out.println("no input phone-data path");
			userMaunel();
			System.exit(-1);
		}
		if (input2 == null || "".equals(input2.trim())) {
			System.out.println("no input user-data path");
			userMaunel();
			System.exit(-1);
		}
		if (output == null || "".equals(output.trim())) {
			System.out.println("no output path");
			userMaunel();
			System.exit(-1);
		}
		if (delimiter == null || "".equals(delimiter.trim())) {
			System.out.println("no delimiter");
			userMaunel();
			System.exit(-1);
		}

	}

	// 用户手册
	private void userMaunel() {
		System.err.println("Usage:");
		System.err.println("-i1 input \t phone data path.");
		System.err.println("-i2 input \t user data path.");
		System.err.println("-o output \t output data path.");
		System.err.println("-delimiter data delimiter \t default comma.");
	}

	// 对属性进行赋值
	// 设置输入的格式:-i1 xxx(输入目录) -i2 xxx(输入目录) -o xxx(输出目录) -delimiter x(分隔符)
	private void setArgs(String[] args) {
		for (int i = 0; i < args.length; i++) {
			if ("-i1".equals(args[i])) {
				input1 = args[++i]; // 将input1赋值为第一个文件的输入路径
			} else if ("-i2".equals(args[i])) {
				input2 = args[++i];
			} else if ("-o".equals(args[i])) {
				output = args[++i];
			} else if ("-delimiter".equals(args[i])) {
				delimiter = args[++i];
			}
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		ToolRunner.run(conf, new MultiMapMain(), args); // 调用run方法
	}
}
时间: 2024-07-31 09:06:28

MapReduce对输入多文件的处理的相关文章

MapReduce对输入多文件的处理2自定义FileInputFormat类

多种自定义文件格式的文件输入处理 MultipleInputs可以让MR支持多种输入格式 比如我们有两种文件格式,那么我们就要有两套Record Class,RecordReader和InputFormat InputFormat(extends FileInputFormat)--->RecordReader(extends RecordReader)--->RecordClass(implements Writable) MultipleInpts需要不同的InputFormat,一种In

windows 10 smb,添加网络位置,输入的文件夹似乎无效

在windows 10中遇到一个现象,在"添加一个网络位置"的时候,弹出"输入的文件夹似乎无效.请选择另一个",我在这里是需要连接到Linux上的smb指定目录,经其它机子测试,smb服务器正常,输入的参数也正常.从smb服务器日志中也没有发现异常现象. 经过多次尝试,发现应该是在访问网络位置的时候,使用的用户及密码不对.由于登录了微软账户,默认使用微软用户进行访问的,所以导致无法访问. 解决方法: 1.改为添加"映射网络驱动器": 2.勾选&q

Linux shell脚本 判断用户输入的文件类型

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 编写一个脚本,从键盘输入一个文件,判断它是否存在,如果存在就判断它是什么类型的文件:并用对应的颜色输出 脚本如下: #!/bin/bash #function:test file type

calculate the number of characters-统计文件中的字符数,非空白字符数,字母数,输入到文件和屏幕:

//calculate the number of characters-统计文件中的字符数,非空白字符数,字母数,输入到文件和屏幕: #include<iostream> #include<fstream> #include<cstdlib> #include<cmath> int main() { using namespace std; ifstream fin; ofstream fout; double ch1 = 0,ch2 = 0,letter

目标系统不支持长文件名,请输入该文件的名称

问题现场 服务器:Dell poweredge r210 SATA Controller设置为ATA Mode 硬盘:samsung ssd 850 evo 250G 使用大白菜PE中带的DG分区工具对硬盘进行分区 如果磁盘用来安装系统需要设置为主分区,并激活该分区 分区格式化之后,往硬盘里复制文件就出现此错误:目标系统不支持长文件名,请输入该文件的名称 解决方法 有人说是注册表有问题. [HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\

Hadoop 学习笔记一 ---MapReduce 的输入和输出

Hadoop 中的MapReduce库支持几种不同格式的输入数据.例如,文本模式的输入数据的每一行被视为一个key/value pair,其中key为文件的偏移量,value为那一行的内容.每一种输入类型的实现都必须能够把输入数据分割成数据片段,并能够由单独的Map任务来对数据片段进行后续处理. 一.  输入格式InputFormat 当运行一个M-R 作业的时候,我们需要为作业制定它的输入格式.InputFormat为Hadoop作业的所有输入格式的抽象基类,它描述了作业输入需要满足的规范细节

使用MapReduce实现两个文件的Join操作

数据结构 customer表         1 hanmeimei ShangHai 110 2 leilei BeiJing 112 3 lucy GuangZhou 119 oder表       1 1 50 2 1 200 3 3 15 4 3 350 5 3 58 6 1 42 7 1 352 8 2 1135 9 2 400 10 2 2000 11 2 300 MAPJOIN 场景:我们模拟一个有一份小表一个大表的场景,customer是那份小表,order是那份大表做法:直接将

[Python爬虫] 中文编码问题:raw_input输入、文件读取、变量比较等str、unicode、utf-8转换问题

最近研究搜索引擎.知识图谱和Python爬虫比较多,中文乱码问题再次浮现于眼前.虽然市面上讲述中文编码问题的文章数不胜数,同时以前我也讲述过PHP处理数据库服务器中文乱码问题,但是此处还是准备简单做下笔记.方便以后查阅和大家学习. 中文编码问题的处理核心都是--保证所有的编码方式一致即可,包括编译器.数据库.浏览器编码方式等,而Python通常的处理流程是将unicode作为中间转换码进行过渡.先将待处理字符串用unicode函数以正确的编码转换为Unicode码,在程序中统一用Unicode字

[Python] 中文编码问题:raw_input输入、文件读取、变量比较等str、unicode、utf-8转换问题

最近研究搜索引擎.知识图谱和Python爬虫比较多,中文乱码问题再次浮现于眼前.虽然市面上讲述中文编码问题的文章数不胜数,同时以前我也讲述过PHP处理数据库服务器中文乱码问题,但是此处还是准备简单做下笔记.方便以后查阅和大家学习.        中文编码问题的处理核心都是——保证所有的编码方式一致即可,包括编译器.数据库.浏览器编码方式等,而Python通常的处理流程是将unicode作为中间转换码进行过渡.先将待处理字符串用unicode函数以正确的编码转换为Unicode码,在程序中统一用U