Storm写库,Hbase批入库,定时和批入库,batchSize,TickTime

转载请注明出处:Import博客园http://www.cnblogs.com/thinkpad

注意:本文批处理只是Storm到Hbase批处理入库操作,并非Storm的API的批处理!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

  1 package com.storm.hbaseTest;
  2
  3 import java.io.IOException;
  4 import java.util.HashMap;
  5 import java.util.Iterator;
  6 import java.util.List;
  7 import java.util.Map;
  8
  9 import org.apache.commons.lang.StringUtils;
 10 import org.apache.hadoop.conf.Configuration;
 11 import org.apache.hadoop.hbase.HBaseConfiguration;
 12 import org.apache.hadoop.hbase.client.HConnection;
 13 import org.apache.hadoop.hbase.client.HConnectionManager;
 14 import org.apache.hadoop.hbase.client.HTableInterface;
 15 import org.apache.hadoop.hbase.client.Put;
 16
 17 import backtype.storm.Config;
 18 import backtype.storm.Constants;
 19 import backtype.storm.task.OutputCollector;
 20 import backtype.storm.task.TopologyContext;
 21 import backtype.storm.topology.BasicOutputCollector;
 22 import backtype.storm.topology.IRichBolt;
 23 import backtype.storm.topology.OutputFieldsDeclarer;
 24 import backtype.storm.topology.base.BaseBasicBolt;
 25 import backtype.storm.tuple.Fields;
 26 import backtype.storm.tuple.Tuple;
 27
 28 import org.slf4j.Logger;
 29 import org.slf4j.LoggerFactory;
 30
 31 import com.google.common.collect.Lists;
 32
 33 /**
 34  * @ClassName: HbaseBout.java
 35  * @Description:自定义批入库,定时批量和大小批量
 36  * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]
 37  * @date 2016年8月27日 下午4:43:19
 38  * @version V1.0
 39  */
 40 @SuppressWarnings("all")
 41 public class HbaseBout implements IRichBolt{
 42
 43       private static final long serialVersionUID = 1L;
 44       private static final Logger LOG = LoggerFactory.getLogger(HbaseBout.class);
 45
 46       protected OutputCollector collector;
 47       protected HbaseClient hbaseClient;
 48       protected String tableName;
 49       protected String configKey ="hbase.conf";
 50
 51       //批处理大小
 52       protected int batchSize = 15000;
 53       List<Put> batchMutations;
 54       List<Tuple> tupleBatch;
 55       //tick Time
 56       int flushIntervalSecs = 1;
 57
 58   /**
 59    * @Description:Storm初始化
 60    * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]
 61    */
 62     public void prepare(Map map, TopologyContext context,
 63             OutputCollector collector) {
 64
 65         this.collector = collector;
 66         Configuration hbConfig = HBaseConfiguration.create();
 67         Map<String,String> conf = (Map)map.get(this.configKey);
 68
 69         //获取Hbase配置
 70         if (conf == null) {
 71           throw new IllegalArgumentException("HBase configuration not found using key ‘" + this.configKey + "‘");
 72         }
 73
 74         //批大小
 75         if(map.get("batchSize") != null){
 76             this.batchSize = new Integer(map.get("batchSize").toString());
 77         }
 78         //Tick Time
 79         if(map.get("flushIntervalSecs") != null){
 80             this.flushIntervalSecs = Integer.valueOf(map.get("flushIntervalSecs").toString());
 81         }
 82
 83         //Hbase 配置
 84         for (String key : conf.keySet()) {
 85           hbConfig.set(key, String.valueOf(conf.get(key)));
 86         }
 87         this.hbaseClient = new HbaseClient(hbConfig, this.tableName);
 88     }
 89
 90   /**
 91    * @Description:每次调用
 92    * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]
 93    */
 94     @Override
 95     public void execute(Tuple tuple) {
 96
 97          boolean flush = false;
 98
 99          try {
100              //Tick
101              if(tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){
102                 LOG.debug("TICK received! current batch status ", Integer.valueOf(this.tupleBatch.size()), Integer.valueOf(this.batchSize));
103                 flush = true;
104             }else{
105
106                 Put put = new Put(tuple.getStringByField("rowKey").getBytes());
107                 put.add("cf".getBytes(), "name".getBytes(), tuple.getStringByField("name").getBytes());
108                 put.add("cf".getBytes(), "sex".getBytes(), tuple.getStringByField("sex").getBytes());
109                 this.batchMutations.add(put);
110                 this.tupleBatch.add(tuple);
111
112                 //当前tuple批大小
113                 if (this.tupleBatch.size() >= this.batchSize) {
114                   flush = true;
115                 }
116             }
117              //持久化操作
118              if ((flush) && (!this.tupleBatch.isEmpty())) {
119                   this.hbaseClient.batchMutate(this.batchMutations);
120                   LOG.debug("acknowledging tuples after batchMutate");
121                   for (Iterator<Tuple> tuples = this.tupleBatch.iterator(); tuples.hasNext(); ) {
122                       Tuple t = (Tuple)tuples.next();
123                         this.collector.ack(t);
124                   }
125                   this.tupleBatch.clear();
126                   this.batchMutations.clear();
127                }
128         } catch (Exception e) {
129               LOG.debug("inser batch fail");
130               this.collector.reportError(e);
131               for (Tuple t : this.tupleBatch) {
132                   this.collector.fail(t);
133                 }
134               this.tupleBatch.clear();
135               this.batchMutations.clear();
136         }
137     }
138
139   /**
140    * @Description:字段声明
141    * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]
142    */
143     public void declareOutputFields(OutputFieldsDeclarer declarer) {
144
145     }
146
147     /**
148        * @Description:配置
149        * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]
150      */
151     @Override
152     public Map<String, Object> getComponentConfiguration() {
153         Config conf = new Config();
154         conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS,flushIntervalSecs);
155         return conf;
156     }
157
158
159     /**
160        * @Description:清理方法
161        * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]
162       */
163     @Override
164     public void cleanup() {
165
166     }
167 }
168
169 /**
170  *
171  * @ClassName: HbaseBout.java
172  * @Description:Hbase操作相关类
173  * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]
174  * @date 2016年8月27日 下午6:06:34
175  * @version V1.0
176  */
177 class HbaseClient{
178
179      private static final Logger LOG = LoggerFactory.getLogger(HbaseClient.class);
180       HConnection hTablePool = null;
181       HTableInterface table = null;
182
183       /**
184        * @Description:获取连接
185        * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]
186        * @param configuration:Hbase表配置
187        * @param tableName:表名
188        */
189       public HbaseClient(final Configuration configuration, final String tableName)
190       {
191         try
192         {
193             hTablePool =  HConnectionManager.createConnection(configuration) ;
194             this.table =  hTablePool.getTable(tableName);
195         }
196         catch (Exception e) {
197           throw new RuntimeException("HBase create failed: " + e.getMessage(), e);
198         }
199       }
200
201       /**
202        *
203        * @ClassName: HbaseBout.java
204        * @Description:批入库
205        * @author Import QQ:99123550  博客 [ http://www.cnblogs.com/thinkpad ]
206        * @date 2016年8月27日 下午6:05:39
207        * @version V1.0
208        */
209       public void batchMutate(List<Put> puts) throws Exception {
210
211           try {
212             this.table.put(puts);
213         } catch (Exception e) {
214               LOG.warn("Error insert batch to HBase.", e);
215               throw e;
216         }
217       }
218 }
时间: 2024-12-26 05:51:07

