Flume-NG源码阅读之HBaseSink

  关于HBase的sink的所有内容均在org.apache.flume.sink.hbase包下。

  每个sink包括自己定制的,都extends AbstractSink implements Configurable。

  一、首先是configure(Context context)方法。该方法是对HBaseSink的参数初始化。主要包括以下几个:

  tableName:要写入的HBase数据表名,不能为空;

  columnFamily:数据表对应的列簇名,这个sink目前只支持一个列簇,不能为空;

  batchSize:每次事务可以处理的最大Event数量,默认是100;

  eventSerializerType:用来将event写入HBase,即将event转化为put。默认是org.apache.flume.sink.hbase.SimpleHbaseEventSerializer,还有一个是RegexHbaseEventSerializer,即适合HBaseSink的Serializer只有这俩,否则自己定制;

  serializerContext:是eventSerializerType的配置信息,就是配置文件中包含“serializer.”的项;

  kerberosKeytab和kerberosPrincipal是用来做访问控制的,默认都为空,即不设置。

  并生成eventSerializerType对应的实例并加以配置,两个Serializer各有不同的用途主要是一个只能写一列,一个可以写多列: 

1 Class<? extends HbaseEventSerializer> clazz =
2 (Class<? extends HbaseEventSerializer>)
3 Class.forName(eventSerializerType);
4 serializer = clazz.newInstance();
5 serializer.configure(serializerContext); //配置序列化组件,先配置。默认是SimpleHbaseEventSerializer

  1、SimpleHbaseEventSerializer.configure(Context
context):此Serializer只能将数据写入一列


 1   public void configure(Context context) {
2 rowPrefix = context.getString("rowPrefix", "default");  //获取RowKey的前缀,固定的部分,默认前缀是default
3 incrementRow =
4 context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);//获取计数器对应的行键
5 String suffix = context.getString("suffix", "uuid");  //rowkey的类型(可以指定的有四种uuid/random/timestamp/nano),默认是uuid
6
7 String payloadColumn = context.getString("payloadColumn");  //要写入HBase的列名
8 String incColumn = context.getString("incrementColumn");  //计数器对应的列
9 if(payloadColumn != null && !payloadColumn.isEmpty()) {  //根据suffix决定rowkey类型
10 if(suffix.equals("timestamp")){
11 keyType = KeyType.TS;
12 } else if (suffix.equals("random")) {
13 keyType = KeyType.RANDOM;
14 } else if(suffix.equals("nano")){
15 keyType = KeyType.TSNANO;
16 } else {
17 keyType = KeyType.UUID;
18 }
19 plCol = payloadColumn.getBytes(Charsets.UTF_8);  //列名
20 }
21 if(incColumn != null && !incColumn.isEmpty()) {  //存在计数器列
22 incCol = incColumn.getBytes(Charsets.UTF_8);
23 }
24 }

  2、RegexHbaseEventSerializer.configure(Context
context):此Serializer根据正则可以写入多列


  public void configure(Context context) {
String regex = context.getString(REGEX_CONFIG, REGEX_DEFAULT);  //获取配置文件中的正则表达式,默认是“(.*)”
regexIgnoreCase = context.getBoolean(IGNORE_CASE_CONFIG,
INGORE_CASE_DEFAULT);  //是否忽略大小写
inputPattern = Pattern.compile(regex, Pattern.DOTALL
+ (regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0));  //将给定的正则表达式编译到具有给定标志的模式中

String colNameStr = context.getString(COL_NAME_CONFIG, COLUMN_NAME_DEFAULT);  //获取配置文件中的列名s
String[] columnNames = colNameStr.split(",");  //分割列名获得列名数组
for (String s: columnNames) {
colNames.add(s.getBytes(Charsets.UTF_8));
}
}

  二、start()方法。该方法首先会构造一个HTable对象,并table.setAutoFlush(false)来激活缓冲区(默认大小时2MB),随后的是一些检查。

  三、然后是process()方法用来从channel中take数据,serializer之后写入HBase。


 1 public Status process() throws EventDeliveryException {
2 Status status = Status.READY;
3 Channel channel = getChannel();
4 Transaction txn = channel.getTransaction();
5 List<Row> actions = new LinkedList<Row>();
6 List<Increment> incs = new LinkedList<Increment>();
7 txn.begin();
8 for(long i = 0; i < batchSize; i++) {
9 Event event = channel.take();
10 if(event == null){
11 status = Status.BACKOFF;
12 counterGroup.incrementAndGet("channel.underflow");
13 break;
14 } else {
15 serializer.initialize(event, columnFamily);
16 actions.addAll(serializer.getActions());
17 incs.addAll(serializer.getIncrements());
18 }
19 }
20 putEventsAndCommit(actions, incs, txn);
21 return status;
22 }

  1、actions和incs是要写入HBase的数据,actions对应的是数据;incs对应的是计数器。

  2、serializer.initialize(event,
columnFamily),两个Serializer的initialize目的一样:

