对于HBase的MapReduce性能提升方案之BulkLoad

我们知道,在第一次海量数据批量入库时,我们会选择使用BulkLoad的方式。

简介一下BulkLoad原理方式:(1)通过MapReduce的方式,在Map或者Reduce端将输出格式化为HBase的底层存储文件HFile。(2)调用BulkLoad将第一个Job生成的HFile导入到对应的HBase表中。

ps:请注意(1)HFile方式是所有的加载方案里面是最快的,前提是:数据必须第一个导入,表示空的!如果表中已经有数据,HFile再次导入的时候,HBase的表会触发split分割操作。(2)最终输出结果,无论是Map还是Reduce,输出建议只使用<ImmutableBytesWritable, KeyValue>。

现在我们开始正题:BulkLoad固然是写入HBase最快的方式,但是,如果我们在做业务分析的时候,而数据又已经在HBase的时候,我们采用普通的针对HBase的方式,如下demo所示:

import com.yeepay.bigdata.bulkload.TableCreator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.Logger;

import java.io.IOException;

public class HBaseMapReduceDemo {

    static Logger LOG = Logger.getLogger(HBaseMapReduceDemo.class);

    static class Mapper1 extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {

        @Override
        public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {

            try {
            //  context.write(key, value);
            } catch (Exception e) {
                LOG.error(e);
            }
        }
    }

    public static class Reducer1 extends TableReducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {

        public void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, Context context) throws IOException, InterruptedException {
            try {

                Put put = new Put(key.get());
                // put.add();
                context.write(key, put);

            } catch (Exception e) {
                LOG.error(e);
                return ;
            }  // catch
        }  // reduce function
    }  // reduce class

    public static void main(String[] args) throws Exception {

        HBaseConfiguration conf = new HBaseConfiguration();
        conf.set("hbase.zookeeper.quorum", "yp-name02,yp-name01,yp-data01");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        // conf.set(TableInputFormat.INPUT_TABLE,"access_logs");
        Job job = new Job(conf, "HBaseMapReduceDemo");
        job.setJarByClass(HBaseMapReduceDemo.class);
//        job.setNumReduceTasks(2);
        Scan scan = new Scan();
        scan.setCaching(2500);
        scan.setCacheBlocks(false);

        TableMapReduceUtil.initTableMapperJob("srcHBaseTableName", scan, Mapper1.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
//        TableCreator.createTable(20, true, "OP_SUM");
        TableMapReduceUtil.initTableReducerJob("destHBasetableName", Reducer1.class, job);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

这个时候在对海量数据的插入过程中,会放生Spliter,写入速度非常的,及其的慢。但是此种情况适合,对已有的HBase表进行修改时候的使用。

针对如下情况HBase -> MapReduce 分析 -> 新表,我们采用 (HBase -> MapReduce 分析 -> bulkload -> 新表)方式。

demo如下:

Mapper如下:

public class MyReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue> {

    static Logger LOG = Logger.getLogger(MyReducer.class);

    public void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, Context context) throws IOException, InterruptedException {
        try {
            context.write(key, kv);
        } catch (Exception e) {
            LOG.error(e);
            return;
        }  // catch
    }  // reduce function

}

Reducer如下:

public class MyReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue> {

    static Logger LOG = Logger.getLogger(MyReducer.class);

    public void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, Context context) throws IOException, InterruptedException {
        try {
            context.write(key, kv);
        } catch (Exception e) {
            LOG.error(e);
            return;
        }  // catch
    }  // reduce function

}

Job and BulkLoad:

public abstract class JobBulkLoad {

