HBase 高性能加入数据 - 按批多“粮仓”式解决办法

摘要:如何从HBase中的海量数据中,以很快的速度的获取大批量数据,这一议题已经在《HBase 高性能获取数据》(http://www.cnblogs.com/wgp13x/p/4245182.html)一文中给出了解决办法。那么,如何向HBase中高性能的插入数据呢?经研究表明,光是批量写入也还是不行。网上没有现成的方法。本文针对这一问题,给出了一个解决方案。它采用了多线程按批“多粮仓”的方式,经过验证,能较好的达到高速度的效果。

关键词:hbase, 高性能, 多线?程, 算法

解决问题:如何向HBase中高性能的插入数据。



Solr和HBase专辑

1、“关于Solr的使用总结的心得体会”(http://www.cnblogs.com/wgp13x/p/3742653.html)

2、“中文分词器性能比较?”(http://www.cnblogs.com/wgp13x/p/3748764.html)

3、“Solr与HBase架构设计”(http://www.cnblogs.com/wgp13x/p/a8bb8ccd469c96917652201007ad3c50.html)

4、 “大数据架构: 使用HBase和Solr将存储与索引放在不同的机器上”(http://www.cnblogs.com/wgp13x/p/3927979.html)

5、“一个自定义 HBase Filter -通过RowKeys来高性能获取数据”(http://www.cnblogs.com/wgp13x/p/4196466.html)

6、“HBase 高性能获取数据 - 多线程批量式解决办法”(http://www.cnblogs.com/wgp13x/p/4245182.html



在实现向海量数据库(HBase + Solr)插入数据的过程中,发现存在这么一个问题:如果要插入的数据是一条一条传入的,每秒还能传入上千条那么多,那么入库的速度非常慢,以每秒数条记,此时面临着读写问题,读的速度远远跟不上写的速度,会造成大量数据积累,严重的会造成磁盘写爆掉。首先要说明的是,我们是实现了“按批入库”的接口的:public int importDataBatch(String dataTypeCode, List<Map<String, String>> datas),既然有这一接口,那么为什么还会写入速度如此慢呢?我们若是只对此接口进行性能测试的话,可以发现传入的批量数据的大小对性能的影响非常大,像这种一条一条调用importDataBatch的,简直是暴殄天物;当然,我们希望一批数据里有成千上万条数据一起提交入库才过瘾,如果配置的好的话,像《HBase 高性能获取数据》一文中“HBase基本说明与性能测试”所叙述的那样:“单台机器能够实现1w/s~3w/s之间的插入速度”。那么,对于这种实际情况中一条一条的数据插入,我们应该如何处理呢?

解决办法其实很简单,那就是“广积粮,深挖仓”。首先,它的传入速度还算快,能达到每秒上千条;其次,传入的数据是一条一条组织的;如果遇到每条数据都可能对应不同的数据类型,那会更麻烦。“广积粮,深挖仓”也得讲究方法,仓该如何组织,小米和大豆是否应该存放于不同的仓库里,积粮久了会不会发霉,粮应积多久就要吃掉了,这都是问题。我的方案是:“对于不同的数据类型粮,建造不同的粮仓;一旦粮仓满了,要立刻打开粮仓分了吃掉;对于积攒超过5秒的而没被吃掉的粮,要立即吃掉”,这样就保证了数据不会丢失,并且处理速度超快。

下面是这一算法的具体代码实现:

/**

* 数据处理

*

* @author wanganqi

* @version v1.0

* @since 2014年8月8日上午10:11:07

*/

public class DataProcessHandler

{

private static final int MULIPLE_CONT = 10;    // 10个多粮仓

private Random random = new Random();

private MultipleImportDatas[] multipleImportDatas = new MultipleImportDatas[MULIPLE_CONT];    // 10个多粮仓入库

public DataProcessHandler()

{

for (int i = 0; i < MULIPLE_CONT; i++)

{

multipleImportDatas[i] = new MultipleImportDatas();

}

}

private void 处理来的每一条数据(String dataTypeCode, List<Map<String, String>> datas)

{

int index = random.nextInt(MULIPLE_CONT);    // 随机选择1个多粮仓入库

multipleImportDatas[index].concurrentImportDatas(dataTypeCode, datas, false);    // 线程安全入数据。入库需满仓


public class MultipleImportDatas    // 多粮仓线程安全入库

{

private ConcurrentHashMap<String, List<Map<String, String>>> inputBuffer = new ConcurrentHashMap<String, List<Map<String, String>>>();    // 各粮仓,按数据类型划分

private ConcurrentHashMap<String, Date> inputBufferDate = new ConcurrentHashMap<String, Date>();    // 各粮仓,最近清仓时间

private final int BUFFER_SIZE = 1000;    // 各粮仓大小

private final int BUFFER_MONITOR_TIME = 5000;    // 对粮仓监控间隔

private Thread inputBufferMonitor = new InputBufferMonitor();    // 对粮仓进行监控

public MultipleImportDatas()

{

inputBufferMonitor.start();

}

class InputBufferMonitor extends Thread

{

public void run()

{

while (true)

{

try

{

Thread.sleep(BUFFER_MONITOR_TIME);

}

catch (InterruptedException e1)

{

e1.printStackTrace();

}

for (Map.Entry<String, List<Map<String, String>>> entity : inputBuffer

.entrySet())   // 各粮仓监管

{

String dataType = entity.getKey();

concurrentImportDatas(dataType, null, true);    // 线程安全入数据。入库需时间到

}

}

}

}

private void resetInputBufferDate(String dateTypeCode)   // 更新清仓时间

{

Date date = new Date();

if (!inputBufferDate.containsKey(dateTypeCode))

{

inputBufferDate.put(dateTypeCode, date);

}

else

{

inputBufferDate.replace(dateTypeCode, date);

}

}

public void concurrentImportDatas(String dataTypeCode,

List<Map<String, String>> datas, boolean timeUP)    // 线程安全入数据。入库需满仓或时间到

{

DataHBaseSolrBLL dataHBaseSolrBLL = (DataHBaseSolrBLL) SpringRabbitMQSupport

.getContext().getBean("dataHBaseSolrBLLImpl");

if (datas != null && datas.size() >= BUFFER_SIZE)    // 一次入粮多

{

try

{

dataHBaseSolrBLL.importDataBatch(dataTypeCode, datas);   // 批量入库

}

catch (DatabaseException | BusinessException e)

{

e.printStackTrace();

}

return;

}

synchronized (inputBuffer)

{

if (!inputBuffer.containsKey(dataTypeCode))     // 无仓

{

if (datas != null)

{

inputBuffer.put(dataTypeCode, datas);   // 建仓

}

return;

}

if (timeUP)   // 时间到

{

Date date = new Date();

if (((!inputBufferDate.containsKey(dataTypeCode)) || (inputBufferDate

.containsKey(dataTypeCode) && date.getTime()

- inputBufferDate.get(dataTypeCode).getTime() > BUFFER_MONITOR_TIME))

&& inputBuffer.get(dataTypeCode).size() > 0)

{

importInputBufferAndClear(dataTypeCode);   // 清仓

}

}

else

{

List<Map<String, String>> ldatas = inputBuffer

.get(dataTypeCode);

if (datas.size() + ldatas.size() > BUFFER_SIZE)   // 要满仓

{

importInputBufferAndClear(dataTypeCode);   // 清仓

}

ldatas.addAll(datas);    // 未满仓,入仓

}

}

}

private void importInputBufferAndClear(String dataTypeCode)   // 清仓

{

DataHBaseSolrBLL dataHBaseSolrBLL = (DataHBaseSolrBLL) SpringRabbitMQSupport

.getContext().getBean("dataHBaseSolrBLLImpl");

List<Map<String, String>> ldatas = inputBuffer.get(dataTypeCode);

try

{

dataHBaseSolrBLL.importDataBatch(dataTypeCode, ldatas);   // 批量入库

}

catch (DatabaseException | BusinessException e)

{

e.printStackTrace();

}

ldatas.clear();

resetInputBufferDate(dataTypeCode);   // 更新清仓时间

}

}

说明:这里起了多个“多粮仓”来进行入库,是为了避免线程同步造成的性能降低。测试表明,如果只起一个“多粮仓”来进行入库,每秒可以入库300多条数据,但若是起10个“多粮仓”来进行入库,每秒可以入库1200多条数据,这说明线程读写同步造成的时延现象还是蛮严重的。说明一下:这里的测试环境与《HBase 高性能获取数据》一文中“HBase基本说明与性能测试”环境不同,这里的加入了数据验证等过程,因此与其所描述的不具备可比性。

明天就是周末咯,偶和亲爱的老婆各请了15天的婚假,过几天就要离开这寒冷的冬季,去夏季巴厘岛度蜜月咯,好开心

来自王安琪

时间: 2024-10-26 00:02:44

HBase 高性能加入数据 - 按批多“粮仓”式解决办法的相关文章

HBase 高性能获取数据 - 多线程批量式解决办法

在前篇博客里已经讲述了通过一个自定义 HBase Filter来获取数据的办法,在末尾指出此办法的性能是不能满足应用要求的,很显然对于如此成熟的HBase来说,高性能获取数据应该不是问题.下面首先简单介绍了搜索引擎的性能,然后详细说明了HBase与MySQL的性能对比,这里的数据都是经过实际的测试获得的.最后,给出了采用多线程批量从HBase中取数据的方案,此方案经过测试要比通过自定义Filter的方式性能高出很多. Solr和HBase专辑 1.“关于Solr的使用总结的心得体会”(http:

chart.js插件生成折线图时数据普遍较大时Y轴数据不从0开始的解决办法[bubuko.com]

chart.js插件生成折线图时数据普遍较大时Y轴数据不从0开始的解决办法,原文: 默认情况下如下图 Y轴并不是从0开始,这样折现图的幅度会很大,不是正常的幅度,解决办法如下, 示例代码: window.onload = function () { var ctx = document.getElementById("canvas").getContext("2d"); window.myLine = new Chart(ctx).Line(lineChartDat

不同类型的数据跨表空间迁移的解决办法

http://blog.csdn.net/passion_wang/article/details/6541369 Oracle10g数据跨表空间迁移 因某些开发人员由于对oracle数据库理解的不够深入,往往在建表的时候指定了当前用户非默认的表空间,这样就导致了在exp及imp等操作时候问题很多,因此需要将这些表及相关的数据迁移回当前用户的默认表空间里.Oracle10g数据数据库提供了一个Move命令可以把这样的数据对象进行跨表空间的迁移,也可以对含有BLOB.CLOB这样的二进制大字段的表

Get请求携带数据量的各种限制及解决办法、Post请求说明

1.   Get请求携带数据量的各种限制及解决办法 Http Get方法提交的数据大小长度并没有限制,HTTP协议规范没有对URL长度进行限制.这个限制是特定的浏览器及服务器对它的限制. 到新公司处理的第一个线上问题是某个商品页,在某个人机器上访问失败,nginx返回400错误,但其它人机器上没有问题,即使用虚拟机重建了出问题机器的软硬件环境也不会出问题. 经过对出问题机器的http请求进行抓包,发现URL超长,cookie也很大,然后问题就很清楚了,因为大部分人用的是IE浏览器,IE浏览器限

关于MATLAB中xlswrite函数写数据出现服务器异常情况的解决办法

在网上找了半天解决办法,也没有找到,在此共享我自己的一个问题,不具有普遍性,仅作参考! 问题描述:MATLAB调用xlsread函数出现问题,第一次运行的时候,出现服务器异常,再次运行能够正常读取数据: 在调用xlswrite函数时,一直出现服务器异常状况,不能将数据写入excel表格中. 问题关键:不是MATLAB的原因,问题出在excel软件上! 问题解决:xlswrite函数在调用时会占用excel的com端口,所以要保证在调用时这个端口是开放的,也就是没有被其他程序占用.打开excel(

mysql delete数据 空间占用不减少的解决办法

今天空间商告诉我数据库空间满了,检查了一下,发现网站用户行为记录数据表竟然占了20多MB.积累了半年了,该删除释放一下空间了.果断delete之后发现数据库空间竟然没少,虽然数据记录数是零. 原来这是因为删除操作后在数据文件中留下碎片所致.DELETE只是将数据标识位删除,并没有整理数据文件,当插入新数据后,会再次使用这些被置为删除标识的记录空间.另外实际操作过程中还发现这个问题还存在两种情况. (1)当DELETE后面跟条件的时候,则就会出现这个问题.如: delete from table_

用PL/SQL Developer导出表数据的时候,窗口一闪而过解决办法

设置系统变量 变量名:ORACLE_HOME 变量值:为ORACLE安装bin目录的上一级目录,假如你的bin目录就在d:\oracle\bin 那么变量值就设置为d:\oracle

ajax 返回数据 无法得到其属性的解决办法

当我们用ajax无法 得到其属性.正常情况下是: <script type="text/javascript">        function useAjax(sendUrl,name){ $.ajax({                type: "POST",                url: sendUrl,                data: { txtCode: $("#txtCode").val() },  

jpa缓存导致无法查询到更新后的数据&amp;android出现ANR的一个解决办法

1. 向服务器更新记录后查询,始终查询不到更新后的信息 只能查到更新之前的,马上推断出是缓存的问题.网上搜索一番,将问题定位为jpa缓存,我们要设置jpa查询时不从缓存中取,直接从数据库中取,这样便能保证查询到的结果是最新的,但是性能可能会有所影响.参考资料:https://en.wikibooks.org/wiki/Java_Persistence/Caching#JPA_2.0_Cache_APIs 如上图提示,我在restful服务器做了如下设置后便可以了. 2. 安卓app注册界面出新了