hadoop MultipleOutputs

MultipleOutputs:

  write data to multiple files with customized name, can be used for both map and reduce phase.

http://www.lichun.cc/blog/2013/11/how-to-use-hadoop-multipleoutputs/

public static class MyMap extends
            Mapper<LongWritable, Text, Text, DoubleWritable> {
        MultipleOutputs<Text, DoubleWritable> mos;

        public void map(LongWritable inKey, Text inValue, Context context)
                throws IOException, InterruptedException {

            mos.write(map_out_file, NullWritable.get(), new Text(name));

        }

        @Override
        public void setup(Context context) {
            mos = new MultipleOutputs<Text, DoubleWritable>(context);
        }

        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            mos.close();
        }

    }

example

package a5p2;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class ClassAvg2 {
    public static final String map_out_file = "mapOutFileIndividualStudentAverage";
    public static final String reduce_out_file = "reduceOutFileClassAverage";

    public static class AvgMap extends
            Mapper<LongWritable, Text, Text, DoubleWritable> {
        MultipleOutputs<Text, DoubleWritable> mos;

        public void map(LongWritable inKey, Text inValue, Context context)
                throws IOException, InterruptedException {

            String line = inValue.toString();
            StringTokenizer myToken = new StringTokenizer(line);
            String name = myToken.nextToken();
            int cnt = 0;
            double sum = 0;
            double avg;
            while (myToken.hasMoreTokens()) {
                sum += Float.parseFloat(myToken.nextToken());
                cnt++;
            }
            avg = sum / cnt;
            context.write(new Text(name), new DoubleWritable(avg));
            mos.write(map_out_file, NullWritable.get(), new Text(name + " "
                    + avg));

        }

        @Override
        public void setup(Context context) {
            mos = new MultipleOutputs<Text, DoubleWritable>(context);
        }

        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            mos.close();
        }

    }

    public static class AvgReduce extends
            Reducer<Text, DoubleWritable, Text, DoubleWritable> {
        MultipleOutputs<Text, DoubleWritable> mos;

        public void reduce(Text key, Iterable<DoubleWritable> inValues,
                Context context) throws IOException, InterruptedException {

            double classSum = 0;
            int cnt = 0;
            for (DoubleWritable dw : inValues) {
                classSum += dw.get();
                cnt++;
            }
            double classAvg = classSum / cnt;
            mos.write(reduce_out_file, NullWritable.get(), new Text(
                    "Class average: " + classAvg));
            // context.write(new Text("class average"), new DoubleWritable(
            // classAvg));

        }

        @Override
        public void setup(Context context) {
            mos = new MultipleOutputs<Text, DoubleWritable>(context);
        }

        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            mos.close();
        }

    }

    public static class AvgGroupComparator implements RawComparator<Text> {

        public int compare(Text t1, Text t2) {
            return 0;
        }

        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return 0;
        }
    }

    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "class avg");
        job.setJarByClass(ClassAvg2.class);

        // mapper
        job.setMapperClass(AvgMap.class);
        job.setGroupingComparatorClass(AvgGroupComparator.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);

        // reducer
        job.setReducerClass(AvgReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        // input and output
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        MultipleOutputs.addNamedOutput(job, map_out_file,
                TextOutputFormat.class, NullWritable.class, Text.class);
        MultipleOutputs.addNamedOutput(job, reduce_out_file,
                TextOutputFormat.class, NullWritable.class, Text.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}
时间: 2024-10-11 00:42:16

hadoop MultipleOutputs的相关文章

使用hadoop multipleOutputs对输出结果进行不一样的组织

MapReduce job中,可以使用FileInputFormat和FileOutputFormat来对输入路径和输出路径来进行设置.在输出目录中,框架自己会自动对输出文件进行命名和组织,如:part-(m|r)-00000之类.但有时为了后续流程的方便,我们常需要对输出结果进行一定的分类和组织.以前常用的方法是在MR job运行过后,用脚本对目录下的数据进行一次重新组织,变成我们需要的格式.研究了一下MR框架中的MultipleOutputs(是2.0之后的新API,是对老版本中Multip

Hadoop MultipleOutputs 结果输出到多个文件夹 出现数据不全,部分文件为空

如题:出现下图中的情况(设置reduceNum=5) 感觉很奇怪,排除了很久,终于发现是一个第二次犯的错误:丢了这句 this.mOutputs.close(); 加上这句,一切恢复正常!

Hadoop中的MultipleOutputs实践

本例子采用hadoop1.1.2版本 采用气象数据作为处理数据 1.MultipleOutputs例子,具体解释在代码中有注释 package StationPatitioner; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.

multipleOutputs Hadoop

package org.lukey.hadoop.muloutput; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io

Hadoop之——MapReduce实战(一)

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/45956487 MapReduce概述      MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题. MR由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单. 这两个函数的形参是key.value对,表示函数的输入信息. MR执行流程 MapReduce原理 执行

hadoop多文件输出

在旧的API中使用多文件输出,只需要自定义类继承MultipleTextOutputFormat类 重写它下面的generateFileNameForKeyValue 方法即可, 直接上例子. 输入文件 内容: 目的是按照 字母开头的文件输出,并统计单词计数,输出结果为: 代码如下: package defined; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; impor

Hadoop MapReduce输入输出类型

一.输入格式 1.输入分片split 一个分片对应一个map任务: 一个分片包含一个表(整个文件)上的若干行,而一条记录(单行)对应一行: 分片包含一个以字节为单位的长度 和 一组存储位置,分片不包含实际的数据: map处理时会用分片的大小来排序,优先处理最大的分片: hadoop中Java定义的分片为InputSplit抽象类:主要两个方法,涉及分片长度,分片起始位置 public abstract class InputSplit{ public abstract long getLengt

hadoop编程小技巧(7)---自己定义输出文件格式以及输出到不同文件夹

代码測试环境:Hadoop2.4 应用场景:当须要定制输出数据格式时能够採用此技巧,包含定制输出数据的展现形式.输出路径.输出文件名称称等. Hadoop内置的输出文件格式有: 1)FileOutputFormat<K,V>  经常使用的父类. 2)TextOutputFormat<K,V> 默认输出字符串输出格式. 3)SequenceFileOutputFormat<K,V> 序列化文件输出: 4)MultipleOutputs<K,V> 能够把输出数据

tf-idf hadoop map reduce

package com.jumei.robot.mapreduce.tfidf; import java.io.IOException; import java.util.Collection; import java.util.Comparator; import java.util.Map.Entry; import java.util.Set; import java.util.StringTokenizer; import java.util.TreeMap; import org.ap