最近学习了下flume的使用,以配合公司将日志系统独立出来的开发,官网用户手册:http://flume.apache.org/FlumeUserGuide.html
flume架构
a. 组件
先搬上官网上的架构图
从图上可以看到flume的事件定义成了一个数据流,一个数据流组成了Agent,其实就是JVM实例,每个Agent包含三个组件:Source、Channel、Sink
Source:接受Agent之外的日志消息,其实就是Agent内部的Event的生产者,同时他又是外部日志的消费者(如:Web Server)
Channel:类似于消息队列,连接了Source和Sink
Sink:消费Channel中的日志,将日志发送到存储库或者下一个Agent,取决于自己的配置(或者说自己的flume架构拓扑)
数据源:flume的数据源比较丰富,下面详细说
存储:Sink最后会把数据存储起来,可以是file、hdfs、hbase等,或者是另一个Agent
b:Sources
flume包含了丰富的Source,见官网用户手册:http://flume.apache.org/FlumeUserGuide.html#flume-sources
Avro Source
提供一个Avro的接口,需要往设置的地址和端口发送Avro消息,Source就能接收到,如:Log4jAppender通过Avro Source将消息发送到Agent
Thrift Source
提供一个Thrift接口,类似Avro
Exec Source
Source启动的时候会运行一个设置的UNIX命令(比如 cat file),该命令会不断地往标准输出(stdout)输出数据,这些数据就会被打包成Event,进行处理
JMS Source
从一个JMS的目标中读取消息,比如activeMQ
Spooling Directory Source
监听某个目录,该目录有新文件出现时,把文件的内容打包成Event,进行处理
NetCat Source
监听某一个端口,把消息打包成Event
Syslog Source
读取Syslog的数据,并转换成Event
Multiport Syslog TCP Source
Syslog UDP Source
HTTP Source
接收HTTP POST或者GET的消息,并转换成Event
自定义Source
使用者通过实现flume提供的接口来定制满足需求的Source
c. Channels
http://flume.apache.org/FlumeUserGuide.html#flume-channels
Memory Channel
消息保存在内存,可以设置一个最大容量,超过的消息将会丢失。适用于高吞吐量且允许在Agent退出时部分消息丢失的场景
JDBC Channel
消息保存通过JDBC保存到数据库中。目前仅支持内置的Derby。Agent退出之后再重启,Channel中的消息不会丢失
File Channel
消息保存在本地文件中。Agent退出之后再重启,Channel中的消息不回丢失。需要注意的时,File Channel默认会在文件系统的某个目录中保存数据,同时加锁,如果一个系统中同时启动多个File Channel时,第一个启动成功,其他的就会因为该默认目录被加锁而导致启动失败,这是就需要在配置Channel时指定自己的目录
Spillable Memory Channel
消息保存在内存队列和本地文件中,内存队列优先级高。当内存队列满了之后,后来接收的消息都会被保存到本地文件中,实际上是一个内置的File Channel。根据官方文档说明,暂时还不建议使用在正式上线的系统中。
Pseudo Transaction Channel
只用作单元测试
自定义Channel
d. Sinks
http://flume.apache.org/FlumeUserGuide.html#flume-sinks
flume提供了多种Sink的实现,把收集的日志保存到某一存储或者转发到其他系统
HDFS Sink
日志保存到HDFS中
Logger Sink
日志通过系统定义log4j形式输出到控制台
Avro Sink
日志发送给某一个Avro接口,比如另一个包含Avro Source的Agent
Thrift Sink
日志发送给某一个Thrift接口,比如另一个包含Thrift Source的Agent
IRC Sink
日志发送给IRC接口
File Roll Sink
日志输出到本地文件中,文件会以滚动的方式,定期生成新文件
Null Sink
日志全部丢弃
HBase Sink
日志保存到HBase中
AsyncHbase Sink
以异步的方式把日志写入到HBase中
MorphlineSolr Sink
日志保存到Solr全文索引的服务器中
ElasticSearch Sink
日志保存到ES全文索引的服务器中
自定义Sink
e. Channel Selectors
http://flume.apache.org/FlumeUserGuide.html#flume-channel-selectors
假设出现如下的架构拓扑
该图中Agent外部有一个数据源Source,分发到3个不同的Channel,然后有3个不同的Sink去消费对应的一个Channel,Sink1最后存储到HDFS,Sink2转发到JMS,Sink3数据流向Agent bar
那么我们该如何实现上述这样的拓扑呢?我们之前介绍的相关知识还无法解决这样的问题,下面介绍一个组件解决这个问题
flume提供了两种Selector
Replicating Channel Selector
Source 会把消息发给所有连接的Channel
Multiplexing Channel Selector
Source根据消息header中的某一字段(配置时指定)的值来决定把消息发送给哪几个Channel
f. Sink Processor
多个Sink从逻辑上组成一个整体Sink来使用,可以提供Failover和load balancee等功能,也可以由使用者自己定义,加入其它功能
Default Sink Processor
默认的,不需要任何配置,就是单个Sink工作模式
Failover Sink Processor
提供一个容错机制,维护一个有优先级的Sink列表,由当前最高优先级的Sink来处理消息,高优先级的Sink出错后,在它恢复运行的这段时间里,由当前列表中优先级最高的Sink来处理消息,直到出错的这个优先级更高的Sink恢复为止。
Load balancing Sink Processor
一组Sink通过load balancing的方式一起处理消息,有两种方式round_robin和random。
g. Event Serializers
提供了一种重新封装FlumeEvent的机制,在Sink中使用。目前file_roll Sink 和 HDFS Sink 都支持。
Body Text Serializer
把FlumeEvent中body输出,headers的信息忽略
Avro Event Serializer
把FlumeEvent序列化封装到Avro container file,可以在Avro RPC中使用
h. Interceptor
拦截器,相当于我们熟悉的SpringMVC的interceptor,功能类似,在Source这一层可以修改或者丢弃接收的消息,达到我们想要的消息进入Channel
Timestamp Interceptor
在Event的header中插入一个当前Agent的系统时间
Host Interceptor
在Event的header中插入当前Agent的主机名或者IP
Static Interceptor
在Event的header中插入一个固定的key-value值对
UUID Interceptor
Morphline Interceptor
Regex Filtering Interceptor
Regex Extractor Interceptor
这篇博客就介绍到这里,如果需要了解更详细的知识以及配置,可以去官方User Guide学习,接下去我将更新几篇关于如何使用flume的例子