MapReduce程序之combiner规约

[toc]


MapReduce程序之combiner规约

前言

前面的wordcount程序,shuffle阶段的数据都是<hello, [1, 1, 1]>这种类型的(可以查看程序的输出),也就是说,交给reduce处理时就是这种类型的数据,这会带来一个问题,什么问题呢?就是网络传输问题,对于[1, 1, 1]这种数据,完全可以在本地就先完成规约,即将相当于在本地做一次reduce,从代码的角度去分析,其实也是一次reduce的操作,只是这个过程是在shuffle的时候就完成的。

程序代码

代码如下:

package com.uplooking.bigdata.mr.wc2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * MR应用程序
 *      统计hdfs目录hdfs://ns1/hello中每一个单词出现的次数
 *      将结果存储在hdfs目录:hdfs://ns1/output/mr/wc/
 *
 *   Map<k1, v1, k2, v2>
 *   第一步:确定map的类型参数
 *      k1, v1是map函数的输入参数
 *      k2, v2是map函数的输出参数
 *      对于普通的文本文件的每一行的起始偏移量就是k1,---->Long(LongWritable)
 *      对于普通的文本文件,v2就是其中的一行数据,是k1所对应的一行数据,---->String(Text)
 *      k2, v2
 *          k2就是拆分后的单词,---->String(Text)
 *          v2就是拆分后单词对应的次数,---->int(IntWritable)
 *   第二步:编写一个类继承Mapper
 *      复写其中的map函数
 *   Reduce<k2, v2s, k3, v3>
 *    第一步:确定reduce的类型
 *      k2, v2s是reduce函数的输入参数
 *      k3, v3是reduce函数的输出参数
 *      k2  --->Text
 *      v2s ---->Iterable<IntWritable>
 *
 *      k3 聚合之后的单词---->Text
 *      v3 聚合之后的单词对应的次数--->IntWritable
 第二步:编写一个类继承Reducer
 *      复写其中的reduce函数
 *
 *
 *  第三步:编写完map和reduce之后,将二者通过驱动程序组装起来,进行执行
 *
 *
 *  mr的执行的方式:
 *  yarn/hadoop jar jar的路径 全类名 参数
 */
public class WordCountMRJob3 {
    public static void main(String[] args) throws Exception {

        if(args == null || args.length < 2) {
            System.err.print("参数错误");
            System.exit(-1);
        }

        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);

        Configuration conf = new Configuration();
//        String jobName = "WordCountMRJob";
        String jobName = WordCountMRJob3.class.getSimpleName();
        Job job = Job.getInstance(conf, jobName);
        //设置job运行的jar
        job.setJarByClass(WordCountMRJob3.class);
        //设置整个程序的输入
        FileInputFormat.setInputPaths(job, inputPath);
        job.setInputFormatClass(TextInputFormat.class);//就是设置如何将输入文件解析成一行一行内容的解析类
        //设置mapper
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //设置整个程序的输出
        FileOutputFormat.setOutputPath(job, outputPath);
        job.setOutputFormatClass(TextOutputFormat.class);
        //设置reducer
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置归约
        job.setCombinerClass(WordCountCombiner.class);

