Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
Flume有两个版本,Flume 0.9X或CDH3及更早版本的统称Flume-og,Flume-og由agent、collection、master等组件组成;Flume1.X或CDH4及以后的版本统称Flume-ng,Flume-ng由agent、client等组件组成,截止到目前为止,Flume最新版本为1.6.0版本,Flume1.6.0有几个新特性:
- Flume Sink and Source for Apache Kafka(source、sink新增对Kafka的支持)
- A new channel that uses Kafka(channel使用Kafka的消息通道)
- Hive Sink based on the new Hive Streamingsupport
- End to End authentication in Flume
- Simple regex search-and-replace interceptor(拦截器支持简单的正则表达式)
Agent
Flume运行的核心是agent,agent用于采集数据,将数据源的数据发送给collector。它是一个完整的数据收集工具,含有三个核心组件,分别是source、channel、sink。Event从Source,流向Channel,再到Sink。Event代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去。Source:完成对日志数据的收集,分成transtion和
event 打入到channel之中。Channel:主要提供一个队列的功能,对source提供中的数据进行简单的缓存。Sink:取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。通过这些组件,event可以从一个地方流向另一个地方,如下图所示。
Source消费从外部流进的Events,如AvroSource接收外部客户端传来的或是从别的agent流出来的Avro
Event。Source可以把event送往一个或多个channel。channel是一个队列,持有event等待sink来消费,一种Channel的实现:FileChannel使用本地文件系统来作为它的存储。Sink的作用是把Event从channel里移除,送往外部数据仓库或给下一站agent的Source,如HDFSEventSink送往HDFS。同个agent下的source和sink是异步的。
flume-ng是由一个个agent组成的。一个agent就像一个细胞一样。当然可以自由组合,如下图:
下图为多对一Collection场景:
Source
完成对日志数据的收集,分成transtion和 event打入到channel之中
- Source用于获取数据,可从文本文件,syslog,HTTP等获取数据
- Sink将Source获得的数据进一步传输给后面的Collector。
- syslogTcp(5140) |agentSink("localhost",12345)
- tail("/etc/services") |agentSink("localhost",12345)
可以让应用程序同已有的Source直接打交道,如AvroSource,SyslogTcpSource。也可以写一个Source,以IPC或RPC的方式接入自己的应用。
Flume自带了直接可用的数据源(source),如:
ltext("filename")
ltail("filename")
lfsyslogTcp(5140)
lconsole("format")
lexec
lexecPeriodic
lexecStream
lirc
llog4jfile
lmultitail
lnonlsynth
lnull
lreport
lrpcSource
lscribe
lseqfile
lsyslogTcp
lsyslogTcp1
lsyslogUdp
l……
对于直接读取文件Source,有两种方式:
lExecSource:以运行Linux命令的方式,持续的输出最新的数据,如tail
-F
文件名指令,在这种方式下,取的文件名必须是指定的。 ExecSource可以实现对日志的实时收集,但是存在Flume不运行或者指令执行出错时,将无法收集到日志数据,无法保证日志数据的完整性。
lSpoolSource:监测配置的目录下新增的文件,并将文件中的数据读取出来。
需要注意:拷贝到spool目录下的文件不可以再打开编辑;spool目录下不可包含相应的子目录。SpoolSource虽然无法实现实时的收集数据,但是可以使用以分钟的方式分割文件,趋近于实时。如果应用无法实现以分钟切割日志文件的话,可以两种收集方式结合使用。在实际使用的过程中,可以结合log4j使用,使用log4j的时候,将log4j的文件分割机制设为1分钟一次,将文件拷贝到spool的监控目录。log4j有一个TimeRolling的插件,可以把log4j分割的文件到spool目录。基本实现了实时的监控。Flume在传完文件之后,将会修改文件的后缀,变为.COMPLETED(后缀也可以在配置文件中灵活指定)
Channel
Channel有多种方式:
有MemoryChannel,JDBCChannel,MemoryRecoverChannel,FileChannel。
MemoryChannel可以实现高速的吞吐,但是无法保证数据的完整性。
MemoryRecoverChannel在官方文档的建议上已经建义使用FileChannel来替换。
FileChannel保证数据的完整性与一致性。在具体配置不限的FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。
Sink
Sink在设置存储数据时,可以向文件系统、数据库、hadoop存数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。
Flume提供了很多Sink,如:
lconsole[("format")]
ltext(“txtfile”)
ldfs(“dfsfile”)
lsyslogTcp(“host”,port)
lagentSink[("machine"[,port])]
lagentDFOSink[("machine"[,port])]
lagentBESink[("machine"[,port])]
lattr2hbase
lavroSink
lcollectorSink
lcounter
lformatDfs
lhbase
lirc
llogicalSink
lmultigrep
lregexhisto
lregexhistospec
lrpcSink
lseqfile
lthriftSink
l……
扫描下面的二维码可以关注作者的微信公众号。