摘要:如何从HBase中的海量数据中,以很快的速度的获取大批量数据,这一议题已经在《HBase 高性能获取数据》(http://www.cnblogs.com/wgp13x/p/4245182.html)一文中给出了解决办法。那么,如何向HBase中高性能的插入数据呢?经研究表明,光是批量写入也还是不行。网上没有现成的方法。本文针对这一问题,给出了一个解决方案。它采用了多线程按批“多粮仓”的方式,经过验证,能较好的达到高速度的效果。
关键词:hbase, 高性能, 多线?程, 算法
解决问题:如何向HBase中高性能的插入数据。
Solr和HBase专辑
1、“关于Solr的使用总结的心得体会”(http://www.cnblogs.com/wgp13x/p/3742653.html)
2、“中文分词器性能比较?”(http://www.cnblogs.com/wgp13x/p/3748764.html)
3、“Solr与HBase架构设计”(http://www.cnblogs.com/wgp13x/p/a8bb8ccd469c96917652201007ad3c50.html)
4、 “大数据架构: 使用HBase和Solr将存储与索引放在不同的机器上”(http://www.cnblogs.com/wgp13x/p/3927979.html)
5、“一个自定义 HBase Filter -通过RowKeys来高性能获取数据”(http://www.cnblogs.com/wgp13x/p/4196466.html)
6、“HBase 高性能获取数据 - 多线程批量式解决办法”(http://www.cnblogs.com/wgp13x/p/4245182.html)
在实现向海量数据库(HBase + Solr)插入数据的过程中,发现存在这么一个问题:如果要插入的数据是一条一条传入的,每秒还能传入上千条那么多,那么入库的速度非常慢,以每秒数条记,此时面临着读写问题,读的速度远远跟不上写的速度,会造成大量数据积累,严重的会造成磁盘写爆掉。首先要说明的是,我们是实现了“按批入库”的接口的:public int importDataBatch(String dataTypeCode, List<Map<String, String>> datas),既然有这一接口,那么为什么还会写入速度如此慢呢?我们若是只对此接口进行性能测试的话,可以发现传入的批量数据的大小对性能的影响非常大,像这种一条一条调用importDataBatch的,简直是暴殄天物;当然,我们希望一批数据里有成千上万条数据一起提交入库才过瘾,如果配置的好的话,像《HBase 高性能获取数据》一文中“HBase基本说明与性能测试”所叙述的那样:“单台机器能够实现1w/s~3w/s之间的插入速度”。那么,对于这种实际情况中一条一条的数据插入,我们应该如何处理呢?
解决办法其实很简单,那就是“广积粮,深挖仓”。首先,它的传入速度还算快,能达到每秒上千条;其次,传入的数据是一条一条组织的;如果遇到每条数据都可能对应不同的数据类型,那会更麻烦。“广积粮,深挖仓”也得讲究方法,仓该如何组织,小米和大豆是否应该存放于不同的仓库里,积粮久了会不会发霉,粮应积多久就要吃掉了,这都是问题。我的方案是:“对于不同的数据类型粮,建造不同的粮仓;一旦粮仓满了,要立刻打开粮仓分了吃掉;对于积攒超过5秒的而没被吃掉的粮,要立即吃掉”,这样就保证了数据不会丢失,并且处理速度超快。
下面是这一算法的具体代码实现:
/** * 数据处理 * * @author wanganqi * @version v1.0 * @since 2014年8月8日上午10:11:07 */ public class DataProcessHandler { private static final int MULIPLE_CONT = 10; // 10个多粮仓 private Random random = new Random(); private MultipleImportDatas[] multipleImportDatas = new MultipleImportDatas[MULIPLE_CONT]; // 10个多粮仓入库 public DataProcessHandler() { for (int i = 0; i < MULIPLE_CONT; i++) { multipleImportDatas[i] = new MultipleImportDatas(); } } private void 处理来的每一条数据(String dataTypeCode, List<Map<String, String>> datas) { int index = random.nextInt(MULIPLE_CONT); // 随机选择1个多粮仓入库 multipleImportDatas[index].concurrentImportDatas(dataTypeCode, datas, false); // 线程安全入数据。入库需满仓 } |
public class MultipleImportDatas // 多粮仓线程安全入库 { private ConcurrentHashMap<String, List<Map<String, String>>> inputBuffer = new ConcurrentHashMap<String, List<Map<String, String>>>(); // 各粮仓,按数据类型划分 private ConcurrentHashMap<String, Date> inputBufferDate = new ConcurrentHashMap<String, Date>(); // 各粮仓,最近清仓时间 private final int BUFFER_SIZE = 1000; // 各粮仓大小 private final int BUFFER_MONITOR_TIME = 5000; // 对粮仓监控间隔 private Thread inputBufferMonitor = new InputBufferMonitor(); // 对粮仓进行监控 public MultipleImportDatas() { inputBufferMonitor.start(); } class InputBufferMonitor extends Thread { public void run() { while (true) { try { Thread.sleep(BUFFER_MONITOR_TIME); } catch (InterruptedException e1) { e1.printStackTrace(); } for (Map.Entry<String, List<Map<String, String>>> entity : inputBuffer .entrySet()) // 各粮仓监管 { String dataType = entity.getKey(); concurrentImportDatas(dataType, null, true); // 线程安全入数据。入库需时间到 } } } } private void resetInputBufferDate(String dateTypeCode) // 更新清仓时间 { Date date = new Date(); if (!inputBufferDate.containsKey(dateTypeCode)) { inputBufferDate.put(dateTypeCode, date); } else { inputBufferDate.replace(dateTypeCode, date); } } public void concurrentImportDatas(String dataTypeCode, List<Map<String, String>> datas, boolean timeUP) // 线程安全入数据。入库需满仓或时间到 { DataHBaseSolrBLL dataHBaseSolrBLL = (DataHBaseSolrBLL) SpringRabbitMQSupport .getContext().getBean("dataHBaseSolrBLLImpl"); if (datas != null && datas.size() >= BUFFER_SIZE) // 一次入粮多 { try { dataHBaseSolrBLL.importDataBatch(dataTypeCode, datas); // 批量入库 } catch (DatabaseException | BusinessException e) { e.printStackTrace(); } return; } synchronized (inputBuffer) { if (!inputBuffer.containsKey(dataTypeCode)) // 无仓 { if (datas != null) { inputBuffer.put(dataTypeCode, datas); // 建仓 } return; } if (timeUP) // 时间到 { Date date = new Date(); if (((!inputBufferDate.containsKey(dataTypeCode)) || (inputBufferDate .containsKey(dataTypeCode) && date.getTime() - inputBufferDate.get(dataTypeCode).getTime() > BUFFER_MONITOR_TIME)) && inputBuffer.get(dataTypeCode).size() > 0) { importInputBufferAndClear(dataTypeCode); // 清仓 } } else { List<Map<String, String>> ldatas = inputBuffer .get(dataTypeCode); if (datas.size() + ldatas.size() > BUFFER_SIZE) // 要满仓 { importInputBufferAndClear(dataTypeCode); // 清仓 } ldatas.addAll(datas); // 未满仓,入仓 } } } private void importInputBufferAndClear(String dataTypeCode) // 清仓 { DataHBaseSolrBLL dataHBaseSolrBLL = (DataHBaseSolrBLL) SpringRabbitMQSupport .getContext().getBean("dataHBaseSolrBLLImpl"); List<Map<String, String>> ldatas = inputBuffer.get(dataTypeCode); try { dataHBaseSolrBLL.importDataBatch(dataTypeCode, ldatas); // 批量入库 } catch (DatabaseException | BusinessException e) { e.printStackTrace(); } ldatas.clear(); resetInputBufferDate(dataTypeCode); // 更新清仓时间 } } |
说明:这里起了多个“多粮仓”来进行入库,是为了避免线程同步造成的性能降低。测试表明,如果只起一个“多粮仓”来进行入库,每秒可以入库300多条数据,但若是起10个“多粮仓”来进行入库,每秒可以入库1200多条数据,这说明线程读写同步造成的时延现象还是蛮严重的。说明一下:这里的测试环境与《HBase 高性能获取数据》一文中“HBase基本说明与性能测试”环境不同,这里的加入了数据验证等过程,因此与其所描述的不具备可比性。
明天就是周末咯,偶和亲爱的老婆各请了15天的婚假,过几天就要离开这寒冷的冬季,去夏季巴厘岛度蜜月咯,好开心。