        //指定程序有几个reducer去运行
        job.setNumReduceTasks(1);
        //提交程序
        job.waitForCompletion(true);
    }

    static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        Logger logger = LoggerFactory.getLogger(WordCountMapper.class);
        /**
         * 对于普通的文本文件,当前map函数,是每一行内容就会被调用一次
         *
         * @param k1
         * @param v1
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
            // 先将每一行转换为java的String类型
            String line = v1.toString();

            // 调试输出
            String msg1 = "map输入\t"+k1.get()+","+line;
            System.out.println(msg1);
            logger.debug("-------" + msg1);

            // 将行中的单词以空格作为分隔符分离出来得到一个字符串数组
            String[] words = line.split(" ");
            // 定义输出数据的变量k2和v2,类型分别为Text和IntWritable
            Text k2 = null;
            IntWritable v2 = null;
            // 统计单词并写入到上下文变量context中
            for (String word : words) {
                k2 = new Text(word);
                v2 = new IntWritable(1);
                context.write(k2, v2);

                // 调试输出
                String msg2 = "map输出\t"+k2.toString()+","+v2.get();
                System.out.println(msg2);
                logger.debug("-------" + msg2);
                logger.debug(msg2);

            }
        }
    }

    static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        Logger logger = LoggerFactory.getLogger(WordCountReducer.class);

        /**
         * 该函数,是相同的key,只会调用一次
         *
         * @param k2
         * @param v2s
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text k2, Iterable<IntWritable> v2s, Context context) throws IOException, InterruptedException {
            // 调试输出
            System.out.println("reduce输入分组k2\t"+k2.toString());

            // 定义某个key值k2出现次数的变量
            int sum = 0;
            // 统计k2孤个数
            for (IntWritable v2 : v2s) {
                // 调试输出
                System.out.println("reduce输入分组k2对应的v2\t" + v2.get());

                sum += v2.get();
            }
            // 构建reduce输出的k3和v3,类型分别为Text和IntWritable
            Text k3 = k2;
            IntWritable v3 = new IntWritable(sum);
            // 结果reduce结果写入到上下文变量context中
            context.write(k2, v3);

            // 调试输出
            System.out.println("reduce输出\t" + k2.toString() + "," + v3.get());

        }
    }

    static class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

        Logger logger = LoggerFactory.getLogger(WordCountReducer.class);

        @Override
        protected void reduce(Text k2, Iterable<IntWritable> v2s, Context context) throws IOException, InterruptedException {
            // 调试输出
            System.out.println("combiner输入分组k2\t"+k2.toString());

            // 定义某个key值k2出现次数的变量
            int sum = 0;
            // 统计k2孤个数
            for (IntWritable v2 : v2s) {
                // 调试输出
                System.out.println("combiner输入分组k2对应的v2\t" + v2.get());

                sum += v2.get();
            }
            // 构建reduce输出的k3和v3,类型分别为Text和IntWritable
            Text k3 = k2;
            IntWritable v3 = new IntWritable(sum);
            // 结果reduce结果写入到上下文变量context中
            context.write(k2, v3);

            // 调试输出
            System.out.println("combiner输出\t" + k2.toString() + "," + v3.get());

        }
    }

}

测试

在Hadoop中运行上面的程序,输出如下:

/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=52382:/Applications/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/tools.jar:/Users/yeyonghao/IdeaProjects/bigdata-study-20171211/hadoop-study/target/classes:/Users/yeyonghao/maven/repository/org/apache/hadoop/hadoop-common/2.6.4/hadoop-common-2.6.4.jar:/Users/yeyonghao/maven/repository/org/apache/hadoop/hadoop-annotations/2.6.4/hadoop-annotations-2.6.4.jar:/Users/yeyonghao/maven/repository/com/google/guava/guava/11.0.2/guava-11.0.2.jar:/Users/yeyonghao/maven/repository/commons-cli/commons-cli/1.2/commons-cli-1.2.jar:/Users/yeyonghao/maven/repository/org/apache/commons/commons-math3/3.1.1/commons-math3-3.1.1.jar:/Users/yeyonghao/maven/repository/xmlenc/xmlenc/0.52/xmlenc-0.52.jar:/Users/yeyonghao/maven/repository/commons-httpclient/commons-httpclient/3.1/commons-httpclient-3.1.jar:/Users/yeyonghao/maven/repository/commons-codec/commons-codec/1.4/commons-codec-1.4.jar:/Users/yeyonghao/maven/repository/commons-io/commons-io/2.4/commons-io-2.4.jar:/Users/yeyonghao/maven/repository/commons-net/commons-net/3.1/commons-net-3.1.jar:/Users/yeyonghao/maven/repository/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar:/Users/yeyonghao/maven/repository/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar:/Users/yeyonghao/maven/repository/org/mortbay/jetty/jetty/6.1.26/jetty-6.1.26.jar:/Users/yeyonghao/maven/repository/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar:/Users/yeyonghao/maven/repository/com/sun/jersey/jersey-core/1.9/jersey-core-1.9.jar:/Users/yeyonghao/maven/repository/com/sun/jersey/jersey-json/1.9/jersey-json-1.9.jar:/Users/yeyonghao/maven/repository/com/sun/xml/bind/jaxb-impl/2.2.3-1/jaxb-impl-2.2.3-1.jar:/Users/yeyonghao/maven/repository/com/sun/jersey/jersey-server/1.9/jersey-server-1.9.jar:/Users/yeyonghao/maven/repository/asm/asm/3.1/asm-3.1.jar:/Users/yeyonghao/maven/repository/tomcat/jasper-compiler/5.5.23/jasper-compiler-5.5.23.jar:/Users/yeyonghao/maven/repository/tomcat/jasper-runtime/5.5.23/jasper-runtime-5.5.23.jar:/Users/yeyonghao/maven/repository/javax/servlet/jsp/jsp-api/2.1/jsp-api-2.1.jar:/Users/yeyonghao/maven/repository/commons-el/commons-el/1.0/commons-el-1.0.jar:/Users/yeyonghao/maven/repository/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar:/Users/yeyonghao/maven/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/Users/yeyonghao/maven/repository/net/java/dev/jets3t/jets3t/0.9.0/jets3t-0.9.0.jar:/Users/yeyonghao/maven/repository/org/apache/httpcomponents/httpclient/4.1.2/httpclient-4.1.2.jar:/Users/yeyonghao/maven/repository/org/apache/httpcomponents/httpcore/4.1.2/httpcore-4.1.2.jar:/Users/yeyonghao/maven/repository/com/jamesmurty/utils/java-xmlbuilder/0.4/java-xmlbuilder-0.4.jar:/Users/yeyonghao/maven/repository/commons-lang/commons-lang/2.6/commons-lang-2.6.jar:/Users/yeyonghao/maven/repository/commons-configuration/commons-configuration/1.6/commons-configuration-1.6.jar:/Users/yeyonghao/maven/repository/commons-digester/commons-digester/1.8/commons-digester-1.8.jar:/Users/yeyonghao/maven/repository/commons-beanutils/commons-beanutils/1.7.0/commons-beanutils-1.7.0.jar:/Users/yeyonghao/maven/repository/commons-beanutils/commons-beanutils-core/1.8.0/commons-beanutils-core-1.8.0.jar:/Users/yeyonghao/maven/repository/org/slf4j/slf4j-api/1.7.5/slf4j-api-1.7.5.jar:/Users/yeyonghao/maven/repository/org/slf4j/slf4j-log4j12/1.7.5/slf4j-log4j12-1.7.5.jar:/Users/yeyonghao/maven/repository/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar:/Users/yeyonghao/maven/repository/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar:/Users/yeyonghao/maven/repository/org/apache/avro/avro/1.7.4/avro-1.7.4.jar:/Users/yeyonghao/maven/repository/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar:/Users/yeyonghao/maven/repository/org/xerial/snappy/snappy-java/1.0.4.1/snappy-java-1.0.4.1.jar:/Users/yeyonghao/maven/repository/com/google/protobuf/protobuf-java/2.5.0/protobuf-java-2.5.0.jar:/Users/yeyonghao/maven/repository/com/google/code/gson/gson/2.2.4/gson-2.2.4.jar:/Users/yeyonghao/maven/repository/org/apache/hadoop/hadoop-auth/2.6.4/hadoop-auth-2.6.4.jar:/Users/yeyonghao/maven/repository/org/apache/directory/server/apacheds-kerberos-codec/2.0.0-M15/apacheds-kerberos-codec-2.0.0-M15.jar:/Users/yeyonghao/maven/repository/org/apache/directory/server/apacheds-i18n/2.0.0-M15/apacheds-i18n-2.0.0-M15.jar:/Users/yeyonghao/maven/repository/org/apache/directory/api/api-asn1-api/1.0.0-M20/api-asn1-api-1.0.0-M20.jar:/Users/yeyonghao/maven/repository/org/apache/directory/api/api-util/1.0.0-M20/api-util-1.0.0-M20.jar:/Users/yeyonghao/maven/repository/org/apache/curator/curator-framework/2.6.0/curator-framework-2.6.0.jar:/Users/yeyonghao/maven/repository/com/jcraft/jsch/0.1.42/jsch-0.1.42.jar:/Users/yeyonghao/maven/repository/org/apache/curator/curator-client/2.6.0/curator-client-2.6.0.jar:/Users/yeyonghao/maven/repository/org/apache/curator/curator-recipes/2.6.0/curator-recipes-2.6.0.jar:/Users/yeyonghao/maven/repository/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/Users/yeyonghao/maven/repository/org/htrace/htrace-core/3.0.4/htrace-core-3.0.4.jar:/Users/yeyonghao/maven/repository/org/apache/zookeeper/zookeeper/3.4.6/zookeeper-3.4.6.jar:/Users/yeyonghao/maven/repository/org/apache/commons/commons-compress/1.4.1/commons-compress-1.4.1.jar:/Users/yeyonghao/maven/repository/org/tukaani/xz/1.0/xz-1.0.jar:/Users/yeyonghao/maven/repository/org/apache/hadoop/hadoop-client/2.6.4/hadoop-client-2.6.4.jar:/Users/yeyonghao/maven/repository/org/apache/hadoop/hadoop-mapreduce-client-app/2.6.4/hadoop-mapreduce-client-app-2.6.4.jar:/Users/yeyonghao/maven/repository/org/apache/hadoop/hadoop-yarn-api/2.6.4/hadoop-yarn-api-2.6.4.jar:/Users/yeyonghao/maven/repository/org/apache/hadoop/hadoop-mapreduce-client-jobclient/2.6.4/hadoop-mapreduce-client-jobclient-2.6.4.jar:/Users/yeyonghao/maven/repository/org/apache/hadoop/hadoop-hdfs/2.6.4/hadoop-hdfs-2.6.4.jar:/Users/yeyonghao/maven/repository/commons-daemon/commons-daemon/1.0.13/commons-daemon-1.0.13.jar:/Users/yeyonghao/maven/repository/io/netty/netty/3.6.2.Final/netty-3.6.2.Final.jar:/Users/yeyonghao/maven/repository/xerces/xercesImpl/2.9.1/xercesImpl-2.9.1.jar:/Users/yeyonghao/maven/repository/xml-apis/xml-apis/1.3.04/xml-apis-1.3.04.jar:/Users/yeyonghao/maven/repository/org/apache/hadoop/hadoop-yarn-common/2.6.4/hadoop-yarn-common-2.6.4.jar:/Users/yeyonghao/maven/repository/javax/xml/bind/jaxb-api/2.2.2/jaxb-api-2.2.2.jar:/Users/yeyonghao/maven/repository/javax/xml/stream/stax-api/1.0-2/stax-api-1.0-2.jar:/Users/yeyonghao/maven/repository/javax/activation/activation/1.1/activation-1.1.jar:/Users/yeyonghao/maven/repository/com/sun/jersey/jersey-client/1.9/jersey-client-1.9.jar:/Users/yeyonghao/maven/repository/org/codehaus/jackson/jackson-jaxrs/1.9.13/jackson-jaxrs-1.9.13.jar:/Users/yeyonghao/maven/repository/org/codehaus/jackson/jackson-xc/1.9.13/jackson-xc-1.9.13.jar:/Users/yeyonghao/maven/repository/com/google/inject/extensions/guice-servlet/3.0/guice-servlet-3.0.jar:/Users/yeyonghao/maven/repository/com/google/inject/guice/3.0/guice-3.0.jar:/Users/yeyonghao/maven/repository/javax/inject/javax.inject/1/javax.inject-1.jar:/Users/yeyonghao/maven/repository/aopalliance/aopalliance/1.0/aopalliance-1.0.jar:/Users/yeyonghao/maven/repository/com/sun/jersey/contribs/jersey-guice/1.9/jersey-guice-1.9.jar:/Users/yeyonghao/maven/repository/org/apache/hadoop/hadoop-yarn-client/2.6.4/hadoop-yarn-client-2.6.4.jar:/Users/yeyonghao/maven/repository/org/apache/hadoop/hadoop-yarn-server-common/2.6.4/hadoop-yarn-server-common-2.6.4.jar:/Users/yeyonghao/maven/repository/org/fusesource/leveldbjni/leveldbjni-all/1.8/leveldbjni-all-1.8.jar:/Users/yeyonghao/maven/repository/org/apache/hadoop/hadoop-yarn-server-resourcemanager/2.6.4/hadoop-yarn-server-resourcemanager-2.6.4.jar:/Users/yeyonghao/maven/repository/org/codehaus/jettison/jettison/1.1/jettison-1.1.jar:/Users/yeyonghao/maven/repository/org/apache/hadoop/hadoop-yarn-server-web-proxy/2.6.4/hadoop-yarn-server-web-proxy-2.6.4.jar:/Users/yeyonghao/maven/repository/org/apache/hadoop/hadoop-yarn-server-nodemanager/2.6.4/hadoop-yarn-server-nodemanager-2.6.4.jar:/Users/yeyonghao/maven/repository/org/apache/hadoop/hadoop-yarn-server-applicationhistoryservice/2.6.4/hadoop-yarn-server-applicationhistoryservice-2.6.4.jar:/Users/yeyonghao/maven/repository/org/apache/hadoop/hadoop-mapreduce-client-core/2.6.4/hadoop-mapreduce-client-core-2.6.4.jar:/Users/yeyonghao/maven/repository/org/apache/hadoop/hadoop-mapreduce-client-shuffle/2.6.4/hadoop-mapreduce-client-shuffle-2.6.4.jar:/Users/yeyonghao/maven/repository/org/apache/hadoop/hadoop-mapreduce-client-common/2.6.4/hadoop-mapreduce-client-common-2.6.4.jar com.uplooking.bigdata.mr.wc2.WordCountMRJob3 /Users/yeyonghao/data/input/hello /Users/yeyonghao/data/output/mr/wc-5
objc[5387]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/bin/java (0x1045fc4c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x1055f24e0). One of the two will be used. Which one is undefined.
2018-03-05 23:13:37,782 [main] [org.apache.hadoop.util.NativeCodeLoader] [WARN] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-03-05 23:13:38,159 [main] [org.apache.hadoop.conf.Configuration.deprecation] [INFO] - session.id is deprecated. Instead, use dfs.metrics.session-id
2018-03-05 23:13:38,160 [main] [org.apache.hadoop.metrics.jvm.JvmMetrics] [INFO] - Initializing JVM Metrics with processName=JobTracker, sessionId=
2018-03-05 23:13:38,494 [main] [org.apache.hadoop.mapreduce.JobResourceUploader] [WARN] - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2018-03-05 23:13:38,505 [main] [org.apache.hadoop.mapreduce.JobResourceUploader] [WARN] - No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
2018-03-05 23:13:38,515 [main] [org.apache.hadoop.mapreduce.lib.input.FileInputFormat] [INFO] - Total input paths to process : 1
2018-03-05 23:13:38,558 [main] [org.apache.hadoop.mapreduce.JobSubmitter] [INFO] - number of splits:1
2018-03-05 23:13:38,675 [main] [org.apache.hadoop.mapreduce.JobSubmitter] [INFO] - Submitting tokens for job: job_local1995273034_0001
2018-03-05 23:13:38,798 [main] [org.apache.hadoop.mapreduce.Job] [INFO] - The url to track the job: http://localhost:8080/
2018-03-05 23:13:38,798 [main] [org.apache.hadoop.mapreduce.Job] [INFO] - Running job: job_local1995273034_0001
2018-03-05 23:13:38,799 [Thread-11] [org.apache.hadoop.mapred.LocalJobRunner] [INFO] - OutputCommitter set in config null
2018-03-05 23:13:38,805 [Thread-11] [org.apache.hadoop.mapred.LocalJobRunner] [INFO] - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
2018-03-05 23:13:38,838 [Thread-11] [org.apache.hadoop.mapred.LocalJobRunner] [INFO] - Waiting for map tasks
2018-03-05 23:13:38,839 [LocalJobRunner Map Task Executor #0] [org.apache.hadoop.mapred.LocalJobRunner] [INFO] - Starting task: attempt_local1995273034_0001_m_000000_0
2018-03-05 23:13:38,872 [LocalJobRunner Map Task Executor #0] [org.apache.hadoop.yarn.util.ProcfsBasedProcessTree] [INFO] - ProcfsBasedProcessTree currently is supported only on Linux.
2018-03-05 23:13:38,872 [LocalJobRunner Map Task Executor #0] [org.apache.hadoop.mapred.Task] [INFO] -  Using ResourceCalculatorProcessTree : null
2018-03-05 23:13:38,874 [LocalJobRunner Map Task Executor #0] [org.apache.hadoop.mapred.MapTask] [INFO] - Processing split: file:/Users/yeyonghao/data/input/hello:0+28
2018-03-05 23:13:38,948 [LocalJobRunner Map Task Executor #0] [org.apache.hadoop.mapred.MapTask] [INFO] - (EQUATOR) 0 kvi 26214396(104857584)
2018-03-05 23:13:38,948 [LocalJobRunner Map Task Executor #0] [org.apache.hadoop.mapred.MapTask] [INFO] - mapreduce.task.io.sort.mb: 100
2018-03-05 23:13:38,949 [LocalJobRunner Map Task Executor #0] [org.apache.hadoop.mapred.MapTask] [INFO] - soft limit at 83886080
2018-03-05 23:13:38,949 [LocalJobRunner Map Task Executor #0] [org.apache.hadoop.mapred.MapTask] [INFO] - bufstart = 0; bufvoid = 104857600
2018-03-05 23:13:38,949 [LocalJobRunner Map Task Executor #0] [org.apache.hadoop.mapred.MapTask] [INFO] - kvstart = 26214396; length = 6553600
2018-03-05 23:13:38,955 [LocalJobRunner Map Task Executor #0] [org.apache.hadoop.mapred.MapTask] [INFO] - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
map输入   0,hello you
map输出   hello,1
map输出   you,1
map输入   10,hello me
map输出   hello,1
map输出   me,1
map输入   19,hello he
map输出   hello,1
map输出   he,1
2018-03-05 23:13:38,975 [LocalJobRunner Map Task Executor #0] [org.apache.hadoop.mapred.LocalJobRunner] [INFO] -
2018-03-05 23:13:38,976 [LocalJobRunner Map Task Executor #0] [org.apache.hadoop.mapred.MapTask] [INFO] - Starting flush of map output
2018-03-05 23:13:38,976 [LocalJobRunner Map Task Executor #0] [org.apache.hadoop.mapred.MapTask] [INFO] - Spilling map output
2018-03-05 23:13:38,976 [LocalJobRunner Map Task Executor #0] [org.apache.hadoop.mapred.MapTask] [INFO] - bufstart = 0; bufend = 52; bufvoid = 104857600
2018-03-05 23:13:38,976 [LocalJobRunner Map Task Executor #0] [org.apache.hadoop.mapred.MapTask] [INFO] - kvstart = 26214396(104857584); kvend = 26214376(104857504); length = 21/6553600
combiner输入分组k2  he
combiner输入分组k2对应的v2 1
combiner输出  he,1
combiner输入分组k2  hello
combiner输入分组k2对应的v2 1
combiner输入分组k2对应的v2 1
combiner输入分组k2对应的v2 1
combiner输出  hello,3
combiner输入分组k2  me
combiner输入分组k2对应的v2 1
combiner输出  me,1
combiner输入分组k2  you
combiner输入分组k2对应的v2 1
combiner输出  you,1
2018-03-05 23:13:38,991 [LocalJobRunner Map Task Executor #0] [org.apache.hadoop.mapred.MapTask] [INFO] - Finished spill 0
2018-03-05 23:13:38,995 [LocalJobRunner Map Task Executor #0] [org.apache.hadoop.mapred.Task] [INFO] - Task:attempt_local1995273034_0001_m_000000_0 is done. And is in the process of committing
2018-03-05 23:13:39,008 [LocalJobRunner Map Task Executor #0] [org.apache.hadoop.mapred.LocalJobRunner] [INFO] - map
2018-03-05 23:13:39,009 [LocalJobRunner Map Task Executor #0] [org.apache.hadoop.mapred.Task] [INFO] - Task ‘attempt_local1995273034_0001_m_000000_0‘ done.
2018-03-05 23:13:39,009 [LocalJobRunner Map Task Executor #0] [org.apache.hadoop.mapred.LocalJobRunner] [INFO] - Finishing task: attempt_local1995273034_0001_m_000000_0
2018-03-05 23:13:39,009 [Thread-11] [org.apache.hadoop.mapred.LocalJobRunner] [INFO] - map task executor complete.
2018-03-05 23:13:39,012 [Thread-11] [org.apache.hadoop.mapred.LocalJobRunner] [INFO] - Waiting for reduce tasks
2018-03-05 23:13:39,012 [pool-3-thread-1] [org.apache.hadoop.mapred.LocalJobRunner] [INFO] - Starting task: attempt_local1995273034_0001_r_000000_0
2018-03-05 23:13:39,017 [pool-3-thread-1] [org.apache.hadoop.yarn.util.ProcfsBasedProcessTree] [INFO] - ProcfsBasedProcessTree currently is supported only on Linux.
2018-03-05 23:13:39,018 [pool-3-thread-1] [org.apache.hadoop.mapred.Task] [INFO] -  Using ResourceCalculatorProcessTree : null
2018-03-05 23:13:39,021 [pool-3-thread-1] [org.apache.hadoop.mapred.ReduceTask] [INFO] - Using ShuffleConsumerPlugin: [email protected]
2018-03-05 23:13:39,030 [pool-3-thread-1] [org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl] [INFO] - MergerManager: memoryLimit=1336252800, maxSingleShuffleLimit=334063200, mergeThreshold=881926912, ioSortFactor=10, memToMemMergeOutputsThreshold=10
2018-03-05 23:13:39,033 [EventFetcher for fetching Map Completion Events] [org.apache.hadoop.mapreduce.task.reduce.EventFetcher] [INFO] - attempt_local1995273034_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
2018-03-05 23:13:39,063 [localfetcher#1] [org.apache.hadoop.mapreduce.task.reduce.LocalFetcher] [INFO] - localfetcher#1 about to shuffle output of map attempt_local1995273034_0001_m_000000_0 decomp: 42 len: 46 to MEMORY
2018-03-05 23:13:39,083 [localfetcher#1] [org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput] [INFO] - Read 42 bytes from map-output for attempt_local1995273034_0001_m_000000_0
2018-03-05 23:13:39,085 [localfetcher#1] [org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl] [INFO] - closeInMemoryFile -> map-output of size: 42, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->42
2018-03-05 23:13:39,086 [EventFetcher for fetching Map Completion Events] [org.apache.hadoop.mapreduce.task.reduce.EventFetcher] [INFO] - EventFetcher is interrupted.. Returning
2018-03-05 23:13:39,087 [pool-3-thread-1] [org.apache.hadoop.mapred.LocalJobRunner] [INFO] - 1 / 1 copied.
2018-03-05 23:13:39,087 [pool-3-thread-1] [org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl] [INFO] - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
2018-03-05 23:13:39,096 [pool-3-thread-1] [org.apache.hadoop.mapred.Merger] [INFO] - Merging 1 sorted segments
2018-03-05 23:13:39,096 [pool-3-thread-1] [org.apache.hadoop.mapred.Merger] [INFO] - Down to the last merge-pass, with 1 segments left of total size: 37 bytes
2018-03-05 23:13:39,097 [pool-3-thread-1] [org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl] [INFO] - Merged 1 segments, 42 bytes to disk to satisfy reduce memory limit
2018-03-05 23:13:39,098 [pool-3-thread-1] [org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl] [INFO] - Merging 1 files, 46 bytes from disk
2018-03-05 23:13:39,098 [pool-3-thread-1] [org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl] [INFO] - Merging 0 segments, 0 bytes from memory into reduce
2018-03-05 23:13:39,098 [pool-3-thread-1] [org.apache.hadoop.mapred.Merger] [INFO] - Merging 1 sorted segments
2018-03-05 23:13:39,099 [pool-3-thread-1] [org.apache.hadoop.mapred.Merger] [INFO] - Down to the last merge-pass, with 1 segments left of total size: 37 bytes
2018-03-05 23:13:39,099 [pool-3-thread-1] [org.apache.hadoop.mapred.LocalJobRunner] [INFO] - 1 / 1 copied.
2018-03-05 23:13:39,109 [pool-3-thread-1] [org.apache.hadoop.conf.Configuration.deprecation] [INFO] - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
reduce输入分组k2    he
reduce输入分组k2对应的v2   1
reduce输出    he,1
reduce输入分组k2    hello
reduce输入分组k2对应的v2   3
reduce输出    hello,3
reduce输入分组k2    me
reduce输入分组k2对应的v2   1
reduce输出    me,1
reduce输入分组k2    you
reduce输入分组k2对应的v2   1
reduce输出    you,1
2018-03-05 23:13:39,113 [pool-3-thread-1] [org.apache.hadoop.mapred.Task] [INFO] - Task:attempt_local1995273034_0001_r_000000_0 is done. And is in the process of committing
2018-03-05 23:13:39,114 [pool-3-thread-1] [org.apache.hadoop.mapred.LocalJobRunner] [INFO] - 1 / 1 copied.
2018-03-05 23:13:39,114 [pool-3-thread-1] [org.apache.hadoop.mapred.Task] [INFO] - Task attempt_local1995273034_0001_r_000000_0 is allowed to commit now
2018-03-05 23:13:39,115 [pool-3-thread-1] [org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter] [INFO] - Saved output of task ‘attempt_local1995273034_0001_r_000000_0‘ to file:/Users/yeyonghao/data/output/mr/wc-5/_temporary/0/task_local1995273034_0001_r_000000
2018-03-05 23:13:39,116 [pool-3-thread-1] [org.apache.hadoop.mapred.LocalJobRunner] [INFO] - reduce > reduce
2018-03-05 23:13:39,116 [pool-3-thread-1] [org.apache.hadoop.mapred.Task] [INFO] - Task ‘attempt_local1995273034_0001_r_000000_0‘ done.
2018-03-05 23:13:39,116 [pool-3-thread-1] [org.apache.hadoop.mapred.LocalJobRunner] [INFO] - Finishing task: attempt_local1995273034_0001_r_000000_0
2018-03-05 23:13:39,116 [Thread-11] [org.apache.hadoop.mapred.LocalJobRunner] [INFO] - reduce task executor complete.
2018-03-05 23:13:39,806 [main] [org.apache.hadoop.mapreduce.Job] [INFO] - Job job_local1995273034_0001 running in uber mode : false
2018-03-05 23:13:39,810 [main] [org.apache.hadoop.mapreduce.Job] [INFO] -  map 100% reduce 100%
2018-03-05 23:13:39,811 [main] [org.apache.hadoop.mapreduce.Job] [INFO] - Job job_local1995273034_0001 completed successfully
2018-03-05 23:13:39,821 [main] [org.apache.hadoop.mapreduce.Job] [INFO] - Counters: 30
    File System Counters
        FILE: Number of bytes read=494
        FILE: Number of bytes written=521348
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
    Map-Reduce Framework
        Map input records=3
        Map output records=6
        Map output bytes=52
        Map output materialized bytes=46
        Input split bytes=103
        Combine input records=6
        Combine output records=4
        Reduce input groups=4
        Reduce shuffle bytes=46
        Reduce input records=4
        Reduce output records=4
        Spilled Records=8
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=0
        Total committed heap usage (bytes)=468713472
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters
        Bytes Read=28
    File Output Format Counters
        Bytes Written=36

Process finished with exit code 0

原文地址:http://blog.51cto.com/xpleaf/2083249

时间: 2024-11-08 14:30:18

MapReduce程序之combiner规约的相关文章

hadoop-初学者写map-reduce程序中容易出现的问题 3

1.写hadoop的map-reduce程序之前所必须知道的基础知识: 1)hadoop map-reduce的自带的数据类型: Hadoop提供了如下内容的数据类型,这些数据类型都实现了WritableComparable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储,以及进行大小比较.(如果是自定义的key,value的数据类型,必须也要写其大小比较的方法) BooleanWritable:标准布尔型数值 ByteWritable:单字节数值 DoubleWritable:

十六:mapreduce程序在yarn集群中的调度过程

mapreduce程序在yarn集群中的调度过程: 1.客户端想ResouceManager提交一个job作业,申请运行一个MR的程序,RPC调用 2.ResourceManager返回一个由创建的jobid目录. 3.在HDFS该目录下有一个以jobid命名的目录并,写入job.xml和job分片数据,job.jar,jobConfinger 4.通知RM,job的资源文件提交完毕. 5.初始化一个任务 然后放到队列中去 6.nodemanager 和ResouceManager 保持心跳进行

6.命令行编译打包运行五个MapReduce程序

对于如何编译WordCount.java,对于0.20 等旧版本版本的做法很常见,具体如下: javac -classpath /usr/local/hadoop/hadoop-1.0.1/hadoop-core-1.0.1.jar WordCount.java 但较新的 2.X 版本中,已经没有 hadoop-core*.jar 这个文件,因此编辑和打包自己的MapReduce程序与旧版本有所不同. Hadoop 2.x 版本中的依赖 jar Hadoop 2.x 版本中jar不再集中在一个

Hadoop初学指南(8)--MapReduce中的Combiner操作

本文主要介绍了MapReduce中的Combiner操作. 在MapReduce的执行步骤中,我们一共分了8步,其中Map中的最后一步规约操作就是今天要讲的Combiner. 首先看一下前文中的计数器: 我们可以发现,其中有两个计数器:Combine output records和Combine input records,他们的计数都是0,这是因为我们在代码中没有进行规约操作. 现在我们加入规约操作. 在前文代码(参看http://xlows.blog.51cto.com/5380484/14

MapReduce程序开发

通过API操作之前要先了解几个基本知识 基本数据类型 Hadoop的基本数据类型和Java的基本数据类型是不一样的,但是都存在对应的关系 如下图 如果需要定义自己的数据类型,则必须实现Writable hadoop的数据类型可以通过get方法获得对应的java数据类型 而java的数据类型可以通过hadoop数据类名的构造函数,或者set方法转换 关于Hadoop的Writable接口,详情请看Hadoop I/O中的序列化部分 MapReduce执行的基本步骤 Hadoop提交作业的的步骤分为

为Hadoop的MapReduce程序编写makefile

最近需要把基于hadoop的MapReduce程序集成到一个大的用C/C++编写的框架中,需要在make的时候自动将MapReduce应用进行编译和打包.这里以简单的WordCount1为例说明具体的实现细节,注意:hadoop版本为2.4.0. 源代码包含两个文件,一个是WordCount1.java是具体的对单词计数实现的逻辑:第二个是CounterThread.java,其中简单的当前处理的行数做一个统计和打印.代码分别见附1. 编写makefile的关键是将hadoop提供的jar包的路

[MapReduce_5] MapReduce 中的 Combiner 组件应用

0. 说明 Combiner 介绍 &&  在 MapReduce 中的应用 1. 介绍 Combiner: Map 端的 Reduce,有自己的使用场景 在相同 Key 过多的情况下,在 Map 端进行的预聚合,大大缓解了网络间的 K-V 全分发 Combiner 适用场景: 最大值 求和 最小值 Combiner 不适用平均值的计算 2. 结合 Combiner 实现 Word Count 在 [MapReduce_1] 运行 Word Count 示例程序 代码基础上在 WCApp.

使用Eclipse运行Hadoop 2.x MapReduce程序常见问题

1. 当我们编写好MapReduce程序,点击Run on Hadoop的时候,Eclipse控制台输出如下内容: 这个信息告诉我们没有找到log4j.properties文件.如果没有这个文件,程序运行出错的时候,就没有打印日志,因此我们会很难调试. 解决方法:复制$HADOOP_HOME/etc/hadoop/目录下的log4j.properties文件到MapReduce项目 src文件夹下. 2.当执行MapReduce程序的时候,Eclipse可能会报告堆益处的错误. 此时,MapRe

Win7下Eclipse中运行远程MapReduce程序

1.hadoop插件的参数配置 2.运行时的参数 3.运行结果 Win7下Eclipse中运行远程MapReduce程序,布布扣,bubuko.com