读书笔记-HBase in Action-第二部分Advanced concepts-(2)Coprocessor

Coprocessor是HBase 0.92.0引入的特性。使用Coprocessor,可以将一些计算逻辑下推到HBase节点,HBase由一个单纯的存储系统升级为分布式数据处理平台。

Coprocessor分为两种:Observer和Endpoint。Observer能修改扩展已有的客户端操作功能,而Endpoint能引入新的客户端操作。

Observer

Observer的作用类似于数据库的触发器或者AOP中的advice。下图为Put操作增加Observer,其中1-2-4-6是一次正常的Put操作RPC调用过程,而3和5属于Observer,可以在Put操作之前和之后加入自定义处理逻辑。

Observer包括三种,RegionObserver(针对数据访问和更新操作,运行在Region上)/WALObserver(针对WAL日志事件,运行在RegionServer上下文)/MasterObserver(针对DDL操作,运行在Master节点)。

Endpoint

Endpoint的作用则类似于数据库存储过程。实现机制是通过扩展HBase RPC协议,给客户端暴露新的操作接口。如下图,客户端负责发起调用和收集结果,服务端各节点负责并行计算。

实战

以上一章的follows表为例,通过Observer实现followedBy被关注表数据一致性维护,Endpoint实现关注人数量统计。

因为要实现在插入follows表时自动插入followedBy表,需要用到关注人/被关注人用户名信息,所以首先升级schema。

实现Observer

代码中有三处注释值得注意:

  1. postPut方法在put操作之后被调用。
  2. 如果通过hbase-site.xml安装Observer,会应用到全局所有表,所以这里判断put操作的是否follows表。
  3. 这里有点bad smell。Observer运行在服务器端,为了共用代码,又调用客户端代码,仅为演示作用。
packageHBaseIA.TwitBase.coprocessors;
//…
publicclass FollowsObserver extends BaseRegionObserver {
    private HTablePool pool = null;
    @Override
    public void start(CoprocessorEnvironment env)throws IOException {
        pool = newHTablePool(env.getConfiguration(), Integer.MAX_VALUE);
    }
    @Override
    public void stop(CoprocessorEnvironment env)throws IOException {
        pool.close();
    }
    @Override
    public void postPut(//1,在Put操作之后调用
            finalObserverContext<RegionCoprocessorEnvironment> e,
            final Put put,
            final WALEdit edit,
            final boolean writeToWAL) throws IOException {
        byte[] table=e.getEnvironment().getRegion().getRegionInfo().getTableName();
        if (!Bytes.equals(table,FOLLOWS_TABLE_NAME))
             return;  //2,判断表名
        KeyValue kv =put.get(RELATION_FAM, FROM).get(0);
        String from =Bytes.toString(kv.getValue());
        kv = put.get(RELATION_FAM,TO).get(0);
        String to =Bytes.toString(kv.getValue());
        RelationsDAO relations = newRelationsDAO(pool);
        relations.addFollowedBy(to,from);//3,插入followedBy表
    }
}

Observer的安装可以通过修改hbase-site.xml或者使用tableschema修改语句完成,前者需要重启HBase服务,后者只需要重新上下线对应表。

$ hbase shell
HBaseShell; enter 'help<RETURN>' for list of supported commands.
Type"exit<RETURN>" to leave the HBase Shell
Version0.92.0, r1231986, Mon Jan 16 13:16:35 UTC 2012
hbase(main):001:0>disable 'follows'
0 row(s) in 7.0560 seconds
hbase(main):002:0>alter 'follows', METHOD => 'table_att',
'coprocessor'=>'file:///Users/ndimiduk/repos/hbaseiatwitbase/
target/twitbase-1.0.0.jar
|HBaseIA.TwitBase.coprocessors.FollowsObserver|1001|'
Updatingall regions with the new schema...
1/1regions updated.
Done.
0 row(s) in 1.0770 seconds
hbase(main):003:0>enable 'follows'
0 row(s) in 2.0760 seconds

其中1001为优先级,当加载多个Observer时,按照优先级次序运行。

实现Endpoint

