Hadoop MapReduce输入输出类型

一、输入格式

  1、输入分片split

      一个分片对应一个map任务;

      一个分片包含一个表(整个文件)上的若干行,而一条记录(单行)对应一行;

      分片包含一个以字节为单位的长度 和 一组存储位置,分片不包含实际的数据;

      map处理时会用分片的大小来排序,优先处理最大的分片;

  hadoop中Java定义的分片为InputSplit抽象类:主要两个方法,涉及分片长度,分片起始位置

    public abstract class InputSplit{
         public abstract long getLength() throws IOException, InterruptedException;
         public abstract String[] getLocations() throws IOException, InterruptedException;
    }

  InputSplit不需要手动去处理它,它是由InputFormat生成;InputFormat负责产生输入分片并将它们分割成记录:

    public abstract class InputFormat<K, V> {
        public abstract List<InputSplit> getSplits( JobContext context) throws IOException, InterruptedException;
        public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
    }   

  InputFormat抽象类定义的两个方法:getSplits() 和 createRecordReader()

  运行作业的客户端会调用getSplits()来计算分片,然后将它们发送到jobtracker,jobtracker会使用其存储位置来调度map任务从而在tasktracker上来处理这个分片数据。在tasktracker上,map任务把输入分片传给InputFormat的getRecordReader()方法来获得这个分片的RecordReader。RecordReader就是一个集合迭代器,map任务用一个RecordReader来生成记录的键/值对,然后再传递给map函数。

  2、FileInputFormat类

    FileInputFormat类是所有指定数据源实现类的基类,它本身主要有两个功能:a. 指定输入文件位置;b. 输入文件生成分片的实现代码段,具体实现由子类完成;

    继承图:

        

    设置输入文件位置:

      FileInputFormat.addInputPath(job, new Path("hdfs://fileClusters:9000/wordcount.txt"));

      或 FileInputFormat.setInputPaths(job, new Path("hdfs://fileClusters:9000/wordcount.txt"));      

      可添加文件过滤器, FileInputFormat 中静态方法:

        public static void setInputPathFilter(Job job, Class<? extends PathFilter> filter)

        文件添加时,默认就会有一个过滤器,过滤掉"." 和 "_"开头的文件,会过滤掉隐藏文件;自定义的过滤器也是在默认过滤的基础上过滤;

    切分的分片大小:

        一个split的大小计算:max( minimumSize, min( maximumSize, blockSize ));

        minimumSize默认为1,maximumSize默认为Long.MAX_VALUE;

        所以通常 blockSize 在 minimumSize和maximumSize之间,所以一般分片大小就是块大小。

    设置不切分文件:

        两种方法:

          a. 设置minimumSize的大小为Long.MAX_VALUE;

          b. 在实现FileInputFormat的子类时,重写isSplitable()方法返回为false;

     

    在mapper中获取文件分片信息:

        在mapper中可以获取当前处理的分片的信息,可通过context.getInputSplit()方法来获取一个split;当输入的格式源于FileInputFormat时,该方法返回的InputSplit可以被强制转换化一个FileSplit(继承自InputSplit),可调用如下信息:

           a. getPath()  Path/String  文件的路径

           b. getStart() long

             c. getLength() long

    

     自定义一个输入格式,把整个文件作为一条记录: 

// Example 7-2. An InputFormat for reading a whole file as a record
class WholeFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }

    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        WholeFileRecordReader reader = new WholeFileRecordReader();
        reader.initialize(split, context);
        return reader;
    }
}

//主要是实现RecordReader类
class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
    private FileSplit fileSplit;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        this.fileSplit = (FileSplit) split;
        this.conf = context.getConfiguration();
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed) {
            byte[] contents = new byte[(int) fileSplit.getLength()];
            Path file = fileSplit.getPath();
            FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try {
                in = fs.open(file);
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            } finally {
                IOUtils.closeStream(in);
            }
            processed = true;
            return true;
        }
        return false;
    }

    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException {
        return processed ? 1.0f : 0.0f;
    }

    @Override
    public void close() throws IOException {
        // do nothing }
    }
}

       整个文件作为一条记录的应用,把多个小文件合并为一个大文件:

