HBase-协处理器详解及实现

协处理器(coprocessor)

把一部分计算移动到数据的存放端。

简介

协处理器允许用户在region服务器上运行自己的代码,允许用户执行region级别的操作,并且可以使用与RDBMS中触发器(trigger)类似的功能。在客户端,用户不用关心操作具体在哪里执行,HBase的分布式框架会帮助用户把这些工作变得透明。

协处理器框架提供了一些类,用户可以通过继承这些类来扩展自己的功能。主要分为以下两大类

observer

这一类协处理器与触发器(trigger)类似:回调函数(也被称作钩子函数,hook)在一些特定事件发生时被执行。这些事件包括一些用户产生的事件,也包括服务器端内部自动产生的事件。

协处理器框架提供的接口如下

RegionObserver:用户可以用这种的处理器处理数据修改事件,它们与表的region联系紧密。

MasterObserver:可以被用作管理或DDL类型的操作,这些是集群级事件。

WALObserver:提供控制WAL的钩子函数

Observer提供了一些设计好的回调函数,每个操作在集群服务器端都可以被调用。

endpoint

除了事件处理之外还需要将用户自定义操作添加到服务器端。用户代码可以被部署到管理数据的服务器端,例如,做一些服务器端计算的工作。

Endpoint通过添加一下远程过程调用来动态扩展RPC协议。可以把它们理解为与RDBMS中类似的存储过程。Endpoint可以与observer的实现组合起来直接作用于服务器端的状态。

Coprocessor类

所有协处理器的类都必须实现这个接口。它定义了协处理器的基本约定,并使得框架本身的管理变得容易。

Coprocessor.Priority枚举类定义的优先级

SYSTEM:高优先级,定义最先被执行的协处理器

USER:定义其他的协处理器,按顺序执行

协处理器的优先级决定了执行的顺序:系统(SYSTEM)级协处理器在用户(USER)级协处理器之前执行。

在同一个优先级中还有一个序号(sequence number)的概念,用来维护协处理器的加载顺序。序号从0开始依次增加。

这个数字的作用并不大,但用户可以依靠它们来为同一优先级的协处理器排序:在同一优先级下,它们按照其序号递增的顺序执行,即定义了执行顺序。

在协处理器生命周期中,它们由框架管理。Coprocessor接口提供了以下两个方法

void start(CoprocessorEnvironment env) throws IOException;
void stop(CoprocessorEnvironment env) throws IOException;

这两个方法在协处理器开始和结束时被调用。CoprocessorEnvironment用来在协处理器的生命周期中保持其状态。协处理器实例一直被保存在提供的环境中。

CoprocessorEnvironment类提供的方法


int getVersion();


返回Coprocessor版本


String getHBaseVersion();


以字符串的格式返回HBase版本


Coprocessor getInstance();


返回加载的协处理器实例


int getPriority();


返回协处理器的优先级


int getLoadSequence();


协处理器的序号,当协处理器加载时被设置,这反映了它的执行顺序

协处理器应当只与提供给它们的环境进行交互。这样的好处是可以保证没有会被恶意代码用来破坏数据的后门。

注意:协处理器应当使用getTable方法访问表数据。注意这个方法实际上在默认的HTable类上添加了特定的安全措施。例如,协处理器不可以对一行数据加锁。

在协处理器实例的生命周期中,Coprocessor接口的start和stop方法会被框架隐式调用,处理过程中的每一步都有一个状态。

Coprocessor.State枚举类定义的状态


UNINSTALLED


协处理器最初的状态,没有环境,也没有被初始化


INSTALLED


实例装载了它的环境参数


STARTING


协处理器将要开始工作,也就是说start方法将要被调用


ACTIVE


一旦start方法被调用,当前状态设置为active


STOPPING


Stop方法被调用之前的状态


STOPPED


一旦stop方法将控制权 交给框架,协处理器会被设置为状态stopped