关注人数量统计可以通过客户端Scan实现,相比Endpoint方案,有两个待改进点:

  1. 传输所有关注人到客户端,不必要的网络I/O。
  2. 拿到所有关注人Result结果后,遍历实现计数是单线程的。

实现Endpoint包括三部分

定义PRC接口

publicinterface RelationCountProtocol extends CoprocessorProtocol {
    public long followedByCount(String userId) throwsIOException;
}

服务端实现

和客户端不同,InternalScanner运行在特定Region上,返回的是原始的KeyValue对象。

packageHBaseIA.TwitBase.coprocessors;
//…
publicclass RelationCountImpl extends BaseEndpointCoprocessor implementsRelationCountProtocol {
    @Override
    public longfollowedByCount(String userId) throws IOException {
        byte[]startkey = Md5Utils.md5sum(userId);
        Scan scan = newScan(startkey);
        scan.setFilter(newPrefixFilter(startkey));
        scan.addColumn(RELATION_FAM,FROM);
        scan.setMaxVersions(1);
        RegionCoprocessorEnvironmentenv= (RegionCoprocessorEnvironment)getEnvironment();
        InternalScanner scanner =env.getRegion().getScanner(scan);//1,服务器端
        long sum = 0;
        List<KeyValue> results= new ArrayList<KeyValue>();
        boolean hasMore = false;
        do {
            hasMore =scanner.next(results);
            sum += results.size();
            results.clear();
        } while (hasMore);
        scanner.close();
        return sum;
    }
}

客户端代码

参考注释:

  1. 定义Call实例
  2. 调用服务端Endpoint。
  3. 聚合所有RegionServer得到的结果
public long followedByCount (final String userId) throws Throwable {
    HTableInterface followed =pool.getTable(FOLLOWED_TABLE_NAME);
    final byte[] startKey = Md5Utils.md5sum(userId);
    final byte[] endKey =Arrays.copyOf(startKey, startKey.length);
    endKey[endKey.length-1]++;
    Batch.Call<RelationCountProtocol,Long> callable =
        newBatch.Call<RelationCountProtocol, Long>() {
        @Override
        public Longcall(RelationCountProtocol instance) throws IOException {
            returninstance.followedByCount(userId);
        }
    };//1 call instance
    Map<byte[], Long>results = followed.coprocessorExec(
                                   RelationCountProtocol.class,
                                   startKey,
                                   endKey,
                                   callable);//2 invoke endpoint
    long sum = 0;
    for(Map.Entry<byte[],Long> e : results.entrySet()) {
        sum +=e.getValue().longValue();
    }//3 aggreagte results
    return sum;
}

Endpoint只能通过配置文件部署,还需要将相关jar包加入到HBase classpath。

<property>
    <name>hbase.coprocessor.region.classes</name>
    <value>HBaseIA.TwitBase.coprocessors.RelationCountImpl</value>
</property>

读书笔记-HBase in Action-第二部分Advanced concepts-(2)Coprocessor,布布扣,bubuko.com

时间: 2024-08-06 19:42:31

读书笔记-HBase in Action-第二部分Advanced concepts-(2)Coprocessor的相关文章

读书笔记-HBase in Action-第二部分Advanced concepts-(3)非Java客户端

HBase Shell HBase shell使用起来最方便,进入HBase shell控制台即可使用. $ $HBASE_HOME/bin/hbase shell 常见操作有create(创建表)/put(插入或更新数据)/get(根据rowkey查询)/scan(范围查询)/delete(删除列)/deleteAll(根据rowkey删除整行数据)/disable&drop(禁用表之后再删除). 基于数据库的项目,往往会在某个目录下存储专门的sql脚本,记录每次迭代数据库变更:同理,HBas

读书笔记-HBase in Action-第二部分Advanced concepts-(1)HBase table design

本章以山寨版Twitter为例介绍HBase Schema设计模式.广义的HBase Schema设计不只包括创建表时指定项,还应该综合考虑Column families/Column qualifier/Cell value/Versions/Rowkey等相关内容. 灵活的Schema&简单的存储视图 Schema设计和数据存储及访问模式关系密切,先回顾下HBase数据模型,有几个要点: 被索引的部分包括Row Key+Col Fam+Col Qual+Time Stamp 由于HBase的

