写一个读取hfile的mapreduce之获取HFile内容

  之前介绍了关于Mapreduce是进行输入处理的。这一篇将会介绍如何从Hfile中获取内容。这里和一般获取hbase的数据过程不太一样,不会去创建HTable对象而是直接去读取HFile文件。闲话不多说,直接上代码。先写一个单进程读取HFile的程序

 1 public class HFileReaderUtil {
 2
 3     private Configuration conf ;
 4
 5     private Path path ;
 6
 7     private HFile.Reader reader;
 8
 9     private HFileScanner scanner;
10
11     public HFileReaderUtil()  {
12         if(conf==null){
13             conf= HBaseConfiguration.create();
14         }
15
16     }
17
18 public void scanHfile(String pathstr)throws IOException{
19     path = new Path(pathstr);
20     reader = HFile.createReader(FileSystem.get(conf),path ,new CacheConfig(conf),conf);
21     scanner = reader.getScanner(false,false);
22     reader.loadFileInfo();
23     scanner.seekTo();
24
25     do{
26         KeyValue kv = scanner.getKeyValue();
27         System.out.println("rowkey = "+Bytes.toString(CellUtil.cloneRow(kv)));
28         System.out.println("cf = "+Bytes.toString(CellUtil.cloneFamily(kv)));
29         System.out.println("column value = "+Bytes.toString(CellUtil.cloneValue(kv)));
30         System.out.println("column name = "+CellUtil.cloneQualifier(kv));
31
32     }while (scanner.next());
33
34 }
35
36
37
38 }

  接着实现一个从HFile中获取数据的RecordReader,看一下RecordReader的描述,会不断从HFile中读取数据并返回键值对数据交给map去处理

The record reader breaks the data into key/value pairs for input
 1 package spdbccc.mapreduce.inputformat;
 2
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.FileSystem;
 5 import org.apache.hadoop.fs.Path;
 6 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 7 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 8 import org.apache.hadoop.hbase.io.hfile.HFile;
 9 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
10 import org.apache.hadoop.mapreduce.InputSplit;
11 import org.apache.hadoop.mapreduce.RecordReader;
12 import org.apache.hadoop.mapreduce.TaskAttemptContext;
13 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
14
15 import java.io.IOException;
16
17
18 public class HFileRecordReader<K,V>extends RecordReader{
19
20
21     private HFile.Reader reader;
22     private final HFileScanner scanner;
23     private int entryNumber = 0;
24
25     public HFileRecordReader(FileSplit split, Configuration conf)
26             throws IOException {
27         final Path path = split.getPath();
28         reader = HFile.createReader(FileSystem.get(conf), path,new CacheConfig(conf), conf);
29         scanner = reader.getScanner(false, false);
30         reader.loadFileInfo();
31         scanner.seekTo();
32     }
33
34
35
36     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
37
38     }
39
40     public boolean nextKeyValue() throws IOException, InterruptedException {
41         entryNumber++;
42         return scanner.next();
43     }
44
45     public Object getCurrentKey() throws IOException, InterruptedException {
46         // TODO Auto-generated method stub
47         return new ImmutableBytesWritable(scanner.getKeyValue().getRow());
48     }
49
50     public Object getCurrentValue() throws IOException, InterruptedException {
51         return scanner.getKeyValue();
52     }
53
54
55     /**
56      * 返回运行进度
57      * @return
58      * @throws IOException
59      * @throws InterruptedException
60      */
61     public float getProgress() throws IOException, InterruptedException {
62         if (reader != null) {
63             return (entryNumber / reader.getEntries());
64         }
65         return 1;
66     }
67
68
69     /**
70      * 关闭读取资源
71      * @throws IOException
72      */
73     public void close() throws IOException {
74         if (reader != null) {
75             reader.close();
76         }
77     }
78 }

最后实现FileInputFormat,这样就实现了一个读取HFile的InputFormat。

 1 package spdbccc.mapreduce.inputformat;
 2
 3
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.hbase.KeyValue;
 6 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 7 import org.apache.hadoop.mapreduce.InputSplit;
 8 import org.apache.hadoop.mapreduce.JobContext;
 9 import org.apache.hadoop.mapreduce.RecordReader;
10 import org.apache.hadoop.mapreduce.TaskAttemptContext;
11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
13
14 import java.io.IOException;
15
16 public class HFileInputFormat extends
17         FileInputFormat<ImmutableBytesWritable, KeyValue> {
18
19     @Override
20     protected boolean isSplitable(JobContext context, Path filename) {
21         return false;
22     }
23
24     @Override
25     public RecordReader<ImmutableBytesWritable, KeyValue> createRecordReader(
26             InputSplit split, TaskAttemptContext context) throws IOException,
27             InterruptedException {
28         return new HFileRecordReader((FileSplit) split, context
29                 .getConfiguration());
30     }
31
32
33 }

  到这里是不是感觉自己蓄满了洪荒之力?从此HBase读取再没什么难度可以走上人生巅峰了,然而!!!

  这样的实现方式并没有什么卵用!

  这样的实现方式并没有什么卵用!

  这样的实现方式并没有什么卵用!

  重要的事情说三遍!

  因为到这里,如果直接使用这个InputFormat会有两个问题。一,虽然可以读取到HFile中的数据,但是同一个kv会有不同时间戳的版本在不同的HFile,无法保证获取到的是最新的记录。  二,任务运行的时候会产生大量的Map任务,因为HBase数据落地时会产生大量的小文件,而FileInputFormat会对每个小文件生成一个map任务,最终结果时短时间占满集群资源。

  而这两个问题都是由于HBase自身的原理有关,下一篇将会介绍这些背后的故事——————LSM。

时间: 2024-11-10 01:14:26

写一个读取hfile的mapreduce之获取HFile内容的相关文章

写一个读取Excel表格的接口

# -*- coding: gbk -*-import xlrd class Canshu: def __init__(self,filepath): """ 创建文件对象 :param filepath: 文件路径 """ self.workbook = xlrd.open_workbook(filepath) def get_canshu(self,sheetname,row,col): """ 获取某一个单元格

PHP ftp获取目录内容为空

使用PHP的ftp函数获取目录内容,ftp_nlist()和ftp_rawlist()返回都为空. 查了一圈资料找不到答案,然后用Python写了一个,一样的操作就可以获取目录内容. 抓包发现,Python在获取目录内容之前自动改变为被动模式了,在PHP中ftp_rawlist()前也手动ftp_pasv($con, TRUE),就能获取到内容了. 总结:也许是对ftp协议不理解,导致了这样的问题,在此写下,防止各位再踩坑^_^

python 学习笔记 12 -- 写一个脚本获取城市天气信息

最近在玩树莓派,前面写过一篇在树莓派上使用1602液晶显示屏,那么能够显示后最重要的就是显示什么的问题了.最容易想到的就是显示时间啊,CPU利用率啊,IP地址之类的.那么我觉得呢,如果能够显示当前时间.温度也是甚好的,作为一个桌面小时钟还是很精致的. 1. 目前有哪些工具 目前比较好用的应该是 weather-util, 之前我获取天气信息一般都是通过它. 使用起来也很简单: (1) Debian/Ubuntu 用户使用 sudo apt-get install weather-util 安装

【Linux学习】 写一个简单的Makefile编译源码获取当前系统时间

打算学习一下Linux,这两天先看了一下gcc的简单用法以及makefile的写法,今天是周末,天气闷热超市,早晨突然发现住处的冰箱可以用了,于是先出去吃了点东西,然后去超市买了一坨冰棍,老冰棍居多,5毛钱一根,还有几根1.5的. 嗯 接着说gcc的事 先把源代码贴上来 //gettime.h #ifndef _GET_TIME_H_ #define _GET_TIME_H_ void PrintCurrentTime(); #endif //gettime.c #include <stdio.

写一个Windows上的守护进程(8)获取进程路径

写一个Windows上的守护进程(8)获取进程路径 要想守护某个进程,就先得知道这个进程在不在.我们假设要守护的进程只会存在一个实例(这也是绝大部分情形). 我是遍历系统上的所有进程,然后判断他们的路径和要守护的进程是否一致,以此来确定进程是否存在. 遍历进程大家都知道用CreateToolhelp32Snapshot系列API,但是他们最后取得的是进程exe名称,不是全路径,如果仅依靠名称就可以达到目的也就罢了,但是有的时候还是得取到全路径,这样会更靠谱一些. 那么问题来了,如何取到进程全路径

大数据之如何利用自己写的jar包在mapreduce的使用

一:首先要将linux 和winodws的exlipse关联起来 第一步:在windows中部署hadoop包:解压一个hadoop压缩文件 第二步:将解压后的hadoop文件目录下的bin文件中的文件全部被替换成下面文件夹下的文件:该文件已经压缩并上传:bin.rar 第三步:将替换后的文件夹下的一个hadoop.dll复制到windows-->system32文件夹下 第四步:配置hadoop的环境变量: 需要配置的环境变量包括: 这里的root 是虚拟机的管理员账户 第五步:验证是否部署成

如何给Ionic写一个cordova插件

写一个cordova插件 之前由javaWeb转html5开发,由于面临新技术,遂在适应的过程中极为挣扎,不过还好~,这个过程也极为短暂:现如今面临一些较为复杂的需求还会有一丝丝头痛,却没有一开始那么强烈了... 在正式写下文之前,我先感谢公司大boss:王总,感谢他让我进入了一个有挑战性的技术公司 并在这个过程中一直鼓励我不断汲取新技术,同时也指正了我在开发中的一些不太好的习惯,十分感谢! 再~,感谢在开发中给予我太多帮助的杜勇以及孙金~,不论是需求讨论还是具体开发阶段都会给予一些十分有用的思

【转载】如何写一个框架:步骤(下)

说明:写本文的时候作者完全是把脑子里的东西写了出来,没有参考任何的资料,所以对于每一项内容可能都是不完整的,不能作为一个完整的参考.有一些方法学的东西每个人都有自己的喜好,没有觉得的对和错. 单元测试 在这之前我们写的框架只能说是一个在最基本的情况下可以使用的框架,作为一个框架我们无法预测开发人员将来会怎么使用它,所以我们需要做大量的工作来确保框架不但各种功能都是正确的,而且还是健壮的.写应用系统的代码,大多数项目是不会去写单元测试的,原因很多: 项目赶时间,连做一些输入验证都没时间搞,哪里有时

写一个方法进行文件的复制

java中实现文件内的复制,需要新建文件的方法: File file=new File("wubin.txt"); 并且没有这个文件,那么需要将这个文件,创造出来: file.createNewFile(); 当然也可以直接在文件流里面直接创造: FileInputStream  fis=new FileInputStream("wubin.txt"); 意思是在本目录下创建一个wubin.txt的文件,之后创造一个inputstreamreader去获取内容,再通