    public void run(String[] args) throws Exception {
        try {
            if (args.length < 1) {
                System.err.println("please set input dir");
                System.exit(-1);
                return;
            }

            String srcTableName = args[0];
            String destTableName = args[1];
            TableCreator.createTable(20, true, destTableName);

            // 设置 HBase 参数
            HBaseConfiguration conf = new HBaseConfiguration();
            conf.set("hbase.zookeeper.quorum", "yp-name02,yp-name01,yp-data01");
//          conf.set("hbase.zookeeper.quorum", "nn01, nn02, dn01");
            conf.set("hbase.zookeeper.property.clientPort", "2181");

            // 设置 Job 参数
            Job job = new Job(conf, "hbase2hbase-bulkload");
            job.setJarByClass(JobBulkLoad.class);
            HTable htable = new HTable(conf, destTableName);  // 根据region的数量来决定reduce的数量以及每个reduce覆盖的rowkey范围

            // ----------------------------------------------------------------------------------------
            Scan scan = new Scan();
            scan.setCaching(2500);
            scan.setCacheBlocks(false);
            TableMapReduceUtil.initTableMapperJob(srcTableName, scan, MyMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
//          TableMapReduceUtil.initTableReducerJob(destTableName, Common_Reducer.class, job);

            job.setReducerClass(MyReducer.class);
            Date now = new Date();
            Path output = new Path("/output/" + destTableName + "/" + now.getTime());
            System.out.println("/output/" + destTableName + "/" + now.getTime());

            HFileOutputFormat.configureIncrementalLoad(job, htable);
            FileOutputFormat.setOutputPath(job, output);
            HFileOutputFormat.configureIncrementalLoad(job, htable);
            job.waitForCompletion(true);

//-----  执行BulkLoad  -------------------------------------------------------------------------------
            HdfsUtil.chmod(conf, output.toString());
            HdfsUtil.chmod(conf, output + "/" + YeepayConstant.COMMON_FAMILY);
            htable = new HTable(conf, destTableName);
            new LoadIncrementalHFiles(conf).doBulkLoad(output, htable);
            System.out.println("HFile data load success!");
        } catch (Throwable t) {
            throw new RuntimeException(t);
        }
    }

}
时间: 2024-08-09 02:27:15

对于HBase的MapReduce性能提升方案之BulkLoad的相关文章

Ntop性能提升方案

Ntop性能提升方案 Ntop是一款Linux下常见的开源监控软件,它可以监测的数据包括:网络流量.使用协议.系统负载.端口情况.数据包发送时间等.正常情况下它工作的时候就像一部被动声纳,默默的接收看来自网络的各种信息,通过对这些数据的分析,网络管理员可以深入了解网络当前的运行状况,不过一旦超过Ntop数据包的处理能力,Ntop随即出现性能问题,从而导致Ntop无法准确分析网络流量和各种数据,对网络管理造成影响.下文介绍了一种提高Ntop性能的方法. 通常Ntop的抓包分析功能,是通过Ntop自

JavaScript执行效率与性能提升方案(转)

如何提升JavaScript执行效率与性能在前端开发中位于一个很重要的地方,这节来研究下如何在平时做项目过程中,提升JavaScript性能与运行效率,需要的朋友可以参考下如何提升JavaScript执行效率与性能在前端开发中位于一个很重要的地方,这节来研究下如何在平时做项目过程中,提升JavaScript性能与运行效率. 循环 循环是很常用的一个控制结构,大部分东西要依靠它来完成,在JavaScript中,我们可以使用for(;;),while(),for(in)三种循环,事实上,这三种循环中

JS执行效率与性能提升方案

如果是追加字符串,最好使用s+=anotherStr操作,而不是要使用s=s+anotherStr.如果要连接多个字符串,应该少使用+=,如 s+=a;s+=b;s+=c;应该写成s+=a + b + c:而如果是收集字符串,比如多次对同一个字符串进行+=操作的话,最好使用一个缓存.怎么用呢?使用JavaScript数组来收集,最后使用join方法连接起来,如下var buf = new Array();for(var i = 0; i < 100; i++){ buf.push(i.toStr

Hbase框架原理及相关的知识点理解、Hbase访问MapReduce、Hbase访问Java API、Hbase shell及Hbase性能优化总结

转自:http://blog.csdn.net/zhongwen7710/article/details/39577431 本blog的内容包含: 第一部分:Hbase框架原理理解 第二部分:Hbase调用MapReduce函数使用理解 第三部分:Hbase调用Java API使用理解 第四部分:Hbase Shell操作 第五部分:Hbase建表.读写操作方式性能优化总结 第一部分:Hbase框架原理理解 概述 HBase是一个构建在HDFS上的分布式列存储系统:HBase是基于Google

Atitit.h5 web webview性能提升解决方案-----fileStrore缓存离线存储+http方案

1. 业务场景 android+webview h5 css背景图性能提升1 2. 根据标准,到目前为止,H5 一共有6种缓存机制,有些是之前已有,有些是 H5 才新加入的.1 2.1. 各种方案的比较,如下图2 3. Attilax的解决之道 file 缓存+http3 3.1. 图片的下载3 3.2. Jsbridge 4android5 3.3. http协议6 4. 参考8 1. 业务场景 android+webview h5 css背景图性能提升 图片的缓存大概儿需要500m的规模..

【转载】HBase 数据库检索性能优化策略

转自:http://www.ibm.com/developerworks/cn/java/j-lo-HBase/index.html 高性能 HBase 数据库 本文首先介绍了 HBase 数据库基本原理及专用术语,然后介绍了 HBase 数据库发布的操作 API 及部分示例,重点介绍了 Scan 方法的操作方式,接着介绍了检索 HBase 数据库时的优化方案,最后通过一个示例总结了实际项目中遇到的检索速度慢的解决方案. HBase 数据表介绍 HBase 数据库是一个基于分布式的.面向列的.主

Web 应用性能提升 10 倍的 10 个建议

转载自http://blog.jobbole.com/94962/ 提升 Web 应用的性能变得越来越重要.线上经济活动的份额持续增长,当前发达世界中 5 % 的经济发生在互联网上(查看下面资源的统计信息). 我们现在所处的时代要求一直在线和互联互通,这意味着用户对性能有更高的期望.如果网站响应不及时,或者应用有明显的延迟,用户很快就会跑到竞争者那边去. 例如,Amazon 十年前做的一项研究表明,网页加载时间减少 100 毫秒,收入就会增加  1%.最近另一项研究凸显了一个事实,就是有一半以上

kvm性能优化方案

kvm性能优化方案 kvm性能优化,主要集中在cpu.内存.磁盘.网络,4个方面,当然对于这里面的优化,也是要分场景的,不同的场景其优化方向也是不同的,下面具体聊聊这4个方面的优化细节. cpu 在介绍cpu之前,必须要讲清楚numa的概念,建议先参考如下两篇文章 CPU Topology 玩转cpu-topology 查看cpu信息脚本: #!/bin/bash # Simple print cpu topology # Author: kodango function get_nr_proc

优化临时表使用,SQL语句性能提升100倍

[问题现象] 线上mysql数据库爆出一个慢查询,DBA观察发现,查询时服务器IO飙升,IO占用率达到100%, 执行时间长达7s左右.SQL语句如下:SELECT DISTINCT g.*, cp.name AS cp_name, c.name AS category_name, t.name AS type_name FROMgm_game g LEFT JOIN gm_cp cp ON cp.id = g.cp_id AND cp.deleted = 0 LEFT JOIN gm_cate