鲁春利的工作笔记,谁说程序员不能有文艺范?
一个最简单的MapReduce程序
package com.lucl.hadoop.mapreduce; public class MiniMRDriver extends Configured implements Tool { public static void main(String[] args) { try { ToolRunner.run(new MiniMRDriver(), args); } catch (Exception e) { e.printStackTrace(); } } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName()); job.setJarByClass(MiniMRDriver.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } }
查看MapReduce任务的数据
[[email protected] code]$ hdfs dfs -text /data/HTTP_SITE_FLOW.log 视频网站 15 1527 信息安全 20 3156 站点统计 24 6960 搜索引擎 28 3659 站点统计 3 1938 综合门户 15 1938 搜索引擎 21 9531 搜索引擎 63 11058 [[email protected] code]$
打包运行该MapReduce程序
[[email protected] code]$ hadoop jar MiniMR.jar /data/HTTP_SITE_FLOW.log /201511302119 15/11/30 21:19:46 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032 15/11/30 21:19:48 INFO input.FileInputFormat: Total input paths to process : 1 15/11/30 21:19:48 INFO mapreduce.JobSubmitter: number of splits:1 15/11/30 21:19:49 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448889273221_0001 15/11/30 21:19:50 INFO impl.YarnClientImpl: Submitted application application_1448889273221_0001 15/11/30 21:19:50 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1448889273221_0001/ 15/11/30 21:19:50 INFO mapreduce.Job: Running job: job_1448889273221_0001 15/11/30 21:20:26 INFO mapreduce.Job: Job job_1448889273221_0001 running in uber mode : false 15/11/30 21:20:26 INFO mapreduce.Job: map 0% reduce 0% 15/11/30 21:20:59 INFO mapreduce.Job: map 100% reduce 0% 15/11/30 21:21:30 INFO mapreduce.Job: map 100% reduce 100% 15/11/30 21:21:31 INFO mapreduce.Job: Job job_1448889273221_0001 completed successfully 15/11/30 21:21:31 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=254 FILE: Number of bytes written=213863 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=277 HDFS: Number of bytes written=194 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=30256 Total time spent by all reduces in occupied slots (ms)=27787 Total time spent by all map tasks (ms)=30256 Total time spent by all reduce tasks (ms)=27787 Total vcore-seconds taken by all map tasks=30256 Total vcore-seconds taken by all reduce tasks=27787 Total megabyte-seconds taken by all map tasks=30982144 Total megabyte-seconds taken by all reduce tasks=28453888 Map-Reduce Framework Map input records=8 Map output records=8 Map output bytes=232 Map output materialized bytes=254 Input split bytes=103 Combine input records=0 Combine output records=0 Reduce input groups=8 Reduce shuffle bytes=254 Reduce input records=8 Reduce output records=8 Spilled Records=16 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=182 CPU time spent (ms)=2000 Physical memory (bytes) snapshot=305459200 Virtual memory (bytes) snapshot=1697824768 Total committed heap usage (bytes)=136450048 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=174 File Output Format Counters Bytes Written=194 [[email protected] code]$
查看输出结果
[[email protected] code]$ hdfs dfs -ls /201511302119 Found 2 items -rw-r--r-- 2 hadoop hadoop 0 2015-11-30 21:21 /201511302119/_SUCCESS -rw-r--r-- 2 hadoop hadoop 194 2015-11-30 21:21 /201511302119/part-r-00000 [[email protected] code]$ hdfs dfs -text /201511302119/part-r-00000 0 视频网站 15 1527 22 信息安全 20 3156 44 站点统计 24 6960 66 搜索引擎 28 3659 88 站点统计 3 1938 109 综合门户 15 1938 131 搜索引擎 21 9531 153 搜索引擎 63 11058 [[email protected] code]$
在这里没有指定Mapper类、Reducer类,并通过FileInputFormat和FileOutputFormat指定了输入数据及输出结果存储路径,执行后把行偏移量和行内容保存到了指定的输出路径下。
FileInputFormat的默认实现为TextInputFormat,专门用来处理文本数据,以回车换行符作为一行的分割标记,其中key为该行的行偏移量,value为这一行内容。
类定义如下:
public class TextInputFormat extends FileInputFormat<LongWritable, Text> { @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { // 略 return new LineRecordReader(recordDelimiterBytes); } @Override protected boolean isSplitable(JobContext context, Path file) { // 是否可切片 } }
在Job任务中可以通过public void setInputFormatClass(Class<? extends InputFormat> cls)方法设定希望使用的InputFormat格式。
public abstract class InputFormat<K, V> { public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException; }
文件在HDFS上是以Block块的形式存储的,而在MapReduce计算中则是以划分的切片(split后称为split分片或chunk)进行读取的,每个split的就对应一个mapper task,split的数量决定了mappertask的数量。
注意:MapReduce是由Mapper和Reducer组成的,MapperTask由split决定,那么Reducer由什么来决定呢?后面会逐渐通过示例代码进行说明
List<InputSplit> getSplits(JobContext context)负责将一个大数据逻辑分成多片。比如数据库表有100条数据,按照主键ID升序存储,假设每20条分成一片,这个List的大小就是5,然后每个InputSplit记录两个参数,第一个为这个分片的起始ID,第二个为这个分片数据的大小(这里是20)。InputSplit并没有真正存储数据,只是提供了一个如何将数据分片的方法。
RecordReader<K, V) createRecordReader(InputSplit split, TaskAttemptContext context)根据InputSplit定义的分片方法,返回一个能够读取分片记录的RecordReader。
InputSplit类定义
public abstract class InputSplit { // Split分片的大小,用来实现输入的split的排序 public abstract long getLength() throws IOException, InterruptedException; // 用来获取存储分片的位置列表 public abstract String[] getLocations() throws IOException, InterruptedException; }
RecordReader类定义
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable { public abstract void initialize(InputSplit split,TaskAttemptContext context ) throws IOException, InterruptedException; public abstract boolean nextKeyValue() throws IOException, InterruptedException; public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; public abstract float getProgress() throws IOException, InterruptedException; public abstract void close() throws IOException; }
InputSplit描述了数据块的切分方式,RecordReader类则是实际用来加载split分片数据,并把数据转换为适合Mapper类里面map()方法处理的<key, value>形式。
RecordReader实例是由输入格式定义的,默认的输入格式为TextInputFormat,提供了一个LineRecordReader,把每一行的行偏移量作为key,把内容作为value。RecordReader会在输入块上被反复调用,直到整个输入块被处理完毕,每一次调用RecordReader都会调用Mapper类的map()函数。
TextInputFormat并没有getSplits的实现,而是其父类FileInputFormat进行了实现。
public abstract class FileInputFormat<K, V> extends InputFormat<K, V> { // Generate the list of files and make them into FileSplits public List<InputSplit> getSplits(JobContext job) throws IOException { // 1. 通过JobContext中获取List<FileStatus>; // 2. 遍历文件属性数据 // 2.1. 如果是空文件,则初始化一个无主机信息的FileSplits实例; // 2.2. 非空文件,判断是否分片,默认是分片的 // 如果不分片则每个文件作为一个FileSplit // 计算分片大小splitSize // getFormatMinSplitSize()返回固定值1 // getMinSplitSize(job)通过Configuration获取,配置参数为(mapred-default.xml): // mapreduce.input.fileinputformat.split.minsize默认值为0 // minSize的值为1 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); // 实际调用context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE); // 通过Configuration获取,配置参数为(mapred-default.xml无该参数): // mapreduce.input.fileinputformat.split.maxsize // 未配置该参数,取Long.MAX_VALUE,maxSize的值为Long.MAX_VALUE long maxSize = getMaxSplitSize(job); // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); for (FileStatus file: files) { Path path = file.getPath(); // 在HDFS上的绝对路径 long length = file.getLen(); // 文件的实际大小 if (length != 0) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(job, path)) { // 这里取的是Block块的大小,在2.6里面默认是134217728(即128M) long blockSize = file.getBlockSize(); // 获取切片大小,computeSplitSize(blockSize, minSize, maxSize)实际调用: // 1 Long.MAX_VALUE 128M // Math.max(minSize, Math.min(maxSize, blockSize)); // split的大小刚好等于block块的大小,为128M long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); return splits; } }
说明:List<FileStatus>中FileStatus可能为LocatedFileStatus(a FileStatus that includes a file‘s block locations)。