一:背景
RecordReader表示以怎样的方式从分片中读取一条记录,每读取一条记录都会调用RecordReader类,系统默认的RecordReader是LineRecordReader,它是TextInputFormat对应的RecordReader;而SequenceFileInputFormat对应的RecordReader是SequenceFileRecordReader。LineRecordReader是每行的偏移量作为读入map的key,每行的内容作为读入map的value。很多时候hadoop内置的RecordReader并不能满足我们的需求,比如我们在读取记录的时候,希望Map读入的Key值不是偏移量而是行号或者是文件名,这时候就需要我们自定义RecordReader。
二:技术实现
(1):继承抽象类RecordReader,实现RecordReader的一个实例。
(2):实现自定义InputFormat类,重写InputFormat中的CreateRecordReader()方法,返回值是自定义的RecordReader实例。
(3):配置job.setInputFormatClass()为自定义的InputFormat实例。
#需求:统计data文件中奇数行和偶数行的和:
[java] view plain copy
- 10
- 20
- 50
- 15
- 30
- 100
实现代码如下:
MyRecordReader.java:
[java] view plain copy
- public class MyRecordReader extends RecordReader<LongWritable, Text>{
- //起始位置(相对整个分片而言)
- private long start;
- //结束位置(相对整个分片而言)
- private long end;
- //当前位置
- private long pos;
- //文件输入流
- private FSDataInputStream fin = null;
- //key、value
- private LongWritable key = null;
- private Text value = null;
- //定义行阅读器(hadoop.util包下的类)
- private LineReader reader = null;
- @Override
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
- //获取分片
- FileSplit fileSplit = (FileSplit) split;
- //获取起始位置
- start = fileSplit.getStart();
- //获取结束位置
- end = start + fileSplit.getLength();
- //创建配置
- Configuration conf = context.getConfiguration();
- //获取文件路径
- Path path = fileSplit.getPath();
- //根据路径获取文件系统
- FileSystem fileSystem = path.getFileSystem(conf);
- //打开文件输入流
- fin = fileSystem.open(path);
- //找到开始位置开始读取
- fin.seek(start);
- //创建阅读器
- reader = new LineReader(fin);
- //将当期位置置为1
- pos = 1;
- }
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (key == null){
- key = new LongWritable();
- }
- key.set(pos);
- if (value == null){
- value = new Text();
- }
- if (reader.readLine(value) == 0){
- return false;
- }
- pos ++;
- return true;
- }
- @Override
- public LongWritable getCurrentKey() throws IOException, InterruptedException {
- return key;
- }
- @Override
- public Text getCurrentValue() throws IOException, InterruptedException {
- return value ;
- }
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return 0;
- }
- @Override
- public void close() throws IOException {
- fin.close();
- }
- }
MyInputFormat.java
[java] view plain copy
- public class MyInputFormat extends FileInputFormat<LongWritable, Text>{
- @Override
- public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
- //返回自定义的RecordReader
- return new MyRecordReader();
- }
- /**
- * 为了使得切分数据的时候行号不发生错乱
- * 这里设置为不进行切分
- */
- protected boolean isSplitable(FileSystem fs, Path filename) {
- return false;
- }
- }
MyPartitioner.java
[java] view plain copy
- public class MyPartitioner extends Partitioner<LongWritable, Text>{
- @Override
- public int getPartition(LongWritable key, Text value, int numPartitions) {
- //偶数放到第二个分区进行计算
- if (key.get() % 2 == 0){
- //将输入到reduce中的key设置为1
- key.set(1);
- return 1;
- } else {//奇数放在第一个分区进行计算
- //将输入到reduce中的key设置为0
- key.set(0);
- return 0;
- }
- }
- }
主类 RecordReaderTest.java
[java] view plain copy
- public class RecordReaderTest {
- // 定义输入路径
- private static String IN_PATH = "";
- // 定义输出路径
- private static String OUT_PATH = "";
- public static void main(String[] args) {
- try {
- // 创建配置信息
- Configuration conf = new Configuration();
- // 获取命令行的参数
- String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
- // 当参数违法时,中断程序
- if (otherArgs.length != 2) {
- System.err.println("Usage:wordcount<in> <out>");
- System.exit(1);
- }
- // 给路径赋值
- IN_PATH = otherArgs[0];
- OUT_PATH = otherArgs[1];
- // 创建文件系统
- FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
- // 如果输出目录存在,我们就删除
- if (fileSystem.exists(new Path(new URI(OUT_PATH)))) {
- fileSystem.delete(new Path(new URI(OUT_PATH)), true);
- }
- // 创建任务
- Job job = new Job(conf, RecordReaderTest.class.getName());
- // 打成jar包运行,这句话是关键
- job.setJarByClass(RecordReaderTest.class);
- // 1.1 设置输入目录和设置输入数据格式化的类
- FileInputFormat.setInputPaths(job, IN_PATH);
- job.setInputFormatClass(MyInputFormat.class);
- // 1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
- job.setMapperClass(RecordReaderMapper.class);
- job.setMapOutputKeyClass(LongWritable.class);
- job.setMapOutputValueClass(Text.class);
- // 1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
- job.setPartitionerClass(MyPartitioner.class);
- job.setNumReduceTasks(2);
- // 1.4 排序
- // 1.5 归约
- // 2.1 Shuffle把数据从Map端拷贝到Reduce端。
- // 2.2 指定Reducer类和输出key和value的类型
- job.setReducerClass(RecordReaderReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- // 2.3 指定输出的路径和设置输出的格式化类
- FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
- job.setOutputFormatClass(TextOutputFormat.class);
- // 提交作业 退出
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- public static class RecordReaderMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
- @Override
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws IOException,
- InterruptedException {
- // 直接将读取的记录写出去
- context.write(key, value);
- }
- }
- public static class RecordReaderReducer extends Reducer<LongWritable, Text, Text, LongWritable> {
- // 创建写出去的key和value
- private Text outKey = new Text();
- private LongWritable outValue = new LongWritable();
- protected void reduce(LongWritable key, Iterable<Text> values, Reducer<LongWritable, Text, Text, LongWritable>.Context context) throws IOException,
- InterruptedException {
- System.out.println("奇数行还是偶数行:" + key);
- // 定义求和的变量
- long sum = 0;
- // 遍历value求和
- for (Text val : values) {
- // 累加
- sum += Long.parseLong(val.toString());
- }
- // 判断奇偶数
- if (key.get() == 0) {
- outKey.set("奇数之和为:");
- } else {
- outKey.set("偶数之和为:");
- }
- // 设置value
- outValue.set(sum);
- // 把结果写出去
- context.write(outKey, outValue);
- }
- }
- }
程序运行结果:
注:分区数大于2的MR程序要打成jar包才能运行!