MapReduce处理输出多文件格式(MultipleOutputs)

MultiPleOutputs原理

MapReduce job中,可以使用FileInputFormat和FileOutputFormat来对输入路径和输出路径来进行设置。在输出目录中,框架自己会自动对输出文件进行命名和组织,如part-(m|r)-00000之类,但有时为了后续流程的方便,我们常需要对输出结果进行一定的分类和组织。以前常用的方法是在MR
job运行之后,用脚本对目录下的数据进行一次重新组织,变成我们需要的格式。

研究了一下MR框架中的MultipleOutputs(是2.0之后的新API,是对老版本中MultipleOutputs与MultipleOutputFormat的一个整合)。

在一般情况下,Hadoop每一个Reducer产生一个输出文件,文件以part-r-00000,part-r-00001的方式进行命名,如果需要认为的控制输出文件的命名或者每一个Reducer需要写出多个输出文件时,可以采用MultipleOutputs类来完成,MultipleOutputs采用输出记录的键值对(output Key和output Value)或者任意字符串来生成输出文件的名字,文件一般以name-r-nnnnn的格式进行命名,其中name是程序设计的任意名字;nnnnn表示分区号。

使用方法

1.驱动中不需要额外改变,只需要在MapClass或Reduce类中加入以下代码

private MultipleOutputs<Text,IntWritable> mos;
public void setup(Context context) throws Exception{
  mos=new MultipleOutputs(context);
}
public void cleanup(Context context) throws Exeception{
  mos.close();
}

2.然后就可以用mos.write(Key key,Value value,String baseOutputPath)代替context.write(key,value);

在MapClass或Reduce中使用,输出时也会有默认的文件part-m-00*或part-r-00*,不过这些文件是无内容的,大小为0。而且只有part-m-00*会传给reduce。

注意:

multipleOutputs.write(key,value,baseOutputPath)方法的第三个参数表明了该输出所在的目录(相当于用户指定的输出目录)。如果baseOutputPath不包含文件分隔符"/",那么输出的文件格式为baseOutputPath-r-nnnnn(name-r-nnnnn).

如果包含文件分隔符"/".例如baseOutputPath="029070-99999/1901/part",那么输出文件则为027070-99999/1901/part-r-nnnnn.

3.最后在job的类中

// 设置输出文件类型

  MultipleOutputs.addNamedOutput(job, namedOutput, outputFormatClass, keyClass, valueClass);

案例

原始数据

1512,iphone5s,4inchs,A7,64,M7,lowerpower

1512,iphone5,4inchs,A6,IOS7

1512,iphone4s,3.5inchs,A5

50019780,Ipad,9.7,retina

50019780,yoga,lenovel,18hours

50019780,nexus,7,google

50019780,Ipad mini2,retina,7.9

1101,macbook air,OS X mavericks

1101,macbook pro,OS X lion

1101,thinkpad yoga,lenovel,windows 8

对原始数据根据第一个id列进行分类到不同的文件中

1.Map类

package test.mr.multioutputs;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MultioutMap extends Mapper<LongWritable, Text, Text, Text> {
	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, Text, Text>.Context context)
			throws IOException, InterruptedException {
		String line = value.toString().trim();
		if (line.length() > 0) {
			String str[] = line.split(",");
			// 提取分类
			context.write(new Text(str[0]), value);
		}
	}
}

2.Reduce类

package test.mr.multioutputs;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class MultioutRedu extends Reducer<Text, Text, NullWritable, Text> {
	private MultipleOutputs<NullWritable, Text> mos; // 输出类型和Reduce一致

	@Override
	protected void setup(Reducer<Text, Text, NullWritable, Text>.Context context)
			throws IOException, InterruptedException {
		mos = new MultipleOutputs<NullWritable, Text>(context);
	}

	@Override
	protected void cleanup(
			Reducer<Text, Text, NullWritable, Text>.Context context)
			throws IOException, InterruptedException {
		mos.close();
	}

	@Override
	protected void reduce(Text key, Iterable<Text> values,
			Reducer<Text, Text, NullWritable, Text>.Context context)
			throws IOException, InterruptedException {
		/*
		 * mos.write(Key key,Value
		 * value,StringbaseOutputPath)代替context.write(key,value);
		 */
		for (Text value : values) {
			// 指定写出不同文件的数据
			mos.write("KeySplit", NullWritable.get(), value, key.toString()
					+ "/");
			// 指定写出全部数据
			mos.write("AllData", NullWritable.get(), value);
		}

	}
}

3.job类

