【hbase】——HBase 写优化之 BulkLoad 实现数据快速入库

1、为何要 BulkLoad 导入?传统的 HTableOutputFormat 写 HBase 有什么问题?

我们先看下 HBase 的写流程:

通常 MapReduce 在写HBase时使用的是 TableOutputFormat 方式,在reduce中直接生成put对象写入HBase,该方式在大数据量写入时效率低下(HBase会block写入,频繁进行flush,split,compact等大量IO操作),并对HBase节点的稳定性造成一定的影响(GC时间过长,响应变慢,导致节点超时退出,并引起一系列连锁反应),而HBase支持 bulk load 的入库方式,它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接在HDFS中生成持久化的HFile数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载,在大数据量写入时能极大的提高写入效率,并降低对HBase节点的写入压力。
通过使用先生成HFile,然后再BulkLoad到Hbase的方式来替代之前直接调用HTableOutputFormat的方法有如下的好处:
(1)消除了对HBase集群的插入压力
(2)提高了Job的运行速度,降低了Job的执行时间
目前此种方式仅仅适用于只有一个列族的情况,在新版 HBase 中,单列族的限制会消除。

2、bulkload 流程与实践

bulkload 方式需要两个Job配合完成: 
(1)第一个Job还是运行原来业务处理逻辑,处理的结果不直接调用HTableOutputFormat写入到HBase,而是先写入到HDFS上的一个中间目录下(如 middata) 
(2)第二个Job以第一个Job的输出(middata)做为输入,然后将其格式化HBase的底层存储文件HFile 
(3)调用BulkLoad将第二个Job生成的HFile导入到对应的HBase表中

下面给出相应的范例代码:

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.KeyValue;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;

import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;

