Hbase Java API包括协处理器统计行数

package com.zy;
import java.io.IOException;

import org.apache.commons.lang.time.StopWatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.util.Bytes;

public class HbaseTable {
    // 声明静态配置
    private static Configuration conf = HBaseConfiguration.create();
    // 创建表(tableName 表名; family 列族列表)
    public static void createTable(String tableName, String[] familys)
    throws IOException{
      HBaseAdmin admin = new HBaseAdmin(conf);
    if (admin.tableExists(tableName)){
    System.out.println(tableName+" already exists!");
    }
    else {
    HTableDescriptor descr = new HTableDescriptor(TableName.valueOf(tableName));
    for (String family:familys) {
    descr.addFamily(new HColumnDescriptor(family)); //添加列族
    }
    admin.createTable(descr); //建表
    System.out.println(tableName+" created successfully!");
    }
    }
    //插入数据(rowKey rowKey;tableName 表名;family 列族;qualifier 限定名;value 值)
    public static void addData(String tableName, String rowKey, String familyName, String
    columnName, String value)
    throws IOException {
    HTable table = new HTable(conf, Bytes.toBytes(tableName));//HTable负责跟记录相关的操作如增删改查等//
    Put put = new Put(Bytes.toBytes(rowKey));// 设置rowkey
    put.add(Bytes.toBytes(familyName), Bytes.toBytes(columnName), Bytes.toBytes(value));
    table.put(put);
    System.out.println("Add data successfully!rowKey:"+rowKey+", column:"+familyName+":"+columnName+", cell:"+value);
    }
    //遍历查询hbase表(tableName 表名)
    public static void getResultScann(String tableName) throws IOException {
    Scan scan = new Scan();
    ResultScanner rs = null;
    HTable table = new HTable(conf, Bytes.toBytes(tableName));
    try {
    rs = table.getScanner(scan);
    for (Result r : rs) {
    for (KeyValue kv : r.list()) {
    System.out.println("row:" + Bytes.toString(kv.getRow()));
    System.out.println("family:" + Bytes.toString(kv.getFamily()));
    System.out.println("qualifier:" + Bytes.toString(kv.getQualifier()));
    System.out.println("value:" + Bytes.toString(kv.getValue()));
    System.out.println("timestamp:" + kv.getTimestamp());
    System.out.println("-------------------------------------------");
    }
    }
    } finally {
    rs.close();
    }
    }
    //查询表中的某一列(
    public static void getResultByColumn(String tableName, String rowKey, String familyName, String
    columnName) throws IOException {
    HTable table = new HTable(conf, Bytes.toBytes(tableName));
    Get get = new Get(Bytes.toBytes(rowKey));
    get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); //获取指定列族和列修饰符对应的列
    Result result = table.get(get);
    for (KeyValue kv : result.list()) {
    System.out.println("family:" + Bytes.toString(kv.getFamily()));
    System.out.println("qualifier:" + Bytes.toString(kv.getQualifier()));
    System.out.println("value:" + Bytes.toString(kv.getValue()));
    System.out.println("Timestamp:" + kv.getTimestamp());
    System.out.println("-------------------------------------------");
    }
    }
    //更新表中的某一列(tableName 表名;rowKey rowKey;familyName 列族名;columnName 列名;value 更新后的值)
    public static void updateTable(String tableName, String rowKey,
    String familyName, String columnName, String value)
    throws IOException {
    HTable table = new HTable(conf, Bytes.toBytes(tableName));
    Put put = new Put(Bytes.toBytes(rowKey));
    put.add(Bytes.toBytes(familyName), Bytes.toBytes(columnName),
    Bytes.toBytes(value));
    table.put(put);
    System.out.println("update table Success!");
    }
    //删除指定单元格
    public static void deleteColumn(String tableName, String rowKey,
    String familyName, String columnName) throws IOException {
    HTable table = new HTable(conf, Bytes.toBytes(tableName));
    Delete deleteColumn = new Delete(Bytes.toBytes(rowKey));
    deleteColumn.deleteColumns(Bytes.toBytes(familyName),
    Bytes.toBytes(columnName));
    table.delete(deleteColumn);
    System.out.println("rowkey:"+rowKey+",column:"+familyName+":"+columnName+" deleted!");
    }
    //删除指定的行
    public static void deleteAllColumn(String tableName, String rowKey)
    throws IOException {
    HTable table = new HTable(conf, Bytes.toBytes(tableName));
    Delete deleteAll = new Delete(Bytes.toBytes(rowKey));
    table.delete(deleteAll);
    System.out.println("rowkey:"+rowKey+" are all deleted!");
    }
    //删除表(tableName 表名)
    public static void deleteTable(String tableName) throws IOException { HBaseAdmin admin = new HBaseAdmin(conf);
    admin.disableTable(tableName);
    admin.deleteTable(tableName);
    System.out.println(tableName + " is deleted!");
    }

    //统计行数
    public void RowCount(String tablename) throws Exception,Throwable{
        //提前创建conf
        HBaseAdmin admin = new HBaseAdmin(conf);
        TableName name=TableName.valueOf(tablename);
        //先disable表,添加协处理器后再enable表
        admin.disableTable(name);
        HTableDescriptor descriptor = admin.getTableDescriptor(name);
        String coprocessorClass = "org.apache.hadoop.hbase.coprocessor.AggregateImplementation";
        if (! descriptor.hasCoprocessor(coprocessorClass)) {
            descriptor.addCoprocessor(coprocessorClass);
        }
        admin.modifyTable(name, descriptor);
        admin.enableTable(name);

        //计时
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        //提高RPC通信时长
        conf.setLong("hbase.rpc.timeout", 600000);
        //设置Scan缓存
        conf.setLong("hbase.client.scanner.caching", 1000);
        Configuration configuration = HBaseConfiguration.create(conf);
        AggregationClient aggregationClient = new AggregationClient(configuration);
        Scan scan = new Scan();
        long rowCount = aggregationClient.rowCount(name, new LongColumnInterpreter(), scan);
        System.out.println(" rowcount is " + rowCount);
        System.out.println("统计耗时:"+stopWatch.getTime());
        }

    public static void main(String[] args) throws Exception {
    // 创建表
    String tableName = "test";
    String[] family = { "f1", "f2" };
    createTable(tableName, family);
    // 为表插入数据
    String[] rowKey = {"r1", "r2"};
    String[] columnName = { "c1", "c2", "c3" };
    String[] value = {"value1", "value2", "value3", "value4", "value5", "value6",};
    addData(tableName,rowKey[0],family[0],columnName[0],value[0]);
    addData(tableName,rowKey[0],family[0],columnName[1],value[1]);
    addData(tableName,rowKey[0],family[1],columnName[2],value[2]);
    addData(tableName,rowKey[1],family[0],columnName[0],value[3]);
    addData(tableName,rowKey[1],family[0],columnName[1],value[4]);
    addData(tableName,rowKey[1],family[1],columnName[2],value[5]);
    // 扫描整张表
    getResultScann(tableName);
    // 更新指定单元格的值
    updateTable(tableName, rowKey[0], family[0], columnName[0], "update value");
    // 查询刚更新的列的值
    getResultByColumn(tableName, rowKey[0], family[0], columnName[0]);
    // 删除一列
    deleteColumn(tableName, rowKey[0], family[0], columnName[1]);
    // 再次扫描全表
    getResultScann(tableName);
    // 删除整行数据
    deleteAllColumn(tableName, rowKey[0]);
    // 再次扫描全表
    getResultScann(tableName);
    // 删除表
    deleteTable(tableName);
    }

}