读书笔记-HBase in Action-第二部分Advanced concepts-(3)非Javaclient

HBase Shell HBase shell使用起来最方便,进入HBase shell控制台就可以使用. $ $HBASE_HOME/bin/hbase shell 常见操作有create(创建表)/put(插入或更新数据)/get(依据rowkey查询)/scan(范围查询)/delete(删除列)/deleteAll(依据rowkey删除整行数据)/disable&drop(禁用表之后再删除). 基于数据库的项目,往往会在某个文件夹下存储专门的sql脚本,记录每次迭代数据库变更:同理,HB

[读书笔记]算法(Sedgewick著)·第二章.初级排序算法

本章开始学习排序算法 1.初级排序算法 先从选择排序和插入排序这两个简单的算法开始学习排序算法.选择排序就是依次找到当前数组中最小的元素,将其和第一个元素交换位置,直到整个数组有序. 1 public static void sort(Comparable a[]){ 2 int N = a.length; 3 for(int i = 0; i < N; i ++){ 4 int min = i; //最小元素索引 5 for(int j = i + 1; j < N; j++){ 6 if(

读书笔记-HBase in Action-第三部分应用-(1)OpenTSDB

OpenTSDB是基于HBase的开源监控系统,可以支持上万规模集群监控和上亿数据点采集.其中TSDB代表Time Series Database,OpenTSDB在时间序列数据的存储和查询上都做了相当多的优化工作. 架构Overview 概念上OpenTSDB由三部分组成:tcollector数据采集.tsd数据服务和HBase数据存储. 数据采集流程 如上图,tcollector后台进程运行在每台被监控的服务器上,管理数据收集脚本,定期执行,失败时重启,确保所有的监控数据发送给OpenTSD

读书笔记-HBase in Action-第一部分 HBase fundamentals

新项目准备上HBase.HBase眼下由组里某牛负责.本着学会使用HBase的目标,先阅读下HBase in Action,一共十章组织成三部分,须要学习的内容包含HBase基本实现原理,用法,Schema设计原则和实战等.借用Michael Stack(HBase Chair)的话,"At a highlevel, HBase is like theatomic bomb. Its basic operation can be explained onthe back of a napkin

读书笔记-HBase in Action-第三部分应用-(2)GIS系统

本章介绍用HBase存储.高效查询地理位置信息. Geohash空间索引 考虑LBS应用中常见的两个问题:1)查找离某地最近的k个地点:2)查找某区域内地点.如果要用HBase实现高效查找,首先要考虑的是空间局部性(Spatial Locality),即位置上相近的点得物理存储在一起.最简单的地理位置数据由两个维度组成:经度X和纬度Y,那么相对应最简单的Rowkey也可以由X和Y组成.Rowkey的有序性决定了数据首先按照经度X排序,再按照纬度Y排序,这种方式最大的问题是经度值相等的A地点和B地

软件工程读书笔记(2)——第二章 软件过程

第二章 软件过程 软件工程的目标是在规定的时间和预算内开发出高质量软件. 软件项目失败的主要原因几乎与技术和工具没有任何关系,更多的是由于缺少过程规范,只有建立规范的软件开发过程,并持续不断地加以改进,才能管理和控制软件产品的质量. 一.软件过程的概念 1.任务思维与过程思维 软件发展的前期阶段:强调软件开发结果,忽略软件开发过程.(类似于黑盒子) Watts Humphery首先将过程管理的原则和思想引入软件开发过程中,将软件开发任务看做是一个可控的,可度量的和可改进的过程. 2.软件过程的定

读书笔记-HBase in Action-第四部分-(1)部署

最后一部分了...分两章吧.HBase和Hadoop紧密相关,更为具体的部署和运维内容推荐Hadoop Operations和HBase Administration Cookbook.本文粗粒度列出一些HBase部署运维的最佳实践和基本原则. 集群规划 一个完整的HBase集群包含HBase Master,ZooKeeper,RegionServers和Hadoop相关组件.生产集群按照规模大小可分为小型(10-20个节点).中型(50个节点)和大型(超过50个节点).集群规划需要为这些组件选