1 public void initialize(Event event, byte[] columnFamily) {
2 this.payload = event.getBody();  //获取要处理的数据
3 this.cf = columnFamily;    //获取要写入的列簇
4 }

  3、serializer.getActions()

  SimpleHbaseEventSerializer.getActions()方法会根据configure(Context
context)中设置的RowKey类型先获取rowkey,可以是毫秒时间戳、随机数、纳秒时间戳以及UUID128位数四种类型。然后构造一个Put对象,将(列簇,列名,数据)添加进这个Put,返回List<Row>
actions。

  RegexHbaseEventSerializer.getActions()方法,首先会做一些判断匹配成功否?匹配出的个数和指定的列数相同否?,然后是获取rowkey,这里的rowkey是[time
in millis]-[random key]-[nonce]三部分组成的字符串。剩下的是依次匹配列组成Put,返回List<Row>
actions。

  4、serializer.getIncrements()

  SimpleHbaseEventSerializer.getIncrements()如果配置文件中配置了incrementColumn,就添加相应的计数器,否则返回一个没有数据的List<Increment>。

  RegexHbaseEventSerializer.getIncrements()直接返回一个没有数据的List<Increment>,即不设置计数器。

  5、putEventsAndCommit(actions, incs,
txn)方法。首先会table.batch(actions)提交List<Put>;然后是计数器table.increment(i);txn.commit()提交事务;如有异常txn.rollback()回滚;txn.close()事务关闭。

  四、stop()方法。table.close();table = null;

  有两个问题撒:

  1、我们在开发HBase程序的时候总是要指定“hbase.zookeeper.quorum”对应的zookeeper地址的,但是看完HBaseSink也没发现设置的地方,是不是在HBase集群中的任意节点都不需要设置,除非在集群外节点才设置?

  2、还有在使用时发现放在安装有zookeeper的节点上运行flume报错,删除zookeeper后运行正常,没安装zookeeper的节点上运行正常,这是为什么??

  希望知道的可以解答哈。。。HBaseSink也比较简单。。。后续还有更多源码解读!敬请期待!!

Flume-NG源码阅读之HBaseSink,布布扣,bubuko.com

时间: 2024-10-16 01:49:21

Flume-NG源码阅读之HBaseSink的相关文章

Flume NG源码分析(五)使用ThriftSource通过RPC方式收集日志

上一篇说了利用ExecSource从本地日志文件异步的收集日志,这篇说说采用RPC方式同步收集日志的方式.笔者对Thrift比较熟悉,所以用ThriftSource来介绍RPC的日志收集方式. 整体的结构图如下: 1. ThriftSource包含了一个Thrift Server,以及一个Thrift Service服务的实现.这里的Thrift Service是由ThriftSourceProtocol定义 2. 应用程序调用Thrift Service的客户端,以RPC的方式将日志发送到Th

Flume NG源码分析(三)使用Event接口表示数据流

Flume NG有4个主要的组件: Event表示在Flume各个Agent之间传递的数据流 Source表示从外部源接收Event数据流,然后传递给Channel Channel表示对从Source传递的Event数据流的临时存储 Sink表示从Channel中接收存储的Event数据流,并传递给下游的Source或者终点仓库 这篇看一下Event接口表示的数据流.Source, Channel, Sink操作的数据流都是基于Event接口的封装. public interface Event

【Java】【Flume】Flume-NG源码阅读之AvroSink

org.apache.flume.sink.AvroSink是用来通过网络来传输数据的,可以将event发送到RPC服务器(比如AvroSource),使用AvroSink和AvroSource可以组成分层结构.它继承自AbstractRpcSink  extends AbstractSink implements Configurable这跟其他的sink一样都得extends AbstractSink implements Configurable,所以重点也在confgure.start.

Flume 源码阅读

Flume架构 主要由3个组件,分别是Source,Channel和Sink,3个组件组成Event在Flume中得数据流向或者说流水线,功能可以由Flume的介绍看出:When a Flume source receives an event, it stores it into one or more channels. The channel is a passive store that keeps the event until it’s consumed by a Flume sin

Flume-NG源码阅读之FileChannel

FileChannel是flume一个非常重要的channel组件,非常常用.这个channel非常复杂,涉及的文件更多涉及三个包:org.apache.flume.channel.file.org.apache.flume.channel.file.encryption(加密).org.apache.flume.channel.file.proto共计40个源码文件. 一.configure(Context context)方法: 1.首先获取配置文件中的checkpointDir和dataD

Flume-NG源码阅读之AvroSink

org.apache.flume.sink.AvroSink是用来通过网络来传输数据的,可以将event发送到RPC服务器(比如AvroSource),使用AvroSink和AvroSource可以组成分层结构.它继承自AbstractRpcSink  extends AbstractSink implements Configurable这跟其他的sink一样都得extends AbstractSink implements Configurable,所以重点也在confgure.start.

【Java】【Fulme】Flume-NG源码阅读之SpoolDirectorySource

org.apache.flume.source.SpoolDirectorySource是flume的一个常用的source,这个源支持从磁盘中某文件夹获取文件数据.不同于其他异步源,这个源能够避免重启或者发送失败后数据丢失.flume可以监控文件夹,当出现新文件时会读取该文件并获取数据.当一个给定的文件被全部读入到通道中时,该文件会被重命名以标志已经完成.同时,该源需要一个清理进程来定期移除完成的文件. 通道可选地将一个完成路径的原始文件插入到每个事件的hearder域中.在读取文件时,sou

Spark2.1内部原理剖析与源码阅读、程序设计与企业级应用案例视频教程

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

CI框架源码阅读笔记3 全局函数Common.php

从本篇开始,将深入CI框架的内部,一步步去探索这个框架的实现.结构和设计. Common.php文件定义了一系列的全局函数(一般来说,全局函数具有最高的加载优先权,因此大多数的框架中BootStrap引导文件都会最先引入全局函数,以便于之后的处理工作). 打开Common.php中,第一行代码就非常诡异: if ( ! defined('BASEPATH')) exit('No direct script access allowed'); 上一篇(CI框架源码阅读笔记2 一切的入口 index