MapReduce 重要组件——Recordreader组件

(1)以怎样的方式从分片中读取一条记录,每读取一条记录都会调用RecordReader类;

(2)系统默认的RecordReader是LineRecordReader,如TextInputFormat;而SequenceFileInputFormat的RecordReader是SequenceFileRecordReader;

(3)LineRecordReader是用每行的偏移量作为map的key,每行的内容作为map的value;

(4)应用场景:自定义读取每一条记录的方式;自定义读入key的类型,如希望读取的key是文件的路径或名字而不是该行在文件中的偏移量。

自定义RecordReader:

(1)继承抽象类RecordReader,实现RecordReader的一个实例;

(2)实现自定义InputFormat类,重写InputFormat中createRecordReader()方法,返回值是自定义的RecordReader实例;

(3)配置job.setInputFormatClass()设置自定义的InputFormat实例;

源码见org.apache.mapreduce.lib.input.TextInputFormat类;

RecordReader例子:

应用场景:

数据:

1

2

3

4

5

6

7

......

要求:分别计算奇数行与偶数行数据之和

奇数行综合:10+30+50+70=160

偶数行综合:20+40+60=120

新建项目TestRecordReader,包com.recordreader,

源代码MyMapper.java:

package com.recordreader;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper {

@Override

protected void map(LongWritable key, Text value,Context context)

throws IOException, InterruptedException {

// TODO Auto-generated method stub

context.write(key, value);

}

}

源代码MyPartitioner.java:

package com.recordreader;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Partitioner;

public class MyPartitioner extends Partitioner {

@Override

public int getPartition(LongWritable key, Text value, int numPartitions) {

// TODO Auto-generated method stub

if(key.get() % 2 == 0){

key.set(1);

return 1;

}

else {

key.set(0);

return 0;

}

}

}

源代码MyReducer.java:

package com.recordreader;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer {

@Override

protected void reduce(LongWritable key, Iterable value,Context context)

throws IOException, InterruptedException {

// TODO Auto-generated method stub

int sum = 0;

for(Text val: value){

sum += Integer.parseInt(val.toString());

}

Text write_key = new Text();

IntWritable write_value = new IntWritable();

if(key.get() == 0)

write_key.set("odd:");

else

write_key.set("even:");

write_value.set(sum);

context.write(write_key, write_value);

}

}

源代码MyRecordReader.java:

package com.recordreader;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import org.apache.hadoop.util.LineReader;

public class MyRecordReader extends RecordReader {

private long start;

private long end;

private long pos;

private FSDataInputStream fin = null;

private LongWritable key = null;

private Text value = null;

private LineReader reader = null;

@Override

public void close() throws IOException {

// TODO Auto-generated method stub

fin.close();

}

@Override

public LongWritable getCurrentKey() throws IOException,

InterruptedException {

// TODO Auto-generated method stub

return key;

}

@Override

public Text getCurrentValue() throws IOException, InterruptedException {

// TODO Auto-generated method stub

return value;

}

@Override

public float getProgress() throws IOException, InterruptedException {

// TODO Auto-generated method stub

return 0;

}

@Override

public void initialize(InputSplit inputSplit, TaskAttemptContext context)

throws IOException, InterruptedException {

// TODO Auto-generated method stub

FileSplit fileSplit = (FileSplit)inputSplit;

start = fileSplit.getStart();

end = start + fileSplit.getLength();

Configuration conf = context.getConfiguration();

Path path = fileSplit.getPath();

FileSystem fs = path.getFileSystem(conf);

fin = fs.open(path);

fin.seek(start);

reader = new LineReader(fin);

pos = 1;

}

@Override

public boolean nextKeyValue() throws IOException, InterruptedException {

// TODO Auto-generated method stub

if(key == null)

key = new LongWritable();

key.set(pos);

if(value == null)

value = new Text();

if(reader.readLine(value) == 0)

return false;

pos++;

return true;

}

}

源代码MyFileInputFormat.java:

package com.recordreader;

import java.io.IOException;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hadoop.mapreduce.JobContext;

import org.apache.hadoop.mapreduce.RecordReader;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class MyFileInputFormat extends FileInputFormat {

@Override

public RecordReader createRecordReader(InputSplit arg0,

TaskAttemptContext arg1) throws IOException, InterruptedException {

// TODO Auto-generated method stub

return new MyRecordReader();

}

@Override

protected boolean isSplitable(JobContext context, Path filename) {

// TODO Auto-generated method stub

return false;

}

}

源代码TestRecordReader.java:

package com.recordreader;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class TestRecordReader {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{

Configuration conf = new Configuration();

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

if (otherArgs.length != 2) {

System.err.println("Usage: wordcount ");

System.exit(2);

}

Job job = new Job(conf, "word count");

job.setJarByClass(TestRecordReader.class);

job.setMapperClass(MyMapper.class);

job.setReducerClass(MyReducer.class);

job.setPartitionerClass(MyPartitioner.class);

job.setNumReduceTasks(2);

job.setInputFormatClass(MyFileInputFormat.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

时间: 2024-10-13 07:07:55

MapReduce 重要组件——Recordreader组件的相关文章

MapReduce 重要组件——Recordreader组件 [转]

(1)以怎样的方式从分片中读取一条记录,每读取一条记录都会调用RecordReader类: (2)系统默认的RecordReader是LineRecordReader,如TextInputFormat:而SequenceFileInputFormat的RecordReader是SequenceFileRecordReader: (3)LineRecordReader是用每行的偏移量作为map的key,每行的内容作为map的value: (4)应用场景:自定义读取每一条记录的方式:自定义读入key

[MapReduce_5] MapReduce 中的 Combiner 组件应用

0. 说明 Combiner 介绍 &&  在 MapReduce 中的应用 1. 介绍 Combiner: Map 端的 Reduce,有自己的使用场景 在相同 Key 过多的情况下,在 Map 端进行的预聚合,大大缓解了网络间的 K-V 全分发 Combiner 适用场景: 最大值 求和 最小值 Combiner 不适用平均值的计算 2. 结合 Combiner 实现 Word Count 在 [MapReduce_1] 运行 Word Count 示例程序 代码基础上在 WCApp.

Delphi IdHttp组件+IdHttpServer组件实现文件下载服务

http://blog.csdn.net/xxkku521/article/details/16864759 Delphi IdHttp组件+IdHttpServer组件实现文件下载服务 2013-11-21 18:15 2624人阅读 评论(0) 收藏 举报  分类: DELPHI(10)  版权声明:本文为博主原创文章,未经博主允许不得转载. [delphi] view plain copy uses idhttp,IdHTTPServer; //idhttp组件提交下载请求 procedu

12 Django组件-form组件

知识预览 forms组件 forms组件 校验字段功能 针对一个实例:注册用户讲解. 模型:models.py class UserInfo(models.Model): name=models.CharField(max_length=32) pwd=models.CharField(max_length=32) email=models.EmailField() tel=models.CharField(max_length=32) 模板:register.html: <!DOCTYPE h

django第13天(auth组件,forms组件)

django第13天(auth组件,forms组件) auth组件 -auth组件 -auth是什么? -django内置的用户认证系统,可以快速的实现,登录,注销,修改密码.... -怎么用? -(1)先创建超级用户: -python3 manage.py createsuperuser -输入用户名,邮箱(可以不输入),密码,敲回车,这样就创建出一个超级用户 -也就是在auth_user这个表中插入了一条数据(密码是加密的,所以我不能手动插入) -(2)验证用户: -from django.

Django---FORM组件.FORM组件的字段,FORM组件校验流程,FORM组件的全局和局部钩子,FORM和Model的组合

Django---FORM组件.FORM组件的字段,FORM组件校验流程,FORM组件的全局和局部钩子,FORM和Model的组合 一丶FORM的介绍 1.生成页面可用的HTML标签 2.对用户提交的数据进行校验 3.保留上次输入内容 二丶使用form组件实现注册功能 from django import forms # 导入forms组件 # 按照Django form组件的要求自己写一个类 class RegForm(forms.Form): # 继承Form name = forms.Ch

v-once指令、v-cloak指令、条件指令家族、原义指令、循环指令、todolist案例、实例成员-符号、实例成员-计算属性、实例成员-属性监听、监听的案例、局部组件、全局组件、组件交互(父传子、子传父)

v-once指令: v-once:单独使用,限制的标签内容一旦赋值,便不可被动更改(如果是输入框,可以主动修改) <div id="app"> <input type="text" v-model="msg"> <!-- 一旦赋值,只可主动更改 --> <input type="text" v-model="msg" v-once> <p>{{ m

django的RBAC认证z;自定义auth_user表;认证组件权限组件源码分析;认证组件;权限组件

一 RBAC 1.RBAC:全称(Role-Based Access Control):指的是基于用户权限访问控制的认证. 2.Django框架采用的是RBAC认证规则,RBAC认证规则通常会分为:三表规则,五表规则:Django采用的是六表规则. # 三表:用户表.角色表.权限表# 五表:用户表.角色表.权限表.用户角色关系表.角色权限关系表# 六表:用户表.角色表.权限表.用户角色关系表.角色权限关系表.用户权限关系表 3.在Django中六表之间是都是多对多的关系,可通过下面字段跨表访问

?DRF?-----三大认证组件--认证组件

认证组件 铺垫: 源码分析 入口: restframework 框架内的 views 下的 APIView 的 dispatch方法 组件的最下面 有三个方法   分别是 认证组件  权限组件 和 频率组件 perform_authentication (认证组件) 校验用户 - 游客 合法用户 非法用户 游客: 代表校验通过 进入下一步校验 (权限校验) 合法用户: 代表校验通过 将用户 存储在 request.user 中 再进行下一步校验 非法用户: 代表校验失败 抛出异常 返回403 权