package test.mr.multioutputs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MultioutMain {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf);
		job.setJarByClass(MultioutMain.class);

		job.setMapperClass(MultioutMap.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		job.setReducerClass(MultioutRedu.class);
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);

		// 设置输出文件类型
		MultipleOutputs.addNamedOutput(job, "KeySplit", TextOutputFormat.class,
				NullWritable.class, Text.class);
		MultipleOutputs.addNamedOutput(job, "AllData", TextOutputFormat.class,
				NullWritable.class, Text.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.waitForCompletion(true);
	}
}
时间: 2024-08-11 11:34:29

MapReduce处理输出多文件格式(MultipleOutputs)的相关文章

MapReduce排序输出

hadoop的map是具有输出自动排序功能的~继续学习~ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.

MapReduce设置输出文件到多个文件夹下

一:自定义OutputFormat类MapReduce默认的OutPutFormat会将结果输出文件放置到一个我们指定的目录下,但如果想把输出文件根据某个条件,把满足不同条件的内容分别输出到不同的目录下, 就需要自定义实现OutputFormat类,且重写RecordWriter方法.在驱动类中设置job.setOutputFormatClass方法为自定义实现的OutputFormat类 下面案例是一组购物文本数据,将其中的好评和差评分别输出到对应的好评文件夹下.差评文件夹下. 二:自定义实现

9.2.1 hadoop mapreduce任务输出的默认排序

任务的默认排序 MapTask和ReduceTask都会默认对数据按照key进行排序,不管逻辑上是否需要.默认是按照字典顺序排序,且实现该排序的方法是快速排序.但是map和reduce任务只能保证单个任务内部输出有序,不能保证所有输出全局有序. MapTask,当环形缓冲区使用率到达一定阈值后进行一次快速排序,将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序.ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写到磁

MapReduce设置输出分隔符

conf.set("mapred.textoutputformat.ignoreseparator","true"); conf.set("mapred.textoutputformat.separator",","); 默认是tab

MapReduce 编程 系列六 MultipleOutputs使用

在前面的例子中,输出文件名是默认的: _logs part-r-00001 part-r-00003 part-r-00005 part-r-00007 part-r-00009 part-r-00011 part-r-00013 _SUCCESS part-r-00000 part-r-00002 part-r-00004 part-r-00006 part-r-00008 part-r-00010 part-r-00012 part-r-00014 part-r-0000N 还有一个_SUC

如何去掉MapReduce输出的默认分隔符

我们在用MapReduce做数据处理的时候,经常会遇到将只需要输出键或者值的情况,如context.write(new Text(record), new Text("")),这样得到结果每行尾部会自动加上一个制表符.尽管我们的值是空的,但是MapReduce默认输出的是键值对,且键值对之间默认的分隔符为制表符,这样可能对我们数据的后续处理会产生一些干扰,那么如何去掉或是更改这个制表符呢?这里提供三种解决办法: 方法一:将键设置为空值 一般我们用context写入数据的时候,是将要输出

通过MultipleOutputs写到多个文件

MultipleOutputs 类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串.这允许每个 reducer(或者只有 map 作业的 mapper)创建多个文件. 采用name-m-nnnnn 形式的文件名用于 map 输出,name-r-nnnnn 形式的文件名用于 reduce 输出,其中 name 是由程序设定的任意名字, nnnnn 是一个指明块号的整数(从 0 开始).块号保证从不同块(mapper 或 reducer)输出在相同名字情况下不会冲突 1.项目需求

使用mapreduce计算环比的实例

最近做了一个小的mapreduce程序,主要目的是计算环比值最高的前5名,本来打算使用spark计算,可是本人目前spark还只是简单看了下,因此就先改用mapreduce计算了,今天和大家分享下这个例子,也算是对自己写的程序的总结了. 首先解释下环比,例如我们要算本周的环比,那么计算方式就是本周的数据和上周数字的差值除以上周数值就是环比了,如果是月的环比就是本月和上月数据的差值除以上月数字就是本月环比了.不过本mapreduce实例不会直接算出比值,只是简单求出不同时间段数值的差值,最终环比结

hadoop学习;自己定义Input/OutputFormat;类引用mapreduce.mapper;三种模式

hadoop切割与读取输入文件的方式被定义在InputFormat接口的一个实现中.TextInputFormat是默认的实现,当你想要一次获取一行内容作为输入数据时又没有确定的键.从TextInputFormat返回的键为每行的字节偏移量,但眼下没看到用过 曾经在mapper中曾使用LongWritable(键)和Text(值),在TextInputFormat中,由于键是字节偏移量.能够是LongWritable类型,而当使用KeyValueTextInputFormat时,第一个分隔符前后