HBase 协处理器详解

一、简述

在使用HBase时,如果你的数据量达到了数十亿行或数百万列,此时能否在查询中返回大量数据将受制于网络的带宽,即便网络状况允许,但是客户端的计算处理也未必能够满足要求。在这种情况下,协处理器(Coprocessors)应运而生。它允许你将业务计算代码放入在RegionServer的协处理器中,将处理好的数据再返回给客户端,这可以极大地降低需要传输的数据量,从而获得性能上的提升。同时协处理器也允许用户扩展实现HBase目前所不具备的功能,如权限校验、二级索引、完整性约束等。

二、协处理器类型

2.1 Observer协处理器

1. 功能

Observer协处理器类似于关系型数据库中的触发器,当发生某些事件的时候这类协处理器会被Server端调用。通常可以用来实现下面功能:

  • 权限校验:在执行GetPut操作之前,您可以使用preGetprePut方法检查权限;
  • 完整性约束: HBase不支持关系型数据库中的外键功能,可以通过触发器在插入或者删除数据的时候,对关联的数据进行检查;
  • 二级索引: 可以使用协处理器来维护二级索引。

2. 类型

当前Observer协处理器有以下四种类型:

  • RegionObserver :
    允许您观察Region上的事件,例如Get和Put操作。
  • RegionServerObserver :
    允许您观察与RegionServer操作相关的事件,例如启动,停止或执行合并,提交或回滚。
  • MasterObserver :
    允许您观察与HBase Master相关的事件,例如表创建,删除或schema修改。
  • WalObserver :
    允许您观察与预写日志(WAL)相关的事件。

3. 接口

以上四种类型的Observer协处理器均继承自Coprocessor接口,这四个接口中分别定义了所有可用的钩子方法,以便在对应方法前后执行特定的操作。通常情况下,我们并不会直接实现上面接口,而是继承其Base实现类,Base实现类只是简单空实现了接口中的方法,这样我们在实现自定义的协处理器时,就不必实现所有方法,只需要重写必要方法即可。

这里以RegionObservers为例,其接口类中定义了所有可用的钩子方法,下面截取了部分方法的定义,多数方法都是成对出现的,有pre就有post

4. 执行流程

  • 客户端发出 put 请求
  • 该请求被分派给合适的 RegionServer 和 region
  • coprocessorHost 拦截该请求,然后在该表的每个 RegionObserver 上调用 prePut()
  • 如果没有被prePut()拦截,该请求继续送到 region,然后进行处理
  • region 产生的结果再次被 CoprocessorHost 拦截,调用postPut()
  • 假如没有postPut()拦截该响应,最终结果被返回给客户端

如果大家了解Spring,可以将这种执行方式类比于其AOP的执行原理即可,官方文档当中也是这样类比的:

If you are familiar with Aspect Oriented Programming (AOP), you can think of a coprocessor as applying advice by intercepting a request and then running some custom code,before passing the request on to its final destination (or even changing the destination).

如果您熟悉面向切面编程(AOP),您可以将协处理器视为通过拦截请求然后运行一些自定义代码来使用Advice,然后将请求传递到其最终目标(或者更改目标)。

2.2 Endpoint协处理器

Endpoint协处理器类似于关系型数据库中的存储过程。客户端可以调用Endpoint协处理器在服务端对数据进行处理,然后再返回。

以聚集操作为例,如果没有协处理器,当用户需要找出一张表中的最大数据,即 max 聚合操作,就必须进行全表扫描,然后在客户端上遍历扫描结果,这必然会加重了客户端处理数据的压力。利用 Coprocessor,用户可以将求最大值的代码部署到 HBase Server 端,HBase将利用底层 cluster 的多个节点并发执行求最大值的操作。即在每个 Region 范围内执行求最大值的代码,将每个 Region 的最大值在 Region Server 端计算出来,仅仅将该 max 值返回给客户端。之后客户端只需要将每个 Region 的最大值进行比较而找到其中最大的值即可。

三、协处理的加载方式

要使用我们自己开发的协处理器,必须通过静态(使用HBase配置)或动态(使用HBase Shell或Java API)加载它。

  • 静态加载的协处理器称之为 System Coprocessor(系统级协处理器),作用范围是整个HBase上的所有表,需要重启HBase服务;
  • 动态加载的协处理器称之为 Table Coprocessor(表处理器),作用于指定的表,不需要重启HBase服务。

其加载和卸载方式分别介绍如下。

四、静态加载与卸载

4.1 静态加载

静态加载分以下三步:

  1. hbase-site.xml定义需要加载的协处理器。
