flume 测试 hive sink

测试flume,将数据送到hive表中,首先建表。

create table order_flume(
order_id string,
user_id string,
eval_set string,
order_number string,
order_dow string,
order_hour_of_day string,
days_since_prior_order string)
clustered by (order_id) into 5 buckets
stored as orc; # 记得加 orc,不然后面会出错

  flume conf 配置如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /root/code/flume_exec_test.txt

# Describe the sink
a1.sinks.k1.type = hive
a1.sinks.k1.hive.metastore = thrift://master:9083
a1.sinks.k1.hive.database = hw
a1.sinks.k1.hive.table = order_flume
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = ","
a1.sinks.k1.serializer.serdeSeparator = ‘,‘
a1.sinks.k1.serializer.fieldnames = order_id,user_id,eval_set,order_number,order_dow,order_hour_of_day,days_since_prior_order

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000 # 设置大一点,默认是1000
a1.channels.c1.transactionCapacity = 1000 # 设置大一点,默认是100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  这个时候如果启动flume的话会报错,需要将hive中的jar包移动到flume 中。

/usr/local/src/apache-hive-1.2.2-bin/hcatalog/share/hcatalog/* $FLUME_HOME/lib

/usr/local/src/apache-hive-1.2.2-bin/lib/* $FLUME_HOME/lib

  此时,在修改修改 hive-site.xml,将下面的值进行修改。

      <property>
		<name>hive.support.concurrency</name>
		<value>true</value>
	</property>
	<property>
		<name>hive.exec.dynamic.partition.mode</name>
		<value>nonstrict</value>
	</property>
	<property>
		<name>hive.txn.manager</name>
		<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
	</property>
	<property>
		<name>hive.compactor.initiator.on</name>
		<value>true</value>
	</property>
	<property>
		<name>hive.compactor.worker.threads</name>
		<value>1</value>
	</property>

  上面的配置完成之后,先启动 hive metastore,在启动 flume。

# 一定要重启
# 先重启 mysql
# 再重启 hadoop
# 再启动 hive metstore
# 再启动 flume

service mysql restart

start-dfs.sh
start-yarn.sh

hive --service metastore

flume-ng agent -c conf -f conf/hive.conf -n a1 -Dflume.root.logger=INFO,console

  一开始调试的时候,配置的都是对的,无论怎么跑都是连不上,都提示失败。

org.apache.flume.sink.hive.HiveWriter$ConnectException: Failed connecting to EndPoint {metaStoreUri=‘thrift://master:9083‘, database=‘hw‘, table=‘order_flume‘, partitionVals=[] }
	at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:99)
	at org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:343)
	at org.apache.flume.sink.hive.HiveSink.drainOneBatch(HiveSink.java:295)
	at org.apache.flume.sink.hive.HiveSink.process(HiveSink.java:253)
	at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
	at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flume.sink.hive.HiveWriter$TxnBatchException: Failed acquiring Transaction Batch from EndPoint: {metaStoreUri=‘thrift://master:9083‘, database=‘hw‘, table=‘oreder_flume‘, partitionVals=[] }
	at org.apache.flume.sink.hive.HiveWriter.nextTxnBatch(HiveWriter.java:400)
	at org.apache.flume.sink.hive.HiveWriter.<init>(HiveWriter.java:90)
	... 6 more
Caused by: org.apache.hive.hcatalog.streaming.TransactionBatchUnAvailable: Unable to acquire transaction batch on end point: {metaStoreUri=‘thrift://master:9083‘, database=‘hw‘, table=‘order_flume‘, partitionVals=[] }
	at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.<init>(HiveEndPoint.java:514)
	at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.<init>(HiveEndPoint.java:464)
	at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.fetchTransactionBatchImpl(HiveEndPoint.java:351)
	at org.apache.hive.hcatalog.streaming.HiveEndPoint$ConnectionImpl.fetchTransactionBatch(HiveEndPoint.java:331)
	at org.apache.flume.sink.hive.HiveWriter$9.call(HiveWriter.java:395)
	at org.apache.flume.sink.hive.HiveWriter$9.call(HiveWriter.java:392)
	at org.apache.flume.sink.hive.HiveWriter$11.call(HiveWriter.java:428)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.thrift.TApplicationException: Internal error processing open_txns
	at org.apache.thrift.TApplicationException.read(TApplicationException.java:111)
	at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:79)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_open_txns(ThriftHiveMetastore.java:4195)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.open_txns(ThriftHiveMetastore.java:4182)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.openTxns(HiveMetaStoreClient.java:1988)
	at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.openTxnImpl(HiveEndPoint.java:523)
	at org.apache.hive.hcatalog.streaming.HiveEndPoint$TransactionBatchImpl.<init>(HiveEndPoint.java:507)
	... 10 more

  重启了几次还是同样的错误,后面尝试先启动 hive,去看一下其中表的情况,发现启动hive失败。

FAILED: LockException [Error 10280]: Error communicating with the metastore

  再次尝试重启 metastore,在尝试启动 hive,一开始尝试2次,发现还是同样的错误,结果过了几分钟,再次重新启动后,发现它自己好了,一直没有弄明白为什么,可能是之前没重启成功,重新编辑一下 hive-site.xml文件,在次保存退出,在重启可能会好。

最终flume 链接 hive 成功,信息如下。

2019-07-20 12:26:28,968 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hive.HiveSink.getOrCreateWriter(HiveSink.java:342)] k1: Creating Writer to Hive end point : {metaStoreUri=‘thrift://master:9083‘, database=‘hw‘, table=‘order_flume‘, partitionVals=[] }
2019-07-20 12:26:30,747 (hive-k1-call-runner-0) [INFO - org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:377)] Trying to connect to metastore with URI thrift://master:9083
2019-07-20 12:26:30,865 (hive-k1-call-runner-0) [INFO - org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:473)] Connected to metastore.
2019-07-20 12:26:31,494 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:377)] Trying to connect to metastore with URI thrift://master:9083
2019-07-20 12:26:31,516 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:473)] Connected to metastore.
2019-07-20 12:26:35,677 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hive.HiveWriter.nextTxnBatch(HiveWriter.java:335)] Acquired Txn Batch TxnIds=[1...100] on endPoint = {metaStoreUri=‘thrift://master:9083‘, database=‘hw‘, table=‘order_flume‘, partitionVals=[] }. Switching to first txn
2019-07-20 12:26:38,808 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hive.HiveWriter.commitTxn(HiveWriter.java:275)] Committing Txn 1 on EndPoint: {metaStoreUri=‘thrift://master:9083‘, database=‘hw‘, table=‘order_flume‘, partitionVals=[] }

  

原文地址:https://www.cnblogs.com/hanwen1014/p/11217385.html

时间: 2024-10-09 00:57:09

flume 测试 hive sink的相关文章

flume 监控hive日志文件

flume 监控hive 日志文件 一: flume 监控hive的日志 1.1 案例需求: 1. 实时监控某个日志文件,将数据收集到存储hdfs 上面, 此案例使用exec source ,实时监控文件数据,使用Memory Channel 缓存数据,使用HDFS Sink 写入数据 2. 此案例实时监控hive 日志文件,放到hdfs 目录当中. hive 的日志目录是 hive.log.dir = /home/hadoop/yangyang/hive/logs 1.2 在hdfs 上面创建

Flume 测试 Kafka 案例

Flume Kafka 测试案例,Flume 的配置. a1.sources = s1 a1.channels = c1 a1.sinks = k1 a1.sources.s1.type = netcat a1.sources.s1.bind = master a1.sources.s1.port = 44444 a1.channels.c1.type = memory a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sin

flume 自定义 hbase sink 类

参考(向原作者致敬) http://ydt619.blog.51cto.com/316163/1230586 https://blogs.apache.org/flume/entry/streaming_data_into_apache_hbase flume 1.5 的配置文件示例 #Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the

Flume+Hadoop+Hive的离线分析系统基本架构

PS:历史原因作者账号名为:ymh198816,但事实上作者的生日并不是1988年1月6日 最近在学习大数据的离线分析技术,所以在这里通过做一个简单的网站点击流数据分析离线系统来和大家一起梳理一下离线分析系统的架构模型.当然这个架构模型只能是离线分析技术的一个简单的入门级架构,实际生产环境中的大数据离线分析技术还涉及到很多细节的处理和高可用的架构.这篇文章的目的只是带大家入个门,让大家对离线分析技术有一个简单的认识,并和大家一起做学习交流. 离线分析系统的结构图 整个离线分析的总体架构就是使用F

[flume] channel 和 sink

上周把安卓日志手机的客户端sdk完成跑通,这周开始调试日志服务器端. 使用flume进行日志收集,然后转kafka.在测试的时候总是发现漏掉一些event,后来才知道对 channel 和 sink 的使用有误.当多个sink使用同一个channel时,event是会分流共同消耗的,而不是每个sink都复制.最后,改成多个channel,每个channel对应一个sink的模式. 有篇不错的博客,http://shiyanjun.cn/archives/915.html

Hadoop实战-Flume之Hdfs Sink(十)

a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink #a1.sinks.k1.type = logger a1.sinks.k1.type = hdfs a1.sinks.k1.hd

Hadoop实战-Flume之自定义Sink(十九)

import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeli

关于centos环境下Flume+hadoop+hive常用命令

centos命令 进入root用户su root 1. 复制 2.解压tar.gztar zxvf xxx.tar.gz 3.文件操作 -创建文件夹mkdir mkdir /usr/ mkdir 文件名 -移动文件mv [options] 源文件或目录 目标文件或目录 —删除一个文件rm —删除一个文件夹 rm /home/test rm -r /home/test —文件赋权限cmod 1.txt 4.修改文件夹权限chown -R Hadoop.Hadoop /增加可执行读写权限chmod

【Flume】自定义sink kafka,并编译打包jar,unapproval license的问题解决

如图所示,新建一个JAVA工程,编辑pom文件,pom文件内容如下[这里取出了parent]: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLoca