MapReduce 编程 系列六 MultipleOutputs使用

在前面的例子中,输出文件名是默认的:

_logs         part-r-00001  part-r-00003  part-r-00005  part-r-00007  part-r-00009  part-r-00011  part-r-00013  _SUCCESS
part-r-00000  part-r-00002  part-r-00004  part-r-00006  part-r-00008  part-r-00010  part-r-00012  part-r-00014

part-r-0000N

还有一个_SUCCESS文件标志job运行成功。

还有一个目录_logs。

但是实际情况中,我们有时候需要根据情况定制我的输出文件名。

比如我要根据did的值分组,产生不同的输出文件。所有did出现次数在[0, 2)的都输出到a文件中,在[2, 4)的输出大b文件,其他输出到c文件。

这里涉及到的输出类是MultipleOutputs类。下面是介绍如何实现。

首先有一个小优化,为了避免每次执行时输入一长串命令,利用maven exec plugin,参考pom.xml配置如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.freebird</groupId>
  <artifactId>mr1_example2</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>mr1_example2</name>
  <url>http://maven.apache.org</url>
  <dependencies>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-core</artifactId>
      <version>1.2.1</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.3.2</version>
        <executions>
          <execution>
            <goals>
              <goal>exec</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <executable>hadoop</executable>
          <arguments>
            <argument>jar</argument>
            <argument>target/mr1_example2-1.0-SNAPSHOT.jar</argument>
            <argument>org.freebird.LogJob</argument>
            <argument>/user/chenshu/share/logs</argument>
            <argument>/user/chenshu/share/output12</argument>
          </arguments>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

这样每次mvn clean package之后,运行mvn exec:exec命令即可。

然后在LogJob.java文件添加几行代码:

package org.freebird;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.freebird.reducer.LogReducer;
import org.freebird.mapper.LogMapper;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LogJob {                                                                                                                                                                                                

    public static void main(String[] args) throws Exception {
        System.out.println("args[0]:" + args[0]);
        System.out.println("args[1]:" + args[1]);                                                                                                                                                                    

        Configuration conf = new Configuration();
        Job job = new Job(conf, "sum_did_from_log_file");
        job.setJarByClass(LogJob.class);                                                                                                                                                                             

        job.setMapperClass(org.freebird.mapper.LogMapper.class);
        job.setReducerClass(org.freebird.reducer.LogReducer.class);                                                                                                                                                  

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);                                                                                                                                                                  

        MultipleOutputs.addNamedOutput(job, "a", TextOutputFormat.class, Text.class, IntWritable.class);
        MultipleOutputs.addNamedOutput(job, "b", TextOutputFormat.class, Text.class, Text.class);
        MultipleOutputs.addNamedOutput(job, "c", TextOutputFormat.class, Text.class, Text.class);                                                                                                                    

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));                                                                                                                                                      

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

MultipleOutputs.addNamedOutput 函数被调用了三次,设置了文件名为a,b和c,最后两个参数分别是output key和output value类型,应该和job.setOutputKeyClass以及job.setOutputValueClass保持一致。

最后修改reducer类的代码:

public class LogReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable result = new IntWritable();

    private MultipleOutputs outputs;

    @Override
    public void setup(Context context) throws IOException, InterruptedException {
        System.out.println("enter LogReducer:::setup method");
        outputs = new MultipleOutputs(context);
    }

    @Override
    public void cleanup(Context context) throws IOException, InterruptedException {
        System.out.println("enter LogReducer:::cleanup method");
        outputs.close();
    }

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context) throws IOException, InterruptedException {
        System.out.println("enter LogReducer::reduce method");
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        System.out.println("key: " + key.toString() + " sum: " + sum);
        if ((sum < 2) && (sum >= 0)) {
            outputs.write("a", key, sum);
        } else if (sum < 4) {
            outputs.write("b", key, sum);
        } else {
            outputs.write("c", key, sum);
        }
    }
}

根据相同key(did)sum的结果大小,写入到不同的文件中。运行后观察一下结果:

[[email protected] output12]$ ls
a-r-00000  a-r-00004  a-r-00008  a-r-00012  b-r-00001  b-r-00005  b-r-00009  b-r-00013  c-r-00002  c-r-00006  c-r-00010  c-r-00014     part-r-00002  part-r-00006  part-r-00010  part-r-00014
a-r-00001  a-r-00005  a-r-00009  a-r-00013  b-r-00002  b-r-00006  b-r-00010  b-r-00014  c-r-00003  c-r-00007  c-r-00011  _logs         part-r-00003  part-r-00007  part-r-00011  _SUCCESS
a-r-00002  a-r-00006  a-r-00010  a-r-00014  b-r-00003  b-r-00007  b-r-00011  c-r-00000  c-r-00004  c-r-00008  c-r-00012  part-r-00000  part-r-00004  part-r-00008  part-r-00012
a-r-00003  a-r-00007  a-r-00011  b-r-00000  b-r-00004  b-r-00008  b-r-00012  c-r-00001  c-r-00005  c-r-00009  c-r-00013  part-r-00001  part-r-00005  part-r-00009  part-r-00013

