大数据-Hadoop生态(20)-MapReduce框架原理-OutputFormat

1.outputFormat接口实现类

2.自定义outputFormat

步骤:

1). 定义一个类继承FileOutputFormat

2). 定义一个类继承RecordWrite,重写write方法

3. 案例

有一个log文件,将包含nty的输出到nty.log文件,其他的输出到other.log

http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.nty.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com

自定义类继承FileOutputFormat

package com.nty.outputFormat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * author nty
 * date time 2018-12-12 19:28
 */
public class FilterOutputFormat extends FileOutputFormat<LongWritable, Text> {

    @Override
    public RecordWriter<LongWritable, Text> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        FilterRecordWrite frw = new FilterRecordWrite();
        frw.init(job);
        return frw;
    }
}

自定义RecordWriter,重写write

package com.nty.outputFormat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * author nty
 * date time 2018-12-12 19:29
 */
public class FilterRecordWrite extends RecordWriter<LongWritable, Text> {

    private FSDataOutputStream nty;

    private FSDataOutputStream other;

    //将job通过参数传递过来
    public void init(TaskAttemptContext job) throws IOException {

        String outDir = job.getConfiguration().get(FileOutputFormat.OUTDIR);

        FileSystem fileSystem = FileSystem.get(job.getConfiguration());

        nty = fileSystem.create(new Path(outDir + "/nty.log"));
        other = fileSystem.create(new Path(outDir + "/other.log"));

    }

    @Override
    public void write(LongWritable key, Text value) throws IOException, InterruptedException {
        String address = value.toString() + "\r\n";

        if(address.contains("nty")) {
            nty.write(address.getBytes());
        } else {
            other.write(address.getBytes());
        }

    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        //关流
        IOUtils.closeStream(nty);
        IOUtils.closeStream(other);
    }
}

Driver类设置

package com.nty.outputFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * author nty
 * date time 2018-12-12 19:29
 */
public class FilterDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(FilterDriver.class);

        job.setOutputFormatClass(FilterOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path("d:\\Hadoop_test"));
        FileOutputFormat.setOutputPath(job, new Path("d:\\Hadoop_test_out"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

输出结果

原文地址:https://www.cnblogs.com/duoduotouhenying/p/10110609.html

时间: 2024-10-09 06:06:06

大数据-Hadoop生态(20)-MapReduce框架原理-OutputFormat的相关文章

大数据-Hadoop生态(19)-MapReduce框架原理-Combiner合并

1. Combiner概述 2. 自定义Combiner实现步骤 1). 定义一个Combiner继承Reducer,重写reduce方法 public class WordcountCombiner extends Reducer<Text, IntWritable, Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context)

大数据-Hadoop生态(4)-Hadoop集群搭建

准备工作 开启全部三台虚拟机,确保hadoop100的机器已经配置完成 脚本 操作hadoop100 新建一个xsync的脚本文件,将下面的脚本复制进去 vim xsync #这个脚本使用的是rsync命令而不是scp命令,是同步而非覆盖文件,所以仅仅会同步过去修改的文件.但是rsync并不是一个原生的Linux命令,需要手动安装.如果没有,请自行安装 #!/bin/bash #1 获取输入参数个数,如果没有参数,直接退出 pcount=$# if ((pcount==0)); then ech

大数据-Hadoop生态(12)-Hadoop序列化和源码追踪

1.什么是序列化 2.为什么要序列化 3.为什么不用Java的序列化 4.自定义bean对象实现序列化接口(Writable) 在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口. 具体实现bean对象序列化步骤如下7步: 1) 必须实现Writable接口 2) 反序列话时,需要反射调用无参构造方法,所以必须要有无参构造方法 3) 重写序列化方法write() 4) 重写反序列化方法readFields() 5)

大数据-Hadoop生态(5)-HDFS概述

HDFS产生背景 HDFS优缺点 HDFS组成架构 HDFS文件块大小 原文地址:https://www.cnblogs.com/duoduotouhenying/p/10084446.html

王家林的云计算分布式大数据Hadoop征服之旅:HDFS&amp;MapReduce&amp;HBase&amp;Hive&amp;集群管理

一:课程简介: 作为云计算实现规范和实施标准的Hadoop恰逢其时的应运而生,使用Hadoop用户可以在不了解分布式底层细节的情况下开发出分布式程序,从而可以使用众多廉价的计算设备的集群的威力来高速的运算和存储,而且Hadoop的运算和存储是可靠的.高效,的.可伸缩的,能够使用普通的社区服务器出来PB级别的数据,是分布式大数据处理的存储的理想选择. 本课程会助你深入浅出的掌握Hadoop开发(包括HDFS.MapReduce.HBase.Hive等),并且在此基础上掌握Hadoop集群的配置.维

成都大数据Hadoop与Spark技术培训班

成都大数据Hadoop与Spark技术培训班 中国信息化培训中心特推出了大数据技术架构及应用实战课程培训班,通过专业的大数据Hadoop与Spark技术架构体系与业界真实案例来全面提升大数据工程师.开发设计人员的工作水平,旨在培养专业的大数据Hadoop与Spark技术架构专家,更好地服务于各个行业的大数据项目开发和落地实施. 2015年近期公开课安排:(全国巡回开班) 08月21日——08月23日大连 09月23日——09月25日北京 10月16日——10月18日成都 11月27日——11月2

大数据hadoop领域技术总体介绍(各个组件的作用)

2019/2/16 星期六 大数据领域技术总体介绍(各个组件的作用)1.大数据技术介绍大数据技术生态体系:Hadoop 元老级分布式海量数据存储.处理技术系统,擅长离线数据分析Hbase 基于hadoop 的分布式海量数据库,离线分析和在线业务通吃Hive sql 基于hadoop 的数据仓库工具,使用方便,功能丰富,使用方法类似SQLZookeeper 集群协调服务Sqoop 数据导入导出工具Flume 数据采集框架 //经常会结合kafka+flume数据流 或者用于大量的日志收集到hdfs

王家林的云计算分布式大数据Hadoop企业级开发动手实践

一:课程简介: Hadoop是云计算分布式大数据的事实标准软件框架,Hadoop中的架构实现是整个云计算产业技术的基础,作为与Google三大核心技术DFS.MapReduce.BigTable相对的HDFS.MapReduce.和HBase也是整个Hadoop生态系统的核心的技术,本课程致力于帮您掌握这三大技术的同时掌握云计算的数据仓库挖掘技术Hive,助您在云计算技术时代自由翱翔. 二:课程特色 1,      深入浅出中动手实作: 2,      掌握Hadoop三大核心:HDFS.Map

大数据Hadoop最佳实践(V3)

一:课程简介: Hadoop是当下云计算大数据的王者. Hadoop不仅是一个大数据的计算框架,同时也是大数据的存储平台. 使用Hadoop,用户可以在不了解分布式底层细节的情况下开发出分布式程序,从而可以使用众多廉价的计算设备的集群的威力来高速的运算和存储,而且Hadoop的运算和存储是可靠的.高效的.可伸缩的,能够使用普通的社区服务器出来PB级别的数据,是分布式大数据处理的存储的理想选择 使用Hadoop可以主要完成: 1,构建离线处理平台,完成海量离线数据的存储分析,相对于传统的关系型数据