import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class GeneratePutHFileAndBulkLoadToHBase {

    public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>

    {

        private Text wordText=new Text();

        private IntWritable one=new IntWritable(1);

        @Override

        protected void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {

            // TODO Auto-generated method stub

            String line=value.toString();

            String[] wordArray=line.split(" ");

            for(String word:wordArray)

            {

                wordText.set(word);

                context.write(wordText, one);

            }

            

        }

    }

    

    public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>

    {

        private IntWritable result=new IntWritable();

        protected void reduce(Text key, Iterable<IntWritable> valueList,

                Context context)

                throws IOException, InterruptedException {

            // TODO Auto-generated method stub

            int sum=0;

            for(IntWritable value:valueList)

            {

                sum+=value.get();

            }

            result.set(sum);

            context.write(key, result);

        }

        

    }

    

    public static class ConvertWordCountOutToHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>

    {

        @Override

        protected void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {

            // TODO Auto-generated method stub

            String wordCountStr=value.toString();

            String[] wordCountArray=wordCountStr.split("\t");

            String word=wordCountArray[0];

            int count=Integer.valueOf(wordCountArray[1]);

            

            //创建HBase中的RowKey

            byte[] rowKey=Bytes.toBytes(word);

            ImmutableBytesWritable rowKeyWritable=new ImmutableBytesWritable(rowKey);

            byte[] family=Bytes.toBytes("cf");

            byte[] qualifier=Bytes.toBytes("count");

            byte[] hbaseValue=Bytes.toBytes(count);

            // Put 用于列簇下的多列提交,若只有一个列,则可以使用 KeyValue 格式

            // KeyValue keyValue = new KeyValue(rowKey, family, qualifier, hbaseValue);

            Put put=new Put(rowKey);

            put.add(family, qualifier, hbaseValue);

            context.write(rowKeyWritable, put);

            

        }

        

    }

    

    public static void main(String[] args) throws Exception {

        // TODO Auto-generated method stub

        Configuration hadoopConfiguration=new Configuration();

        String[] dfsArgs = new GenericOptionsParser(hadoopConfiguration, args).getRemainingArgs();

        

        //第一个Job就是普通MR,输出到指定的目录

        Job job=new Job(hadoopConfiguration, "wordCountJob");

        job.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class);

        job.setMapperClass(WordCountMapper.class);

        job.setReducerClass(WordCountReducer.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.setInputPaths(job, new Path(dfsArgs[0]));

        FileOutputFormat.setOutputPath(job, new Path(dfsArgs[1]));

        //提交第一个Job

        int wordCountJobResult=job.waitForCompletion(true)?0:1;

        

        //第二个Job以第一个Job的输出做为输入,只需要编写Mapper类,在Mapper类中对一个job的输出进行分析,并转换为HBase需要的KeyValue的方式。

        Job convertWordCountJobOutputToHFileJob=new Job(hadoopConfiguration, "wordCount_bulkload");

        

        convertWordCountJobOutputToHFileJob.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.class);

        convertWordCountJobOutputToHFileJob.setMapperClass(ConvertWordCountOutToHFileMapper.class);

        //ReducerClass 无需指定,框架会自行根据 MapOutputValueClass 来决定是使用 KeyValueSortReducer 还是 PutSortReducer

        //convertWordCountJobOutputToHFileJob.setReducerClass(KeyValueSortReducer.class);

        convertWordCountJobOutputToHFileJob.setMapOutputKeyClass(ImmutableBytesWritable.class);

        convertWordCountJobOutputToHFileJob.setMapOutputValueClass(Put.class);

        

        //以第一个Job的输出做为第二个Job的输入

        FileInputFormat.addInputPath(convertWordCountJobOutputToHFileJob, new Path(dfsArgs[1]));

        FileOutputFormat.setOutputPath(convertWordCountJobOutputToHFileJob, new Path(dfsArgs[2]));

        //创建HBase的配置对象

        Configuration hbaseConfiguration=HBaseConfiguration.create();

        //创建目标表对象

        HTable wordCountTable =new HTable(hbaseConfiguration, "word_count");

        HFileOutputFormat.configureIncrementalLoad(convertWordCountJobOutputToHFileJob,wordCountTable);

       

        //提交第二个job

        int convertWordCountJobOutputToHFileJobResult=convertWordCountJobOutputToHFileJob.waitForCompletion(true)?0:1;

        

        //当第二个job结束之后,调用BulkLoad方式来将MR结果批量入库

        LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseConfiguration);

        //第一个参数为第二个Job的输出目录即保存HFile的目录,第二个参数为目标表

        loader.doBulkLoad(new Path(dfsArgs[2]), wordCountTable);

        

        //最后调用System.exit进行退出

        System.exit(convertWordCountJobOutputToHFileJobResult);

        

    }

}

比如原始的输入数据的目录为:/rawdata/test/wordcount/20131212

中间结果数据保存的目录为:/middata/test/wordcount/20131212 
最终生成的HFile保存的目录为:/resultdata/test/wordcount/20131212 
运行上面的Job的方式如下: 
hadoop jar test.jar /rawdata/test/wordcount/20131212 /middata/test/wordcount/20131212 /resultdata/test/wordcount/20131212

3、说明与注意事项:

(1)HFile方式在所有的加载方案里面是最快的,不过有个前提——数据是第一次导入,表是空的。如果表中已经有了数据。HFile再导入到hbase的表中会触发split操作。

(2)最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。
否则报这样的错误:

?


1

2

3

java.lang.IllegalArgumentException: Can‘t read partitions file

...

Caused by: java.io.IOException: wrong key class: org.apache.hadoop.io.*** is not class org.apache.hadoop.hbase.io.ImmutableBytesWritable

(3)最终输出部分,Value类型是KeyValue 或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer,这个 SorterReducer 可以不指定,因为源码中已经做了判断:

?


1

2

3

4

5

6

7

if (KeyValue.class.equals(job.getMapOutputValueClass())) {

    job.setReducerClass(KeyValueSortReducer.class);

} else if (Put.class.equals(job.getMapOutputValueClass())) {

    job.setReducerClass(PutSortReducer.class);

} else {

    LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());

}

