MapReduce自定义RecordReader

一:背景

RecordReader表示以怎样的方式从分片中读取一条记录,每读取一条记录都会调用RecordReader类,系统默认的RecordReader是LineRecordReader,它是TextInputFormat对应的RecordReader;而SequenceFileInputFormat对应的RecordReader是SequenceFileRecordReader。LineRecordReader是每行的偏移量作为读入map的key,每行的内容作为读入map的value。很多时候hadoop内置的RecordReader并不能满足我们的需求,比如我们在读取记录的时候,希望Map读入的Key值不是偏移量而是行号或者是文件名,这时候就需要我们自定义RecordReader。

二:技术实现

(1):继承抽象类RecordReader,实现RecordReader的一个实例。

(2):实现自定义InputFormat类,重写InputFormat中的CreateRecordReader()方法,返回值是自定义的RecordReader实例。

(3):配置job.setInputFormatClass()为自定义的InputFormat实例。

#需求:统计data文件中奇数行和偶数行的和:

[java] view plain copy

  1. 10
  2. 20
  3. 50
  4. 15
  5. 30
  6. 100

实现代码如下:

MyRecordReader.java:

[java] view plain copy

  1. public class MyRecordReader extends RecordReader<LongWritable, Text>{
  2. //起始位置(相对整个分片而言)
  3. private long start;
  4. //结束位置(相对整个分片而言)
  5. private long end;
  6. //当前位置
  7. private long pos;
  8. //文件输入流
  9. private FSDataInputStream fin = null;
  10. //key、value
  11. private LongWritable key = null;
  12. private Text value = null;
  13. //定义行阅读器(hadoop.util包下的类)
  14. private LineReader reader = null;
  15. @Override
  16. public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
  17. //获取分片
  18. FileSplit fileSplit = (FileSplit) split;
  19. //获取起始位置
  20. start = fileSplit.getStart();
  21. //获取结束位置
  22. end = start + fileSplit.getLength();
  23. //创建配置
  24. Configuration conf = context.getConfiguration();
  25. //获取文件路径
  26. Path path = fileSplit.getPath();
  27. //根据路径获取文件系统
  28. FileSystem fileSystem = path.getFileSystem(conf);
  29. //打开文件输入流
  30. fin = fileSystem.open(path);
  31. //找到开始位置开始读取
  32. fin.seek(start);
  33. //创建阅读器
  34. reader = new LineReader(fin);
  35. //将当期位置置为1
  36. pos = 1;
  37. }
  38. @Override
  39. public boolean nextKeyValue() throws IOException, InterruptedException {
  40. if (key == null){
  41. key = new LongWritable();
  42. }
  43. key.set(pos);
  44. if (value == null){
  45. value = new Text();
  46. }
  47. if (reader.readLine(value) == 0){
  48. return false;
  49. }
  50. pos ++;
  51. return true;
  52. }
  53. @Override
  54. public LongWritable getCurrentKey() throws IOException, InterruptedException {
  55. return key;
  56. }
  57. @Override
  58. public Text getCurrentValue() throws IOException, InterruptedException {
  59. return value ;
  60. }
  61. @Override
  62. public float getProgress() throws IOException, InterruptedException {
  63. return 0;
  64. }
  65. @Override
  66. public void close() throws IOException {
  67. fin.close();
  68. }
  69. }

MyInputFormat.java

[java] view plain copy

  1. public class MyInputFormat extends FileInputFormat<LongWritable, Text>{
  2. @Override
  3. public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
  4. //返回自定义的RecordReader
  5. return new MyRecordReader();
  6. }
  7. /**
  8. * 为了使得切分数据的时候行号不发生错乱
  9. * 这里设置为不进行切分
  10. */
  11. protected boolean isSplitable(FileSystem fs, Path filename) {
  12. return false;
  13. }
  14. }

MyPartitioner.java

[java] view plain copy

  1. public class MyPartitioner extends Partitioner<LongWritable, Text>{
  2. @Override
  3. public int getPartition(LongWritable key, Text value, int numPartitions) {
  4. //偶数放到第二个分区进行计算
  5. if (key.get() % 2 == 0){
  6. //将输入到reduce中的key设置为1
  7. key.set(1);
  8. return 1;
  9. } else {//奇数放在第一个分区进行计算
  10. //将输入到reduce中的key设置为0
  11. key.set(0);
  12. return 0;
  13. }
  14. }
  15. }

