Java往hbase写数据

接上篇读HDFS

上面读完了HDFS,当然还有写了。

先上代码:

WriteHBase

public class WriteHBase {

    public static void writeHbase(String content){
     // HDFS 数据是一行一条记录
        String[] lines = content.split("\n");
        int userSize = 0;
        List<Put> puts = new ArrayList<Put>();
        Put put;
        for(String line : lines){        //只有两列,以#号分割,一列rowkey,一列value,一个value是很多列数据拼接起来的。
            if(line.contains("#")){
                String[] arr = line.split("#");
//              添加一行,
                put = new Put(Bytes.toBytes(arr[0]));         // 给行添加列 cf column value
                put.add(Bytes.toBytes(Constant.CF), Bytes.toBytes(Constant.COLUMN), Bytes.toBytes(arr[1]));
                puts.add(put);
            }else{
                continue;
            }
            lines[userSize] = null;
            ++userSize;
            // write when list have 1000 没1000 条提交一次,已经改的 5000
            if (userSize % Constant.BATCH ==0){
                writeDate(userSize, puts);
            }
        }
        writeDate(userSize, puts);
        HDFSReadLog.writeLog("analysis " +userSize +" users");
    }

    private static void writeDate(int userSize, List<Put> puts) {
        try {
            table.put(puts);
            HDFSReadLog.writeLog("write "+userSize + " item.");
        } catch (IOException e) {
            e.printStackTrace();
            HDFSReadLog.writeLog("write "+userSize + " error.");
            HDFSReadLog.writeLog(e.getMessage());
        }
    }

    static HTable table = null;
//    static HTablePool pool = null;
    static{
        try {        // 创建HTable对象,对应hbase 的table
            table = new HTable(HBaseConf.getConf(),Constant.HBASE_TABLE);        // 如果表不存在就创建一个
            fitTable(Constant.HBASE_TABLE);
        } catch (IOException e) {
            e.printStackTrace();
            HDFSReadLog.writeLog("create table error.");
            HDFSReadLog.writeLog(e.getMessage());
        }
    }

    /**
     * if table is not exists, create it
     * @param tab
     * @throws IOException
     */
    private static void fitTable(String tab) throws IOException {

        HBaseAdmin admin = new HBaseAdmin(HBaseConf.getConf());
        if (admin.tableExists(tab)) {
            HDFSReadLog.writeLog(tab + " exists");
        } else {        
            HTableDescriptor tableDesc = new HTableDescriptor(tab);        // 建表的使用要指定 column family
            tableDesc.addFamily(new HColumnDescriptor("cf"));
            admin.createTable(tableDesc);
            HDFSReadLog.writeLog(tab + " create success");
        }

    }

}

HBaseConfig(z这个必须,不然会卡在table.put 上面,没有报错,就是卡)

public class HBaseConf {

    public static Configuration conf = null;
    public static Configuration getConf(){
        if (conf == null){
            conf = new Configuration();
            String path  = Constant.getSysEnv("HBASE_HOME") +"/conf/";
            HDFSReadLog.writeLog("Get HBase home : " + path);

            // hbase conf
            conf.setClassLoader(HBaseConf.class.getClassLoader());
            conf.addResource(path + "hbase-default.xml");
            conf.addResource(path + "hbase-site.xml");
            conf = HBaseConfiguration.create(conf);
            HDFSReadLog.writeLog("hbase.zookeeper.quorum : " + conf.get("hbase.zookeeper.quorum"));
        }
    // 如果配置文件读不到,set这两个参数,也可以读
        /*conf.set("hbase.zookeeper.quorum", "ip,ip,ip");
        conf.set("hbase.zookeeper.property.clientPort", "port");*/
        return conf;
    }

}  

注: hbase的配置文件很重要,如果读不到 “hbase.zookeeper.quorum” 会默认到 localhost,然后在table.put 的时候,卡住。

table.put(),不止可以put 一个Put,也可以put 一个Put的list,这样算是到底批量提交了。

一个一个写,太慢了。这边的结果:334403 条数据,写了112秒

 
时间: 2024-11-08 12:02:28

Java往hbase写数据的相关文章

Hbase写数据,存数据,读数据的详细过程

