Hadoop InputFormat

Hadoop可以处理不同数据格式(数据源)的数据,从文本文件到(非)关系型数据库,这很大程度上得益于Hadoop InputFormat的可扩展性设计,InputFormat层次结构图如下:

InputFormat(org.apache.hadoop.mapreduce.InputFormat)被设计为一个抽象类,代码如下:

public abstract class InputFormat<K, V> {

public abstract List<InputSplit> getSplits(JobContext context)

throws IOException, InterruptedException;

public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context)

throws IOException, InterruptedException;

}

InputFormat为我们提供两个很重要的功能:

(1)如何将数据源中的数据(不一定是文件)形成切片(InputSplit,可能有多个),数据只有被合理切分,我们才可以分布式处理;

(2)如何读取并解析切片中的数据(RecordReader),数据源中的数据形式各异,只有转换为通用的Key-Value Pair,我们才可以利用MapReduce框架完成计算过程。

InputSplit

InputSplit表示数据源中的部分数据,以MapReduce为例,每一个InputSplit会被一个Map Task负责处理,亦即有多少个InputSplit便会形成多少个Map Task。InputSplit并不实际存储数据,仅仅关联数据,类似于索引的作用,对于文件而言,InputSplit中保存着文件名称、文件起始偏移量、数据长度;对于数据库而言,InputSplit中保存着数据表名称、数据行范围。

InputSplit也被设计为一个抽象类,可以看出不同的InputFormat通常会对应着不同的InputSplit,代码如下:

public abstract class InputSplit {

public abstract long getLength()

throws IOException, InterruptedException;

public abstract String[] getLocations()

throws IOException, InterruptedException;

}

InputSplit中保存着两个部分信息:

(1)数据大小:InputSplit所关联数据的大小,MapReduce根据此值对所有InputSplit进行排序,使“大”InputSplit得以优先运行,进而缩短整个Job的运行时间;

(2)数据存储位置:MapReduce根据此值完成Map Task的调度,即对于某个InputSplit来说,选取哪个计算节点运行Map Task完成这个InputSplit的数据处理,旨在拉近计算与数据之间的“距离”,移动计算而非移动数据,减少集群网络带宽资源。

大多数时候我们并不需要关心InputSplit的生成过程,它由MapReduce框架通过InputFormat getSplits()生成。

RecordReader

Map Task实际处理的是InputSplit的一条条“记录”(Record),我们需要一种方式能够将InputSplit关联的数据转换为一条条的“记录”,即Key-Value Pairs。对于文本文件的一条Record而言,key为数据在文本文件的起始偏移量,value为对应的一行字符串数据。

RecordReader就是被设计用来完成这项工作的。

RecordReader实际就是一个迭代器,不断地将InputSplit关联的数据传送给我们自定义的Map函数,代码如下:

public void run(Context context) throws IOException, InterruptedException {

setup(context);

while (context.nextKeyValue()) {

map(context.getCurrentKey(), context.getCurrentValue(), context);

}

cleanup(context);

}

RecordReader被封闭于Context内部,相关方法调用会由Context转交至RecordReader完成。

注:RecordReader为提高实现效率,每次迭代返回(getCurrentKey、getCurrentValue)的key、value实例都是相同的(重复使用实例),仅仅两者实例的内容不同而已。某些情况下我们可能需要缓存这些key、value以便后续处理,此时我们需要“复制”这些值,比如new Text(key),new Text(value)。Mapper和Reducer的数据处理过程均需要注意这个问题。

综上所述,我们可以根据实际情况,自定义扩展InputFormat完成特殊数据格式(数据源)的分析,主要涉及到切片(InputSplit)的形成以及数据的解析(RecordReader)。

时间: 2024-08-04 10:25:14

Hadoop InputFormat的相关文章

Hadoop InputFormat浅析

本文转载:http://hi.baidu.com/_kouu/item/dc8d727b530f40346dc37cd1 在执行一个Job的时候,Hadoop会将输入数据划分成N个Split,然后启动相应的N个Map程序来分别处理它们. 数据如何划分?Split如何调度(如何决定处理Split的Map程序应该运行在哪台TaskTracker机器上)?划分后的数据又如何读取?这就是本文所要讨论的问题. 先从一张经典的MapReduce工作流程图出发: 1.运行mapred程序: 2.本次运行将生成