CoprocessorHost类,它维护所有协处理器实例和它们专用的环境。它有一些子类,这些子类应用在不同的使用环境,例如,master和region服务器等环境

Coprocessor、CoprocessorEnvironment和CoprocessorHost这3个类形成了协处理器类的基础,基于这三个类能够实现更高级的功能。它们支持协处理器的生命周期,管理协处理器的状态,同时提供了执行时的环境参数,以保证协处理器正确执行。此外,这些类也提供了一个抽象层方便用户更简单的构建自己的实现。

客户端的调用与一系列的协处理器进行交互

协处理器加载

从配置文件中加载

需要将协处理器jar包放入HBASE_CLASSPATH指定的路径下

在hbase-site.xml中配置,会对所有的表都加上

<property>
       <name>hbase.coprocessor.region.classes</name>
       <value>class1,class2</value>
</property>
<property>
       <name>hbase.coprocessor.master.classes</name>
       <value>class1,class2</value>
</property>
<property>
       <name>hbase.coprocessor.wal.classes</name>
       <value>class1,class2</value>
</property>

配置完成后重启HBase.

配置文件中配置项的顺序决定了执行顺序。所有协处理器都是以系统级优先级进行加载的。

配置文件首先在HBase启动时被检查。虽然还可以在其他地方增加系统级优先级的协处理器,但是在配置文件中配置的协处理器是被最先执行的。

从表描述符中加载

针对特定表,所以加载的协处理器只针对这个表的region,同时也只被这些region的region服务器使用。换句话说,用户只能在与region相关的协处理器上使用这种方法,而不能在master或WAL相关的协处理器上使用。

由于它们是使用表的上下文加载的,所以与配置文件中加载的协处理器影响所有表相比,这种方法加载的协处理器更具有针对性。用户需要在表描述符中利用HTableDescription.setValue方法定义它们。键必须以COPROCESSOR开头,值必须符合以下格式

<path-to-jar>|<classname>|<priority>

如下,一个使用系统优先级,一个使用用户优先级

alter ‘socialSecurityTest‘,METHOD=>‘table_att‘,‘coprocessor‘=>‘hdfs://cluster1/user/solr/hbase/observer/HBaseCoprocessor.jar|com.hbase.coprocessor.HbaseDataSyncSolrObserver|SYSTEM‘

alter ‘socialSecurityTest‘,METHOD=>‘table_att‘,‘coprocessor‘=>‘ /user/solr/hbase/observer/HBaseCoprocessor.jar|com.hbase.coprocessor.HbaseDataSyncSolrObserver|USER

path-to-jar可以是一个完整的HDFS地址或其他Hadoop FileSystem类支持的地址。第二个协处理器使用了本地路径。

Classname定义了具体的实现类。由于JAR可能包含许多协处理器类,但只能给一张表设定一个协处理器。用户应该使用标准的java包命名规则来命名指定类。

Priority只能是SYSTEM或USER

注意:不要在协处理器定义中添加空格。解释十分严格,添加头尾或间隔字符会使整个配置条目无效。

使用${number}后缀可以改变定义中的顺序,即协处理器的加载顺序。虽然只有COPROCESSOR的前缀会被检查,但是还是推荐使用数字后缀定义顺序。

package com.hbase.coprocessor;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.log4j.Logger;

/**
* @author:FengZhen
* @create:2018年8月30日
* 协处理器
*/
public class CoprocessorCreateTest {

	private static String addr="HDP233,HDP232,HDP231";
	private static  String port="2181";
	Logger logger = Logger.getLogger(getClass());

	private static Connection connection;

