【Flume】flume输出sink到hbase的实现

flume 1.5.2

hbase 0.98.9

hadoop 2.6

zk 3.4.6

以上是基础的软件及对应版本,请先确认以上软件安装成功!

1、添加jar包支持

将hbase的lib下的这些jar包拷贝到flume的lib下

2、配置flume

注意看以上的serializer配置,采用的是官方的RegexHbaseEventSerializer,

当然还有一个SimpleHbaseEventSerializer

如果你使用了SimpleHbaseEventSerializer

就会出现如下的错误

[2015-03-04 09:35:41,244] [SinkRunner-PollingRunner-DefaultSinkProcessor:5672] [ERROR] Failed to commit transaction.Transaction rolled back.
java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Increment.setWriteToWAL(Z)Lorg/apache/hadoop/hbase/client/Increment;
        at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:408)
        at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:391)
        at org.apache.flume.sink.hbase.HBaseSink.runPrivileged(HBaseSink.java:427)
        at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:391)
        at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:344)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
[2015-03-04 09:35:41,249] [SinkRunner-PollingRunner-DefaultSinkProcessor:5677] [ERROR] Failed to commit transaction.Transaction rolled back.
java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Increment.setWriteToWAL(Z)Lorg/apache/hadoop/hbase/client/Increment;
        at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:408)
        at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:391)
        at org.apache.flume.sink.hbase.HBaseSink.runPrivileged(HBaseSink.java:427)
        at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:391)
        at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:344)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Increment.setWriteToWAL(Z)Lorg/apache/hadoop/hbase/client/Increment;
        at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:408)
        at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:391)
        at org.apache.flume.sink.hbase.HBaseSink.runPrivileged(HBaseSink.java:427)
        at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:391)
        at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:344)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)

所以不要使用该序列化器

当然这个序列化器应该自己来开发的,因为列族,列修饰符什么的,应该由我们自己定义的

3、测试

这里注意,你必须提前在hbase将表创建好

hbase(main):004:0> create 'flume','chiwei'
0 row(s) in 1.0770 seconds

=> Hbase::Table - flume

启动flume

待监测的文件内容为

Hello Flume
Hello HBase
Hello Hadoop
Hello Zk
Hello chiwei

再来看一下hbase的表数据

hbase(main):006:0> scan 'flume'
ROW                                              COLUMN+CELL
 1425437884264-rd3B0VE5Uz-0                      column=chiwei:payload, timestamp=1425437884520, value=Hello Flume
 1425437884275-rd3B0VE5Uz-1                      column=chiwei:payload, timestamp=1425437884520, value=Hello HBase
 1425437884276-rd3B0VE5Uz-2                      column=chiwei:payload, timestamp=1425437884520, value=Hello Hadoop
 1425437884277-rd3B0VE5Uz-3                      column=chiwei:payload, timestamp=1425437884520, value=Hello Zk
 1425437884278-rd3B0VE5Uz-4                      column=chiwei:payload, timestamp=1425437884520, value=Hello chiwei
 1425437884624-rd3B0VE5Uz-5                      column=chiwei:payload, timestamp=1425437884639, value=Hello Flume
 1425437884626-rd3B0VE5Uz-6                      column=chiwei:payload, timestamp=1425437884639, value=Hello HBase
 1425437884628-rd3B0VE5Uz-7                      column=chiwei:payload, timestamp=1425437884639, value=Hello Hadoop
 1425437884629-rd3B0VE5Uz-8                      column=chiwei:payload, timestamp=1425437884639, value=Hello Zk
 1425437884630-rd3B0VE5Uz-9                      column=chiwei:payload, timestamp=1425437884639, value=Hello chiwei
10 row(s) in 0.4020 seconds

hbase(main):007:0> 

测试成功!

时间: 2024-08-27 18:32:43

【Flume】flume输出sink到hbase的实现的相关文章

使用flume将数据sink到HBase

===========>先创建Hbase表和列族<================案例1:源数据一行对应Hbase的一列存储(hbase-1.12没有问题)================================================================================#说明:案例是flume监听目录/home/hadoop/flume_hbase采集到hbase:必须先在Hbase中创建表和列族 数据目录:vi /home/hadoop/flum

使用flume将kafka数据sink到HBase【转】

1. hbase sink介绍 1.1 HbaseSink 1.2 AsyncHbaseSink 2. 配置flume 3. 运行测试flume 4. 使用RegexHbaseEventSerializer来处理些HBASE的值 5. 效率测试 1. hbase sink介绍 如果还不了解flume请查看我写的其他flume下的博客. 接下来的内容主要来自flume官方文档的学习. 顺便也强烈推荐flume 1.6 官方API hbase的sink主要有以下两种.两种方式都提供和HBASE一样的

Hadoop实战-Flume之Hdfs Sink(十)

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink #a1.sinks.k1.type = logger a1.sinks.k1.type = hdfs a1.sinks.k1.hd

Hadoop实战-Flume之自定义Sink(十九)

import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeli

[flume] channel 和 sink

上周把安卓日志手机的客户端sdk完成跑通,这周开始调试日志服务器端. 使用flume进行日志收集,然后转kafka.在测试的时候总是发现漏掉一些event,后来才知道对 channel 和 sink 的使用有误.当多个sink使用同一个channel时,event是会分流共同消耗的,而不是每个sink都复制.最后,改成多个channel,每个channel对应一个sink的模式. 有篇不错的博客,http://shiyanjun.cn/archives/915.html

【Flume】自定义sink kafka,并编译打包jar,unapproval license的问题解决

如图所示,新建一个JAVA工程,编辑pom文件,pom文件内容如下[这里取出了parent]: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLoca

【Flume】Rpc sink XX closing Rpc client:NettyAvroRpcClient {xx} …… Failed to send events 问题解决

从以上截图信息,就可以看出问题了,服务端和客户端连接信息对不上,服务端有很多ESTABLISHED的连接,其实是无用的.这种情况,起初,我也很奇怪,没有发现原因,只能查看日志了. 通过日志信息,发现出现了异常,但是很奇怪,在异常信息之前,有一句Rpc sink {} closing Rpc client: {} 这里destroyConnection了,摧毁了一个连接,好端端的为什么会摧毁连接呢,从flume源码来看,flume自身不会出现这种低端的BUG吧,好端端,摧毁自己的连接干啥,所以从f

flume中自定义sink InterCeptor

SinkProcessor: ============================ FailOver: Load balancing : //负载均衡处理器 //round_robin 轮询 1-2-3-1-2-3-... //random 随机 1-3-2-3-1-... 1.round_robin 轮询 1-2-3-1-2-3-... 2.random 随机: 自定义Sink && InterCeptor ======================================

flume 测试 hive sink

测试flume,将数据送到hive表中,首先建表. create table order_flume( order_id string, user_id string, eval_set string, order_number string, order_dow string, order_hour_of_day string, days_since_prior_order string) clustered by (order_id) into 5 buckets stored as orc