【转载】Hadoop自定义RecordReader

转自:http://www.linuxidc.com/Linux/2012-04/57831.htm

系统默认的LineRecordReader是按照每行的偏移量做为map输出时的key值,每行的内容作为map的value值,默认的分隔符是回车和换行。

现在要更改map对应的输入的<key,value>值,key对应的文件的路径(或者是文件名),value对应的是文件的内容(content)。

那么我们需要重写InputFormat和RecordReader,因为RecordReader是在InputFormat中调用的,当然重写RecordReader才是重点!

下面看代码InputFormat的重写:

  1. public class chDicInputFormat extends FileInputFormat<Text,Text>
  2. implements JobConfigurable{
  3. private CompressionCodecFactory compressionCodecs = null;
  4. public void configure(JobConf conf) {
  5. compressionCodecs = new CompressionCodecFactory(conf);
  6. }
  7. /**
  8. * @brief isSplitable 不对文件进行切分,必须对文件整体进行处理
  9. *
  10. * @param fs
  11. * @param file
  12. *
  13. * @return false
  14. */
  15. protected boolean isSplitable(FileSystem fs, Path file) {
  16. //  CompressionCodec codec = compressionCodecs.getCode(file);
  17. return false;//以文件为单位,每个单位作为一个split,即使单个文件的大小超过了64M,也就是Hadoop一个块得大小,也不进行分片
  18. }
  19. public RecordReader<Text,Text> getRecordReader(InputSplit genericSplit,
  20. JobConf job, Reporter reporter) throws IOException{
  21. reporter.setStatus(genericSplit.toString());
  22. return new chDicRecordReader(job,(FileSplit)genericSplit);
  23. }
  24. }

下面来看RecordReader的重写:

  1. public class chDicRecordReader implements RecordReader<Text,Text> {
  2. private static final Log LOG = LogFactory.getLog(chDicRecordReader.class.getName());
  3. private CompressionCodecFactory compressionCodecs = null;
  4. private long start;
  5. private long pos;
  6. private long end;
  7. private byte[] buffer;
  8. private String keyName;
  9. private FSDataInputStream fileIn;
  10. public chDicRecordReader(Configuration job,FileSplit split) throws IOException{
  11. start = split.getStart(); //从中可以看出每个文件是作为一个split的
  12. end = split.getLength() + start;
  13. final Path path = split.getPath();
  14. keyName = path.toString();
  15. LOG.info("filename in hdfs is : " + keyName);
  16. final FileSystem fs = path.getFileSystem(job);
  17. fileIn = fs.open(path);
  18. fileIn.seek(start);
  19. buffer = new byte[(int)(end - start)];
  20. this.pos = start;
  21. }
  22. public Text createKey() {
  23. return new Text();
  24. }
  25. public Text createValue() {
  26. return new Text();
  27. }
  28. public long getPos() throws IOException{
  29. return pos;
  30. }
  31. public float getProgress() {
  32. if (start == end) {
  33. return 0.0f;
  34. else {
  35. return Math.min(1.0f, (pos - start) / (float)(end - start));
  36. }
  37. }
  38. public boolean next(Text key, Text value) throws IOException{
  39. while(pos < end) {
  40. key.set(keyName);
  41. value.clear();
  42. fileIn.readFully(pos,buffer);
  43. value.set(buffer);
  44. //      LOG.info("---内容: " + value.toString());
  45. pos += buffer.length;
  46. LOG.info("end is : " + end  + " pos is : " + pos);
  47. return true;
  48. }
  49. return false;
  50. }
  51. public void close() throws IOException{
  52. if(fileIn != null) {
  53. fileIn.close();
  54. }
  55. }
  56. }

通过上面的代码,然后再在main函数中设置InputFormat对应的类,就可以使用这种新的读入格式了。

时间: 2024-08-19 23:30:20

【转载】Hadoop自定义RecordReader的相关文章

干货--Hadoop自定义数据类型和自定义输入输出格式整合项目案例

正文开始前 ,先介绍几个概念 序列化 所谓序列化,是指将结构化对象转化为字节流,以便在网络上传输或写到磁盘进行永久存储. 反序列化 是指将字节流转回到结构化对象的逆过程 序列化在分布式数据处理的两个大领域经常出现:进程间通信和永久存储 在Hadoop中,系统中多个节点上进程间的通信是通过"远程过程调用"(remote procedure call,RPC)实现的 .RPC协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化为原始消息 Hadoop使用了自己写的序列

MapReduce自定义RecordReader

一:背景 RecordReader表示以怎样的方式从分片中读取一条记录,每读取一条记录都会调用RecordReader类,系统默认的RecordReader是LineRecordReader,它是TextInputFormat对应的RecordReader:而SequenceFileInputFormat对应的RecordReader是SequenceFileRecordReader.LineRecordReader是每行的偏移量作为读入map的key,每行的内容作为读入map的value.很多

[Hadoop] - 自定义Mapreduce InputFormat&amp;OutputFormat

在MR程序的开发过程中,经常会遇到输入数据不是HDFS或者数据输出目的地不是HDFS的,MapReduce的设计已经考虑到这种情况,它为我们提供了两个组建,只需要我们自定义适合的InputFormat和OutputFormat,就可以完成这个需求,这里简单的介绍一个从MongoDB中读数据,并写出数据到MongoDB中的一种情况,只是一个Demo,所以数据随便找的一个. 一.自定义InputFormat MapReduce中Map阶段的数据输入是由InputFormat决定的,我们查看org.a

Hadoop自定义类型处理手机上网日志

job提交源码分析 在eclipse中的写的代码如何提交作业到JobTracker中的哪?(1)在eclipse中调用的job.waitForCompletion(true)实际上执行如下方法 connect(); info = jobClient.submitJobInternal(conf); (2)在connect()方法中,实际上创建了一个JobClient对象. 在调用该对象的构造方法时,获得了JobTracker的客户端代理对象JobSubmissionProtocol. JobSu

Hadoop 自定义RPC protocol

RPC的全称为远程过程调用.由于Hadoop是一个分布式系统,因此底层的通信库也就必须实现RPC的基础功能.Hadoop RPC 在整个hadoop中扮演着底层通信模块的角色,举例而言NN和DN.AM和RM之间的通信和协调都是Hadoop RPC来完成的.熟悉使用Hadoop RPC可以加深我们对Hadoop各个模块之间通信过程的理解,也能让我们实现一些自己想要的分布式的小功能. 很多Hadoop相关书籍中都详细介绍了Hadoop RPC,其具体原理大家有兴趣的话可以去看源码加深理解.不过,我觉

【转载】自定义InputFormat

转自:http://blog.csdn.net/jackydai987/article/details/6226108 系统默认的TextInputFormat.Java [java] view plain copy public class TextInputFormat extends FileInputFormat<LongWritable, Text> { @Override public RecordReader<LongWritable, Text> createRec

Hadoop自定义分区Partitioner

一:背景 为了使得MapReduce计算后的结果显示更加人性化,Hadoop提供了分区的功能,可以使得MapReduce计算结果输出到不同的分区中,方便查看.Hadoop提供的Partitioner组件可以让Map对Key进行分区,从而可以根据不同key来分发到不同的reduce中去处理,我们可以自定义key的分发规则,如数据文件包含不同的省份,而输出的要求是每个省份对应一个文件. 二:技术实现 自定义分区很简单,我们只需要继承抽象类Partitioner,实现自定义的getPartitione

Hadoop日记Day13---使用hadoop自定义类型处理手机上网日志

测试数据的下载地址为:http://pan.baidu.com/s/1gdgSn6r 一.文件分析 首先可以用文本编辑器打开一个HTTP_20130313143750.dat的二进制文件,这个文件的内容是我们的手机日志,文件的内容已经经过了优化,格式比较规整,便于学习研究,感兴趣的读者可以尝试一下. 我从中截取文件中的一行记录内容进行分析: 1363157985066     13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i

[转载]hadoop SecondNamenode详解

SecondNamenode名字看起来很象是对第二个Namenode,要么与Namenode一样同时对外提供服务,要么相当于Namenode的HA.真正的了解了SecondNamenode以后,才发现事实并不是这样的.下面这段是Hadoop对SecondNamenode的准确定义: * The Secondary Namenode is a helper to the primary Namenode.* The Secondary is responsible for supporting p