如果想要在本地成功运行上述的API Demo,必须满足如下几个条件:

1. 新建项目

本小节使用Intellij IDEA作为HBase的开发环境。安装好工具后需新建一个名为 hbase-test 的maven项

目,并在项目目录下的 ~/src/main/java/ 目录下将新建一个 HtableTest.java 文件,内容为上述

的API Demo。

2. 导入jar包

将上一章节中获取的jar包下载到本地,并将上一步新建的项目 hbase-test 与其建立依赖,也就是设定

新建项目 hbase-test 的 classpath ,用于API运行时查找jar包和配置文件。

3. 导入配置文件

若您要在本地进行开发还需要 hbase-site.xml 文件,将配置文件移入resources目录下。这个文件在集群中任意一台服务器上的

/etc/hbase/conf/ 目录下。

12. HBase API运行教程

本地的 hbase-site.xml 文件应放在上一步中与项目 hbase-test 建立了依赖的路径

下。

满足上述条件后,你就可以运行上述的API Demo了。

原文地址:https://www.cnblogs.com/qfdy123/p/12128454.html

时间: 2024-10-01 02:31:22

Hbase Java API包括协处理器统计行数的相关文章

HBase 协处理器统计行数

环境:cdh5.1.0 启用协处理器方法1. 启用协处理器 Aggregation(Enable Coprocessor Aggregation) 我们有两个方法:1.启动全局aggregation,能过操纵所有的表上的数据.通过修改hbase-site.xml这个文件来实现,只需要添加如下代码: <property> <name>hbase.coprocessor.user.region.classes</name> <value>org.apache.h

