MapReduce-MulitipleOutputs实现自定义输出到多个目录

输入源数据样例:

Source1-0001
Source2-0002
Source1-0003
Source2-0004
Source1-0005
Source2-0006
Source3-0007
Source3-0008

描述:

  • Source1开头的数据属于集合A;
  • Source2开头的数据属于集合B;
  • Source3开头的数据即属于集合A,也属于集合B;

输出要求:

  • 完整保留集合A数据(包含Source1、Source3开头数据)
  • 完整保留集合B数据(包含Source2、Source3开头数据)

程序实现:

import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.mahout.common.AbstractJob;

import com.yhd.common.util.HadoopUtil;

/**
 * AbstractJob 是mahout的Job模板,可以不使用该模板,
 * 实则的核心部分在于MultipleOutputs部分
 *
 * @author ouyangyewei
 *
 */
public class TestMultipleOutputsJob extends AbstractJob {
    @Override
    public int run(String[] args) throws Exception {
        addInputOption();
        addOutputOption();

        Map<String, List<String>> parseArgs = parseArguments(args);
        if(parseArgs==null){
            return -1;
        }

        HadoopUtil.delete(getConf(), getOutputPath());

        Configuration conf = new Configuration();
        conf.setInt("mapred.reduce.tasks", 4);
        conf.set("mapred.job.queue.name", "pms");
        conf.set("mapred.child.java.opts", "-Xmx3072m");
        conf.set("mapreduce.reduce.shuffle.memory.limit.percent", "0.05");

        Job job = new Job(new Configuration(conf));
        job.setJobName("TestMultipleOutputsJob");
        job.setJarByClass(TestMultipleOutputsJob.class);
        job.setMapperClass(MultipleMapper.class);
        job.setNumReduceTasks(0);
        FileInputFormat.setInputPaths(job, this.getInputPath());
        FileOutputFormat.setOutputPath(job, this.getOutputPath());

        /** 输出文件格式将为:Source1-m-**** */
        MultipleOutputs.addNamedOutput(job, "Source1", TextOutputFormat.class, Text.class, Text.class);
        /** 输出文件格式将为:Source2-m-**** */
        MultipleOutputs.addNamedOutput(job, "Source2", TextOutputFormat.class, Text.class, Text.class);

        boolean suceeded = job.waitForCompletion(true);
        if(!suceeded) {
            return -1;
        }
        return 0;
    }

    /**
     *
     * @author ouyangyewei
     *
     */
    public static class MultipleMapper extends Mapper<LongWritable, Text, Text, Text> {
        private MultipleOutputs<Text, Text> mos = null;

        @Override
        protected void setup(Context context
                             ) throws IOException, InterruptedException {
            mos = new MultipleOutputs<Text, Text>(context);
        }

        public void map(LongWritable key, Text value, Context context
                        ) throws IOException, InterruptedException {
            String line = value.toString();
            String[] tokenizer = line.split("-");

            if (tokenizer[0].equals("Source1")) {
                /** 集合A的数据 */
                mos.write("Source1", new Text(tokenizer[0]), tokenizer[1]);
            } else if (tokenizer[0].equals("Source2")) {
                /** 集合B的数据 */
                mos.write("Source2", new Text(tokenizer[0]), tokenizer[1]);
            }

            /** 集合A交集合B的数据 */
            if (tokenizer[0].equals("Source3")) {
                mos.write("Source1", new Text(tokenizer[0]), tokenizer[1]);
                mos.write("Source2", new Text(tokenizer[0]), tokenizer[1]);
            }
        }

        protected void cleanup(Context context
                               ) throws IOException, InterruptedException {
            mos.close();
        }
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        System.setProperty("javax.xml.parsers.DocumentBuilderFactory",
            "com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl");
        System.setProperty("javax.xml.parsers.SAXParserFactory",
            "com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl");

        TestMultipleOutputsJob instance = new TestMultipleOutputsJob();
        try {
            instance.run(args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

使用hadoop jar命令调度运行jar包代码:

hadoop jar bigdata-datamining-1.0-user-trace-jar-with-dependencies.jar com.yhd.datamining.data.usertrack.offline.job.mapred.TestMultipleOutputsJob --input /user/pms/workspace/ouyangyewei/testMultipleOutputs --output /user/pms/workspace/ouyangyewei/testMultipleOutputs/output 

程序运行以后,输出的结果:

[[email protected] /home/pms/workspace/ouyangyewei]
$hadoop fs -ls /user/pms/workspace/ouyangyewei/testMultipleOutputs/output
Found 4 items
-rw-r--r--   3 pms pms         65 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source1-m-00000
-rw-r--r--   3 pms pms         65 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source2-m-00000
-rw-r--r--   3 pms pms          0 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/_SUCCESS
-rw-r--r--   3 pms pms          0 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/part-m-00000

[[email protected] /home/pms/workspace/ouyangyewei]
$hadoop fs -cat /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source1-m-00000
Source1	0001
Source1	0003
Source1	0005
Source3	0007
Source3	0008

[[email protected] /home/pms/workspace/ouyangyewei]
$hadoop fs -cat /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source2-m-00000
Source2	0002
Source2	0004
Source2	0006
Source3	0007
Source3	0008
时间: 2024-08-07 00:12:40

MapReduce-MulitipleOutputs实现自定义输出到多个目录的相关文章

hadoop编程小技巧(7)---自定义输出文件格式以及输出到不同目录

代码测试环境:Hadoop2.4 应用场景:当需要定制输出数据格式时可以采用此技巧,包括定制输出数据的展现形式,输出路径,输出文件名称等. Hadoop内置的输出文件格式有: 1)FileOutputFormat<K,V>  常用的父类: 2)TextOutputFormat<K,V> 默认输出字符串输出格式: 3)SequenceFileOutputFormat<K,V> 序列化文件输出: 4)MultipleOutputs<K,V> 可以把输出数据输送到