hadoop InputFormat详解

1. 概述 我们在设置MapReduce输入格式的时候,会调用这样一条语句: job.setInputFormatClass(KeyValueTextInputFormat.class); 这条语句保证了输入文件会按照我们预设的格式被读取.KeyValueTextInputFormat即为我们设定的数据读取格式. 所有的输入格式类都继承自InputFormat,这是一个抽象类.其子类有例如专门用于读取普通文件的FileInputFormat,还有用来读取数据库的DBInputFormat等等.相

Hadoop InputFormat源码分析

平时我们写MapReduce程序的时候,在设置输入格式的时候,总会调用形如job.setInputFormatClass(KeyValueTextInputFormat.class)来保证输入文件按照我们想要的格式被读取.所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等. 不同的InputFormat都会按自己的实现来读取输入数据并产生输入分片,一个输入分片会被单独的MapT

hadoop InputFormat getSplits

/** Splits files returned by {@link #listStatus(JobConf)} when * they're too big.*/ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { //计时器, StopWatch sw = new StopWatch().start(); // FileStatus[] files = listStatus(job);

关于hadoop中的DBInputFormat试验

1.注意,需要声明为静态内部类,否则会报java.lang.NoSuchMethodException...<init>的错误public static class MySqlWritable implements Writable, DBWritable { 2.如果输出目录存在,需要先删除 3.由于需要从mysql数据取值,则需要有mysql数据库驱动包,hadoop classpath查看hadoop类加载路径,将驱动包拷贝到其中一个目录下即可: 4.解决mysql"Acces

Hadoop OutputFormat浅析

问题:reduce输出时,如果不是推测任务写结果时会先写临时目录最后移动到输出目录吗? 下面部分转自Hadoop官网说明 OutputFormat 描述Map/Reduce作业的输出样式. Map/Reduce框架根据作业的OutputFormat来: 检验作业的输出,例如检查输出路径是否已经存在. 提供一个RecordWriter的实现,用来输出作业结果. 输出文件保存在FileSystem上. TextOutputFormat是默认的 OutputFormat. 任务的Side-Effect

Hadoop与Spark比较

先看这篇文章:http://www.huochai.mobi/p/d/3967708/?share_tid=86bc0ba46c64&fmid=0 直接比较Hadoop和Spark有难度,因为它们处理的许多任务都一样,但是在一些方面又并不相互重叠. 比如说,Spark没有文件管理功能,因而必须依赖Hadoop分布式文件系统(HDFS)或另外某种解决方案. Hadoop框架的主要模块包括如下: Hadoop Common Hadoop分布式文件系统(HDFS) Hadoop YARN Hadoop

spark VS Hadoop 两大大数据分析系统深度解读

大数据,无论是从产业上,还是从技术上来看,都是目前的发展热点.在中国,政府控制着80%的数据,剩下的多由"BAT"这样的大公司拥有,中小企业如何构建自己的大数据系统?其他企业如何建设自己的大数据系统? 推荐两大应用最广泛.国人认知最多的Apache开源大数据框架系统:spark  Hadoop   Spark:速度快.易于使用Spark以性能见长,但是它也因易用性而小有名气,原因是它随带易于使用的API,支持Scala(原生语言).Java.Python和Spark SQL.Spark

Spark 学习: spark 原理简述与 shuffle 过程介绍

Spark学习: 简述总结 Spark 是使用 scala 实现的基于内存计算的大数据开源集群计算环境.提供了 java,scala, python,R 等语言的调用接口. Spark学习 简述总结 引言 1 Hadoop 和 Spark 的关系 Spark 系统架构 1 spark 运行原理 RDD 初识 shuffle 和 stage 性能优化 1 缓存机制和 cache 的意义 2 shuffle 的优化 3 资源参数调优 4 小结 本地搭建 Spark 开发环境 1 Spark-Scal