MapReduce之RecordReader组件源码解析及实例

简述

无论我们以怎样的方式从分片中读取一条记录,每读取一条记录都会调用RecordReader类;

系统默认的RecordReader是LineRecordReaderTextInputFormat

LineRecordReader是用每行的偏移量作为map的key,每行的内容作为map的value;

而SequenceFileInputFormat的RecordReader是SequenceFileRecordReader;

应用场景:自定义读取每一条记录的方式;自定义读入key的类型,如希望读取的key是文件的路径或名字而不是该行在文件中的偏移量。

TextInputFormat源码如下:

package org.apache.hadoop.mapreduce.lib.input;
/** An {@link InputFormat} for plain text files.  Files are broken into lines.
 * Either linefeed or carriage-return are used to signal end of line.  Keys are
 * the position in the file, and values are the line of text.. */
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

  @Override
  public RecordReader<LongWritable, Text>
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
    String delimiter = context.getConfiguration().get(
        "textinputformat.record.delimiter");
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
    return new LineRecordReader(recordDelimiterBytes);
  }

  @Override
  protected boolean isSplitable(JobContext context, Path file) {
    final CompressionCodec codec =
      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    if (null == codec) {
      return true;
    }
    return codec instanceof SplittableCompressionCodec;
  }
}

textinputformat.record.delimiter指的是读取一行的数据的终止符号,即遇到textinputformat.record.delimiter所包含的字符时,该一行的读取结束。

可以通过Configuration的set()方法来设置自定义的终止符,如果没有设置 textinputformat.record.delimiter,那么Hadoop就采用以CR,LF或者CRLF作为终止符,这一点可以查看LineReader的readDefaultLine方法 。

LineRecordReader源码如下:

package org.apache.hadoop.mapreduce.lib.input;

/**
 * Treats keys as offset in file and value as line.
 */
public class LineRecordReader extends RecordReader<LongWritable, Text> {
     public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
        ......
        start = split.getStart();
        end = start + split.getLength();
        final Path file = split.getPath();

        // open the file and seek to the start of the split
        final FileSystem fs = file.getFileSystem(job);
        fileIn = fs.open(file);
        ......
        // If this is not the first split, we always throw away first record
        // because we always (except the last split) read one extra line in
        // next() method.
        if (start != 0) {
            start += in.readLine(new Text(), 0, maxBytesToConsume(start));
        }
        this.pos = start;
        ......
    }
    ......
}

自定义RecordReader

1、继承抽象类RecordReader,实现RecordReader的一个实例;

2、实现自定义InputFormat类,重写InputFormat中createRecordReader()方法,返回值是自定义的RecordReader实例;

3、配置job.setInputFormatClass()设置自定义的InputFormat实例;

实例

数据:

10

20

30

40

50

60

70

……

要求:读取整个文件,分别计算奇数行与偶数行数据之和

奇数行之和:10+30+50+70=160

偶数行之和:20+40+60=120

package Recordreader;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.LineReader;

public class MyRecordReader {

    private final static String INPUT_PATH = "hdfs://liguodong:8020/inputsum";
    private final static String OUTPUT_PATH = "hdfs://liguodong:8020/outputsum";    

    public static class DefRecordReader extends RecordReader<LongWritable, Text>{

        private long start;//分片开始位置
        private long end;//分片结束位置
        private long pos;
        private FSDataInputStream fin = null;
        //自定义自己的key与value
        private LongWritable key = null;
        private Text value = null;
        //A class that provides a line reader from an input stream.
        private LineReader reader = null;

        @Override
        public void initialize(InputSplit split, TaskAttemptContext context)
                throws IOException, InterruptedException {
            FileSplit fileSplit = (FileSplit)split;
            start = fileSplit.getStart();
            end = start + fileSplit.getLength();
            Path path = fileSplit.getPath();//获取输入分片的路径
            Configuration conf = context.getConfiguration();
            //Return the FileSystem that owns this Path.
            FileSystem fs = path.getFileSystem(conf);
            fin = fs.open(path);
            reader = new LineReader(fin);
            pos = 1;
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {

            if(key == null){
                key = new LongWritable();
            }
            key.set(pos);//设置key
            if(value == null){
                value = new Text();
            }
            //并没有跨块,跨文件,而是一个文件作为不可分割的
            if(reader.readLine(value)==0){//一次读取行的内容,并设置值
                return false;
            }
            pos++;
            return true;
        }

        @Override
        public LongWritable getCurrentKey() throws IOException,
                InterruptedException {
            return key;
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return value;
        }

        /**
          * Get the progress within the split
          */
        @Override
        public float getProgress() throws IOException, InterruptedException {
            return 0;
        }

        @Override
        public void close() throws IOException {
            fin.close();
        }

    }