<property>
    <name>hbase.coprocessor.region.classes</name>
    <value>org.myname.hbase.coprocessor.endpoint.SumEndPoint</value>
</property>

<name>标签的值必须是下面其中之一:

  • RegionObservers 和 Endpoints协处理器:hbase.coprocessor.region.classes
  • WALObservers协处理器: hbase.coprocessor.wal.classes
  • MasterObservers协处理器:hbase.coprocessor.master.classes

    <value>必须是协处理器实现类的全限定类名。如果为加载指定了多个类,则类名必须以逗号分隔。

  1. 将jar(包含代码和所有依赖项)放入HBase安装目录中的lib目录下;
  2. 重启HBase。

4.2 静态卸载

  1. 从hbase-site.xml中删除配置的协处理器的<property>元素及其子元素;
  2. 从类路径或HBase的lib目录中删除协处理器的JAR文件(可选);
  3. 重启HBase。

五、动态加载与卸载

使用动态加载协处理器,不需要重新启动HBase。但动态加载的协处理器是基于每个表加载的,只能用于所指定的表。
此外,在使用动态加载必须使表脱机(disable)以加载协处理器。动态加载通常有两种方式:Shell 和 Java API 。

以下示例基于两个前提:

  1. coprocessor.jar 包含协处理器实现及其所有依赖项。
  2. JAR 包存放在HDFS上的路径为:hdfs:// <namenode>:<port> / user / <hadoop-user> /coprocessor.jar

5.1 HBase Shell动态加载

  1. 使用HBase Shell禁用表
hbase > disable ‘tableName‘
  1. 使用如下命令加载协处理器
hbase > alter ‘tableName‘, METHOD => ‘table_att‘, ‘Coprocessor‘=>‘hdfs://<namenode>:<port>/
user/<hadoop-user>/coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823|
arg1=1,arg2=2‘

Coprocessor包含由管道(|)字符分隔的四个参数,按顺序解释如下:

  • JAR包路径:通常为JAR包在HDFS上的路径。关于路径以下两点需要注意:
  • 允许使用通配符,例如:hdfs://<namenode>:<port>/user/<hadoop-user>/*.jar 来添加指定的JAR包;
  • 可以使指定目录,例如:hdfs://<namenode>:<port>/user/<hadoop-user>/ ,这会添加目录中的所有JAR包,但不会搜索子目录中的JAR包。
  • 类名:协处理器的完整类名。
  • 优先级:协处理器的优先级,遵循数字的自然序,即值越小优先级越高。可以为空,在这种情况下,将分配默认优先级值。
  • 可选参数 :传递的协处理器的可选参数。
  1. 启用表
hbase > enable ‘tableName‘
  1. 验证协处理器是否已加载
hbase > describe ‘tableName‘

协处理器出现在TABLE_ATTRIBUTES属性中则代表加载成功。

</br>

5.2 HBase Shell动态卸载

  1. 禁用表
hbase> disable ‘tableName‘
  1. 移除表协处理器
hbase> alter ‘tableName‘, METHOD => ‘table_att_unset‘, NAME => ‘coprocessor$1‘
  1. 启用表
hbase> enable ‘tableName‘

</br>

5.3 Java API 动态加载

TableName tableName = TableName.valueOf("users");
String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar";
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
hTableDescriptor.setValue("COPROCESSOR$1", path + "|"
+ RegionObserverExample.class.getCanonicalName() + "|"
+ Coprocessor.PRIORITY_USER);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);

在HBase 0.96及其以后版本中,HTableDescriptor的addCoprocessor()方法提供了一种更为简便的加载方法。

TableName tableName = TableName.valueOf("users");
Path path = new Path("hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar");
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
hTableDescriptor.addCoprocessor(RegionObserverExample.class.getCanonicalName(), path,
Coprocessor.PRIORITY_USER, null);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);

5.4 Java API 动态卸载

卸载其实就是重新定义表但不设置协处理器。这会删除所有表上的协处理器。

TableName tableName = TableName.valueOf("users");
String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar";
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);

六、协处理器案例

这里给出一个简单的案例,实现一个类似于Redis中append 命令的协处理器,当我们对已有列执行put操作时候,HBase默认执行的是update操作,这里我们修改为执行append操作。

# redis append 命令示例
redis>  EXISTS mykey
(integer) 0
redis>  APPEND mykey "Hello"
(integer) 5
redis>  APPEND mykey " World"
(integer) 11
redis>  GET mykey
"Hello World"

6.1 创建测试表

# 创建一张杂志表 有文章和图片两个列族
hbase >  create ‘magazine‘,‘article‘,‘picture‘

