multipleOutputs Hadoop

package org.lukey.hadoop.muloutput;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.GenericOptionsParser;

public class TestMultipleOutput {

    static String baseOutputPath = "/user/hadoop/test_out";

    private static MultipleOutputs<Text, IntWritable> mos;

    // Mapper
    static class WordsOfClassCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text className = new Text();

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            FileSplit fileSplit = (FileSplit) context.getInputSplit();

            // 文件名
            String fileName = fileSplit.getPath().getName();

            // 文件夹名
            String dirName = fileSplit.getPath().getParent().getName();

            className.set(dirName + "/" + fileName);

            // Country:ABDBI 1
            mos.write(value, one, className.toString());
            // context.write(className, one);

        }

        @Override
        protected void cleanup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            mos.close();
        }

        @Override
        protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            mos = new MultipleOutputs<Text, IntWritable>(context);
        }

    }

    // Reducer
    static class WordsOfClassCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        // result 表示每个文件里面单词个数
        IntWritable result = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                        throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            result.set(sum);

            context.write(key, result);
        }

    }

    // Client
    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if (otherArgs.length != 2) {
            System.out.println("Usage <in> <out>");
            System.exit(-1);
        }

        Job job = new Job(conf, "file count");

        job.setJarByClass(TestMultipleOutput.class);

        job.setMapperClass(WordsOfClassCountMapper.class);
        job.setReducerClass(WordsOfClassCountReducer.class);

        FileSystem fileSystem = FileSystem.get(conf);

        Path path = new Path(otherArgs[0]);

        FileStatus[] fileStatus = fileSystem.listStatus(path);

        for (FileStatus fs : fileStatus) {
            if (fs.isDir()) {
                Path p = new Path(fs.getPath().toString());
                FileInputFormat.addInputPath(job, p);
            }else{
                FileInputFormat.addInputPath(job, fs.getPath());
            }
        }

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }
}
时间: 2024-08-13 11:59:57

multipleOutputs Hadoop的相关文章

Hadoop中的MultipleOutputs实践

本例子采用hadoop1.1.2版本 采用气象数据作为处理数据 1.MultipleOutputs例子,具体解释在代码中有注释 package StationPatitioner; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.

使用hadoop multipleOutputs对输出结果进行不一样的组织

MapReduce job中,可以使用FileInputFormat和FileOutputFormat来对输入路径和输出路径来进行设置.在输出目录中,框架自己会自动对输出文件进行命名和组织,如:part-(m|r)-00000之类.但有时为了后续流程的方便,我们常需要对输出结果进行一定的分类和组织.以前常用的方法是在MR job运行过后,用脚本对目录下的数据进行一次重新组织,变成我们需要的格式.研究了一下MR框架中的MultipleOutputs(是2.0之后的新API,是对老版本中Multip

hadoop MultipleOutputs

MultipleOutputs: write data to multiple files with customized name, can be used for both map and reduce phase. http://www.lichun.cc/blog/2013/11/how-to-use-hadoop-multipleoutputs/ public static class MyMap extends Mapper<LongWritable, Text, Text, Dou

Hadoop MultipleOutputs 结果输出到多个文件夹 出现数据不全,部分文件为空

如题:出现下图中的情况(设置reduceNum=5) 感觉很奇怪,排除了很久,终于发现是一个第二次犯的错误:丢了这句 this.mOutputs.close(); 加上这句,一切恢复正常!

Hadoop之——MapReduce实战(一)

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/45956487 MapReduce概述      MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题. MR由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单. 这两个函数的形参是key.value对,表示函数的输入信息. MR执行流程 MapReduce原理 执行

hadoop多文件输出

在旧的API中使用多文件输出,只需要自定义类继承MultipleTextOutputFormat类 重写它下面的generateFileNameForKeyValue 方法即可, 直接上例子. 输入文件 内容: 目的是按照 字母开头的文件输出,并统计单词计数,输出结果为: 代码如下: package defined; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; impor

Hadoop MapReduce输入输出类型

一.输入格式 1.输入分片split 一个分片对应一个map任务: 一个分片包含一个表(整个文件)上的若干行,而一条记录(单行)对应一行: 分片包含一个以字节为单位的长度 和 一组存储位置,分片不包含实际的数据: map处理时会用分片的大小来排序,优先处理最大的分片: hadoop中Java定义的分片为InputSplit抽象类:主要两个方法,涉及分片长度,分片起始位置 public abstract class InputSplit{ public abstract long getLengt

hadoop编程小技巧(7)---自己定义输出文件格式以及输出到不同文件夹

代码測试环境:Hadoop2.4 应用场景:当须要定制输出数据格式时能够採用此技巧,包含定制输出数据的展现形式.输出路径.输出文件名称称等. Hadoop内置的输出文件格式有: 1)FileOutputFormat<K,V>  经常使用的父类. 2)TextOutputFormat<K,V> 默认输出字符串输出格式. 3)SequenceFileOutputFormat<K,V> 序列化文件输出: 4)MultipleOutputs<K,V> 能够把输出数据

tf-idf hadoop map reduce

package com.jumei.robot.mapreduce.tfidf; import java.io.IOException; import java.util.Collection; import java.util.Comparator; import java.util.Map.Entry; import java.util.Set; import java.util.StringTokenizer; import java.util.TreeMap; import org.ap