Storm写库,Hbase批入库,定时和批入库,batchSize,TickTime的相关文章

数据库写库基础组件设计思想与实现

码农一定会遇到写库的繁琐操作,字段少的话数据访问层的SQL语句封装还好实现,可是字段一旦多起来,比如十多个二十多个字段的话,SQL的封装将会是一个巨大的难题,并不是说难度有多大,而是这样的操作很繁琐,况且极容易出错,SQL语句一旦出错很难排查.我也是在开发中遇到了相同的问题,这样的问题总会浪费很多不必要的时间,所以我就想能不能提供一个公共的基础组件来实现繁琐的底层SQL语句操作,我们只需要调用一些简单的借口就能实现数据库的快捷的写库.首先,写库时必要的信息包含:要写入的列名,还有就是数据实体.(

WinCE 调试某手写输入法时遇到的加载手写库失败的问题

调试某手写输入法时遇到的加载手写库失败的问题 在 WinCE6.0 下使用此手写输入法 SDK 做了一个单独的手写输入程序A(MFC Dialog 框架).正常情况下,可以正常使用. 此 A 程序,采用 LIB 方式加载此的手写 SDK. 在运行某一带手写输入的程序 B 后,A 程序无法运行.首先怀疑 B 程序中也使用了此手写 SDK,导致加载冲突. 但仔细想想程序 B 和程序 A 应该运行在不同的进程空间,且最后确定程序 B 并未使用此手写. 进一步测试发现,如果程序 A 先运行,再程序 B 

storm写redis问题小结

最近一直在跟进storm的问题,从storm集群的稳定性到监控到升级到bolt写redis的问题,因为公司目前没有专业运维redis的,只能我们数据部门自己搞了..下面记录下遇到的几个问题: 总结下目前storm写redis问题: 1.redis高峰写入异常,增加redis监控,发现cpu性能瓶颈(redis单线程,最高10w/s的处理量) 2.之前redis bolt的并发在200以上,过多的并发对redis的性能造成比较大的影响,现在已经减少为5 3.关闭了redis的monitor监控,常

为什么用C++写库 但是导出接口时 却定义了C的接口(李大哥告诉我的,我还没有理解,先记着吧。为以后查询方便,哈哈)

导出C接口 使其拥有使用范围最广的接口 和多方式支持.比如操作系统,用C++写,但是接口申明了#ifdef C plus plus,判断如果是C++代码 就导出C接口,windows 下微软的几乎所有接口都是这样导出的,linux也一样.举例说明,我们导出一个C++接口 接口函数如下:KERNEL_USER_API int UserLogin(LPCTSTR lpszUserName,LPCTSTR lpszPassword);   假设这是一个内核库 封装了所有方法   然后提供给界面程序调用

WebAPI 用ExceptionFilterAttribute实现错误(异常)日志的记录(log4net做写库操作)

WebAPI 用ExceptionFilterAttribute实现错误(异常)日志的记录(log4net做写库操作) 好吧,还是那个社区APP,非管理系统,用户行为日志感觉不是很必要的,但是,错误日志咱还是得记录则个.总不能上线后报bug了让自己手足无措吧,虽然不管有木有错误日志报bug都是件很头疼的事... 我们知道webAPI也有好几个Filter,上篇文章我们做token与权限用到了ActionFilterAttribute,这次我们用ExceptionFilterAttribute来做

第8章 自己写库-构建库函数雏形—零死角玩转STM32-F429系列

第8章 ????自己写库-构建库函数雏形 全套200集视频教程和1000页PDF教程请到秉火论坛下载:www.firebbs.cn 野火视频教程优酷观看网址:http://i.youku.com/firege ? 本章参考资料:<STM32F4xx 中文参考手册>.<STM32F429规格书> 虽然我们上面用寄存器点亮了 LED,乍看一下好像代码也很简单,但是我们别侥幸以后就可以一直用寄存器开发.在用寄存器点亮 LED 的时候,我们会发现 STM32 的寄存器都是 32 位的,每次

HBase 高性能加入数据 - 按批多“粮仓”式解决办法

摘要:如何从HBase中的海量数据中,以很快的速度的获取大批量数据,这一议题已经在<HBase 高性能获取数据>(http://www.cnblogs.com/wgp13x/p/4245182.html)一文中给出了解决办法.那么,如何向HBase中高性能的插入数据呢?经研究表明,光是批量写入也还是不行.网上没有现成的方法.本文针对这一问题,给出了一个解决方案.它采用了多线程按批“多粮仓”的方式,经过验证,能较好的达到高速度的效果. 关键词:hbase, 高性能, 多线?程, 算法 解决问题:

小白写linux环境下的定时监测

本人做了多年的java,但是对于linux环境基本上还是小白一个,之前做了一个java的定时处理,但是好像会过了10来天就会挂掉. 但是java也没有日志.所以也不清楚怎么去调查.所以想写一个定时监测程序,如果监测到java程序挂掉的话就再次启动java程序. 于是在linux的crontab中加了一段定时监测代码. 文件etc/crontab中加入了以下代码: # /etc/crontab: system-wide crontab# Unlike any other crontab you d

(小菜鸟写库)个人为0.95寸OLED写的图形界面库(连载)

名称: OLED_Window.h OLED_Window.c 功能:对外资提供三个接口:1添加图标控件                                    2光标移动(选择图标)函数                                    3光标确定(执行图标)函数 4显示函数 OLED_Window.h代码: #ifndef _OLED_WINDOW_H #define _OLED_WINDOW_H #include "OLED.h" //OLED驱