提高 MapReduce 价值,自定义输入和输出。
比如跳过存储到 HDFS 中这个耗时的布置。 而只是从原始数据源接受数据,或者直接将数据发送给某些处理程序。 这些处理程序在 MapReduce 作业完成后使用这些数据。 有时由文件块和输入 split 组成的基础 Hadoop 范式并不能满足需求。 此时自定义 InputFormat 和 OutputFormat 。
三种处理输入的模式:
1 生成数据(generating data)
2 外部源输入(external source input)
3 分区裁剪(partition pruning)
Map 特性 在拿到输入的输入对之前完全不知道这个复杂的事情是如何发生的。
一种处理输出的模式
1 外部源输出。
在 Hadoop 中自定义输入和输出
Hadoop 允许用户修改从磁盘加载数据的方式: 配置如何根据 HDFS 的块生成连续的输入分块;配置记录在 map 阶段如何出现;为做到这点将用到两个类: RecordReader 和 InputFormat。 在 Hadoop MapReduce 框架中。使用这两个类的方式与添加 mapper 和 reducer 类似。
Hadoop 允许用户通过类似的方式修改数据的存储方式:通过 OutputFirnat 和 RecordWriter 类。
InputFormat Hadoop 依赖作业的输入格式完成三个任务。
1)验证作业的输入配置(检查数据是否存在
2)按 InputSplit 类型将输入块和文件拆分成逻辑分块,并将每个数据分块分配给一个 map 任务处理。
3) 创建 RecordReader 的实现,用它来根据原始的 InputSplit 生成键值对。 对这些键值对将捉个发送给他们对应的 mapper
OutputFormat Hadoop 依赖作业的输出格式完成以下两个主要任务:
1)验证作业输入配置
2)创建 RecordWriter 的实现,他负责写作业的输出
InputFormat 接口
Hadoop 中最常见的输入格式都是 FileInputFormat 的子类。 Hadoop 默认的输入格式是 TextInputFormat。输入格式首先验证作业的输入,确保所有的输入路径都存在。 然后根据文件的总字节大小逻辑的拆分每个输入文件并将块大小作为逻辑 split 的上界。
举个栗子: HDFS 块大小设置为 64M 。 现在又一个 200MB 的文件将生成 4个输入 split , 他的字节范围是:0~64MB、64~128MB、128~172MB、 172B~200MB。每个 Map 任务只能分配到一个输入 split ,然后 RecordReader 负责将其分配到所有字节生成键值对。
RecordReader 还有一个额外的固定边界作用。因为输入 split 的边界是任意的,并且很可能落不到记录的边界上。 例如:TextInputFormat 使用 LineRecordReader 读取文本文件,为每个 map 任务的每行文本(换行符分割)创建键值对。见识到目前为止从文件中读取的字节数,值是每行的换行符之前所有字符组成的字符串。由于每个输入 split 的字节块不太可能加号和换行符对齐,因此为确保读取到完整的一行数据, LineRecordReader 将度过其给定的“结尾”。这一小部分数据来自一个不同的数据库,因此没有存储在同一个节点上,所以他是通过流式方式从存储该快的 DataNode 上获取。改数据流是由 FSDataImputStream 类的一个实例处理,我们不需要关心这些块在哪里。 不用怕你自己格式的 split 边界,只要进行了完整的测试,就不会再重复或丢失任何数据
InputFormat 抽象类包含两个抽象方法:
1) getSplits
getSplits 在实现时,通常是通过给定 JobContext 的对象获取配置的输入并返回一个 InputSplit 对象的 List。 输入 split 有一个方法, 能够返回与数据在集群中存储位置相关的机器组成的数组,这为框架决定那个 TaskTracker 应该执行那个 map 任务提供了依据。因为该方法也在前端使用(即作业提交到 JobTracker 之前),所以也很适合验证作业配置并抛出任何必要的异常。
2) createRecordReader
该方法用于在后端生成一个 RecordReader 的实现. Record reader 有一个会被框架调用的 initialize 的方法
RecordReader 抽象类
根据给定的 InputSplit 创造 键值对 InputSplit 是面向字节的 split 视图。 而 RecordReader 能够解析 InputSplit 并使其可以被 mapper 处理。 这也是为什么 Hadoop 和 MapReduce 被认为是读时模式(schema on read)。模式(schema)在 RecordReader 中定义。它仅仅依赖于 record reader 的实现。 他的改变是基于作业希望获得什么样的输入。RecordReader 从 输入源中读取字节并将其转换成 WritableComparable 的键以及 Writable 的值。当穿件自定义输入格式时,自定义数据类型也十分常见,这是将信息呈献给mapper 的一种很好的面向对象方式。
ReduceReader 使用创建输入 split 时产生的边界内的数据生成键值对, 读取数据”开始"的位置是文件中 RecordReader 能够开始生成键值对的地方。 而读取数据的结束的位置是文件中 RecordReader 应该停止读取的地方。
必须重载的一些方法
initialize
初始化 record reader 基于文件输入格式,适合在这里寻找文件开始读取的字节位置。
getCurrentKey 和 getCurrentValue
框架使用这两个方法将生成的键值对传递给 mapper 的实现。
nextKeyValue
方法读取了一个键值对后返回 true 直到所有数据消费完。 与 inputFormat 同 。
getProgerss
框架使用该方法收集度量信息。 与 inputFormat 同 。
close
木有键值后,框架用它做清理工作。
OutputFormat
Hadoop 默认输出格式是 TextOutputFormat, 他将所有的键值对写入到配置文件的默认 HDFS 输出目录,并且键和值用 tab 分隔,每个 reduce 任务将单独的输出一个 part 文件 并写入到配置的输出目录中。
TextOutputFormat 使用 LineRecordWriter 为每个 map 任务或 reduce 任务写键值对, 使用 toString 方法将每个键值对序列化到 HDFS 上输出的 part 文件中, 并且键值用 tab 分隔。 键值tab可以通过作业配置修改。
OutputFormat 抽象类
checkOutputSpecs
确保输出目录不存在,否则输出目录将会被覆盖。
getRecordWriter
负责将键值对序列化到一个输出(通常是一个FileSystem 对象)
getOutputCommiter
初始化时,作业的提交者设置每个任务,当成功完成任务时,并且当每个任务结束时(不管成功或失败)后执行清理, 基于文件输出的 FileOutputCommiter 可以用于处理这些繁重的事情。
RecordWriter 抽象类
负责将键值对写入到文件系统, RecordWriter 不包含初始化阶段, 但在需要时构造函数既能够用于完成 record writer 的设置。
writer
在写每个键值对时,框架将会调用该方法。该方法实现更多打的依赖于你的使用场景。 可以写入一个外部的内存 键值对中(如 riduce)
close
没有键值对时调用此方法