【Apache Flume系列】Flume-ng案例分享及source编码格式问题

转载请注明源地址:http://blog.csdn.net/weijonathan/article/details/41749151

最近忙于在整一个客户的流式抽取的方案,结果遇到了很多问题;主要还是编码问题;先说下场景

场景:

用户生成每一个小时的开始生成一个日志文件,不停的往日志文件中写入。而我这块则是实时读取客户的日志文件然后解析入库;

这里我们选择的方案还是以前的由flume来读取;然后写入kafka,最后到storm中进行解析到最后入库;

这一个流程方案大家应该都比较熟悉了。也不用我在赘述;不清楚的可以看我的博客文章:http://blog.csdn.net/weijonathan/article/details/18301321

问题1:

如何做到实时抽取文件数据以及文件变更?

分析:

我们知道flume exec是通过tail命令监控一个文件的日志变化。那么现在我们有多个文件,怎么办?每个小时会有一个,而且你要去实时监控;

用Spooling Directory Source么?好像不是很现实,而且还会造成用户目录生成大量的文件;Spooling Directory Source只监控目录文件的变更,而不监控日志文件内容的变更。

最终还是选用exec方式;那么怎么做到让flume通过exec方式实现实时监控客户服务器上生成的文件呢?

解决方案:

通过脚本动态修改flume的配置文件的监控日志名称;

#!/usr/bin/env bash

# replace flume monitor file information

#defines param
curdate=`date +"%Y-%m-%d_%H"`
filePrefix="sqllog"
curFile=$filePrefix"_"$curdate
source ~/.profile

#Stop Agent Server
#kill -9 `ps -ef | grep flume_ztky_streaming.conf | head -1 | awk '{print $2}'`

#replace fileName
cd $FLUME_CONF_DIR
sed -i "[email protected]_[0-9]\{4\}-\(\(\(0[13578]\|\(10\|12\)\)-\(0[1-9]\|[1-2][0-9]\|3[0-1]\)\)\|\(02-\(0[1-9]\|[1-2][0-9]\)\)\|\(\(0[469]\|11\)-\(0[1-9]\|[1-2][0-9]\|30\)\)\)_[0-9]\{2\}\[email protected]$curFile\[email protected]" $FLUME_CONF_DIR/flume_ztky_streaming.conf

#Start Agent server
#$FLUME_HOME/bin/flume-ng agent --conf $FLUME_CONF_DIR --conf-file $FLUME_CONF_DIR/flume_ztky_streaming.conf --name producer -Dflume.root.logger=INFO,console >> /home/bigdata/1.txt &

这里大家看到我把kill flume进程的shell和启动flume进程的shell屏蔽了。只留下了修改配置文件的监控文件名称的脚本;为什么?

这里有一个知识点要跟大家分享下;

当我们通过人为手动修改或者脚本修改flume配置文件的时候,flume监控到配置文件变更会自动重新加载配置文件信息,重新使用新的配置信息进行日志搜集;

从日志可以看到这个监控代码是在PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run()方法中

这里通过监控配置文件的最后修改时间来做判断

这里使用到了Google Guava EventBus, 它主要用来简化我们处理生产/消费者编程模型.

上面这个类中,通过eventBus.post()触发实践处理。那么注册的信息和处理类在哪里呢?

看下下面这个类:Application

main方法中

这里eventBus把application处理类注册进去了;

注解@Subscribe是用来标识eventBus的处理方法的;至于一层层的方法调用就不具体解释,想深入了解的可以去看下源码:

https://github.com/apache/flume/blob/f17c7d5022d3e9d112a3843909ad523535fe7e4f/flume-ng-node/src/main/java/org/apache/flume/node/Application.java

解决方案:

那么我们现在只需要编写脚本修改flume的监控配置文件即可;通过crontab -e设置每个小时过一分钟调用脚本修改配置文件,修改写入之后flume会自动停止加载数据并重启;

问题2:

客户提供的日志编码是GBK编码(或者其他编码),整个流程数据跑下来之后,发现storm读取到的数据是乱码。这里我们配置了flume-kafka的日志为UTF-8,按道理是会把GBK编码的日志转换为UTF-8,但是结果出来的却是乱码。

先看看官网的exec文档:

官方文档并没有设置source字符集编码的相关配置;

没办法,看源码吧。

Flume ExecSourceConfigurationConstants--exec的相关配置

https://github.com/apache/flume/blob/8410ad307187b19ca3a4330859815223d1e6b1e2/flume-ng-core/src/main/java/org/apache/flume/source/ExecSourceConfigurationConstants.java

Flume ExecSource

https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java

Flume ExecSource的静态内部类ExecRunnable

这里Flume通过process 执行shell命令读取数据流的方式获取数据,且指定相应的charset。

查看源码之后可以了解到。其实我们是可以通过设置source的charset来设置source的编码格式的;但是官方并没有在文档中做配置说明;这样的话。如果你读取的日志不是UTF-8的编码格式的;那么你读取到的数据就只能是乱码的;

flumejg-kafka-plugin插件