Java关于条件判断练习--统计一个src文件下的所有.java文件内的代码行数(注释行、空白行不统计在内)

要求:统计一个src文件下的所有.java文件内的代码行数(注释行.空白行不统计在内) 分析:先封装一个静态方法用于统计确定的.java文件的有效代码行数.使用字符缓冲流读取文件,首先判断是否是块注释开头,接着判断是否是块注释结尾,再判断是否是单行注释或者空白行,若都不是则是有效代码,统计行数+1. 对于文件夹路径,采用递归的方法判断子条目是文件还是文件夹,是文件就调用静态统计方法.源代码: public class CalculateRow { public static void main(

统计java方法(函数)的代码行数

今天想对一个java项目超过100行的方法进行一些代码优化.需要统计一下项目中的java类有哪些方法的代码超过了100行.在网上没找到类似的统计工具,就自己写了段代码进行统计. 编码思路:因为一个java类,最外层的{}可以标识类,次外层的{}就是方法或内部类了.为了便于编码,我把内部类也当作方法处理了.只要把次外层的{和}配对就是一个完整的方法了.因此我用先进后出的栈存储一个数组.数组的第一个元素是某个方法起始行,第二个元素是该行的行号.这样既能通过行号相减得到方法的行数,又能记录方法的位置.

HBase Java API使用(一)

前言 1. 创建表:(由master完成) 首先需要获取master地址(master启动时会将地址告诉zookeeper)因而客户端首先会访问zookeeper获取master的地址 client和master通信,然后有master来创建表(包括表的列簇,是否cache,设置存储的最大版本数,是否压缩等). 2. 读写删除数据 client与regionserver通信,读写.删除数据 写入和删除数据时讲数据打上不同的标志append,真正的数据删除操作在compact时发生 3. 版本信息

HBase Java API使用

概括 1. 创建.删除及启用禁用表.添加列等都需用到HBaseAdmin,另外需要注意删除,添加列等操作都需要禁用表 2. 表中添加数据,查询等都是和HTable相关,如果是多线程的情况下注意用HTablePool 3.  插入数据使用Put,可以单行添加也可批量添加 4. 查询数据需使用Get,Result,Scan.ResultScanner等 一.HBaseConfiguration org.apache.hadoop.hbase.HBaseConfiguration 对HBase进行配置

Hbase java API 调用详解

Hbase java API 调用 一. hbase的安装 参考:http://blog.csdn.net/mapengbo521521/article/details/41777721 二.hbase访问方式 Native java api:最常规最高效的访问方式. Hbase shell:hbase的命令行工具,最简单的接口,适合管理员使用 Thrift gateway:利用thrift序列化结束支持各种语言,适合异构系统在线访问 Rest gateway:支持rest风格的http api

linux、WINDOWS命令行下查找和统计行数

linux : 例子: netstat -an | grep TIME_WAIT | wc -l |  管道符 grep 查找命令 wc 统计命令 windows: 例子: netstat -an | find /i /c "TIME_WAIT" find 查找命令  /i 选项 忽略大小写   /c 统计行数    “TIME_WAIT” 要查找的内容(记得双引号) 例子 netstat -an | findstr /N TIME_WAIT

Windows命令提示符中统计行数

使用内置工具FIND统计cmd.exe输出的行数非常方便! 在命令行环境中工作时,能够统计不同工具的输出结果的行数有时会非常有用.许多Unix/Linux操作系统都包含带有许多功能选项的wc 工具,Windows则没有内置一样的替代品,但是Windows命令提示符(cmd.exe)原生支持了部分相同功能. 本文将讲述在cmd.exe中我们可以如何使用FIND 工具来统计行数.工具find,有些类似于Unix上的grep,自MS-DOS以来就一直存在, 使用简单. 假设我们有一台Windows服务

oracle查询表统计行数与注释

SELECT TABLE_NAME,NUM_ROWS,(select COMMENTS from user_tab_comments WHERE TABLE_NAME=C.TABLE_NAME) FROM user_tables CWHERE NUM_ROWS>0 查询表统计行数与注释