MultipleOutputs

package MRNB_V4;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MultipleOutputs extends Configured implements Tool {

    public static class MapClass extends MapReduceBase implements
            Mapper<LongWritable, Text, NullWritable, Text> {

        @Override
        public void map(LongWritable key, Text value,
                OutputCollector<NullWritable, Text> output, Reporter reporter)
                throws IOException {
            output.collect(NullWritable.get(), value);
        }

    }

//MultipleTextOutputFormat 继承自MultipleOutputFormat,实现输出文件的分类

    public static class PartitionByCountryMTOF extends
            MultipleTextOutputFormat<NullWritable, Text> { //key is NullWritable, value is Text
        protected String generateFileNameForKeyValue(NullWritable key,
                Text value, String filename) {
            String[] arr = value.toString().split(",",-1);
            String country = arr[4].substring(1,3); //获取country的名称
            return country + "/"+filename;
        }
    }

//此处不使用reducer
    /*public static class Reducer extends MapReduceBase
            implements
            org.apache.hadoop.mapred.Reducer<LongWritable, Text, NullWritable, Text> {

        @Override
        public void reduce(LongWritable key, Iterator<Text> values,
                OutputCollector<NullWritable, Text> output, Reporter reporter)
                throws IOException {
            // TODO Auto-generated method stub

        }

    }
*/
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        JobConf job = new JobConf(conf,MultipleOutputs.class);

        Path in = new Path(args[0]);
        Path out = new Path(args[1]);

        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setJobName("MultipleOutputs");
        job.setMapperClass(MapClass.class);
        job.setInputFormat(TextInputFormat.class);
        job.setOutputFormat(PartitionByCountryMTOF.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

        job.setNumReduceTasks(0);
        JobClient.runJob(job);
        return 0;
    }

    public static void main(String[] args) throws Exception{
        int res = ToolRunner.run(new Configuration(), new MultipleOutputs(), args);
        System.exit(res);
    }

}

  

package MRNB_V4;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class TestwithMultipleOutputs extends Configured implements Tool {

	public static class MapClass extends
			Mapper<LongWritable, Text, Text, IntWritable> {

		private MultipleOutputs<Text, IntWritable> mos;

		protected void setup(Context context) throws IOException,
				InterruptedException {
			mos = new MultipleOutputs<Text, IntWritable>(context);
		}

		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] tokens = line.split("-");

			//mos.write("MOSInt", new Text(tokens[0]),new IntWritable(Integer.parseInt(tokens[1]))); // (第一种)
			//mos.write("MOSText", new Text(tokens[0]), tokens[2]); // 第二种
			mos.write("mlj", new Text(tokens[0]), line, tokens[0] + "/");// 第三种 同时也可写到指定的文件或文件夹中
		}

		protected void cleanup(Context context) throws IOException,
				InterruptedException {
			mos.close();
		}
	}

	public int run(String[] args) throws Exception {

		Configuration conf = getConf();

		Job job = new Job(conf, "word count with MultipleOutputs");

		job.setJarByClass(TestwithMultipleOutputs.class);

		/*Path in = new Path(args[0]);
		Path out = new Path(args[1]);*/
		  final String Input_path="hdfs://mlj:9000/hive";
		  final String Out_path="hdfs://mlj:9000/hive_out";

		FileInputFormat.setInputPaths(job, Input_path);
		FileOutputFormat.setOutputPath(job, new Path(Out_path));

		job.setMapperClass(MapClass.class);
		job.setNumReduceTasks(0);
		MultipleOutputs.addNamedOutput(job, "MOSInt", TextOutputFormat.class,Text.class, IntWritable.class);
		MultipleOutputs.addNamedOutput(job, "mlj", TextOutputFormat.class,Text.class, Text.class);

		System.exit(job.waitForCompletion(true) ? 0 : 1);
		return 0;
	}

	public static void main(String[] args) throws Exception {

		int res = ToolRunner.run(new Configuration(),new TestwithMultipleOutputs(), args);
		System.exit(res);
	}
}

  

时间: 2024-10-28 18:54:13

MultipleOutputs的相关文章

Hadoop中的MultipleOutputs实践

本例子采用hadoop1.1.2版本 采用气象数据作为处理数据 1.MultipleOutputs例子,具体解释在代码中有注释 package StationPatitioner; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.

multipleOutputs Hadoop

package org.lukey.hadoop.muloutput; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io

MapReduce 编程 系列六 MultipleOutputs使用

在前面的例子中,输出文件名是默认的: _logs part-r-00001 part-r-00003 part-r-00005 part-r-00007 part-r-00009 part-r-00011 part-r-00013 _SUCCESS part-r-00000 part-r-00002 part-r-00004 part-r-00006 part-r-00008 part-r-00010 part-r-00012 part-r-00014 part-r-0000N 还有一个_SUC

通过MultipleOutputs写到多个文件

MultipleOutputs 类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串.这允许每个 reducer(或者只有 map 作业的 mapper)创建多个文件. 采用name-m-nnnnn 形式的文件名用于 map 输出,name-r-nnnnn 形式的文件名用于 reduce 输出,其中 name 是由程序设定的任意名字, nnnnn 是一个指明块号的整数(从 0 开始).块号保证从不同块(mapper 或 reducer)输出在相同名字情况下不会冲突 1.项目需求

(转)MultipleOutputFormat和MultipleOutputs

MultipleOutputFormat和MultipleOutputs http://www.cnblogs.com/liangzh/archive/2012/05/22/2512264.html 一,介绍 1,旧API中有 org.apache.hadoop.mapred.lib.MultipleOutputFormat和org.apache.hadoop.mapred.lib.MultipleOutputs MultipleOutputFormat allowing to write th

使用hadoop multipleOutputs对输出结果进行不一样的组织

MapReduce job中,可以使用FileInputFormat和FileOutputFormat来对输入路径和输出路径来进行设置.在输出目录中,框架自己会自动对输出文件进行命名和组织,如:part-(m|r)-00000之类.但有时为了后续流程的方便,我们常需要对输出结果进行一定的分类和组织.以前常用的方法是在MR job运行过后,用脚本对目录下的数据进行一次重新组织,变成我们需要的格式.研究了一下MR框架中的MultipleOutputs(是2.0之后的新API,是对老版本中Multip

hadoop MultipleOutputs

MultipleOutputs: write data to multiple files with customized name, can be used for both map and reduce phase. http://www.lichun.cc/blog/2013/11/how-to-use-hadoop-multipleoutputs/ public static class MyMap extends Mapper<LongWritable, Text, Text, Dou

MapReduce处理输出多文件格式(MultipleOutputs)

MultiPleOutputs原理 MapReduce job中,可以使用FileInputFormat和FileOutputFormat来对输入路径和输出路径来进行设置.在输出目录中,框架自己会自动对输出文件进行命名和组织,如part-(m|r)-00000之类,但有时为了后续流程的方便,我们常需要对输出结果进行一定的分类和组织.以前常用的方法是在MR job运行之后,用脚本对目录下的数据进行一次重新组织,变成我们需要的格式. 研究了一下MR框架中的MultipleOutputs(是2.0之后

在Maprecue中利用MultipleOutputs输出多个文件

用户在使用Mapreduce时默认以part-*命名, MultipleOutputs可以将不同的键值对输出到用户自定义的不同的文件中. 实现过程是在调用output.write(key, new IntWritable(total), key.toString()); 方法时候第三个参数是  public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 指定了输出文件的命名前缀,那么我们可以通过对不同的key使用不同的