主类 RecordReaderTest.java

[java] view plain copy

  1. public class RecordReaderTest {
  2. // 定义输入路径
  3. private static String IN_PATH = "";
  4. // 定义输出路径
  5. private static String OUT_PATH = "";
  6. public static void main(String[] args) {
  7. try {
  8. // 创建配置信息
  9. Configuration conf = new Configuration();
  10. // 获取命令行的参数
  11. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  12. // 当参数违法时,中断程序
  13. if (otherArgs.length != 2) {
  14. System.err.println("Usage:wordcount<in> <out>");
  15. System.exit(1);
  16. }
  17. // 给路径赋值
  18. IN_PATH = otherArgs[0];
  19. OUT_PATH = otherArgs[1];
  20. // 创建文件系统
  21. FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
  22. // 如果输出目录存在,我们就删除
  23. if (fileSystem.exists(new Path(new URI(OUT_PATH)))) {
  24. fileSystem.delete(new Path(new URI(OUT_PATH)), true);
  25. }
  26. // 创建任务
  27. Job job = new Job(conf, RecordReaderTest.class.getName());
  28. // 打成jar包运行,这句话是关键
  29. job.setJarByClass(RecordReaderTest.class);
  30. // 1.1 设置输入目录和设置输入数据格式化的类
  31. FileInputFormat.setInputPaths(job, IN_PATH);
  32. job.setInputFormatClass(MyInputFormat.class);
  33. // 1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
  34. job.setMapperClass(RecordReaderMapper.class);
  35. job.setMapOutputKeyClass(LongWritable.class);
  36. job.setMapOutputValueClass(Text.class);
  37. // 1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
  38. job.setPartitionerClass(MyPartitioner.class);
  39. job.setNumReduceTasks(2);
  40. // 1.4 排序
  41. // 1.5 归约
  42. // 2.1 Shuffle把数据从Map端拷贝到Reduce端。
  43. // 2.2 指定Reducer类和输出key和value的类型
  44. job.setReducerClass(RecordReaderReducer.class);
  45. job.setOutputKeyClass(Text.class);
  46. job.setOutputValueClass(LongWritable.class);
  47. // 2.3 指定输出的路径和设置输出的格式化类
  48. FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
  49. job.setOutputFormatClass(TextOutputFormat.class);
  50. // 提交作业 退出
  51. System.exit(job.waitForCompletion(true) ? 0 : 1);
  52. } catch (Exception e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. public static class RecordReaderMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
  57. @Override
  58. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException,
  59. InterruptedException {
  60. // 直接将读取的记录写出去
  61. context.write(key, value);
  62. }
  63. }
  64. public static class RecordReaderReducer extends Reducer<LongWritable, Text, Text, LongWritable> {
  65. // 创建写出去的key和value
  66. private Text outKey = new Text();
  67. private LongWritable outValue = new LongWritable();
  68. protected void reduce(LongWritable key, Iterable<Text> values, Reducer<LongWritable, Text, Text, LongWritable>.Context context) throws IOException,
  69. InterruptedException {
  70. System.out.println("奇数行还是偶数行:" + key);
  71. // 定义求和的变量
  72. long sum = 0;
  73. // 遍历value求和
  74. for (Text val : values) {
  75. // 累加
  76. sum += Long.parseLong(val.toString());
  77. }
  78. // 判断奇偶数
  79. if (key.get() == 0) {
  80. outKey.set("奇数之和为:");
  81. } else {
  82. outKey.set("偶数之和为:");
  83. }
  84. // 设置value
  85. outValue.set(sum);
  86. // 把结果写出去
  87. context.write(outKey, outValue);
  88. }
  89. }
  90. }

程序运行结果:

注:分区数大于2的MR程序要打成jar包才能运行!

时间: 2024-10-14 16:48:12

MapReduce自定义RecordReader的相关文章

MapReduce之RecordReader组件源码解析及实例

简述 无论我们以怎样的方式从分片中读取一条记录,每读取一条记录都会调用RecordReader类: 系统默认的RecordReader是LineRecordReader,TextInputFormat: LineRecordReader是用每行的偏移量作为map的key,每行的内容作为map的value: 而SequenceFileInputFormat的RecordReader是SequenceFileRecordReader: 应用场景:自定义读取每一条记录的方式:自定义读入key的类型,如

Hadoop mapreduce自定义分组RawComparator

本文发表于本人博客. 今天接着上次[Hadoop mapreduce自定义排序WritableComparable]文章写,按照顺序那么这次应该是讲解自定义分组如何实现,关于操作顺序在这里不多说了,需要了解的可以看看我在博客园的评论,现在开始. 首先我们查看下Job这个类,发现有setGroupingComparatorClass()这个方法,具体源码如下: /** * Define the comparator that controls which keys are grouped toge

Hadoop初学指南(7)--MapReduce自定义计数器

本文主要介绍了MapReduce中的自定义计数器的相关内容. 在上次的单词统计例子中,我们可以看到MapReduce在执行过程中会有很多的控制台输出信息,其中有一个很关键的内容:计数器.如下图: 可以看到最上方的关键字:Counters,这就表示计数器. 在这里,只有一个制表符缩进的表示计数器组,有两个制表符缩进的表示计数器组下的计数器.如File Output Format Counters就表示文件输出的计数器组,里面的Bytes Written表示输出的字符数,在输出的文本中,hello,

hadoop mapreduce 自定义InputFormat

很久以前为了满足公司的需求写过一些自定义InputFormat,今天有时间拿出来记一下     需求是这样的,如果如果使用FileInputFormat作为输入,是按照行来读取日志的,也就是按照\n来区分每一条日志的,而由于一条日志中记录着一些错误信息,比如java的异常信息,这些信息本身就带有换行符,如果还是按照\n进行区分每一条日志的话明显是错误的,由于我们日志的特殊性,将以"]@\n"作为区分日志的标识.     接下来就来看看如何自定义InputFormat,还是不画类图了,我

mapreduce 自定义数据类型的简单的应用

本文以手机流量统计为例: 日志中包含下面字段 现在需要统计手机的上行数据包,下行数据包,上行总流量,下行总流量. 分析:可以以手机号为key 以上4个字段为value传传递数据. 这样则需要自己定义一个数据类型,用于封装要统计的4个字段,在map 与reduce之间传递和shuffle 注:作为key的自定义类型需要实现WritableComparable 里面的compareTo方法 作为value的自定义类 则只需实现Writable里面的方法 自定义代码如下: /*** * MapRedu

Hadoop读书笔记(十二)MapReduce自定义排序

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855 1.说明: 对给出的两列数据首先按照第一列升序排列,当第一列相同时,第二列升序排列 数据格式: 3 3 3 2 3 1 2 2 2 1 1 1 2.代码 SortApp.java package sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOExc

[Hadoop] - Mapreduce自定义Counter

在Hadoop的MR程序开发中,经常需要统计一些map/reduce的运行状态信息,这个时候我们可以通过自定义Counter来实现,这个实现的方式是不是通过配置信息完成的,而是通过代码运行时检查完成的. 1.创建一个自己的Counter枚举类. enum PROCESS_COUNTER { BAD_RECORDS, BAD_GROUPS; } 2.在需要统计的地方,比如map或者reduce阶段进行下列操作. context.getCounter(PROCESS_COUNTER.BAD_RECO

hadoop MapReduce自定义分区Partition输出各运营商的手机号码

MapReduce和自定义Partition MobileDriver主类 package Partition; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; public class MobileDriver { public static void main(String[] args) { String[] paths = {"F:\\mobile.txt", "F

Hadoop学习之路(6)MapReduce自定义分区实现

MapReduce自带的分区器是HashPartitioner原理:先对map输出的key求hash值,再模上reduce task个数,根据结果,决定此输出kv对,被匹配的reduce任务取走.自定义分分区需要继承Partitioner,复写getpariton()方法自定义分区类:注意:map的输出是<K,V>键值对其中int partitionIndex = dict.get(text.toString()),partitionIndex是获取K的值 附:被计算的的文本 Dear Dea