public class SmallFilesToSequenceFileConverter extends Configured implements Tool {
    static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
        private Text filenameKey;

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            InputSplit split = context.getInputSplit();
            Path path = ((FileSplit) split).getPath();
            filenameKey = new Text(path.toString());
        }

        @Override
        protected void map(NullWritable key, BytesWritable value, Context context)
                throws IOException, InterruptedException {
            context.write(filenameKey, value);
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
        if (job == null) {
            return -1;
        }
        job.setInputFormatClass(WholeFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);
        job.setMapperClass(SequenceFileMapper.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(), args);
        System.exit(exitCode);
    }
}

     文本输入:

        a. TextInputFormat  行首偏移量:行内容

        b. KeyValueTextInputFormat  以tab划分一行的key value

        c. NLineInputFormat  让每个map收到定义的相同行数,每个分片只包含N行

     二进制输入:

        Hadoop的MapReduce不只是可以处理文本信息,还可以处理二进制格式,通过会用以下几个类:

          SequenceFileInputFormat,处理SequenceFile 和 MapFile的文件类型;

          SequenceFileAsTextInputFormat 是 SequenceFileInputFormat的扩展,它将SequenceFile的键值转换为Text对象,这个转化是通过键和值上调用toString()方法实现。

          SequenceFileAsBinaryInputFormat 也是SequenceFileInputFormat的扩展,它将SequenceFile的键值作为二进制对象。它们被封装为BytesWritable对象,因而可以任意解释这些字节数组。

      多输入MultipleInputs:

        它可为每条输入路径指定InputForamt 和 Mapper:       

MutipleInputs.addInputPath(job , ncdcInputPath, TextInputFormat.class, MaxTemperatureMapper.class);
MutipleInputs.addInputPath(job ,metofficeInputPath, TextInputFormat.class, MetofficeMaxTemperatureMapper.class);

//MutipleInputs还有一个重载,当只用一个Mapper时
public static void addInputPath(Job job, Path path, class<? extends InputFormat> inputFormatClass);  

        它取代了FileInputFormat.addInputPath() 和 job.setMapperClass()的调用。

二、输出格式

继承图:  

    

  文体输出TextOutputFormat:

  默认的输出是文本输出TextOutputFormat,它把每条记录写为文本行,它调用toString()方法把key value转化为字符串。

  与之对应的输入为KeyValueTextInputFormat;

  二进制输出:与输入对应。

多输出:

  默认一个reducer生成一个输出文件,命名为part-r-00000,part-r-00001等等;

  有时需要对输出的文件名进行控制 或 让每个redeucer输出多个文件,可利用 MultipleOutputFormat 类;

  范例:按气象站来区分气象数据,各个气象站输出到不同的文件中:

    方法一:可利用每个reducer创建一个输出文件的特点,通过设置多个分区,来输出到各个文件,这样做有两点不好:

          a. 分区个数必须预先就知道;可能有空reducer,可能有的获取不到气象站信息导致值丢失;

          b. 每个reducer处理一个气象站,可能需要过多的reducer,也会有严重的数据倾斜问题;

    方法二:使用 MutipleOutputs 类:

public class PartitionByStationUsingMultipleOutputs extends Configured implements Tool {
    static class StationMapper extends Mapper<LongWritable, Text, Text, Text> {
        private NcdcRecordParser parser = new NcdcRecordParser();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            parser.parse(value);
            context.write(new Text(parser.getStationId()), value);
        }
    }

    static class MultipleOutputsReducer extends Reducer<Text, Text, NullWritable, Text> {
        private MultipleOutputs<NullWritable, Text> multipleOutputs;

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
        }

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            for (Text value : values) {
                multipleOutputs.write(NullWritable.get(), value, key.toString());
            }
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            multipleOutputs.close();
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
        if (job == null) {
            return -1;
        }
        job.setMapperClass(StationMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setReducerClass(MultipleOutputsReducer.class);
        job.setOutputKeyClass(NullWritable.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new PartitionByStationUsingMultipleOutputs(), args);
        System.exit(exitCode);
    }
}

        输出文件结果如下:          

          output/010010-99999-r-00027

          output/010050-99999-r-00013

          output/010100-99999-r-00015

          output/010280-99999-r-00014

  