(4) MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat只适合一次对单列族组织成HFile文件,多列簇需要起多个 job,不过新版本的 Hbase 已经解决了这个限制。

(5) MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库HBase,相当于move HFile到HBase的Region中,HFile子目录的列族内容没有了。

(6)最后一个 Reduce 没有 setNumReduceTasks 是因为,该设置由框架根据region个数自动配置的。

(7)下边配置部分,注释掉的其实写不写都无所谓,因为看源码就知道configureIncrementalLoad方法已经把固定的配置全配置完了,不固定的部分才需要手动配置。

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

public class HFileOutput {

        //job 配置

    public static Job configureJob(Configuration conf) throws IOException {

        Job job = new Job(configuration, "countUnite1");

        job.setJarByClass(HFileOutput.class);

                //job.setNumReduceTasks(2); 

        //job.setOutputKeyClass(ImmutableBytesWritable.class);

        //job.setOutputValueClass(KeyValue.class);

        //job.setOutputFormatClass(HFileOutputFormat.class);

 

        Scan scan = new Scan();

        scan.setCaching(10);

        scan.addFamily(INPUT_FAMILY);

        TableMapReduceUtil.initTableMapperJob(inputTable, scan,

                HFileOutputMapper.class, ImmutableBytesWritable.class, LongWritable.class, job);

        //这里如果不定义reducer部分,会自动识别定义成KeyValueSortReducer.class 和PutSortReducer.class

                job.setReducerClass(HFileOutputRedcuer.class);

        //job.setOutputFormatClass(HFileOutputFormat.class);

        HFileOutputFormat.configureIncrementalLoad(job, new HTable(

                configuration, outputTable));

        HFileOutputFormat.setOutputPath(job, new Path());

                //FileOutputFormat.setOutputPath(job, new Path()); //等同上句

        return job;

    }

 

    public static class HFileOutputMapper extends

            TableMapper<ImmutableBytesWritable, LongWritable> {

        public void map(ImmutableBytesWritable key, Result values,

                Context context) throws IOException, InterruptedException {

            //mapper逻辑部分

            context.write(new ImmutableBytesWritable(Bytes()), LongWritable());

        }

    }

 

    public static class HFileOutputRedcuer extends

            Reducer<ImmutableBytesWritable, LongWritable, ImmutableBytesWritable, KeyValue> {

        public void reduce(ImmutableBytesWritable key, Iterable<LongWritable> values,

                Context context) throws IOException, InterruptedException {

                        //reducer逻辑部分

            KeyValue kv = new KeyValue(row, OUTPUT_FAMILY, tmp[1].getBytes(),

                    Bytes.toBytes(count));

            context.write(key, kv);

        }

    }

}

4、Refer:

1、Hbase几种数据入库(load)方式比较

http://blog.csdn.net/kirayuan/article/details/6371635

2、MapReduce生成HFile入库到HBase及源码分析

http://blog.pureisle.net/archives/1950.html

3、MapReduce生成HFile入库到HBase

http://shitouer.cn/2013/02/hbase-hfile-bulk-load/

时间: 2024-08-27 02:58:22

【hbase】——HBase 写优化之 BulkLoad 实现数据快速入库的相关文章

HBase数据快速导入之ImportTsv&amp;Bulkload

导入数据最快的方式,可以略过WAL直接生产底层HFile文件 (环境:centos6.5.Hadoop2.6.0.HBase0.98.9) 1.SHELL方式 1.1 ImportTsv直接导入 命令:bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv Usage: importtsv -Dimporttsv.columns=a,b,c <tablename> <inputdir> 测试: 1.1.1在HBase中创建好表 c

HBase读写性能优化

一个系统上线之后,开发和调优将会一直伴随在系统的整个生命周期中,HBase也不例外.下面我们要学习如何进行HBase读写性能调优,以获取最大的读写效率. HBase写入优化客户端优化批量写采用批量写,可以减少客户端到RegionServer之间的RPC的次数,提高写入性能.批量写请求要么全部成功返回,要么抛出异常. HTable.put(List<Put>); 异步批量提交如果业务可以接受异常情况下丢失少量数据,可以使用异步批量提交方式提交请求. 用户提交写请求之后,数据会先写入客户端缓存,并

hbase表设计优化原则 ***** 生产环境中使用小结

2019/2/28 星期四 hbase表设计优化原则 https://www.cnblogs.com/qingyunzong/p/8696962.html表设计1.列簇设计 追求的原则是:在合理范围内能尽量少的减少列簇就尽量减少列簇. 最优设计是:将所有相关性很强的 key-value 都放在同一个列簇下,这样既能做到查询效率 最高,也能保持尽可能少的访问不同的磁盘文件. 以用户信息为例,可以将必须的基本信息存放在一个列族,而一些附加的额外信息可以放在 另一列族.2.RowKey 设计 HBas

一个自定义 HBase Filter -“通过RowKeys来高性能获取数据”

大家在使用HBase和Solr搭建系统中经常遇到的一个问题就是:“我通过SOLR得到了RowKeys后,该怎样去HBase上取数据”.使用现有的Filter性能差劲,网上也没有现成的解决方案,我在这里把这个问题的解决办法分享给大家,抛砖引玉一下. Solr和HBase专辑 1.“关于Solr的使用总结的心得体会”(http://www.cnblogs.com/wgp13x/p/3742653.html) 2.“中文分词器性能比较?”(http://www.cnblogs.com/wgp13x/p

hbase G1 GC优化

本文借鉴之前HBaseConAsia2017,小米公司对hbase g1 gc的优化分享.此外还可以参考apache官方博客对于hbase g1 gc优化的一篇文章(Tuning G1GC For Your HBase Cluster) g1 gc的优化主要是对一些重要的参数进行调整,然后执行压力测试,分析G1的日志.G1日志处理可以使用HubSpot开发的一个Python工具, 叫做 gc_log_visualizer , 这个工具通过正则提取日志数据, 然后绘制成监控图, 比较方便查看G1的

使用MapReduce查询Hbase表指定列簇的全部数据输入到HDFS(一)

package com.bank.service; import java.io.IOException; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hba

利用hive-hbase表做hive表快速入库hbase功能,纬度表的查询

有个需求是纬度表需要秒出数据,首先表刚好也是1-1批对的情况,首先想到了利用hbase的索引机制.(表数据大概在4--30E之间) 虽然网上有很多直接建立hive表,自动创建hbase表的例子,但是这种情况不能进行hbase表的region预分区,导致热点问题比较严重.所以hive和hbase表要分开建立再关联起来. 1.建立hbase表 create 'xxxxx', {NAME => 'info',COMPRESSION => 'SNAPPY'},SPLITS => ['xxx|',

三种DSO(标准DSO、写优化DSO、直接更新DSO)

声明:原创作品,转载时请注明文章来自SAP师太技术博客:www.cnblogs.com/jiangzhengjun,并以超链接形式标明文章原始出处,否则将追究法律责任!原文链接:http://www.cnblogs.com/jiangzhengjun/p/4296211.html 标准Standard DSO 标准DSO有三张表: 数据从源抽取到标准DSO中时,在同一抽取请求中,相同业务主键的数据会合并(合并的方式有覆盖与合计,合计又可为MIN.MAX.SUM中的一种,具体转换规则中可为哪一种合

『数据库』随手写了一个 跨数据库 数据迁移工具

随手写了一个 跨数据库 的 数据迁移工具:>目前支持 SQLServer,MySql,SQLite: >迁移工具 可以自动建表,且 保留 主键,自增列: >迁移工具 基于 Laura.Source  ORM框架 开发: >迁移工具 支持 崩溃恢复(重启迁移工具,将会继续 未完成的 数据迁移): >每张表一个事务(即使  表中有 >100W 的数据,也是一个事务完成): >迁移后 的 自增列 和 原数据库 保持一致: 只是展示一下,直接上图片: 操作工具: 迁移工具