    public static class MyFileInputFormat extends FileInputFormat<LongWritable, Text>{

        @Override
        public RecordReader<LongWritable, Text> createRecordReader(
                InputSplit split, TaskAttemptContext context)
                throws IOException, InterruptedException {

            return new DefRecordReader();
        }

        @Override
        protected boolean isSplitable(JobContext context, Path filename) {
            return false;
        }
    }

    public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text>{

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            context.write(key, value);
        }
    }

    public static class DefPartitioner extends Partitioner<LongWritable,Text>{

        @Override
        public int getPartition(LongWritable key, Text value, int numPartitions) {
            //判断奇数行还是偶数行,确定分区
            if(key.get()%2==0){
                key.set(1);//偶数行key通通改为1
                return 1;
            }else {
                key.set(0);//奇数行key通通改为0
                return 0;
            }
        }

    }

    //接收来自不同分区的数据
    public static class MyReducer extends Reducer<LongWritable, Text,Text, IntWritable>{
        Text write_key = new Text();
        IntWritable write_value = new IntWritable();

        @Override
        protected void reduce(LongWritable key, Iterable<Text> values,
                Context context)
                throws IOException, InterruptedException {
            int sum=0;
            for (Text val : values) {
                sum += Integer.parseInt(val.toString());
            }
            if(key.get()==0){
                write_key.set("奇数行之和");
            }else {
                write_key.set("偶数行之和");
            }
            write_value.set(sum);
            context.write(write_key, write_value);
        }
    }

    public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
        //1、配置
        Configuration conf = new Configuration();
        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);

        if(fileSystem.exists(new Path(OUTPUT_PATH)))
        {
            fileSystem.delete(new Path(OUTPUT_PATH),true);
        }
        Job job = Job.getInstance(conf, "Define RecordReader"); 

        //2、打包运行必须执行的方法
        job.setJarByClass(MyRecordReader.class);

        //3、输入路径
        FileInputFormat.addInputPath(job, new Path(INPUT_PATH));  

        //设置输入格式
        job.setInputFormatClass(MyFileInputFormat.class);

        //4、Map
        job.setMapperClass(MyMapper.class);
        //指定map的输出的<k,v>类型,如果<k3,v3>的类型与<k2,v2>的类型一致,那么可以省略。
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        //5、Combiner
        //job.setCombinerClass(MyReducer.class);
        job.setPartitionerClass(DefPartitioner.class);

        //6、Reducer
        job.setReducerClass(MyReducer.class);
        job.setNumReduceTasks(2);//reduce个数默认是1

        //如果<k3,v3>的类型与<k2,v2>的类型不一致,要么都省略,要么都要写。
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //7、 输出路径
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

        //8、提交作业
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
[[email protected] file]# vi inputsum
[[email protected] file]# hdfs dfs -put inputsum /inputsum
[[email protected] file]# hdfs dfs -cat /inputsum
10
20
30
40
50
60
70
[[email protected] file]# yarn jar MyRecordReader.jar
[[email protected] file]# hdfs dfs -ls /outputsum
Found 3 items
-rw-r--r--   1 root supergroup          0 2015-06-14 21:19 /outputsum/_SUCCESS
-rw-r--r--   1 root supergroup         20 2015-06-14 21:19 /outputsum/part-r-00000
-rw-r--r--   1 root supergroup         20 2015-06-14 21:19 /outputsum/part-r-00001
[[email protected] file]# hdfs dfs -cat /outputsum/part-r-00000
奇数行之和      160
[[email protected] file]# hdfs dfs -cat /outputsum/part-r-00001
偶数行之和      120
时间: 2024-07-29 17:30:16

MapReduce之RecordReader组件源码解析及实例的相关文章

DRF-解析器组件源码解析

解析器组件源码解析 解析器组件源码解析 1 执行request.data 开始找重装的request中的data方法 2 在dispatch找到重装的request def dispatch(self, request, *args, **kwargs): request = self.initialize_request(request, *args, **kwargs) ***这里开始找data 3 在initialize_request中找到实例request的类Request() def