时间: 2024-08-10 01:48:44

Hadoop MapReduce输入输出类型的相关文章

MapReduce输入输出类型、格式及实例

输入格式 1.输入分片与记录 2.文件输入 3.文本输入 4.二进制输入 5.多文件输入 6.数据库格式输入 1.输入分片与记录 1.JobClient通过指定的输入文件的格式来生成数据分片InputSplit. 2.一个分片不是数据本身,而是可分片数据的引用. 3.InputFormat接口负责生成分片. InputFormat 负责处理MR的输入部分,有三个作用: 验证作业的输入是否规范. 把输入文件切分成InputSplit. 提供RecordReader 的实现类,把InputSplit

mapreduce 输入输出类型

默认的mapper是IdentityMapper,默认的reducer是IdentityReducer,它们将输入的键和值原封不动地写到输出中. 默认的partitioner是HashPartitinoer,它根据每条记录的键进行哈希操作来分区. 输入文件:文件是MapReduce任务的数据的初始存储地.正常情况下,输入文件一般是存在HDFS里.这些文件的格式可以是任意的:我们可以使用基于行的日志文件,也可以使用二进制格式,多行输入记录或其它一些格式.这些文件会很大—数十G或更大. 小文件与Co

Hadoop MapReduce(WordCount) Java编程

编写WordCount程序数据如下: hello beijing hello shanghai hello chongqing hello tianjin hello guangzhou hello shenzhen ... 1.WCMapper: package com.hadoop.testHadoop; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop

Hadoop MapReduce原理及实例

MapReduce是用于数据处理的一种编程模型,简单但足够强大,专门为并行处理大数据而设计. 1. 通俗理解MapReduce MapReduce的处理过程分为两个步骤:map和reduce.每个阶段的输入输出都是key-value的形式,key和value的类型可以自行指定.map阶段对切分好的数据进行并行处理,处理结果传输给reduce,由reduce函数完成最后的汇总. 例如从大量历史数据中找出往年最高气温,NCDC公开了过去每一年的所有气温等天气数据的检测,每一行记录一条观测记录,格式如

hadoop MapReduce实例解析

1.MapReduce理论简介 1.1 MapReduce编程模型 MapReduce采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果.简单地说,MapReduce就是"任务的分解与结果的汇总". 在Hadoop中,用于执行MapReduce任务的机器角色有两个:一个是JobTracker:另一个是TaskTracker,JobTracker是用于调度工作的,TaskTracke

Hadoop MapReduce开发最佳实践(上篇)

body{ font-family: "Microsoft YaHei UI","Microsoft YaHei",SimSun,"Segoe UI",Tahoma,Helvetica,Sans-Serif,"Microsoft YaHei", Georgia,Helvetica,Arial,sans-serif,宋体, PMingLiU,serif; font-size: 10.5pt; line-height: 1.5;}

Hadoop 4、Hadoop MapReduce的工作原理

一.MapReduce的概念 MapReduce是hadoop的核心组件之一,hadoop要分布式包括两部分,一是分布式文件系统hdfs,一部是分布式计算框就是mapreduce,两者缺一不可,也就是说,可以通过mapreduce很容易在hadoop平台上进行分布式的计算编程. 1.MapReduce编程模型 MapReduce采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果.简单地说,MapRed

Hadoop MapReduce工作原理

在学习Hadoop,慢慢的从使用到原理,逐层的深入吧 第一部分:MapReduce工作原理 MapReduce 角色 ?Client :作业提交发起者. ?JobTracker: 初始化作业,分配作业,与TaskTracker通信,协调整个作业. ?TaskTracker:保持JobTracker通信,在分配的数据片段上执行MapReduce任务. 提交作业 ?在作业提交之前,需要对作业进行配置 ?程序代码,主要是自己书写的MapReduce程序. ?输入输出路径 ?其他配置,如输出压缩等. ?

Hadoop MapReduce编程 API入门系列之wordcount版本5(九)

这篇博客,给大家,体会不一样的版本编程. 代码 package zhouls.bigdata.myMapReduce.wordcount1; import java.io.IOException; import org.apache.commons.lang.StringUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce