MapReduce输入输出类型、格式及实例

输入格式

1、输入分片与记录

2、文件输入

3、文本输入

4、二进制输入

5、多文件输入

6、数据库格式输入

1、输入分片与记录

1、JobClient通过指定的输入文件的格式来生成数据分片InputSplit。

2、一个分片不是数据本身,而是可分片数据的引用

3、InputFormat接口负责生成分片。

InputFormat 负责处理MR的输入部分,有三个作用:

验证作业的输入是否规范。

把输入文件切分成InputSplit。

提供RecordReader 的实现类,把InputSplit读到Mapper中进行处理。

2、文件输入

抽象类:FilelnputFormat

1、FilelnputFormat是所有使用文件作为数据源的InputFormat实现的基类。

2、FilelnputFormat输入数据格式的分片大小由数据块大小决定

FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。

package org.apache.hadoop.mapreduce.lib.input;
public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
  protected long computeSplitSize(long blockSize, long minSize,long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
  }

  /*Generate the list of files and make them into FileSplits.*/
  public List<InputSplit> getSplits(JobContext job) throws IOException {
     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
     long maxSize = getMaxSplitSize(job);
     ......
     long blockSize = file.getBlockSize();
     long splitSize = computeSplitSize(blockSize, minSize, maxSize);
     ......
  }
  /*Get the minimum split size*/
  public static long getMinSplitSize(JobContext job) {
    return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
  }

  /*Get the maximum split size.*/
  public static long getMaxSplitSize(JobContext context) {
    return context.getConfiguration().getLong(SPLIT_MAXSIZE,Long.MAX_VALUE);
  }

    //是否分片
    /*
    Is the given filename splitable? Usually, true, but if the file is stream compressed, it will not be.
    <code>FileInputFormat</code> implementations can override this and return <code>false</code> to ensure that individual input files are never split-up so that {@link Mapper}s process entire files.
    */
    protected boolean isSplitable(JobContext context, Path filename) {
    return true;//默认需要分片
  }

}
自定义输入格式

如果我们不需要分片,那我们就需要对isSplitable方法进行重写

1、继承FileInputFormat基类。

2、重写里面的getSplits(JobContext context)方法。

3、重写createRecordReader(InputSplit split,TaskAttemptContext context)方法。

InputSplit

在执行mapreduce之前,原始数据被分割成若干split,每个split作为一个map任务的输入,在map执行过程中split会被分解成一个个记录(key-value对), map会依次处理每一个记录。

FileInputFormat只划分比HDFS block大的文件,所以FileInputFormat划分

的结果是这个文件或者是这个文件中的一部分。

如果一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件

的效率要比处理很多小文件的效率高的原因。

当Hadoop处理很多小文件(文件大小小于hdfs block大小)的时候,由于

FileInputFormat不会对小文件进行划分,所以每一个小文件都会被当做一个split并分配一个map任务,导致效率底下。

例如:一个1G的文件,会被划分成16个64MB的split,并分配16个map任务处

理,而10000个100kb的文件会被10000个map任务处理。

Map任务的数量?

一个InputSplit对应一个Map task。

InputSplit的大小是由Math.max(minSize,Math.min(maxSize, blockSize))决定。

单节点一般10-100个map task。map task执行时长不建议低于1 分钟,否

则效率低。

抽象类:CombineFilelnputFormat

1、可以使用CombineFilelnputFormat来合并小文件。

2、因为CombineFilelnputFormat是一个抽象类,使用的时候需要创建一个

CombineFilelnputFormat的实体类,并且实现getRecordReader()的方法。

3、避免文件分法的方法:

A、数据块大小尽可能大,这样使文件的大小小于数据块的大小,就不用进行分片。(这种方式不太友好)

B、继承FilelnputFormat,并且重写isSplitable()方法。

job.setInputFormatClass(CombineTextInputFormat.class);

Hadoop2.6.0 CombineTextInputFormat源码:

package org.apache.hadoop.mapreduce.lib.input;
/* Input format that is a <code>CombineFileInputFormat</code>-equivalent for <code>TextInputFormat</code>.*/
public class CombineTextInputFormat
  extends CombineFileInputFormat<LongWritable,Text> {

  public RecordReader<LongWritable,Text> createRecordReader(InputSplit split,
    TaskAttemptContext context) throws IOException {
    return new CombineFileRecordReader<LongWritable,Text>(
      (CombineFileSplit)split, context, TextRecordReaderWrapper.class);
  }

  /*A record reader that may be passed to <code>CombineFileRecordReader</code> so that it can be used in a <code>CombineFileInputFormat</code>-equivalent for <code>TextInputFormat</code>.*/
  private static class TextRecordReaderWrapper
    extends CombineFileRecordReaderWrapper<LongWritable,Text> {
    // this constructor signature is required by CombineFileRecordReader
    public TextRecordReaderWrapper(CombineFileSplit split,
      TaskAttemptContext context, Integer idx)
      throws IOException, InterruptedException {
      super(new TextInputFormat(), split, context, idx);
    }
  }
}

3、文本输入

类名:TextlnputFormat

1、TextlnputFormat是默认的lnputFormat,每一行数据就是一条记录

2、TextlnputFormat的key是LongWritable类型的,存储该行在整个文件的偏移量,value是每行的数据内容,Text类型。

3、输入分片与HDFS数据块关系:TextlnputFormat每一条记录就是一行,很有可能某一行跨数据块存放。默认以\n或回车键作为一行记录。

4、TextInputFormat继承了FileInputFormat。

类名:KeyValueTextInputFormat

可以通过设置key为行号的方式来知道记录的行号,并且可以通过key.value.separator.in.input设置key与value的分割符。

当输入数据的每一行是两列,并用tab分离的形式的时候,KeyValueTextInputformat处理这种格式的文件非常适合。

如果行中有分隔符,那么分隔符前面的作为key,后面的作为value;如果行中没有分隔符,那么整行作为key,value为空。

job.setInputFormatClass(KeyValueTextInputFormat.class);
//默认分隔符就是制表符
//conf.setStrings(KeyValueLineRecordReader.KEY_VALUE
_SEPERATOR, "\t")

类名:NLineInputFormat

可以设置每个mapper处理的行数,可以通过mapred.line.input.format.lienspermap属性设置。

NLineInputformat可以控制在每个split中数据的行数

//设置具体输入处理类
job.setInputFormatClass(NLineInputFormat.class);
//设置每个split的行数
NLineInputFormat.setNumLinesPerSplit(job, Integer.parseInt(args[2]));

4、二进制输入

输入类:

SequenceFileInputFormat 将key和value以sequencefile格式输入。
SequenceFileAsTextInputFormat
SequenceFileAsBinaryInputFormat 将key和value以原始二进制的格式输入。

由于SequenceFile能够支持Splittable,所以能够作为mapreduce输入文件的格式,能够很方便的得到己经含有<key,value>的分片。

SequenceFile处理、压缩处理。

5、多文件输入

类名:MultipleInputs

1、MultipleInputs能够提供多个输入数据类型。

2、通过addInputPath()方法来设置多路径。

6、数据库格式输入

类名:DBInputFormat

1、DBInputFormat是一个使用JDBC方式连接数据库,并且从关系型数据库中读取数据的一种输入格式。

2、有多个map会去连接数据库,有可能造成数据库崩溃,因此,避免过多的数据库连接。

3、HBase中的TablelnputFormat可以让MapReduce程序访问HBase表里的数据。

实例单输入路径