Hadoop 学习笔记一 ---MapReduce 的输入和输出

Hadoop 中的MapReduce库支持几种不同格式的输入数据.例如,文本模式的输入数据的每一行被视为一个key/value pair,其中key为文件的偏移量,value为那一行的内容.每一种输入类型的实现都必须能够把输入数据分割成数据片段,并能够由单独的Map任务来对数据片段进行后续处理. 一.  输入格式InputFormat 当运行一个M-R 作业的时候,我们需要为作业制定它的输入格式.InputFormat为Hadoop作业的所有输入格式的抽象基类,它描述了作业输入需要满足的规范细节

SAP中自定义输出字段的ALV实例

这是运行的结果.对于我们的数据表我们需要字段可以自定义以ALV的格式输出. 特别注意点:wa_alv_field-fieldname = 'EMPID'.这个地方 EMPID一定要大写否则会在运行时报错. *&---------------------------------------------------------------------* *& Report  ZTBALV1 *& *&---------------------------------------

Lrc2srt精灵,增加自定义输出编码

2015.4.8 对中文支持有点问题,修改了一下,支持自定义输出编码! 修改了建议行末偏移,通常100到200最好了,人的反应时间! http://files.cnblogs.com/files/rovedog/Lrc2Srt.2015.4.12.zip

IOS利用宏自定义输出(NSLog)

前言: 1)输出日志是会大量损耗系统性能 2)输出的信息很容易会被截取到,导致信息不安全. 所以我们会在发行版(Release)取消所有的Log.如果一行一行地去注释掉Log,显然不是一个明确的选择. 因此我们可以使用宏去自定义Log输出. 最简单的一个例子: #ifdef DEBUG # define GCLog(fmt, ...) NSLog((fmt), ##__VA_ARGS__); #else # define GCLog(...); #endif 使用方法: GCLog(@"sdf&

织梦图集图片在首页列表页调用并且自定义输出几张

不改动官方核心文件,在自定义方法文件里加入个方法来实现织梦图集图片在首页列表页调用并且自定义输出几张 效果展示 教程实现 打开 include\extend.func.php 在最下面加入这个方法 function Getimgurls($aid,$num=4) { global $dsql; $imgurls = $result = ''; $imgrow = $dsql->GetOne( "Select imgurls From `#@_addonimages` where aid='

常规功能和模块自定义系统(cfcmms)—006Extjs的目录结构和mvvm介绍

常规功能和模块自定义系统(cfcmms)-006Extjs的目录结构和mvvm介绍 在eclipse中打开cfcmms项目,展开后目录结构如下图所示,图中简单的注释了一下各个目录和文件的说明.由于extjs6默认使用的是mvvm架构,如果想要使用extjs的MVC,需要在app下加入controller目录,把所有的控制器都放在此目录之下即可. 打开浏览器,输入网址 http://localhost:1841/,则会显示此项目默认生成的示例.如下图所示. 系统的文件加载过程,在浏览器输入网址后,

自定义的常用文件与目录操作函数库

自定义的常用文件与目录操作函数库,在win和linux平台做了跨平台的处理.(跨平台的处理可以作为参考比较.在win下目录的符号可以是\或者/,但是在linux下只能是/.) 下面给出的是源文件,实现接口函数的代码.每个接口函数都有很详细的功能说明. /* 判断文件或目录是否存在 * 在操作系统中,目录也是一个文件,如果要判断一个目录是否存在则应当使用DirectoryExists, * 要判断一个文件是否存在且是一个归档文件则应当使用IsArchive. * @如果文件或目录存在则返回true

MapReduce的规约--&gt;自定义Combiner

wordCount例子 输入处理文件 hello me hello you 没有加入Combiner之前 设置combiner //加入Combiner //map产生的输出在这个Combiner运行 运行完成交给myreduce job.setCombinerClass(MyReducer.class); Combiner 位于map的reduce中间,会处理下数据 Combiner 位于map段的后面 ================流程==================== 原始 hel