6.2 协处理器编程

完整代码可见本仓库:hbase-observer-coprocessor

新建Maven工程,导入下面依赖:

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-common</artifactId>
    <version>1.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.2.0</version>
</dependency>

继承BaseRegionObserver实现我们自定义的RegionObserver,对相同的article:content执行put命令时,将新插入的内容添加到原有内容的末尾,代码如下:

public class AppendRegionObserver extends BaseRegionObserver {

    private byte[] columnFamily = Bytes.toBytes("article");
    private byte[] qualifier = Bytes.toBytes("content");

    @Override
    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
                       Durability durability) throws IOException {
        if (put.has(columnFamily, qualifier)) {
            // 遍历查询结果,获取指定列的原值
            Result rs = e.getEnvironment().getRegion().get(new Get(put.getRow()));
            String oldValue = "";
            for (Cell cell : rs.rawCells())
                if (CellUtil.matchingColumn(cell, columnFamily, qualifier)) {
                    oldValue = Bytes.toString(CellUtil.cloneValue(cell));
                }

            // 获取指定列新插入的值
            List<Cell> cells = put.get(columnFamily, qualifier);
            String newValue = "";
            for (Cell cell : cells) {
                if (CellUtil.matchingColumn(cell, columnFamily, qualifier)) {
                    newValue = Bytes.toString(CellUtil.cloneValue(cell));
                }
            }

            // Append 操作
            put.addColumn(columnFamily, qualifier, Bytes.toBytes(oldValue + newValue));
        }
    }
}

6.3 打包项目

使用maven命令进行打包,打包后的文件名为hbase-observer-coprocessor-1.0-SNAPSHOT.jar

# mvn clean package

6.4 上传JAR包到HDFS

# 上传项目到HDFS上的hbase目录
hadoop fs -put /usr/app/hbase-observer-coprocessor-1.0-SNAPSHOT.jar /hbase
# 查看上传是否成功
hadoop fs -ls /hbase

6.5 加载协处理器

  1. 加载协处理器前需要先禁用表
hbase >  disable ‘magazine‘
  1. 加载协处理器
hbase >   alter ‘magazine‘, METHOD => ‘table_att‘, ‘Coprocessor‘=>‘hdfs://hadoop001:8020/hbase/hbase-observer-coprocessor-1.0-SNAPSHOT.jar|com.heibaiying.AppendRegionObserver|1001|‘
  1. 启用表
hbase >  enable ‘magazine‘
  1. 查看协处理器是否加载成功
hbase >  desc ‘magazine‘

协处理器出现在TABLE_ATTRIBUTES属性中则代表加载成功,如下图:

6.6 测试加载结果

插入一组测试数据:

hbase > put ‘magazine‘, ‘rowkey1‘,‘article:content‘,‘Hello‘
hbase > get ‘magazine‘,‘rowkey1‘,‘article:content‘
hbase > put ‘magazine‘, ‘rowkey1‘,‘article:content‘,‘World‘
hbase > get ‘magazine‘,‘rowkey1‘,‘article:content‘

可以看到对于指定列的值已经执行了append操作:

插入一组对照数据:

hbase > put ‘magazine‘, ‘rowkey1‘,‘article:author‘,‘zhangsan‘
hbase > get ‘magazine‘,‘rowkey1‘,‘article:author‘
hbase > put ‘magazine‘, ‘rowkey1‘,‘article:author‘,‘lisi‘
hbase > get ‘magazine‘,‘rowkey1‘,‘article:author‘

可以看到对于正常的列还是执行update操作:

>

6.7 卸载协处理器

  1. 卸载协处理器前需要先禁用表
hbase >  disable ‘magazine‘
  1. 卸载协处理器
hbase > alter ‘magazine‘, METHOD => ‘table_att_unset‘, NAME => ‘coprocessor$1‘
  1. 启用表
hbase >  enable ‘magazine‘
  1. 查看协处理器是否卸载成功
hbase >  desc ‘magazine‘

6.8 测试卸载结果

依次执行下面命令可以测试卸载是否成功

hbase > get ‘magazine‘,‘rowkey1‘,‘article:content‘
hbase > put ‘magazine‘, ‘rowkey1‘,‘article:content‘,‘Hello‘
hbase > get ‘magazine‘,‘rowkey1‘,‘article:content‘

参考资料

  1. Apache HBase Coprocessors
  2. Apache HBase Coprocessor Introduction
  3. HBase高階知識

更多大数据系列文章可以参见个人 GitHub 开源项目: 大数据入门指南