打开任意的a,b和c开头的文件,查看值果然是如此

5371700bc7b2231db03afeb0        6
5371700cc7b2231db03afec0        7
5371701cc7b2231db03aff8d        6
5371709dc7b2231db03b0136        6
537170a0c7b2231db03b01ac        6
537170a6c7b2231db03b01fc        6
537170a8c7b2231db03b0217        6
537170b3c7b2231db03b0268        6
53719aa9c7b2231db03b0721        6
53719ad0c7b2231db03b0731        4

使用MultipleOutputs根据sum值对设备ID进行分组成功了。

MapReduce仍然会默认生成part....文件,不用理会,都是空文件。

时间: 2024-10-13 19:21:12

MapReduce 编程 系列六 MultipleOutputs使用的相关文章

MapReduce 编程 系列八 根据输入路径产生输出路径和清除HDFS目录

有了前面的MultipleOutputs的使用经验,就可以将HDFS输入目录的路径解析出来,组成输出路径,这在业务上是十分常用的.这样其实是没有多文件名输出,仅仅是调用了MultipleOutputs的addNamedOutput方法一次,设置文件名为result. 同时为了保证计算的可重入性,每次都需要将已经存在的输出目录删除. 先看pom.xml, 现在参数只有一个输入目录了,输出目录会在该路径后面自动加上/output. <project xmlns="http://maven.ap

MapReduce 编程 系列五 MapReduce 主要过程梳理

前面4篇文章介绍了如何编写一个简单的日志提取程序,读取HDFS share/logs目录下的所有csv日志文件,然后提取数据后,最终输出到share/output目录下. 本篇停留一下,梳理一下主要过程,然后提出新的改进目标. 首先声明一下,所有的代码都是maven工程的,没有使用任何IDE.  这是我一贯的编程风格,用Emacs + JDEE开发.需要使用IDE的只需要学习如何在IDE中使用maven即可. 可比较的序列化 第一个是序列化,这是各种编程技术中常用的.MapReduce的特别之处

MapReduce 编程 系列九 Reducer数目

本篇介绍怎样控制reduce的数目.前面观察结果文件,都会发现通常是以part-r-00000 形式出现多个文件,事实上这个reducer的数目有关系.reducer数目多,结果文件数目就多. 在初始化job的时候.是能够设置reducer的数目的.example4在example的基础上做了改动.改动了pom.xml.使得结束一个參数作为reducer的数目.改动了LogJob.java的代码,作为设置reducer数目. xsi:schemaLocation="http://maven.ap

MapReduce 编程 系列八 Reducer数目

本篇介绍如何控制reduce的数目.前面观察结果文件,都会发现一般是以part-r-00000 形式出现多个文件,其实这个reducer的数目有关系,reducer数目多,结果文件数目就多. 在初始化job的时候,是可以设置reducer的数目的.example4在example的基础上做了改动.修改了pom.xml,使得结束一个参数作为reducer的数目.修改了LogJob.java的代码,作为设置reducer数目. xsi:schemaLocation="http://maven.apa

MapReduce 编程 系列十二 用Hadoop Streaming技术集成newLISP脚本

本文环境和之前的Hadoop 1.x不同,是在Hadoop 2.x环境下测试.功能和前面的日志处理程序一样. 第一个newLISP脚本,起到mapper的作用,在stdin中读取文本数据,将did作为key, value为1,然后将结果输出到stdout 第二个newLISP脚本,起到reducer的作用,在stdin中读取<key, values>, key是dic, values是所有的value,简单对value求和后,写到stdout中 最后应该可以在HDFS下看到结果. 用脚本编程的

MapReduce 编程 系列四 MapReduce例子程序运行

MapReduce程序编译是可以在普通的Java环境下进行,现在来到真实的环境上运行. 首先,将日志文件放到HDFS目录下 $ hdfs dfs -put *.csv /user/chenshu/share/logs/ 14/09/27 17:03:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where app

MapReduce编程系列 — 3:数据去重

1.项目名称: 2.程序代码: package com.dedup; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce

MapReduce编程系列 — 4:排序

1.项目名称: 2.程序代码: package com.sort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce

MapReduce编程系列 — 1:计算单词

1.代码: package com.mrdemo; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import o