Hadoop RPC远程过程调用源码解析及实例

什么是RPC? 1.RPC(Remote Procedure Call)远程过程调用,它允许一台计算机程序远程调用另外一台计算机的子程序,而不用去关心底层的网络通信细节,对我们来说是透明的.经常用于分布式网络通信中. 2.Hadoop的进程间交互都是通过RPC来进行的,比如Namenode与Datanode之间,Jobtracker与Tasktracker之间等. RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据.在OSI网络通信模型中, RPC跨越了传输层和应用层

.Net Core 认证系统之基于Identity Server4 Token的JwtToken认证源码解析

介绍JwtToken认证之前,必须要掌握.Net Core认证系统的核心原理,如果你还不了解,请参考.Net Core 认证组件源码解析,且必须对jwt有基本的了解,如果不知道,请百度.最重要的是你还需要掌握identity server4的基本用法,关于identity server4因为设计到两个协议Oath2.0和openid connect协议,内容较多,不是本文重点,后续有时间我会写一片关于identity server4的源码分析.且为了保证整个系统的高度可控,我重写了整个id4,留

Flume-ng源码解析之Channel组件

如果还没看过Flume-ng源码解析之启动流程,可以点击Flume-ng源码解析之启动流程 查看 1 接口介绍 组件的分析顺序是按照上一篇中启动顺序来分析的,首先是Channel,然后是Sink,最后是Source,在开始看组件源码之前我们先来看一下两个重要的接口,一个是LifecycleAware ,另一个是NamedComponent 1.1 LifecycleAware @[email protected] interface LifecycleAware {  public void s

2015.07.20MapReducer源码解析(笔记)

MapReducer源码解析(笔记) ? 第一步,读取数据源,将每一行内容解析成一个个键值对,每个键值对供map函数定义一次,数据源由FileInputFormat:指定的,程序就能从地址读取记录,读取的记录每一行内容是如何转换成一个个键值对?Mapper函数是如何调用键值对?这是由InputFormatClass完成的,它在我们的例子中的具体实现类是TextInputFormat(Text是普通的文本,log日志,数据库中的数据就不是),总的来说:TextInputFormat把数据源中的数据

Android EventBus源码解析, 带你深入理解EventBus

上一篇带大家初步了解了EventBus的使用方式,详见:Android EventBus实战 没听过你就out了,本篇博客将解析EventBus的源码,相信能够让大家深入理解该框架的实现,也能解决很多在使用中的疑问:为什么可以这么做?为什么这么做不好呢? 1.概述 一般使用EventBus的组件类,类似下面这种方式: [java] view plain copy public class SampleComponent extends Fragment { @Override public vo

Dialog与FragmentDialog源码解析

<代码里的世界> -UI篇 用文字札记描绘自己 android学习之路 转载请保留出处 by Qiao http://blog.csdn.net/qiaoidea/article/details/46402845 [导航] - 弹出式对话框各种方案 从仿QQ消息提示框来谈弹出式对话框的实现方式 (Dialog,PopupWind,自定义View,Activity,FragmentDialog) - Dialog源码解析 从源码上看Dialog与DialogFragment 1.概述 前一篇写了

消息中间件 RocketMQ源码解析:事务消息

关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址 您对于源码的疑问每条留言都将得到认真回复.甚至不知道如何读源码也可以请教噢. 新的源码解析文章实时收到通知.每周更新一篇左右. 1. 概述 2. 事务消息发送 2.1 Producer 发送事务消息 2.2 Broker 处理结束事务请求 2.3 Broker 生成

ExcelReport第二篇:ExcelReport源码解析

导航 目   录:基于NPOI的报表引擎--ExcelReport 上一篇:使用ExcelReport导出Excel 下一篇:扩展元素格式化器 概述 针对上一篇随笔收到的反馈,在展开对ExcelReport源码解析之前,我认为把编写该组件时的想法分享给大家是有必要的. 编写该组件时,思考如下: 1)要实现样式.格式与数据的彻底分离. 为什么要将样式.格式与数据分离呢?恩,你不妨想一想在生成报表时,那些是变的而那些又是不变的.我的结论是:变的是数据. 有了这个想法,很自然的想到用模板去承载不变的部