转自:http://www.aboutyun.com/thread-10886-1-1.html 附HBase 0.94之后Split策略: http://www.aboutyun.com/thread-11211-1-1.html 1.Client写入需要哪些过程?2.Hbase是如何读取数据的? Client写入 -> 存入MemStore,一直到MemStore满 -> Flush成一个StoreFile,直至增长到一定阈值 -> 出发Compact合并操作 -> 多个Sto

sparkStreaming向hbase写数据

在SparkStreaming中统计了数据之后,我们需要将结果写入外部文件系统. 本文,以向Hbase中写数据,为例,说一下,SparkStreaming怎么向Hbase中写数据. 首先,需要说一下,下面的这个方法. foreachRDD(func) 最通用的输出操作,把func作用于从stream生成的每一个RDD. 注意:这个函数是在 运行streaming程序的driver进程 中执行的. 下面跟着思路,看一下,怎么优雅的向Hbase中写入数据 向外部写数据 常见的错误: 向外部数据库写数

线上问题排查-HBase写数据出现NotServingRegionException(Region ... is not online)异常

今天线上遇到一个问题:有一台服务器的cpu持续冲高,排查发现是我们的一个java应用进程造成的,该进程在向hbase中写入数据时,日志不断地打印下面的异常: org.apache.hadoop.hbase.NotServingRegionException: Region iot_flow_cdr_201811,4379692584601-2101152593-20181115072326-355,1536703383699.82804f639798d0502dd64e6e47d75d84. i

在Windows下MyEclipse运行JAVA程序连接HBASE读取数据出错

运行环境:Hadoop-2.5.0+Hbase-0.98.6 问题描述: 15/06/11 15:35:50 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.     at org.apache.hadoop

【hbase】——HBase 写优化之 BulkLoad 实现数据快速入库

1.为何要 BulkLoad 导入?传统的 HTableOutputFormat 写 HBase 有什么问题? 我们先看下 HBase 的写流程: 通常 MapReduce 在写HBase时使用的是 TableOutputFormat 方式,在reduce中直接生成put对象写入HBase,该方式在大数据量写入时效率低下(HBase会block写入,频繁进行flush,split,compact等大量IO操作),并对HBase节点的稳定性造成一定的影响(GC时间过长,响应变慢,导致节点超时退出,

【原创】问题定位分享(16)spark写数据到hive外部表报错ClassCastException: org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat cannot be cast to org.apache.hadoop.hive.ql.io.HiveOutputFormat

spark 2.1.1 spark在写数据到hive外部表(底层数据在hbase中)时会报错 Caused by: java.lang.ClassCastException: org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat cannot be cast to org.apache.hadoop.hive.ql.io.HiveOutputFormat at org.apache.spark.sql.hive.SparkHiveWrit

mapreduce 只使用Mapper往多个hbase表中写数据

只使用Mapper不使用reduce会大大减少mapreduce程序的运行时间. 有时候程序会往多张hbase表写数据. 所以有如题的需求. 下面给出的代码,不是可以运行的代码,只是展示driver中需要进行的必要项设置,mapper类需要实现的接口,map函数需要的参数以及函数内部的处理方式. 实现过程比较曲折,只贴代码: class Qos2HbaseDriver extends Configured implements Tool { private static Logger logge

Spark 批量写数据入HBase

介绍 ??工作中常常会遇到这种情形,需要将hdfs中的大批量数据导入HBase.本文使用Spark+HBase的方式将RDD中的数据导入HBase中.没有使用官网提供的newAPIHadoopRDD接口的方式.使用本文的方式将数据导入HBase, 7000W条数据,花费时间大概20分钟左右,本文Spark可用核数量为20. 本文使用spark版本为1.3.0,hbase版本为0.98.1 hbase表结构为:表名table,列族Family,列为qualifier. 代码如下: val read

新浪微博数据解析与java操作Hbase实例

之前发过一篇开发新浪微博的文章,对于大家比较感兴趣的内容之一便是如何解析新浪微博的JSON. 其实一开始的时候,也遇过一些挫折,比如直接用JsonArray和JsonObject去解析JSON内容的话,是解析不了的. 因为JSON的格式比较固定,像新浪微博返回的JSON内容则是多了一个中括号及statues标签,如下: { "statuses": [ { "created_at": "Tue May 31 17:46:55 +0800 2011"