KafkaFlumeConstans源码

https://github.com/beyondj2ee/flumeng-kafka-plugin/blob/master/flumeng-kafka-plugin/src/main/java/org/apache/flume/plugins/KafkaFlumeConstans.java

KafkaSink源码

https://github.com/beyondj2ee/flumeng-kafka-plugin/blob/master/flumeng-kafka-plugin/src/main/java/org/apache/flume/plugins/KafkaSink.java

这里插件是读取flume配置文件中的kafkaSink的custom.encoding配置,通过new String(byte[],encoding)的方式做编码转换;

解决方案:

配置Flume exec的charset编码格式以及Flume-kafka的编码格式,最终由storm中获取,统一转码;

这样整个方案就解决了每小时更换日志的问题以及编码问题;

时间: 2024-07-31 09:04:54

【Apache Flume系列】Flume-ng案例分享及source编码格式问题的相关文章

第86课:SparkStreaming数据源Flume实际案例分享

一.什么是Flume? flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用.Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera.但随着 FLume 功能的扩展,Flume OG 代码工程臃肿.核心组件设计不合理.核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,c

第86讲:SparkStreaming数据源Flume实际案例分享

一.什么是Flume? flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用.Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera.但随着 FLume 功能的扩展,Flume OG 代码工程臃肿.核心组件设计不合理.核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,c

【Apache Flume系列】Flume-ng failover 以及Load balance测试及注意事项

好久没写博客了.最近在研究storm.flume和kafka.今天给大伙写下我测试flume failover以及load balance的场景以及一些结论: 测试环境包含5个配置文件,也就是5个agent. 一个主的配置文件,也就是我们配置failover以及load balance关系的配置文件(flume-sink.properties),这个文件在下面的场景 会变动,所以这里就不列举出来了,会在具体的场景中写明: 其他4个配置文件类似: #Name the compents on thi

找不到org.apache.spark.streaming.flume.sink.SparkFlumeProtocol$Callback

java.lang.NoClassDefFoundError: org/apache/spark/streaming/flume/sink/SparkFlumeProtocol$Callback at org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:84) at org.apache.spark.streaming.flume

【小程序源码案例】微信小程序项目开发案例分享

作者:web小二本文标签: 微信小程序 小程序源码案例 小程序项目小程序的开发,并不是适合所有公司,我今天跟大家分享小程序方面的教程,主要是供大家学习使用.学习这种东西,有时候则是单纯的喜欢,没有任何目的,很单纯的为了好玩,记得很早之前学flash,没有想法,就是觉得好玩,纯娱乐爱好而已.到后来玩视频剪辑也是出于同样的原因,不图钱财名利,只是图自己个人爱好娱乐. 但是,学习,有时候则是需要有明确目的,特别是关系到自己吃饭问题的时候,你就需要非常有目的去学习,并且还需要制定好学习的计划与目标,希望

【Flume】flume输出sink到hbase的实现

flume 1.5.2 hbase 0.98.9 hadoop 2.6 zk 3.4.6 以上是基础的软件及对应版本,请先确认以上软件安装成功! 1.添加jar包支持 将hbase的lib下的这些jar包拷贝到flume的lib下 2.配置flume 注意看以上的serializer配置,采用的是官方的RegexHbaseEventSerializer, 当然还有一个SimpleHbaseEventSerializer 如果你使用了SimpleHbaseEventSerializer 就会出现如

自动化运维工具——ansible详解案例分享

自动化运维工具--ansible详解案例分享(一)目录ansible 简介ansible 是什么?ansible 特点ansible 架构图ansible 任务执行ansible 任务执行模式ansible 执行流程ansible 命令执行过程ansible 配置详解ansible 安装方式使用 pip(python的包管理模块)安装使用 yum 安装ansible 程序结构ansible配置文件查找顺序ansible配置文件ansuble主机清单ansible 常用命令ansible 命令集a

Flume -- 初识flume、source和sink

Flume – 初识flume.source和sink 目录基本概念常用源 Source常用sink 基本概念 ? 什么叫flume? 分布式,可靠的大量日志收集.聚合和移动工具. ? events 事件,是一行数据的字节数据,是flume发送文件的基本单位. ? flume配置文件 重命名flume-env.sh.template为flume-env.sh,并添加[export JAVA_HOME=/soft/jdk] ? flume的Agent source //从哪儿读数据. 负责监控并收

案例分享:数据库镜像故障转移失败

案例分享:数据库镜像故障转移失败 对于关键性数据库,我们配置了带有见证服务器的同步数据库镜像,来允许自动故障转移.一切运行正常,直到有一次数据中心的突然断电.数据库镜像执行了故障转移,但是运维反馈说应用程序挂起了.当我们手动切换回来,应用程序又正常工作.为什么应用程序没有也故障转移呢? 这是使用数据库镜像的合理的常见问题,像这样的生产应用失败,是因为在镜像部署后没有做故障转移测试.在失败的故障转移之后我们感到棘手. 为了避免生产应用停机,我们在测试环境复制了线上的镜像环境.在确认应用和数据库镜像