之前介绍了关于Mapreduce是进行输入处理的。这一篇将会介绍如何从Hfile中获取内容。这里和一般获取hbase的数据过程不太一样,不会去创建HTable对象而是直接去读取HFile文件。闲话不多说,直接上代码。先写一个单进程读取HFile的程序
1 public class HFileReaderUtil { 2 3 private Configuration conf ; 4 5 private Path path ; 6 7 private HFile.Reader reader; 8 9 private HFileScanner scanner; 10 11 public HFileReaderUtil() { 12 if(conf==null){ 13 conf= HBaseConfiguration.create(); 14 } 15 16 } 17 18 public void scanHfile(String pathstr)throws IOException{ 19 path = new Path(pathstr); 20 reader = HFile.createReader(FileSystem.get(conf),path ,new CacheConfig(conf),conf); 21 scanner = reader.getScanner(false,false); 22 reader.loadFileInfo(); 23 scanner.seekTo(); 24 25 do{ 26 KeyValue kv = scanner.getKeyValue(); 27 System.out.println("rowkey = "+Bytes.toString(CellUtil.cloneRow(kv))); 28 System.out.println("cf = "+Bytes.toString(CellUtil.cloneFamily(kv))); 29 System.out.println("column value = "+Bytes.toString(CellUtil.cloneValue(kv))); 30 System.out.println("column name = "+CellUtil.cloneQualifier(kv)); 31 32 }while (scanner.next()); 33 34 } 35 36 37 38 }
接着实现一个从HFile中获取数据的RecordReader,看一下RecordReader的描述,会不断从HFile中读取数据并返回键值对数据交给map去处理
The record reader breaks the data into key/value pairs for input
1 package spdbccc.mapreduce.inputformat; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.FileSystem; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 7 import org.apache.hadoop.hbase.io.hfile.CacheConfig; 8 import org.apache.hadoop.hbase.io.hfile.HFile; 9 import org.apache.hadoop.hbase.io.hfile.HFileScanner; 10 import org.apache.hadoop.mapreduce.InputSplit; 11 import org.apache.hadoop.mapreduce.RecordReader; 12 import org.apache.hadoop.mapreduce.TaskAttemptContext; 13 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 14 15 import java.io.IOException; 16 17 18 public class HFileRecordReader<K,V>extends RecordReader{ 19 20 21 private HFile.Reader reader; 22 private final HFileScanner scanner; 23 private int entryNumber = 0; 24 25 public HFileRecordReader(FileSplit split, Configuration conf) 26 throws IOException { 27 final Path path = split.getPath(); 28 reader = HFile.createReader(FileSystem.get(conf), path,new CacheConfig(conf), conf); 29 scanner = reader.getScanner(false, false); 30 reader.loadFileInfo(); 31 scanner.seekTo(); 32 } 33 34 35 36 public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 37 38 } 39 40 public boolean nextKeyValue() throws IOException, InterruptedException { 41 entryNumber++; 42 return scanner.next(); 43 } 44 45 public Object getCurrentKey() throws IOException, InterruptedException { 46 // TODO Auto-generated method stub 47 return new ImmutableBytesWritable(scanner.getKeyValue().getRow()); 48 } 49 50 public Object getCurrentValue() throws IOException, InterruptedException { 51 return scanner.getKeyValue(); 52 } 53 54 55 /** 56 * 返回运行进度 57 * @return 58 * @throws IOException 59 * @throws InterruptedException 60 */ 61 public float getProgress() throws IOException, InterruptedException { 62 if (reader != null) { 63 return (entryNumber / reader.getEntries()); 64 } 65 return 1; 66 } 67 68 69 /** 70 * 关闭读取资源 71 * @throws IOException 72 */ 73 public void close() throws IOException { 74 if (reader != null) { 75 reader.close(); 76 } 77 } 78 }
最后实现FileInputFormat,这样就实现了一个读取HFile的InputFormat。
1 package spdbccc.mapreduce.inputformat; 2 3 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.hbase.KeyValue; 6 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 7 import org.apache.hadoop.mapreduce.InputSplit; 8 import org.apache.hadoop.mapreduce.JobContext; 9 import org.apache.hadoop.mapreduce.RecordReader; 10 import org.apache.hadoop.mapreduce.TaskAttemptContext; 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 12 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 13 14 import java.io.IOException; 15 16 public class HFileInputFormat extends 17 FileInputFormat<ImmutableBytesWritable, KeyValue> { 18 19 @Override 20 protected boolean isSplitable(JobContext context, Path filename) { 21 return false; 22 } 23 24 @Override 25 public RecordReader<ImmutableBytesWritable, KeyValue> createRecordReader( 26 InputSplit split, TaskAttemptContext context) throws IOException, 27 InterruptedException { 28 return new HFileRecordReader((FileSplit) split, context 29 .getConfiguration()); 30 } 31 32 33 }
到这里是不是感觉自己蓄满了洪荒之力?从此HBase读取再没什么难度可以走上人生巅峰了,然而!!!
这样的实现方式并没有什么卵用!
这样的实现方式并没有什么卵用!
这样的实现方式并没有什么卵用!
重要的事情说三遍!
因为到这里,如果直接使用这个InputFormat会有两个问题。一,虽然可以读取到HFile中的数据,但是同一个kv会有不同时间戳的版本在不同的HFile,无法保证获取到的是最新的记录。 二,任务运行的时候会产生大量的Map任务,因为HBase数据落地时会产生大量的小文件,而FileInputFormat会对每个小文件生成一个map任务,最终结果时短时间占满集群资源。
而这两个问题都是由于HBase自身的原理有关,下一篇将会介绍这些背后的故事——————LSM。
时间: 2024-11-10 01:14:26