MapReduce 编程 系列八 根据输入路径产生输出路径和清除HDFS目录

有了前面的MultipleOutputs的使用经验,就可以将HDFS输入目录的路径解析出来,组成输出路径,这在业务上是十分常用的。这样其实是没有多文件名输出,仅仅是调用了MultipleOutputs的addNamedOutput方法一次,设置文件名为result.

同时为了保证计算的可重入性,每次都需要将已经存在的输出目录删除。

先看pom.xml, 现在参数只有一个输入目录了,输出目录会在该路径后面自动加上/output.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.freebird</groupId>
  <artifactId>mr1_example3</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>mr1_example3</name>
  <url>http://maven.apache.org</url>
  <dependencies>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-core</artifactId>
      <version>1.2.1</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>exec-maven-plugin</artifactId>
        <version>1.3.2</version>
        <executions>
          <execution>
            <goals>
              <goal>exec</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <executable>hadoop</executable>
          <arguments>
            <argument>jar</argument>
            <argument>target/mr1_example3-1.0-SNAPSHOT.jar</argument>
            <argument>org.freebird.LogJob</argument>
            <argument>/user/chenshu/share/logs</argument>
          </arguments>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

LogJob.java做了修改,主要用Path, FileSystem和Configuration三个类配合,删除HDFS已经存在的目录。

并且只设置了一个NamedOutput,名为result.

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.freebird.reducer.LogReducer;
import org.freebird.mapper.LogMapper;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.FileSystem;
import java.io.IOException;

public class LogJob {                                                                                                                                                                                                                                                             

    public static void main(String[] args) throws Exception {
        String inputPath = args[0];
        if (inputPath.endsWith("/")) {
            inputPath = inputPath.substring(0, inputPath.length() -1);
        }
        System.out.println("args[0] indicates input folder path, the last / will be removed if it exists:" + inputPath);
        String outputPath = inputPath + "/output";
        System.out.println("output folder path is:" + outputPath);                                                                                                                                                                                                                

        Configuration conf = new Configuration();
        Job job = new Job(conf, "sum_did_from_log_file");
        job.setJarByClass(LogJob.class);                                                                                                                                                                                                                                          

        job.setMapperClass(org.freebird.mapper.LogMapper.class);
        job.setReducerClass(org.freebird.reducer.LogReducer.class);                                                                                                                                                                                                               

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);                                                                                                                                                                                                                               

        Path path1 = new Path(inputPath);
        Path path2 = new Path(outputPath);                                                                                                                                                                                                                                        

        recreateFolder(path2, conf);                                                                                                                                                                                                                                              

        MultipleOutputs.addNamedOutput(job, "result", TextOutputFormat.class, Text.class, IntWritable.class);                                                                                                                                                                     

        FileInputFormat.addInputPath(job, path1);
        FileOutputFormat.setOutputPath(job, path2);                                                                                                                                                                                                                               

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }                                                                                                                                                                                                                                                                             

    private static void recreateFolder(Path path, Configuration conf) throws IOException {
        FileSystem fs = path.getFileSystem(conf);
        if (fs.exists(path)) {
            fs.delete(path);
        }
    }
}

Reduce代码也需要修改:

package org.freebird.reducer;

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class LogReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private MultipleOutputs outputs;

    @Override
    public void setup(Context context) throws IOException, InterruptedException {
        System.out.println("enter LogReducer:::setup method");
        outputs = new MultipleOutputs(context);
    }

    @Override
    public void cleanup(Context context) throws IOException, InterruptedException {
        System.out.println("enter LogReducer:::cleanup method");
        outputs.close();
    }

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context) throws IOException, InterruptedException {
        System.out.println("enter LogReducer::reduce method");
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        System.out.println("key: " + key.toString() + " sum: " + sum);
        outputs.write("result", key, sum);
    }
}

代码比之前的例子简单很多,仅仅是往一个named output "result" 写出结果。

时间: 2024-12-22 00:05:58

MapReduce 编程 系列八 根据输入路径产生输出路径和清除HDFS目录的相关文章

MapReduce 编程 系列八 Reducer数目

本篇介绍如何控制reduce的数目.前面观察结果文件,都会发现一般是以part-r-00000 形式出现多个文件,其实这个reducer的数目有关系,reducer数目多,结果文件数目就多. 在初始化job的时候,是可以设置reducer的数目的.example4在example的基础上做了改动.修改了pom.xml,使得结束一个参数作为reducer的数目.修改了LogJob.java的代码,作为设置reducer数目. xsi:schemaLocation="http://maven.apa

深入探索并发编程系列(八)-Acquire与Release语义

一般来说,在无锁(lock-free)注1编程中,线程有两种方法来操作共享内存:线程间相互竞争一种资源或者相互合作传递消息.Acquire与Release语义对后者来说很关键:保证在线程间可靠地相互传递消息.实际上,我大胆地猜测,不正确的或者缺乏Acquire与Release语义是导致无锁编程产生错误的最常见 原因. 在这篇文章中,我会去探讨许多在C++中获得Acquire与Release 语义的方法.还会简单介绍一下C++11原子库标准.所以,你事先不必具备这方面的知识.简明起见,这里的讨论仅

MapReduce 编程 系列五 MapReduce 主要过程梳理

前面4篇文章介绍了如何编写一个简单的日志提取程序,读取HDFS share/logs目录下的所有csv日志文件,然后提取数据后,最终输出到share/output目录下. 本篇停留一下,梳理一下主要过程,然后提出新的改进目标. 首先声明一下,所有的代码都是maven工程的,没有使用任何IDE.  这是我一贯的编程风格,用Emacs + JDEE开发.需要使用IDE的只需要学习如何在IDE中使用maven即可. 可比较的序列化 第一个是序列化,这是各种编程技术中常用的.MapReduce的特别之处

MapReduce 编程 系列四 MapReduce例子程序运行

MapReduce程序编译是可以在普通的Java环境下进行,现在来到真实的环境上运行. 首先,将日志文件放到HDFS目录下 $ hdfs dfs -put *.csv /user/chenshu/share/logs/ 14/09/27 17:03:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where app

MapReduce 编程 系列六 MultipleOutputs使用

在前面的例子中,输出文件名是默认的: _logs part-r-00001 part-r-00003 part-r-00005 part-r-00007 part-r-00009 part-r-00011 part-r-00013 _SUCCESS part-r-00000 part-r-00002 part-r-00004 part-r-00006 part-r-00008 part-r-00010 part-r-00012 part-r-00014 part-r-0000N 还有一个_SUC

MapReduce编程系列 — 3:数据去重

1.项目名称: 2.程序代码: package com.dedup; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce

MapReduce编程系列 — 4:排序

1.项目名称: 2.程序代码: package com.sort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce

MapReduce编程系列 — 1:计算单词

1.代码: package com.mrdemo; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import o

MapReduce编程系列 — 2:计算平均分

1.项目名称: 2.程序代码: package com.averagescorecount; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWrit