一、MapReduce简介
1.1 MapReduce概述
MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题。MR由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,其执行流程如图1。这两个函数的形参是key、value对,表示函数的输入信息。
图 1
1.2 Map和Reduce编程模型
在Hadoop 中, map 函数位于内置类org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN, KEYOUT, VALUEOUT>中,reduce 函数位于内置类org.apache.hadoop. mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>中。我们要做的就是覆盖map 函数和reduce 函数。对于Hadoop 的map 函数和reduce 函数,处理的数据是键值对,也就是说map 函数接收的数据是键值对,两个参数;输出的也是键值对,两个参数;reduce 函数接收的参数和输出的结果也是键值对。
1.2.1 Mapper类
现在看一下Mapper 类,有四个泛型,分别是KEYIN、VALUEIN、KEYOUT、VALUEOUT,前面两个KEYIN、VALUEIN 指的是map 函数输入的参数key、value 的类型;后面两个KEYOUT、VALUEOUT 指的是map 函数输出的key、value 的类型。map 函数定义如下代码1.1。
1 protected void map(KEYIN key, VALUEIN value, 2 Context context) throws IOException, InterruptedException { 3 context.write((KEYOUT) key, (VALUEOUT) value); 4 }
代码 1.1
在上面的代码中,输入参数key、value 的类型就是KEYIN、VALUEIN,每一个键值对都会调用一次map 函数。在这里,map 函数没有处理输入的key、value,直接通过context.write(…)方法输出了,输出的key、value 的类型就是KEYOUT、VALUEOUT。这是默认实现,通常是需要我们根据业务逻辑覆盖的。
1.2.2 Reducer类
再看一下Reducer 类,也有四个泛型,同理,分别指的是reduce 函数输入的key、value类型,和输出的key、value 类型。看一下reduce 函数定义,如下代码1.2。
1 protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context 2 ) throws IOException, InterruptedException { 3 for(VALUEIN value: values) { 4 context.write((KEYOUT) key, (VALUEOUT) value); 5 } 6 }
代码 1.2
在上面代码中,reduce 函数的形参key、value 的类型是KEYIN、VALUEIN。要注意这里的value 是存在于java.lang.Iterable<VALUEIN>中的,这是一个迭代器,用于集合遍历的,意味着values 是一个集合。reduce 函数默认实现是把每个value 和对应的key,通过调用context.write(…)输出了,这里输出的类型是KEYOUT、VALUEOUT。通常我们会根据业务逻辑覆盖reduce 函数的实现。
二、 MapReduce 执行原理
2.1 MapRduce执行流程
MapReduce 运行的时候,会通过Mapper 运行的任务读取HDFS 中的数据文件,然后调用自己的方法,处理数据,最后输出。Reducer 任务会接收Mapper 任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到HDFS 的文件中。整个流程如图3.1
图2.1
2.2 Mapper 任务的执行过程
每个Mapper 任务是一个java 进程,它会读取HDFS 中的文件,解析成很多的键值对,经过我们覆盖的map 方法处理后,转换为很多的键值对再输出。整个Mapper 任务的处理过程又可以分为以下几个阶段,如图3.2
图2.2
在图3.2中,把Mapper 任务的运行过程分为六个阶段。
第一阶段是把输入文件按照一定的标准分片(InputSplit),每个输入片的大小是固定的。默认情况下,输入片(InputSplit)的大小与数据块(Block)的大小是相同的。如果数据块(Block)的大小是默认值64MB,输入文件有两个,一个是32MB,一个是72MB。那么小的文件是一个输入片,大文件会分为两个数据块,那么是两个输入片。一共产生三个输入片。每一个输入片由一个Mapper 进程处理。这里的三个输入片,会有三个Mapper 进程处理。
第二阶段是对输入片中的记录按照一定的规则解析成键值对。有个默认规则是把每一行文本内容解析成键值对。“键”是每一行的起始位置(单位是字节),“值”是本行的文本内容。
第三阶段是调用Mapper 类中的map 方法。第二阶段中解析出来的每一个键值对,调用一次map 方法。如果有1000 个键值对,就会调用1000 次map 方法。每一次调用map 方法会输出零个或者多个键值对。map具体的工作做有我们自己来决定,我们要对map函数进行覆盖,封装我们要进行的操作来实现我们最终的目的。
第四阶段是按照一定的规则对第三阶段的每个Mapper任务输出的键值对进行分区。比较是基于键进行的。比如我们的键表示省份(如北京、上海、山东等),那么就可以按照不同省份进行分区,同一个省份的键值对划分到一个区中。默认是只有一个区。分区的数量就是Reducer 任务运行的数量。默认只有一个Reducer 任务。
第五阶段是对每个分区中的键值对进行排序。首先,按照键进行排序,对于键相同的键值对,按照值进行排序。比如三个键值对<2,2>、<1,3>、<2,1>,键和值分别是整数。那么排序后的结果是<1,3>、<2,1>、<2,2>。如果有第六阶段,那么进入第六阶段;如果没有,直接输出到本地的linux 文件中。
第六阶段是对数据进行归约处理,也就是reduce 处理。对于键相等的键值对才会调用一次reduce 方法。经过这一阶段,数据量会减少。归约后的数据输出到本地的linxu 文件中。本阶段默认是没有的,需要用户自己增加这一阶段的代码。
2.3 Reducer执行过程
每个Reducer 任务是一个java 进程。Reducer 任务接收Mapper 任务的输出,归约处理后写入到HDFS 中,可以分为如图2.3 所示的几个阶段
图2.3
在图3.2中,把Mapper 任务的运行过程分为四个阶段。
第一阶段是Reducer 任务会主动从Mapper 任务复制其输出的键值对。Mapper 任务可能会有很多,因此Reducer 会复制多个Mapper 的输出。
第二阶段是把复制到Reducer 本地数据,全部进行合并,即把分散的数据合并成一个大的数据。再对合并后的数据排序。
第三阶段是对排序后的键值对调用reduce 方法。键相等的键值对调用一次reduce 方法,每次调用会产生零个或者多个键值对。最后把这些输出的键值对写入到HDFS 文件中。
在整个MapReduce 程序的执行过程中如图2.4,我可以根据上面的讲解来分析下面MapReducer执行过程,从下图可知每个Mapper任务分了两个区,因此会有两个Reducer任务,最终产生两个HDFS副本。
图 2.4
2.4 键值对的编号
在对Mapper 任务、Reducer 任务的分析过程中,会看到很多阶段都出现了键值对,为避免混淆,所以我在这里对键值对进行编号,方便大家理解键值对的变化情况。如图2.5
图 2.5
在图2.5 中,对于Mapper 任务输入的键值对,定义为key1 和value1。在map 方法中处理后,输出的键值对,定义为key2 和value2。reduce 方法接收key2 和value2,处理后,输出key3 和value3。在下文讨论键值对时,可能把key1 和value1 简写为<k1,v1>,key2 和value2 简写为<k2,v2>,key3 和value3 简写为<k3,v3>。
2.5 举例:单词计数
该业务要求统计指定文件中的所有单词的出现次数。下面看一下源文件的内容为:
“hello you”
“hello me”
内容很简单,两行文本,每行的单词中间使用空格区分。
分析思路:最直观的想法是使用数据结构Map。解析文件中出现的每个单词,用单词作为key,出现次数作为value。这个思路没有问题,但是在大数据环境下就不行了。我们需要使用MapReduce 来做。根据Mapper 任务和Reducer任务的运行阶段,我们知道在Mapper任务的第二阶段是把文件的每一行转化成键值对,那么第三阶段的map 方法就能取得每一行文本内容,我们可以在map 方法统计本行文本中单词出现的次数,把每个单词的出现次数作为新的键值对输出。在Reducer 任务的第二阶段会对Mapper 任务输出的键值对按照键进行排序,键相等的键值对会调用一次reduce 方法。在这里,“键”就是单词,“值”就是出现次数。因此可以在reduce 方法中对单词的不同行中的所有出现次数相加,结果就是该单词的总的出现次数。最后把这个结果输出。
程序源码如下代码 2.1。
1 package counter; 2 3 import java.net.URI; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FileSystem; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Counter; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 19 20 public class WordCountApp { 21 static final String INPUT_PATH = "hdfs://hadoop:9000/input"; 22 static final String OUT_PATH = "hdfs://hadoop:9000/output"; 23 24 public static void main(String[] args) throws Exception { 25 Configuration conf = new Configuration(); 26 final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 27 final Path outPath = new Path(OUT_PATH); 28 if(fileSystem.exists(outPath)){ 29 fileSystem.delete(outPath, true); 30 } 31 final Job job = new Job(conf , WordCountApp.class.getSimpleName()); 32 FileInputFormat.setInputPaths(job, INPUT_PATH);//1.1指定读取的文件位于哪里 33 34 job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对 35 job.setMapperClass(MyMapper.class);//1.2 指定自定义的map类 36 job.setMapOutputKeyClass(Text.class);//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略 37 job.setMapOutputValueClass(LongWritable.class); 38 39 job.setPartitionerClass(HashPartitioner.class);//1.3 分区 40 job.setNumReduceTasks(1);//有一个reduce任务运行 41 //1.4 TODO 排序、分组 42 //1.5 TODO 规约 43 job.setReducerClass(MyReducer.class);//2.2 指定自定义reduce类 44 job.setOutputKeyClass(Text.class);//指定reduce的输出类型 45 job.setOutputValueClass(LongWritable.class);//2.3 指定写出到哪里 46 FileOutputFormat.setOutputPath(job, outPath);//指定输出文件的格式化类 47 48 job.setOutputFormatClass(TextOutputFormat.class); 49 50 job.waitForCompletion(true);//把job提交给JobTracker运行 51 } 52 53 /** 54 * KEYIN 即k1 表示行的偏移量 55 * VALUEIN 即v1 表示行文本内容 56 * KEYOUT 即k2 表示行中出现的单词 57 * VALUEOUT 即v2 表示行中出现的单词的次数,固定值1 58 */ 59 static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ 60 protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException { 61 62 final String line = v1.toString(); 63 final String[] splited = line.split(" "); 64 for (String word : splited) { 65 context.write(new Text(word), new LongWritable(1)); 66 } 67 }; 68 } 69 70 /** 71 * KEYIN 即k2 表示行中出现的单词 72 * VALUEIN 即v2 表示行中出现的单词的次数 73 * KEYOUT 即k3 表示文本中出现的不同单词 74 * VALUEOUT 即v3 表示文本中出现的不同单词的总次数 75 * 76 */ 77 static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ 78 protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException { 79 long times = 0L; 80 for (LongWritable count : v2s) { 81 times += count.get(); 82 } 83 ctx.write(k2, new LongWritable(times)); 84 }; 85 } 86 87 }
代码 2.1
2.5.1 如何覆盖map 方法
map 方法代码如下,代码2.2。
1 static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ 2 final Text key2 = new Text();//key2 表示该行中某一单词 3 final IntWritable value2 = new IntWritable(1);//value2 表示单词在该行中的出现次数 4 //key 表示文本行的起始位置 5 //value 表示文本行 6 protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException { 7 final String[] splited = value.toString().split(" "); 8 for (String word : splited) { 9 key2.set(word); 10 context.write(key2, value2); 11 } 12 }; 13 }
代码 2.2
上面代码中,注意Mapper 类的泛型不是java 的基本类型,而是Hadoop 的数据类型LongWritable、Text、IntWritable。读者可以简单的等价为java 的类long、String、int。下文会有专门讲解Hadoop 的数据类型。
代码中Mapper 类的泛型依次是<k1,v1,k2,v2>。map 方法的第二个形参是行文本内容,是我们关心的。核心代码是把行文本内容按照空格拆分,把每个单词作为新的键,数值1作为新的值,写入到上下文context 中。在这里,因为输出的是每个单词,所以出现次数是常量1。如果一行文本中包括两个hello,会输出两次<hello,1>。
2.5.2 如何覆盖reduce 方法
Reduce方法代码如下,代码2.3
1 /** 2 * key 表示单词 3 * values 表示map方法输出的1的集合 4 * context 上下文对象 5 */ 6 static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ 7 final IntWritable value3 = new IntWritable(0);//value3表示单词出现的总次数 8 protected void reduce(Text key, java.lang.Iterable<IntWritable> values,Context context) throws java.io.IOException ,InterruptedException { 9 int sum = 0; 10 for (IntWritable count : values) { 11 sum += count.get(); 12 } 13 final Text key3 = key; 14 value3.set(sum);//执行到这里,sum表示该单词出现的总次数 15 context.write(key3, value3);//key3表示单词,是最后输出的key,value3表示单词出现的总次数,是最后输出的value 16 }; 17 }
代码 2.3
上面代码中,Reducer 类的四个泛型依次是<k2,v2,k3,v3>,要注意reduce 方法的第二个参数是java.lang.Iterable 类型,迭代的是v2。也就是k2 相同的v2 都可以迭代出来。以上就是我们覆盖的map 方法和reduce 方法。现在要把我们的代码运行起来,需要写驱动代码,如下代码 2.4
1 /** 2 * 驱动代码 3 */ 4 public static void main(String[] args) throws IOException,InterruptedException, ClassNotFoundException { 5 final String INPUT_PATH = "hdfs://hadoop:9000/input";//输入路径 6 final String OUTPUT_PATH = "hdfs://hadoop:9000/output";//输出路径,必须是不存在的 7 final Job job = new Job(new Configuration(),"WordCountApp");//创建一个job对象,封装运行时需要的所有信息 8 job.setJarByClass(WordCountApp.class);//如果需要打成jar运行,需要下面这句 9 FileInputFormat.setInputPaths(job, INPUT_PATH);//告诉job执行作业时输入文件的路径 10 job.setInputFormatClass(TextInputFormat.class);//设置把输入文件处理成键值对的类 11 job.setMapperClass(MyMapper.class);//设置自定义的Mapper类 12 job.setMapOutputKeyClass(Text.class);//设置map方法输出的k2、v2的类型 13 job.setMapOutputValueClass(IntWritable.class); 14 job.setPartitionerClass(HashPartitioner.class);//设置对k2分区的类 15 job.setNumReduceTasks(1);//设置运行的Reducer任务的数量 16 job.setReducerClass(MyReducer.class);//设置自定义的Reducer类 17 job.setOutputKeyClass(Text.class);//设置reduce方法输出的k3、v3的类型 18 job.setOutputValueClass(IntWritable.class); 19 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));//告诉job执行作业时的输出路径 21 job.waitForCompletion(true);//让作业运行,直到运行结束,程序退出 22 }
代码 2.4
在以上代码中,我们创建了一个job 对象,这个对象封装了我们的任务,可以提交到Hadoop 独立运行。最后一句job.waitForCompletion(true),表示把job 对象提交给Hadoop 运行,直到作业运行结束后才可以。
以上代码的运行方式有两种,一种是在宿主机的eclipse 环境中运行,一种是打成jar包在linux 中运行。
第一种运行方式要求宿主机能够访问linux,并且对于输入路径和输出路径中的主机名hadoop , 要在宿主机的hosts 文件中有绑定,我的hosts 文件位于C:\WINDOWS\system32\drivers\etc 文件夹。
第二种运行方式,需要把代码打成jar 包,在linux 下执行命令hadoop jar xxx.jar 运行,运行结束后,文件路径在hdfs://hadoop0:9000/output/part-r-00000。我们看一下输出结果,如图2.6所示。
图2.6
三、Hadoop 的数据类型
3.1. 序列化
序列化是干什么用的?本质上讲,就是数据保存到java 虚拟机之外,然后又被读到java虚拟机内.如果仅仅是保存,不管是否能读进java 虚拟机的话,就不关心序列化问题了。正是
因为需要被读进java 虚拟机,所以必须识别写出、读入的格式、字符顺序等问题。因此序列化也就是比较重视的事情了。拿密码来打比方。序列化就像加密,反序列化就像解密。
Hadoop 作为分布式存储系统必然涉及到序列化问题。
3.2. 基本数据类型
在前面的例子中,我们看到Mapper、Reducer 类中都使用了Hadoop 自己的数据类型LongWritable、IntWritable、Text。这些数据类型都有一个共同的特点,就是实现了
org.apache.hadoop.io.Writable 接口。我们看一下这个接口的源码,如下代码3.1。
1 package org.apache.hadoop.io; 2 import java.io.DataOutput; 3 import java.io.DataInput; 4 import java.io.IOException; 5 6 public interface Writable { 7 void write(DataOutput out) throws IOException; 8 void readFields(DataInput in) throws IOException; 9 }
代码 3.1
从上面的代码中可以看到Writable 接口只有两个方法,一个是writer 方法,一个是readFields 方法。前者是把对象的属性序列化到DataOutput 中去,后者是从DataInput 把数据反序列化到对象的属性中。
java 中的基本类型有char、byte、boolean、short、int、float、double 共7 中基本类型,除了char,都有对应的Writable 类型。对于int 和long 除了IntWritable、LongWritable 外,还有对应的VintWritable、VlongWritable。除此类型之外,还有字符串类型Text、字节数组类型BytesWritable、空类型NullWritable、对象类型Object Writable。以上这些类型构成了mapreduce 运算的基本类型。这些类型都实现了接口WritableComparable,如下代码3.2。
1 package org.apache.hadoop.io; 2 public interface WritableComparable<T> extends Writable, Comparable<T> { 2 }
代码 3.2
从上面代码中可以看到, 这个接口仅仅多了Comparable 接口。实现java.lang.Comparable 接口的目的是为了调用equals 方法进行比较。
我们看一下LongWritable 类的源码,如下代码3.3
1 package org.apache.hadoop.io; 2 3 import java.io.*; 4 5 /** A WritableComparable for longs. */ 6 public class LongWritable implements WritableComparable { 7 private long value; 8 9 public LongWritable() {} 10 11 public LongWritable(long value) { set(value); } 12 13 /** Set the value of this LongWritable. */ 14 public void set(long value) { this.value = value; } 15 16 /** Return the value of this LongWritable. */ 17 public long get() { return value; } 18 19 public void readFields(DataInput in) throws IOException { 20 value = in.readLong(); 21 } 22 23 public void write(DataOutput out) throws IOException { 24 out.writeLong(value); 25 }
代码 3.3
从上面代码中可以看到,该类实现了WritableComparable 接口,内部有个long 类型的属性value,在readFields 方法中从in 中把long 类型的值读出来,赋给value,这是“反序列化”过程;在write 方法中把value 写入到out 中,这是“序列化”过程。
读者可以想一下:自己是否可以封装一个复杂的类型哪?
除了基本类型外,还有集合类型ArrayWritable、TwoDArrayWritable、MapWritable、SortedMapWritable。
3.3 集合数据类型
上传文件时,如果文件的size小于block 的size,那么每个文件就会占用一个block(不是64MB,而是文件实际大小)。如果有非常多的小文件需要上传,那么就需要非常多的block。每一个block 在NameNode 的内存中都会有一个描述信息,这样会占用过多的NameNode 内存。
SequenceFile 可以把大量小文件一起放到一个block 中。在存储相同数量的文件时,可以明显减少block 的数量。
假设有3 个小文件,那么会产生3 个block,那么4 个文件存储后对应的block 如图表3.1:
文件 | file1(大小16M) | file2(大小15M) | file3(大小16M) |
block | block1(大小16M) | block2(大小15M) | block3(大小16M) |
表3.1
如果使用SequenceFile 存储后,只会产生一个block,如表3.2:
文件 | file1(大小16M) | file2(大小15M) | file3(大小16M) |
block | block(大小47M) |
表3.2
可以看出,同样多的小文件,但是占用的block 明显少了。这就是SequenceFile 的作用。另外,SequenceFile 还可以压缩存储的内容,进一步减少文件体积。
3.4 输入文件格式化类InputFormat
类InputFomat 是负责把HDFS 中的文件经过一系列处理变成map 函数的输入部分的。这个类做了三件事情:
第一, 验证输入信息的合法性,包括输入路径是否存在等;
第二,把HDFS 中的文件按照一定规则拆分成InputSplit,每个InputSplit 由一个Mapper执行;
第三,提供RecordReader,把InputSplit 中的每一行解析出来供map 函数处理;
我们看一下这个类的源码,如下代码3.4。
1 public abstract class InputFormat<K, V> { 2 3 public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; 4 5 public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException, InterruptedException; 6 7 }
代码 3.4
从图上面代码中可以看到,该类只有两个方法的声明,方法getSplits 的作用是把输入文件划分为很多的输入分片,方法createRecordReader 的作用是输入分片的记录读取器。这些方法的实现都在子类中。
3.4.1. FileInputFormat
InputFormat 有个子类是FileInputFormat,这是在我们的例子中见到的,我们看一下该类对getSplits 方法的实现,如下代码3.5。
1 public List<InputSplit> getSplits(JobContext job 2 ) throws IOException { 3 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); 4 long maxSize = getMaxSplitSize(job); 5 6 // generate splits 7 List<InputSplit> splits = new ArrayList<InputSplit>(); 8 List<FileStatus>files = listStatus(job); 9 for (FileStatus file: files) { 10 Path path = file.getPath(); 11 FileSystem fs = path.getFileSystem(job.getConfiguration()); 12 long length = file.getLen(); 13 BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); 14 if ((length != 0) && isSplitable(job, path)) { 15 long blockSize = file.getBlockSize(); 16 long splitSize = computeSplitSize(blockSize, minSize, maxSize); 17 18 long bytesRemaining = length; 19 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 20 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 21 splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 22 blkLocations[blkIndex].getHosts())); 23 bytesRemaining -= splitSize; 24 } 25 26 if (bytesRemaining != 0) { 27 splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 28 blkLocations[blkLocations.length-1].getHosts())); 29 } 30 } else if (length != 0) { 31 splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts())); 32 } else { 33 //Create empty hosts array for zero length files 34 splits.add(new FileSplit(path, 0, length, new String[0])); 35 } 36 } 37 38 // Save the number of input files in the job-conf 39 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); 40 41 LOG.debug("Total # of splits: " + splits.size()); 42 return splits; 43 }
代码 3.5
在上面代码中,第3 行计算minSize,是供后面计算使用的,其中getFormatMinSplitSize()方法的值是1,getMinSplitSize(job)方法的值由配置参数mapred.min.split.size 指定,默认值是1,所以minSize 的默认值就是1。第4 行计算maxSize,是供后面计算使用的,值由配置参数mapred.max.split.size 指定,默认值是long 的最大值。第8行files 列表中存放的是输入文件,可能有多个。从第9 行开始,循环处理每一个输入文件。第10 行是获得文件路径,第12 行是获得文件长度,第13行是获得文件块位置。如果文件非空,并且文件允许被分割为输入块,那么就进入第14行的条件判断中。第15 行是读取文件块size,默认是64MB,第260 行是计算输入块size,我们看一下computeSplitSize 方法,如下代码3.6。
1 protected long computeSplitSize(long blockSize, long minSize,long maxSize) { 2 return Math.max(minSize, Math.min(maxSize, blockSize)); 3 }
代码 3.6
从上面代码中可以看出,输入块size 由三个因素决定,分别是minSize、maxSize、blockSize。根据前面的数值,可以得知,输入分片的默认size 是文件块size。
我们回到代码3.5,getSplits 方法的代码中继续分析,在第19至24行的循环体中,是对文件按照输入分片size 进行切分。
总结一下上面的分析,如果输入文件有3 个,那么产生的输入分片的情况如表3.3 所示.
文件大小 | 产生的输入片 | |
输入文件1 | 63MB | 1 个 |
输入文件2 | 64MB | 1 个 |
输入文件3 | 65MB | 2 个 |
注:参数mapred.min.split.size、mapred.max.split.size、dfs.block.size 采用默认值 |
表3.3
注意:每一个输入分片启动一个Mapper 任务。
源码在JobInProcess 中,如下:
1 TaskSplitMetaInfo[] splits = createSplits(jobId); 2 if (numMapTasks != splits.length) { 3 throw new IOException("Number of maps in JobConf doesn‘t match number of " + 4 "recieved splits for job " + jobId + "! " + 5 "numMapTasks=" + numMapTasks + ", #splits=" + splits.length); 6 } 7 numMapTasks = splits.length;
通过以上分析,我们知道很多的输入文件是如何划分成很多的输入分片的。那么每个输入分片中的记录又是如何处理的哪?我们继续分析。
3.4.2. TextInputFormat
该类中有个很重要的方法是实现TextInputFormat 中的createRecordReader,如代码3.7
1 public class TextInputFormat extends FileInputFormat<LongWritable, Text> { 2 3 @Override 4 public RecordReader<LongWritable, Text> 5 createRecordReader(InputSplit split, 6 TaskAttemptContext context) { 7 return new LineRecordReader(); 8 } 9 10 @Override 11 protected boolean isSplitable(JobContext context, Path file) { 12 CompressionCodec codec = 13 new CompressionCodecFactory(context.getConfiguration()).getCodec(file); 14 if (null == codec) { 15 return true; 16 } 17 return codec instanceof SplittableCompressionCodec; 18 } 19 }
代码3.7
在代码3.7 中,该方法直接返回一个实例化的LineRecordReader 类,我们看一下这个类,如代码3.8。
1 public class LineRecordReader extends RecordReader<LongWritable, Text> { 2 private static final Log LOG = LogFactory.getLog(LineRecordReader.class); 3 4 private CompressionCodecFactory compressionCodecs = null; 5 private long start; 6 private long pos; 7 private long end; 8 private LineReader in; 9 private int maxLineLength; 10 private LongWritable key = null; 11 private Text value = null; 12 private Seekable filePosition; 13 private CompressionCodec codec; 14 private Decompressor decompressor; 15 }
代码3.8
在代码3.8 中,可以看到该类的几个属性,其中start、pos、end 表示文件中字节的位置,key 和value 表示从记录中解析出的键和值,in 是一个行内容的读取器。
继续分析其中的initialize 方法,initialize(…)方法是该类的初始化方法,在调用其他方法前先调用该方法,并且只调用一次。从在代码3.9 中可以看到,该类对类FileSplit 的对象split 进行了分析,属性start 表示split的起始位置,属性end 表示split 的结束位置,属性in 表示split 的阅读器。
1 public void initialize(InputSplit genericSplit, 2 TaskAttemptContext context) throws IOException { 3 FileSplit split = (FileSplit) genericSplit; 4 Configuration job = context.getConfiguration(); 5 this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", 6 Integer.MAX_VALUE); 7 start = split.getStart(); 8 end = start + split.getLength(); 9 final Path file = split.getPath(); 10 compressionCodecs = new CompressionCodecFactory(job); 11 codec = compressionCodecs.getCodec(file); 12 13 // open the file and seek to the start of the split 14 FileSystem fs = file.getFileSystem(job); 15 FSDataInputStream fileIn = fs.open(split.getPath()); 16 17 if (isCompressedInput()) { 18 decompressor = CodecPool.getDecompressor(codec); 19 if (codec instanceof SplittableCompressionCodec) { 20 final SplitCompressionInputStream cIn = 21 ((SplittableCompressionCodec)codec).createInputStream( 22 fileIn, decompressor, start, end, 23 SplittableCompressionCodec.READ_MODE.BYBLOCK); 24 in = new LineReader(cIn, job); 25 start = cIn.getAdjustedStart(); 26 end = cIn.getAdjustedEnd(); 27 filePosition = cIn; 28 } else { 29 in = new LineReader(codec.createInputStream(fileIn, decompressor), 30 job); 31 filePosition = fileIn; 32 } 33 } else { 34 fileIn.seek(start); 35 in = new LineReader(fileIn, job); 36 filePosition = fileIn; 37 } 38 // If this is not the first split, we always throw away first record 39 // because we always (except the last split) read one extra line in 40 // next() method. 41 if (start != 0) { 42 start += in.readLine(new Text(), 0, maxBytesToConsume(start)); 43 } 44 this.pos = start; 45 } 46
代码 3.9
下面查看方法nextKeyValue 的源码,代码3.10
1 public boolean nextKeyValue() throws IOException { 2 if (key == null) { 3 key = new LongWritable(); 4 } 5 key.set(pos); 6 if (value == null) { 7 value = new Text(); 8 } 9 int newSize = 0; 10 // We always read one extra line, which lies outside the upper 11 // split limit i.e. (end - 1) 12 while (getFilePosition() <= end) { 13 newSize = in.readLine(value, maxLineLength, 14 Math.max(maxBytesToConsume(pos), maxLineLength)); 15 if (newSize == 0) { 16 break; 17 } 18 pos += newSize; 19 if (newSize < maxLineLength) { 20 break; 21 } 22 23 // line too long. try again 24 LOG.info("Skipped line of size " + newSize + " at pos " + 25 (pos - newSize)); 26 } 27 if (newSize == 0) { 28 key = null; 29 value = null; 30 return false; 31 } else { 32 return true; 33 } 34 }
代码3.10
在代码3.10 中,key 的值是pos 的值,那么这个pos 的值来自第13 行的in.readLine(…)方法的返回值。类LineReader 的方法readLine 是读取每一行的内容,把内容存放到第一个参数value 中,返回值表示读取的字节数。从这里可以看到,类LineRecordReader 的属性key表示InputSplit 中读取的字节位置,value 表示读取的文本行的内容。看一下代码3.11
1 @Override 2 public LongWritable getCurrentKey() { 3 return key; 4 } 5 6 @Override 7 public Text getCurrentValue() { 8 return value; 9 }
代码 3.11
在代码 3.11中,方法getCurrentKey()返回的是key 的值,方法getCurrentValue()返回的是value 的值。
综合以上的分析来看,该类中的getCurrentKeyValue()会被不断的调用,每次被调用后,会同时调用getCurrentKey()和getCurrentValue()。
3.5. 输出格式化类OutputFormat
3.5.1. FileOutputFormat
该类是对类FileSystem 操作执行输出的,会对运算的结果先写入到一个临时文件夹中,待运算结束后,再移动到最终的输出目录中。那么,输出的内容具体是什么格式?这是由TextOutputFormat 类负责的。
3.5.2. TextOutputFormat
该类专门输出普通文本文件的,如代码3.12
1 package org.apache.hadoop.mapreduce.lib.output; 2 3 import java.io.DataOutputStream; 4 import java.io.IOException; 5 import java.io.UnsupportedEncodingException; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.FileSystem; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.fs.FSDataOutputStream; 11 12 import org.apache.hadoop.io.NullWritable; 13 import org.apache.hadoop.io.Text; 14 import org.apache.hadoop.io.compress.CompressionCodec; 15 import org.apache.hadoop.io.compress.GzipCodec; 16 import org.apache.hadoop.mapreduce.OutputFormat; 17 import org.apache.hadoop.mapreduce.RecordWriter; 18 import org.apache.hadoop.mapreduce.TaskAttemptContext; 19 import org.apache.hadoop.util.*; 20 21 /** An {@link OutputFormat} that writes plain text files. */ 22 public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> { 23 protected static class LineRecordWriter<K, V> 24 extends RecordWriter<K, V> { 25 private static final String utf8 = "UTF-8"; 26 private static final byte[] newline; 27 static { 28 try { 29 newline = "\n".getBytes(utf8); 30 } catch (UnsupportedEncodingException uee) { 31 throw new IllegalArgumentException("can‘t find " + utf8 + " encoding"); 32 } 33 } 34 35 protected DataOutputStream out; 36 private final byte[] keyValueSeparator; 37 38 public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { 39 this.out = out; 40 try { 41 this.keyValueSeparator = keyValueSeparator.getBytes(utf8); 42 } catch (UnsupportedEncodingException uee) { 43 throw new IllegalArgumentException("can‘t find " + utf8 + " encoding"); 44 } 45 } 46 47 public LineRecordWriter(DataOutputStream out) { 48 this(out, "\t"); 49 }
代码3.12
在代码3.10 中,文本输出的时候使用UTF-8 编码,次第29行的代码可以看出,划分行的符号是“\n”。从第47行的构造方法可以看出,输出的键值对的默认分隔符是制表符“\t”。由此不难理解,为什么输出文件中是一行行的内容,为什么键值对使用制表符分隔了。