原文地址:https://blog.51cto.com/14183932/2412387

时间: 2025-01-17 23:05:14

HBase 协处理器详解的相关文章

HBase RegionServer详解(未完)

HBase RegionServer详解 RegionServer组件介绍 RegionServer是HBase集群运行在每个工作节点上的服务.它是整个HBase系统的关键所在,一方面它维护了Region的状态,提供了对于Region的管理和服务:另一方面,它与Master交互,参与Master的分布式协调管理. MemStoreFlusher MemStoreFlusher主要功能是将MemStore刷新到文件中,当满足一下条件时会出发MemStore执行flush操作,最小的flush单元是

Mac下安装HBase及详解

Mac下安装HBase及详解 1. 千篇一律的HBase简介 HBase是Hadoop的数据库, 而Hive数据库的管理工具, HBase具有分布式, 可扩展及面向列存储的特点(基于谷歌BigTable). HBase可以使用本地文件系统和HDFS文件存储系统, 存储的是松散的数据(key-value的映射关系). HBase位于HDFS的上层, 向下提供存储, 向上提供运算 2. HBase安装 HBase有单机, 伪分布式, 全分布式运行模式 依赖: 匹配HBase的Hadoop版本 Jav

Hbase存储详解

Hbase存储详解 started by chad walters and jim 2006.11 G release paper on BigTable 2007.2 inital Hbase prototype created as Hadoop contrib 2007.10 First useable Hbase 2008.1 Hadoop become Apache top-level project and Hbase becomes subproject 2008.10 Hbase

大数据学习系列之五 ----- Hive整合HBase图文详解

引言 在上一篇 大数据学习系列之四 ----- Hadoop+Hive环境搭建图文详解(单机) 和之前的大数据学习系列之二 ----- HBase环境搭建(单机) 中成功搭建了Hive和HBase的环境,并进行了相应的测试.本文主要讲的是如何将Hive和HBase进行整合. Hive和HBase的通信意图 Hive与HBase整合的实现是利用两者本身对外的API接口互相通信来完成的,其具体工作交由Hive的lib目录中的hive-hbase-handler-*.jar工具类来实现,通信原理如下图

Hbase安装详解

一.简介 HBase – Hadoop Database,是一个高可靠性.高性能.面向列.可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建起大规模结构化存储集群.HBase利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理HBase中的海量数据,利用Zookeeper作为协调工具. 1.HBase(NoSQL)的数据模型 1)        表(table),是存储管理数据的. 2)        行键(row key),类似于My

NoSQL选型及HBase案例详解(转)

从 NOSQL的类型到 常用的产品,我们已经做过很多关于NoSQL的文章,今天我们从国内著名的互联网公司及科研机构的实战谈一下NoSQL数据库. NoSQL一定程度上是基于一个很重要的原理—— CAP原理提出来的.传统的SQL数据库(关系型数据库)都具有ACID属性,对一致性要求很高,因此降低了A(availability)和P(partion tolerance).为了提高系统性能和可扩展性,必须牺牲C(consistency). 依据CAP理论,从应用的需求不同,数据库的选择可从三方面考虑:

HBase配置项详解

hbase.tmp.dir:本地文件系统的临时目录,默认是java.io.tmpdir/hbase−java.io.tmpdir/hbase−{user.name}: hbase.rootdir:hbase持久化的目录,被所有regionserver共享,默认${hbase.tmp.dir}/hbase,一般设置为hdfs://namenode.example.org:9000/hbase类似,带全限定名: hbase.cluster.distributed:hbase集群模式运作与否的标志,默

Sqoop import加载HBase案例详解

简单写一下如何将订单表sqoop到hbase表中的步骤. 下表: 1.通过hbase shell 打开hbase. 2.创建一个hbase表 create 'so','o' 3.将so表的数据导入到hbase中. opt文件: --connect:数据库 --username :数据库用户名 --password :数据库密码 --table :需要sqoop的表 --columns :表中的列 --hbase-table:hbase中的table --column-family:列族 --hb

暑期——第八周总结(Hbase命令详解)

所花时间:7天 代码行:800(Java) 博客量:1篇 了解到知识点 : 看了hbase的操作,因为老师开学要实现用hbase实现网页版增删改查,根据林子雨老师的教程,很显然是有点差强人意,后来我又在网上找了个例子,运行了时候还是不能用 ,目前正在找原因,下面来分享下看到的hbase命令总结, 很感谢这篇博文: https://www.cnblogs.com/fenghuoliancheng/p/10690022.html 名称 命令表达式 创建表 create '表名', '列族名1','列