	public static void getConnection(){
		Configuration conf = HBaseConfiguration.create();

		conf.set("hbase.zookeeper.quorum",addr);
		conf.set("hbase.zookeeper.property.clientPort", port);
		try {
			connection = ConnectionFactory.createConnection(conf);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	/*
	 * 关闭连接
	 *
	 */
	public static void close() {
		/**
		 * close connection
		 **/
		if (connection != null) {
			try {
				connection.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) {
		getConnection();
		try {
			HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf("test_hbase"));
			Path path = new Path("/user/hdfs/coprocessor/test.jar");
			hTableDescriptor.setValue("COPROCESSOR", path.toString() + "|" + "class" + "|" + Coprocessor.PRIORITY_SYSTEM);
			Admin admin = connection.getAdmin();
			admin.createTable(hTableDescriptor);
		} catch (TableNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
		close();
	}

}

一旦表被启用,且region被打开,框架会首先加载配置文件中定义的协处理器,然后再加载表描述符中的协处理器。

RegionObserver类

属于observer协处理器:当一个特定的region级别的操作发生时,它们的钩子函数会被触发。

这些操作可以被分为两类:region生命周期变化和客户端API调用。

1.处理region生命周期事件

这些observer可以与pending open、open和pending close状态通过钩子链接。每一个钩子都被框架隐式地调用。

状态:pending open。Region将要被打开时会处于这个状态。监听的协处理器可以搭载这个过程或阻止这个过程。

void preOpen(final ObserverContext<RegionCoprocessorEnvironment> c) throws IOException;
void postOpen(final ObserverContext<RegionCoprocessorEnvironment> c);

这些方法会在region被打开前或刚刚打开后被调用。用户可以在自己的协处理器实现中使用这两个方法,例如,使用preOpen方法告知框架这次打开操作应当被放弃,或勾住postOpen方法来触发一次缓存预热或其它一些操作。

Region经过pending open,且在打开状态之前,region服务器可能需要从WAL(Write-Ahead-logging,预写系统日志)中应用一些记录到region中,这时会触发以下方法。

void preWALRestore(final ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
      HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException;
void postWALRestore(final ObserverContext<? extends RegionCoprocessorEnvironment> ctx,
      HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException;

这些方法让用户可以细粒度地控制在WAL重做时那些修改需要被实施。用户可以访问修改记录,因此用户就可以监督那些记录被实施了。

状态:open。当一个region被部署到一个region服务器中,并可以正常工作时,这个region会被认为处于open状态。

可用的钩子函数如下

InternalScanner preFlush(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
      final InternalScanner scanner) throws IOException;
void postFlush(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
      final StoreFile resultFile) throws IOException;
InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
      final Store store, final InternalScanner scanner, final ScanType scanType,
      CompactionRequest request) throws IOException;
void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
      StoreFile resultFile, CompactionRequest request) throws IOException;
void preSplit(final ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow)
      throws IOException;
void preSplit(final ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow)
      throws IOException;

pre方法在事件执行前被调用,post方法在事件执行后被调用。例如:使用preSplit钩子函数可以有效地禁止region拆分,然后手动执行这些操作。

状态:pending close。最后一组region监听器的钩子函数可以监听pending close状态。这个状态在region状态从open到closed转变时发生。在region被关闭之前和之后,一下钩子函数将被执行。

void preClose(final ObserverContext<RegionCoprocessorEnvironment> c,
      boolean abortRequested) throws IOException;
void postClose(final ObserverContext<RegionCoprocessorEnvironment> c,
      boolean abortRequested);

abortRequested参数包含了region被关闭的原因。通常情况下,region会在正常操作中被关闭,例如,region由于负载均衡被移动到其它region服务器时被关闭。也有可能是由于region服务器被撤销,且需避免一些副作用。当这些情况发生时,所有它管理的region都会被撤销,同时用户从这个参数中可以看到是否符合这种情况。

2.处理客户端API事件

与生命周期事件相比,所有的客户端API调用都显示地从客户端应用中传输到region服务器。用户可以在这些调用执行前或刚刚执行后拦截它们。


void preGet()

void postGet()


在客户端Table.get请求之前和之后调用


void prePut()

void postPut()


在客户端Table.put请求之前和之后调用


void preDelete()

void postDelete ()


在客户端Table.delete请求之前和之后调用


boolean preCheckAndPut()

boolean postCheckAndPut ()


在客户端Table. checkAndPut请求之前和之后调用


boolean preCheckAndDelete()

boolean postCheckAndDelete ()


在客户端Table. checkAndDelete请求之前和之后调用


void preGetClosestRowBefore()

void postGetClosestRowBefore ()


在客户端Table.getClosestRowBefore请求之前和之后调用


boolean preExists()

boolean postExists ()


在客户端Table.exists请求之前和之后调用


long preIncrementColumnValue()

long postIncrementColumnValue ()


在客户端Table.incrementColumnValue请求之前和之后调用


void preIncrement()

void postIncrement ()


在客户端Table. increment请求之前和之后调用


InternalScanner preScannerOpen()

InternalScanner postScannerOpen ()


在客户端Table.getScannerOpen请求之前和之后调用


boolean preScannerNext()

void postScannerNext ()


在客户端ResultScanner.next请求之前和之后调用


void preScannerClose()

void postScannerClose ()


在客户端ResultScanner.close请求之前和之后调用

3.RegionCoprocessorEnvironment

实现RegionObserver类的协处理器环境的实例是基于RegionCoprocessorEnvironment类的,RegionCoprocessorEnvironment实现了CoprocessorEnvironment接口。

RegionCoprocessorEnvironment类提供的方法及子类方法


Region getRegion();


返回监听器监听的region的应用


RegionServerServices getRegionServerServices();


返回共享的RegionServerServices实例

getRegion方法可以用于得到目前正在管理的HRegion实例,同时可以执行这个类提供的方法。

RegionServerServices类提供的方法


boolean isStopping();


当region服务正在停止服务时,返回true


WAL getWAL(HRegionInfo regionInfo) throws IOException;


提供访问WAL实例的功能


CompactionRequestor getCompactionRequester();


提供访问共享的CompactionRequestor实例的功能,可以在协处理器内部发起合并


FlushRequester getFlushRequester();


提供访问共享的FlushRequester,可以用于发起memstore刷写


RegionServerAccounting getRegionServerAccounting();


提供访问共享RegionServerAccounting实例的功能。用户可以利用它得到当前服务进程资源的占用状态,例如当前memstore的大小


void postOpenDeployTasks(final PostOpenDeployContext context) throws KeeperException, IOException;


内部调用,在region服务器内部使用


RpcServerInterface getRpcServer();


提供访问共享RpcServerInterface实例的功能,包含当前服务端RPC统计信息

4.ObserverContext类

RegionObserver类提供的所有回调函数都需要一个特殊的上下文作为共同的参数: ObserverContext类,它不仅提供了访问当前系统环境的入口,同时也添加了一些关键功能用以通知协处理器框架在回调函数完成时需要做什么。

所有的协处理器在执行时共用一个上下文实例,并会随着环境一起变化。

ObserverContext类提供的方法


E getEnvironment()


返回当前协处理器环境的应用


void bypass()


当用户代码调用此方法时,框架将使用用户提供的值,而不使用框架通常使用的值


void complete()


通知框架后续的处理可以被跳过,剩下没有被执行的协处理器也会被跳过。这意味着当前协处理器的响应是最后的一个协处理器


boolean shouldBypass()


框架内部用来检查标志位


boolean shouldComplete()


框架内部用来检查标志位


void prepare(E env)


使用特定的环境准备上下文。这个方法只供内部使用。它被静态方法createAndPrepare使用


static <T extends CoprocessorEnvironment> ObserverContext<T> createAndPrepare(


初始化上下文的静态方法。当提供的context参数是null时,它会创建一个新实例

两个重要的方法是bypass和complete。它们会为用户的协处理器实现提供了选择,以控制框架后续行为。Complete的调用会影响后面执行的协处理器,同时bypass方法可以停止当前服务进程的处理过程。例如,可以使用e.bypass停止region的自动拆分。

public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
    e.bypass();
}

  

5.BaseRegionObserver类

这个类可以作为所有用户实现监听类型协处理器的基类。它实现了所有RegionObserver接口的空方法,所以在默认情况下继承这个类的协处理器没有任何功能。

例子:处理特殊行键

示例,协处理器代码去下

package com.hbase.coprocessor;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;

/**
* @author:FengZhen
* @create:2018年8月31日
*/
public class RegionObserverExample extends BaseRegionObserver{

	public static final byte[] FIXED_ROW = Bytes.toBytes("@@@[email protected]@@");

	@Override
	public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
			throws IOException {
		//检查请求的行键是否匹配
		if (Bytes.equals(get.getRow(), FIXED_ROW)) {
			//创建一个特殊的keyvalue,只包含当前的服务器时间。
			KeyValue keyValue = new KeyValue(get.getRow(), FIXED_ROW, FIXED_ROW, Bytes.toBytes(System.currentTimeMillis()));
			results.add(keyValue);
		}
	}
}

  

1.新建HBase表

create ‘test_coprocessor‘, ‘info‘

2.协处理器打包上传至HDFS

上传至HDFS

/data/fz/hbase/coprocessor/RegionObserverExample.jar

3.装载协处理器

hbase(main):004:0> disable ‘test_coprocessor‘

hbase(main):006:0> alter ‘test_coprocessor‘,METHOD=>‘table_att‘,‘COPROCESSOR‘=>‘hdfs://cluster1/data/fz/hbase/coprocessor/RegionObserverExample.jar|com.hbase.coprocessor.RegionObserverExample|1001‘

hbase(main):007:0> enable ‘test_coprocessor‘

  

4.查看挂载情况

hbase(main):009:0> desc ‘test_coprocessor‘

Table test_coprocessor is ENABLED                                                                                                                             

test_coprocessor, {TABLE_ATTRIBUTES => {METADATA => {‘COPROCESSOR$1‘ => ‘hdfs://cluster1/data/fz/hbase/coprocessor/RegionObserverExample.jar|com.hbase.coprocess

or.RegionObserverExample|1001‘}}                                                                                                                              

COLUMN FAMILIES DESCRIPTION                                                                                                                                   

{NAME => ‘info‘, BLOOMFILTER => ‘ROW‘, VERSIONS => ‘1‘, IN_MEMORY => ‘false‘, KEEP_DELETED_CELLS => ‘FALSE‘, DATA_BLOCK_ENCODING => ‘NONE‘, TTL => ‘FOREVER‘,

COMPRESSION => ‘NONE‘, MIN_VERSIONS => ‘0‘, BLOCKCACHE => ‘true‘, BLOCKSIZE => ‘65536‘, REPLICATION_SCOPE => ‘0‘}

5.协处理器测试

行键@@@[email protected]@@被observer的preGet捕获,然后添加当前服务器时间。

如下

hbase(main):054:0> get ‘test_coprocessor‘,‘@@@[email protected]@@‘

COLUMN                                   CELL                                                                                                                

 @@@[email protected]@@:@@@[email protected]@@             timestamp=9223372036854775807, value=\x00\x00\x01e\x8E\xC6ad                                                        

1 row(s) in 0.0450 seconds

插入一条行键为@@@[email protected]@@的数据

hbase(main):055:0> put ‘test_coprocessor‘,‘@@@[email protected]@@‘,‘info:name‘,‘nimei‘

0 row(s) in 0.0540 seconds

此时再次使用get

hbase(main):057:0> get ‘test_coprocessor‘,‘@@@[email protected]@@‘

COLUMN                                   CELL                                                                                                                

 @@@[email protected]@@:@@@[email protected]@@             timestamp=9223372036854775807, value=\x00\x00\x01e\x8E\xC7\xD3\x9B                                                  

 info:name                               timestamp=1535698764962, value=nimei                                                                                

2 row(s) in 0.0130 seconds

  

如果捕获的该行键恰巧在表中同时存在,就会出现上面的情况,为了避免这种情况,可以使用e.bypass。更新步骤如下

6.卸载已有协处理器

alter ‘test_coprocessor‘,METHOD => ‘table_att_unset‘,NAME =>‘COPROCESSOR$1‘

  

7.重新打包RegionObserverWithBypassExample上传至HDFS

代码如下

package com.hbase.coprocessor;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;

/**
* @author:FengZhen
* @create:2018年8月31日
*/
public class RegionObserverWithBypassExample extends BaseRegionObserver{

	public static final byte[] FIXED_ROW = Bytes.toBytes("@@@[email protected]@@");

	@Override
	public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
			throws IOException {
		//检查请求的行键是否匹配
		if (Bytes.equals(get.getRow(), FIXED_ROW)) {
			//创建一个特殊的keyvalue,只包含当前的服务器时间。
			KeyValue keyValue = new KeyValue(get.getRow(), FIXED_ROW, FIXED_ROW, Bytes.toBytes(System.currentTimeMillis()));
			results.add(keyValue);
			//一旦特殊的keyvalue被添加,之后的操作都会被跳过
			e.bypass();
		}
	}
}

 

8.挂载新的协处理器

disable ‘test_coprocessor‘

hbase(main):034:0> alter ‘test_coprocessor‘,METHOD=>‘table_att‘,‘COPROCESSOR‘=>‘hdfs://cluster1/data/fz/hbase/coprocessor/RegionObserverWithBypassExample.jar|com.hbase.coprocessor.RegionObserverWithBypassExample|1001‘

enable ‘test_coprocessor‘

  

9.查看挂载情况

hbase(main):036:0> desc ‘test_coprocessor‘

Table test_coprocessor is ENABLED                                                                                                                            

test_coprocessor, {TABLE_ATTRIBUTES => {METADATA => {‘COPROCESSOR$1‘ => ‘hdfs://cluster1/data/fz/hbase/coprocessor/RegionObserverWithBypassExample.jar|com.hbase.c

oprocessor.RegionObserverWithBypassExample|1001‘}}                                                                                                           

COLUMN FAMILIES DESCRIPTION                                                                                                                                  

{NAME => ‘info‘, BLOOMFILTER => ‘ROW‘, VERSIONS => ‘1‘, IN_MEMORY => ‘false‘, KEEP_DELETED_CELLS => ‘FALSE‘, DATA_BLOCK_ENCODING => ‘NONE‘, TTL => ‘FOREVER‘,

COMPRESSION => ‘NONE‘, MIN_VERSIONS => ‘0‘, BLOCKCACHE => ‘true‘, BLOCKSIZE => ‘65536‘, REPLICATION_SCOPE => ‘0‘}                                            

1 row(s) in 0.0160 seconds

10.测试

hbase(main):037:0> get ‘test_coprocessor‘,‘@@@[email protected]@@‘

COLUMN                                   CELL                                                                                                                

 @@@[email protected]@@:@@@[email protected]@@             timestamp=9223372036854775807, value=\x00\x00\x01e\x8E\xDA\xDB\xC5                                                  

1 row(s) in 0.0210 seconds

成功。

由于默认的get操作被跳过,只有人工添加的一列被返回,并且是返回的唯一一列数据。可以注意到返回列的时间戳是9223372036854775807,这个只是Long.MAX_VALUE预计得到的值。因为示例代码创建keyvalue时并没有指定时间戳,所以被默认设为HConstants.LATEST_TIMESTAMP,即Long.MAX_VALUE..

MasterObserver类

为了处理master服务器的所有回调函数。与关系型数据库中DDL类似,它们可以被归类到数据处理操作中。

1.MasterCoprocessorEnvironment类

MasterCoprocessorEnvironment封装了一个MasterObserver实例,它同样实现了CoprocessorEnvironment接口,因此它能提供getTable之类的方法帮助用户在自己的实现中访问数据。

MasterCoprocessorEnvironment类提供的非继承方法

MasterServices getMasterServices();提供可访问的共享MasterServices实例

MasterServices类提供的方法


AssignmentManager getAssignmentManager();


使用户可以访问AssignmentManager实例,它负责为所有的region分操作,例如分配、卸载和负载均衡等


MasterFileSystem getMasterFileSystem();


提供一个与master操作相关的文件系统抽象层,例如,创建表或日志文件的目录


ServerManager getServerManager();


返回ServerManager实例。它可以访问所有的服务器进程,无论进程处于存活、死亡或其它状态


ExecutorService getExecutorService();


执行服务被master用来调度系统级事件


void checkTableModifiable(final TableName tableName)


检查表是否已经存在以及是否已经离线,如果是就可以修改它

2.BaseMasterObserver类

用户可以直接实现MasterObserver接口,或扩展BaseMasterObserver类来实现自己的功能。BaseMasterObserver为接口的每个方法完成一个空的实现。不做任何改变直接使用这个类不会有任何反馈。

例子:创建新表时创建一个单独的目录

1.打成jar包MasterObserverExample.jar

2.把该jar包添加到hbase-env.sh的HBASE_CLASSPATH(/usr/hdp/2.6.1.0-129/hadoop/lib)中,region服务器在JRE中可以加载到这个类。Hbase-site.xml中添加以下配置

<property>

    <name>hbase.coprocessor.master.classer</name>

    <value>com.hbase.coprocessor.MasterObserverExample</value>

</property>

3.重启HBase

4.测试:

新建一张表

hbase(main):011:0> create ‘test_master_observer‘,‘f1‘

0 row(s) in 2.6390 seconds

=> Hbase::Table - test_master_observer

  

查看HDFS路径下是否有其对应的文件夹

[[email protected] lib]# hadoop fs -ls /user/hdfs/hbase/processor/

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/usr/hdp/2.6.1.0-129/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/usr/hdp/2.6.1.0-129/hadoop-yarn/ProcessLog-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

Found 1 items

drwxr-xr-x   - hbase hdfs          0 2018-09-03 09:52 /user/hdfs/hbase/processor/test_master_observer-blobs

成功。

原文地址:https://www.cnblogs.com/EnzoDin/p/9577561.html

时间: 2024-11-05 21:43:12

HBase-协处理器详解及实现的相关文章

HBase 协处理器详解

一.简述 在使用HBase时,如果你的数据量达到了数十亿行或数百万列,此时能否在查询中返回大量数据将受制于网络的带宽,即便网络状况允许,但是客户端的计算处理也未必能够满足要求.在这种情况下,协处理器(Coprocessors)应运而生.它允许你将业务计算代码放入在RegionServer的协处理器中,将处理好的数据再返回给客户端,这可以极大地降低需要传输的数据量,从而获得性能上的提升.同时协处理器也允许用户扩展实现HBase目前所不具备的功能,如权限校验.二级索引.完整性约束等. 二.协处理器类

HBase RegionServer详解(未完)

HBase RegionServer详解 RegionServer组件介绍 RegionServer是HBase集群运行在每个工作节点上的服务.它是整个HBase系统的关键所在,一方面它维护了Region的状态,提供了对于Region的管理和服务:另一方面,它与Master交互,参与Master的分布式协调管理. MemStoreFlusher MemStoreFlusher主要功能是将MemStore刷新到文件中,当满足一下条件时会出发MemStore执行flush操作,最小的flush单元是

Mac下安装HBase及详解

Mac下安装HBase及详解 1. 千篇一律的HBase简介 HBase是Hadoop的数据库, 而Hive数据库的管理工具, HBase具有分布式, 可扩展及面向列存储的特点(基于谷歌BigTable). HBase可以使用本地文件系统和HDFS文件存储系统, 存储的是松散的数据(key-value的映射关系). HBase位于HDFS的上层, 向下提供存储, 向上提供运算 2. HBase安装 HBase有单机, 伪分布式, 全分布式运行模式 依赖: 匹配HBase的Hadoop版本 Jav

Hbase存储详解

Hbase存储详解 started by chad walters and jim 2006.11 G release paper on BigTable 2007.2 inital Hbase prototype created as Hadoop contrib 2007.10 First useable Hbase 2008.1 Hadoop become Apache top-level project and Hbase becomes subproject 2008.10 Hbase

大数据学习系列之五 ----- Hive整合HBase图文详解

引言 在上一篇 大数据学习系列之四 ----- Hadoop+Hive环境搭建图文详解(单机) 和之前的大数据学习系列之二 ----- HBase环境搭建(单机) 中成功搭建了Hive和HBase的环境,并进行了相应的测试.本文主要讲的是如何将Hive和HBase进行整合. Hive和HBase的通信意图 Hive与HBase整合的实现是利用两者本身对外的API接口互相通信来完成的,其具体工作交由Hive的lib目录中的hive-hbase-handler-*.jar工具类来实现,通信原理如下图

Hbase安装详解

一.简介 HBase – Hadoop Database,是一个高可靠性.高性能.面向列.可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建起大规模结构化存储集群.HBase利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理HBase中的海量数据,利用Zookeeper作为协调工具. 1.HBase(NoSQL)的数据模型 1)        表(table),是存储管理数据的. 2)        行键(row key),类似于My

NoSQL选型及HBase案例详解(转)

从 NOSQL的类型到 常用的产品,我们已经做过很多关于NoSQL的文章,今天我们从国内著名的互联网公司及科研机构的实战谈一下NoSQL数据库. NoSQL一定程度上是基于一个很重要的原理—— CAP原理提出来的.传统的SQL数据库(关系型数据库)都具有ACID属性,对一致性要求很高,因此降低了A(availability)和P(partion tolerance).为了提高系统性能和可扩展性,必须牺牲C(consistency). 依据CAP理论,从应用的需求不同,数据库的选择可从三方面考虑:

HBase配置项详解

hbase.tmp.dir:本地文件系统的临时目录,默认是java.io.tmpdir/hbase−java.io.tmpdir/hbase−{user.name}: hbase.rootdir:hbase持久化的目录,被所有regionserver共享,默认${hbase.tmp.dir}/hbase,一般设置为hdfs://namenode.example.org:9000/hbase类似,带全限定名: hbase.cluster.distributed:hbase集群模式运作与否的标志,默

Sqoop import加载HBase案例详解

简单写一下如何将订单表sqoop到hbase表中的步骤. 下表: 1.通过hbase shell 打开hbase. 2.创建一个hbase表 create 'so','o' 3.将so表的数据导入到hbase中. opt文件: --connect:数据库 --username :数据库用户名 --password :数据库密码 --table :需要sqoop的表 --columns :表中的列 --hbase-table:hbase中的table --column-family:列族 --hb

暑期——第八周总结(Hbase命令详解)

所花时间:7天 代码行:800(Java) 博客量:1篇 了解到知识点 : 看了hbase的操作,因为老师开学要实现用hbase实现网页版增删改查,根据林子雨老师的教程,很显然是有点差强人意,后来我又在网上找了个例子,运行了时候还是不能用 ,目前正在找原因,下面来分享下看到的hbase命令总结, 很感谢这篇博文: https://www.cnblogs.com/fenghuoliancheng/p/10690022.html 名称 命令表达式 创建表 create '表名', '列族名1','列