mutilple output reduce cannot write

package org.lukey.hadoop.classifyBayes;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class Probability {

    // Client
    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        //读取单词总数,设置到congfiguration中
        String totalWordsPath = "/user/hadoop/output/totalwords.txt";
        FileSystem fs = FileSystem.get(URI.create(totalWordsPath), conf);
        FSDataInputStream inputStream = fs.open(new Path(totalWordsPath));
        BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
        String strLine = buffer.readLine();
        String[] temp = strLine.split(":");
        if(temp.length == 2){
            //temp[0] = TOTALWORDS
            conf.setInt(temp[0], Integer.parseInt(temp[1]));
        }

        /*
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if (otherArgs.length != 2) {
            System.out.println("Usage <in> <out>");
            System.exit(-1);
        }
*/
        Job job = new Job(conf, "file count");

        job.setJarByClass(Probability.class);

        job.setMapperClass(WordsOfClassCountMapper.class);
        job.setReducerClass(WordsOfClassCountReducer.class);

        String input = "/user/hadoop/mid/wordsFrequence";
        String output = "/user/hadoop/output/probability/";

        FileInputFormat.addInputPath(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

    private static MultipleOutputs<Text, IntWritable> mos;

    // Mapper
    static class WordsOfClassCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

        private  static IntWritable number = new IntWritable();

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {

            String[] temp = value.toString().split("\t");
            if(temp.length == 3){
                // 文件夹名类别名
                String dirName = temp[0];
                value.set(temp[1]);
                number.set(Integer.parseInt(temp[2]));
                mos.write(value, number, dirName);

            }

        }

        @Override
        protected void cleanup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            mos.close();
        }

        @Override
        protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            mos = new MultipleOutputs<Text, IntWritable>(context);
        }

    }

    // Reducer
    static class WordsOfClassCountReducer extends Reducer<Text, IntWritable, Text, DoubleWritable> {

        // result 表示每个文件里面单词个数
        DoubleWritable result = new DoubleWritable(3);
        Configuration conf = new Configuration();
        int total = conf.getInt("TOTALWORDS", 1);
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, DoubleWritable>.Context context)
                        throws IOException, InterruptedException {
            // TODO Auto-generated method stub
//            double sum = 0;
//            for (IntWritable value : values) {
//                sum += value.get();
//            }
//            result.set(sum);

            context.write(key, result);
        }

    }

}
时间: 2024-10-14 22:04:52

mutilple output reduce cannot write的相关文章

Hadoop源代码分析(mapreduce.lib.partition/reduce/output)

Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出,下面我们就来分析参与这个过程的类. Mapper的结果,可能送到可能的Combiner做合并,Combiner在系统中并没有自己的基类,而是用Reducer作为Combiner的基类,他们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已.Mapper最终处理的结果对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同

01 - Execise About Array.prototype.reduce()

Description: Write a generic function chainer Write a generic function chainer that takes a starting value, and an array of functions to execute on it (array of symbols for ruby). The input for each function is the output of the previous function (ex

MapReduce实现Reduce端Join操作实例

使用案例: 联接两张表 Table EMP:(新建文件EMP,第一行属性名不要) Name Sex Age DepNo zhang male 20 1 li female 25 2 wang female 30 3 zhou male 35 2 Table Dep:(新建文件DEP,第一行属性名不要) DepNo DepName 1 Sales 2 Dev 3 Mgt Inner join: select Name,Sex,Age,DepName from EMP inner join DEP

hadoop reduce端联结

此例子摘自hadoop基础教程. 其中sales.txt内容如下 客户编号 客户消费额度 消费时间001 35.99 2012-03-15 002 12.29 2004-07-02 004 13.42 2005-12-20 003 499.99 2010-12-20 001 78.95 2012-04-02 002 21.99 2006-11-30 002 93.45 2008-09-10 001 9.99 2012-05-17 accounts.txt内容如下: 客户编号 姓名 注册时间001

reduce 方法 (Array) (JavaScript)

对数组中的所有元素调用指定的回调函数.该回调函数的返回值为累积结果,并且此返回值在下一次调用该回调函数时作为参数提供. 语法 array1.reduce(callbackfn[, initialValue]) 参数 参数 定义 array1 必需.一个数组对象. callbackfn 必需.一个接受最多四个参数的函数.对于数组中的每个元素,reduce 方法都会调用 callbackfn 函数一次. initialValue 可选.如果指定 initialValue,则它将用作初始值来启动累积.

Reduce侧联接

案例分析前提,了解其原理,以及术语 术语部分: 1.Data Source:基本与关系数据库中的表相似,形式为:(例子中为CSV格式) 2.Tag:由于记录类型(Customers或Orders)与记录本身分离,标记一个Record会确保特殊元数据会一致存在于记录中.在这个目的下,我们将使用每个record自身的Data source名称标记每个record. 3.Group Key:Group Key类似于关系数据库中的链接键(join key),在我们的例子中,group key就是Cust

tf-idf hadoop map reduce

package com.jumei.robot.mapreduce.tfidf; import java.io.IOException; import java.util.Collection; import java.util.Comparator; import java.util.Map.Entry; import java.util.Set; import java.util.StringTokenizer; import java.util.TreeMap; import org.ap

MapReduce剖析笔记之七:Child子进程处理Map和Reduce任务的主要流程

在上一节我们分析了TaskTracker如何对JobTracker分配过来的任务进行初始化,并创建各类JVM启动所需的信息,最终创建JVM的整个过程,本节我们继续来看,JVM启动后,执行的是Child类中的Main方法,这个方法是如何执行的. 1,从命令参数中解析相应参数,获取JVMID.建立RPC连接.启动日志线程等初始化操作: 父进程(即TaskTracker)在启动子进程时,会加入一些参数,如本机的IP.端口.TaskAttemptID等等,通过解析可以得到JVMID. String ho

Hadoop多目录输入,join,进入reduce,数据流分析

前言 在做需求时,经常遇到多个目录,也就是多个维度进行join,这里分析一下,数据是怎么流动的. 1.多目录输入 使用MultipleInputs.addInputPath()  对多目录制定格式和map 2.数据流分析 map按行读入数据,需要对不同的输入目录,打上不同的标记(这个方法又叫reduce端连接),map在输出后会进行partition和sort,按照key进行排序,然后输出到reduce进行处理. 例子 三个输入文件: a.txt: 500 501 b.txt: 500 501