[root@master liguodong]# hdfs dfs -cat /input.txt
hello you
hello everybody
hello hadoop
[root@master liguodong]# hdfs dfs -text /tmp.seq
15/06/10 21:17:11 INFO bzip2.Bzip2Factory: Successfully loaded & initialized native-bzip2 library system-native
15/06/10 21:17:11 INFO compress.CodecPool: Got brand-new decompressor [.bz2]
100     apache software
99      chinese good
98      james NBA
97      index pass
96      apache software
95      chinese good
94      james NBA
93      index pass
......
package mrinputformat;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TestInputFormat {

    public static class TokenizerMapper
       extends Mapper<IntWritable, Text, Text, IntWritable>{

        private final static IntWritable one = new IntWritable(1);//1
        private Text word = new Text();

        public void map(IntWritable key, Text value, Context context
                        ) throws IOException, InterruptedException
        {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                //k v
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                       Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values)
            {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        //1、配置
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count"); 

        //2、打包运行必须执行的方法
        job.setJarByClass(TestInputFormat.class);

        //3、输入路径
        //hdfs://master:8020/tmp.seq
        //hdfs://master:8020/output
        FileInputFormat.addInputPath(job, new Path(args[0]));
        //默认是TextInputFormat
        job.setInputFormatClass(SequenceFileInputFormat.class);

        //4、Map
        job.setMapperClass(TokenizerMapper.class);

        //5、Combiner
        job.setCombinerClass(IntSumReducer.class);

        //6、Reducer
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);

        //7、 输出路径
        FileOutputFormat.setOutputPath(job, new Path(args[1])); 

        //8、提交作业
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

运行结果:

多输入路径方式

package mrinputformat;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TestInputFormat {

    //采用TextInputFormat
    public static class Mapper1
       extends Mapper<LongWritable, Text, Text, IntWritable>{

        private final static IntWritable one = new IntWritable(1);//1
        private Text word = new Text();

        public void map(LongWritable key, Text value, Context context
                        ) throws IOException, InterruptedException
        {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                //k v
                context.write(word, one);
            }
        }
    }

    //SequenceFileInputFormat
    public static class Mapper2
       extends Mapper<IntWritable, Text, Text, IntWritable>{

        private final static IntWritable one = new IntWritable(1);//1
        private Text word = new Text();

        public void map(IntWritable key, Text value, Context context
                        ) throws IOException, InterruptedException
        {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                //k v
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                       Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values)
            {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        //1、配置
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count"); 

        //2、打包运行必须执行的方法
        job.setJarByClass(TestInputFormat.class);

        //3、输入路径
        //hdfs://master:8020/tmp.seq
        //hdfs://master:8020/output
        //单个输入路径
        //FileInputFormat.addInputPath(job, new Path(args[0]));
        //默认是TextInputFormat
        //job.setInputFormatClass(SequenceFileInputFormat.class);
        //4、Map
        //job.setMapperClass(TokenizerMapper.class);

        //多个输入路径
        Path path1 = new Path("hdfs://master:8020/input.txt");
        Path path2 = new Path("hdfs://master:8020/tmp.seq");
        MultipleInputs.addInputPath(job, path1, TextInputFormat.class,Mapper1.class);
        MultipleInputs.addInputPath(job, path2, SequenceFileInputFormat.class,Mapper2.class);

        //5、Combiner
        job.setCombinerClass(IntSumReducer.class);

        //6、Reducer
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //7、 输出路径
        FileOutputFormat.setOutputPath(job, new Path("hdfs://master:8020/output"));

        //8、提交作业
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

运行结果:

输出格式

文本输出

TextOutputFormat

默认的输出格式,key是LongWritable,value是Text类型, key和value中间值用tab隔开的。

二进制输出

SequenceFileOutputFormat

将key和value以sequencefile格式输出。

SequenceFileAsBinaryOutputFormat

将key和value以原始二进制的格式输出。

MapFileOutputFormat

将key和value写入MapFile中。由于MapFile中的key是有序的,所以写入的时候必须保证记录是按key值顺序写入的。

多文件输出

MultipleOutputFormat
MultipleOutputs

默认情况下一个reducer会产生一个输出,但是有些时候我们想一个reducer产生多个输出, MultipleOutputFormat和MultipleOutputs可以实现这个功能。

区别:MultipleOutputs可以产生不同类型的输出。

数据库格式输出

DBOutputFormat

时间: 2024-10-20 03:31:46

MapReduce输入输出类型、格式及实例的相关文章

mapreduce 输入输出类型

默认的mapper是IdentityMapper,默认的reducer是IdentityReducer,它们将输入的键和值原封不动地写到输出中. 默认的partitioner是HashPartitinoer,它根据每条记录的键进行哈希操作来分区. 输入文件:文件是MapReduce任务的数据的初始存储地.正常情况下,输入文件一般是存在HDFS里.这些文件的格式可以是任意的:我们可以使用基于行的日志文件,也可以使用二进制格式,多行输入记录或其它一些格式.这些文件会很大—数十G或更大. 小文件与Co

Hadoop MapReduce输入输出类型

一.输入格式 1.输入分片split 一个分片对应一个map任务: 一个分片包含一个表(整个文件)上的若干行,而一条记录(单行)对应一行: 分片包含一个以字节为单位的长度 和 一组存储位置,分片不包含实际的数据: map处理时会用分片的大小来排序,优先处理最大的分片: hadoop中Java定义的分片为InputSplit抽象类:主要两个方法,涉及分片长度,分片起始位置 public abstract class InputSplit{ public abstract long getLengt

jQuery打印json格式对象实例代码

jQuery打印json格式对象实例代码:所谓的json格式对象其实就是对象直接量,很多教程多说这是json对象,其实这是不正确.下面是一段打印json格式对象的实例代码,希望能够对初学者有所帮助.代码如下: <!DOCTYPE html> <html> <head> <meta charset=" utf-8"> <meta name="author" content="http://www.soft

将表单元素转换为json格式对象实例代码

将表单元素转换为json格式对象实例代码:在实际引用中,有可能需要将表单元素转换为json格式对象,也就是对象直接量以便于处理,下面就是一段这样的实例代码.代码实例如下: <script type="text/javascript"> (function($){ $.fn.serializeObject=function(){ var inputs=$(this).find("input,textarea,select"); var o={}; $.ea

字符串转成日期类型(格式 MM/dd/YYYY MM-dd-YYYY YYYY/MM/dd YYYY-MM-dd)

//+---------------------------------------------------  //| 字符串转成日期类型   //| 格式 MM/dd/YYYY MM-dd-YYYY YYYY/MM/dd YYYY-MM-dd  //+---------------------------------------------------  function StringToDate(DateStr)  {         var converted = Date.parse(D

SpringMVC返回Json,自定义Json中Date类型格式

http://www.cnblogs.com/jsczljh/p/3654636.html ———————————————————————————————————————————————————————————— SpringMVC返回Json,自定义Json中Date类型格式 SpringMVC返回Json数据依赖jackson这个开源的第三方类库. 若不加任何说明情况下Date类型将以时间戳的形式转换为Json并返回. jackson提供了一些自定义格式的方法.我们只需继承它的抽象类Json

类型构造器与实例构造器

比较实例构造器和类型构造器的区别,类型构造器必须是static,并且不允许定义访问修饰符,类型构造器只能被执行一次.: class mybaseclass { public mybaseclass() { Console.WriteLine("基类实例构造器"); } static mybaseclass()//类型构造器不允许出现访问修饰符 { Console.WriteLine("基类类型构造器"); } public void test()//这里如果是pri

字符串格式表示实例

# 字符串格式设置实例 # 根据指定的宽度打印格式良好的价格列表 width = int(input('Please enter width:')) price_width = 10 item_width = width - price_width header_fmt = '{{:{}}}{{:{}}}'.format(item_width, price_width) fmt = '{{:{}}}{{:>{}.2f}}'.format(item_width, price_width) prin

MapReduce的类型与格式

输入格式 输入分片与记录 之前讨论过,输入数据的每个分片对应一个map任务来处理 在MapReduce中输入分片被表示为InputSplit类,原型如下: public abstract class InputSplit{ //该分片的长度,用于排序分片,有限处理大分片 public abstract long getLength() throw IOException,InterruptedException; //该分片所在的存储位置(主机),不直接存储数据,而是指向数据的引用 public