flume简介与监听文件目录并sink至hdfs实战

场景

1. flume是什么

1.1 背景

  flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但随着 FLume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了 Flume-728,对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,重构后的版本统称为 Flume NG(next generation);改动的另一原因是将 Flume 纳入 apache 旗下,cloudera Flume 改名为 Apache Flume。

  flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。

  flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。

1.2 特点

  • flume的可靠性

     当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。),Store on failure(这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送),Besteffort(数据发送到接收方后,不会进行确认)。

  • 可恢复性

      还是靠Channel。推荐使用FileChannel,事件持久化在本地文件系统里(性能较差)。

  • 核心概念

Agent :使用JVM 运行Flume。每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。

Client :生产数据,运行在一个独立的线程。

Source:从Client收集数据,传递给Channel。

Sink :从Channel收集数据,运行在一个独立线程。

Channel :连接 sources 和 sinks ,这个有点像一个队列。

Events :可以是日志记录、 avro 对象等。

Flume以agent为最小的独立运行单位。一个agent就是一个JVM。单agent由Source、Sink和Channel三大组件构成,如下图:

  值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型。不同类型的Source,Channel和Sink可以自由组合。组合方式基于用户设置的配置文件,非常灵活。比如:Channel可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink可以把日志写入HDFS, HBase,甚至是另外一个Source等等。Flume支持用户建立多级流,也就是说,多个agent可以协同工作,并且支持Fan-in、Fan-out、Contextual Routing、Backup Routes,这也正是NB之处。如下图所示:

  

2.如何配置flume

这里以flume监听目录并将该目录下的文件内容sink至hdfs上指定目录的配置为例加以说明,详见实验部分。

实验

  • 配置 flume-env.sh文件

    在文件末尾追加以下内容:

export FLUME_HOME=/home/hadoop/apache-flume-1.6.0-bin
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=.:$PATH::$FLUME_HOME/bin
  • 配置flume-conf.properties文件
agent1.sources = spooldirSource
agent1.channels = fileChannel
agent1.sinks = hdfsSink

#配置sources,即被监听的源目录
agent1.sources.spooldirSource.type=spooldir
agent1.sources.spooldirSource.spoolDir=/home/hadoop/flume
agent1.sources.spooldirSource.channels=fileChannel

#配置sinks,即目的目录
agent1.sinks.hdfsSink.type=hdfs
agent1.sinks.hdfsSink.hdfs.path=hdfs://master:9000/input/flume/%y-%m-%d
agent1.sinks.hdfsSink.hdfs.filePrefix=flume
agent1.sinks.sink1.hdfs.round = true
# Number of seconds to wait before rolling current file (0 = never roll based on time interval)
agent1.sinks.hdfsSink.hdfs.rollInterval = 3600
# File size to trigger roll, in bytes (0: never roll based on file size)
agent1.sinks.hdfsSink.hdfs.rollSize = 128000000
agent1.sinks.hdfsSink.hdfs.rollCount = 0
agent1.sinks.hdfsSink.hdfs.batchSize = 1000

#Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.
agent1.sinks.hdfsSink.hdfs.roundValue = 1
agent1.sinks.hdfsSink.hdfs.roundUnit = minute
agent1.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
agent1.sinks.hdfsSink.channel=fileChannel
agent1.sinks.hdfsSink.hdfs.fileType = DataStream

#channels,通道目录配置:把文件事件持久化到本地硬盘上
agent1.channels.fileChannel.type = file
agent1.channels.fileChannel.checkpointDir=/home/hadoop/apache-flume-1.6.0-bin/checkpoint
agent1.channels.fileChannel.dataDirs=/home/hadoop/apache-flume-1.6.0-bin/dataDir
  • 测试

    1、flume环境测试与启动

hadoop@master:~$ flume-ng version
Flume 1.6.0
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080
Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015
From source with checksum b29e416802ce9ece3269d34233baf43f
hadoop@master:~$ 
[email protected]:~$ ${FLUME_HOME}/bin/flume-ng agent --conf ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent1 > log.log 2>&1 &
[2] 13370
[email protected]:~$ tailf ~/apache-flume-1.6.0-bin/log.log
2016-06-03 16:32:15,007 (Log-BackgroundWorker-fileChannel) [DEBUG - org.apache.flume.channel.file.FlumeEventQueue.checkpoint(FlumeEventQueue.java:139)] Checkpoint not required
2016-06-03 16:32:15,828 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:conf/flume-conf.properties for changes

2、向监听目录中添加文件

cp ~/wordcount.txt ~/flume/

3、执行结果

总结

 flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。

 

参考

王家林DT大数据梦工厂-IMF-瓦力同学

flume官网

flume on hdfs 配置

时间: 2024-12-25 09:18:39

flume简介与监听文件目录并sink至hdfs实战的相关文章

Flume监听文件目录sink至hdfs配置

一:flume介绍 Flume是一个分布式.可靠.和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力.,Flume架构分为三个部分 源-Source,接收器-Sink,通道-Channel. 二:配置文件 此配置文件source为一个目录,注意,该目录下的文件应为只读,不可写,且文件名不能相同,采用的channels为file,sink为hdfs,此处往hdfs写的策略是当时间达到3600s或者

Flume笔记--source端监听目录,sink端上传到HDFS

官方文档参数解释:http://flume.apache.org/FlumeUserGuide.html#hdfs-sink 需要注意:文件格式,fileType=DataStream 默认为SequenceFile,是hadoop的文件格式,改为DataStream就可直接读了(SqeuenceFile怎么用还不知道..)配置文件: hdfs.conf a1.sources = r1a1.sinks = k1a1.channels = c1 # Describe/configure the s

android 监听网络状态的变化及实战

android 监听网络状态的变化及实际应用 转载请注明博客地址:http://blog.csdn.net/gdutxiaoxu/article/details/53008266 平时我们在请求错误的情况下,通常会进行处理一下,一般来说,主要分为两方面的错误 - 没有网络的错误 - 在有网络的情况下,我们客户端的错误或者服务器端的错误 今天这篇博客主要阐述以下问题 怎样监听网络状态的变化,包括是否打开WiFi,否打开数据网络,当前连接的网络是否可用 网络没有打开情况下的处理,如弹出对话框,跳转到

Oracle监听网络服务全面剖析_超越OCP精通Oracle视频教程培训06

课程目标 Oracle视频教程,风哥oracle教程培训学习内容包括,Oracle监听概念与常用配置文件,监听工具与服务测试,oracle监听静态注册与动态注册,配置oracle第二监听,oracle监听与客户端配置,Oracle监听日志配置与日常维护规范,Oracle监听安全与密码管理及防火墙,db_link与进程,如何跟踪问题深入分析等超越oracle认证的数据库教程 适用人群 IT相关从业人员.Oracle数据库技术人员.想加工资的.想升职的都可以. 课程简介 Oracle监听网络服务全面

消费滚动滴log日志文件(flume监听,kafka消费,zookeeper协同)

第一步:数据源 手写程序实现自动生成如下格式的日志文件: 15837312345,13737312345,2017-01-09 08:09:10,0360 打包放到服务器,使用如下命令执行,模拟持续不断的日志文件: java -cp ct_producter-1.0-SNAPSHOT.jar producter.ProductLog ./awen.tsv 第二步:监听log.tsv日志 使用Flume监控滚动的awen.tsv日志,编写flume # Name the components on

Flume简介与使用(三)——Kafka Sink消费数据之Kafka安装

前面已经介绍了如何利用Thrift Source生产数据,今天介绍如何用Kafka Sink消费数据. 其实之前已经在Flume配置文件里设置了用Kafka Sink消费数据 agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent1.sinks.kafkaSink.topic = TRAFFIC_LOG agent1.sinks.kafkaSink.brokerList = 10.208.129.3:90

js事件监听简介

1.什么是事件监听? 就是让计算机监视一个事件是否发生. 2.事件和事件处理程序 事件是用户或浏览器自身执行的某种动作,如click,load和mouseover都是事件的名字.响应某个事件的函数就叫事件处理程序(也叫事件处理函数.事件句柄).事件处理程序的名字以"on"开头,因此click事件的事件处理程序就是onclick,load事件的事件处理程序就是onload. 总之,事件就是一个动作瞬间,如鼠标点击,事件处理程序是一个过程,处理事件发生时的函数的函数. 3.事件监听器 监听

使用path监听指定文件系统的变化

在以前的JAVA版本中,如果程序需要检测文件的变化,那么需要开辟一个线程每隔一段时间去遍历一次指定的目录,如果发现此次遍历结果和上次不同,那么就认为文件变动了 ,这样的方式非常繁琐,JAVA 7之后的NIO.2 Path类提供了一个方法来监听指定文件目录内文件的变化状态. 1.获取文件系统的WatchService对 2.使用Path类的方法去注册一个监听,指定监听文件的哪些状态,如新增.修改.删除. package com.nio2; import java.io.IOException;im

Windows API 教程(七) hook 钩子监听

Windows API 教程(七) hook 钩子监听 Posted on 2013-08-15 茵蒂克丝 如何创建一个窗口 手动创建窗口的流程 实际代码 安装钩子 (Install hook) 钩子简介 SetWindowsHookEx 函数 设置监听[键盘]消息 设置监听[鼠标]消息 如何创建一个窗口 另外一个再录的 Windows SDK教程 里面有讲到快捷创建窗口的方式,不过这样的话要分好几个文件,感觉有点混所以这里就用原始的方式创建一个窗口. 那么,为什么讲到 hook(钩子)的时候要