Flume interceptor 使用注意事项

1. 在使用 Regex Filtering Interceptor的时候一个属性是excludeEvents

  当它的值为true 的时候,过滤掉匹配到当前正则表达式的一行

  当它的值为false的时候,就接受匹配到正则表达式的一行

  

Property Name Default Description
type The component type name has to be regex_filter
regex ”.*” Regular expression for matching against events
excludeEvents false If true, regex determines events to exclude, otherwise regex determines events to include.

2. flume interceptors的其它属性

Flume has the capability to modify/drop events in-flight. This is done with the help of interceptors. Interceptors are classes that implement org.apache.flume.interceptor.Interceptor interface. An interceptor can modify or even drop events based on any criteria chosen by the developer of the interceptor. Flume supports chaining of interceptors. This is made possible through by specifying the list of interceptor builder class names in the configuration. Interceptors are specified as a whitespace separated list in the source configuration. The order in which the interceptors are specified is the order in which they are invoked. The list of events returned by one interceptor is passed to the next interceptor in the chain. Interceptors can modify or drop events. If an interceptor needs to drop events, it just does not return that event in the list that it returns. If it is to drop all events, then it simply returns an empty list. Interceptors are named components, here is an example of how they are created through configuration:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
a1.sinks.k1.channel = c1

Note that the interceptor builders are passed to the type config parameter. The interceptors are themselves configurable and can be passed configuration values just like they are passed to any other configurable component. In the above example, events are passed to the HostInterceptor first and the events returned by the HostInterceptor are then passed along to the TimestampInterceptor. You can specify either the fully qualified class name (FQCN) or the alias timestamp. If you have multiple collectors writing to the same HDFS path, then you could also use the HostInterceptor.

Timestamp Interceptor

This interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptor inserts a header with key timestamp whose value is the relevant timestamp. This interceptor can preserve an existing timestamp if it is already present in the configuration.

Property Name Default Description
type The component type name, has to be timestamp or the FQCN
preserveExisting false If the timestamp already exists, should it be preserved - true or false

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

Host Interceptor

This interceptor inserts the hostname or IP address of the host that this agent is running on. It inserts a header with key host or a configured key whose value is the hostname or IP address of the host, based on configuration.

Property Name Default Description
type The component type name, has to be host
preserveExisting false If the host header already exists, should it be preserved - true or false
useIP true Use the IP Address if true, else use hostname.
hostHeader host The header key to be used.

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i1.hostHeader = hostname

Static Interceptor

Static interceptor allows user to append a static header with static value to all events.

The current implementation does not allow specifying multiple headers at one time. Instead user might chain multiple static interceptors each defining one static header.

Property Name Default Description
type The component type name, has to be static
preserveExisting true If configured header already exists, should it be preserved - true or false
key key Name of header that should be created
value value Static value that should be created

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK

UUID Interceptor

This interceptor sets a universally unique identifier on all events that are intercepted. An example UUID is b5755073-77a9-43c1-8fad-b7a586fc1b97, which represents a 128-bit value.

Consider using UUIDInterceptor to automatically assign a UUID to an event if no application level unique key for the event is available. It can be important to assign UUIDs to events as soon as they enter the Flume network; that is, in the first Flume Source of the flow. This enables subsequent deduplication of events in the face of replication and redelivery in a Flume network that is designed for high availability and high performance. If an application level key is available, this is preferable over an auto-generated UUID because it enables subsequent updates and deletes of event in data stores using said well known application level key.

Property Name Default Description
type The component type name has to be org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
headerName id The name of the Flume header to modify
preserveExisting true If the UUID header already exists, should it be preserved - true or false
prefix “” The prefix string constant to prepend to each generated UUID

Morphline Interceptor

This interceptor filters the events through a morphline configuration file that defines a chain of transformation commands that pipe records from one command to another. For example the morphline can ignore certain events or alter or insert certain event headers via regular expression based pattern matching, or it can auto-detect and set a MIME type via Apache Tika on events that are intercepted. For example, this kind of packet sniffing can be used for content based dynamic routing in a Flume topology. MorphlineInterceptor can also help to implement dynamic routing to multiple Apache Solr collections (e.g. for multi-tenancy).

Currently, there is a restriction in that the morphline of an interceptor must not generate more than one output record for each input event. This interceptor is not intended for heavy duty ETL processing - if you need this consider moving ETL processing from the Flume Source to a Flume Sink, e.g. to a MorphlineSolrSink.

Required properties are in bold.

Property Name Default Description
type The component type name has to be org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
morphlineFile The relative or absolute path on the local file system to the morphline configuration file. Example: /etc/flume-ng/conf/morphline.conf
morphlineId null Optional name used to identify a morphline if there are multiple morphlines in a morphline config file

Sample flume.conf file:

a1.sources.avroSrc.interceptors = morphlineinterceptor
a1.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1

Search and Replace Interceptor

This interceptor provides simple string-based search-and-replace functionality based on Java regular expressions. Backtracking / group capture is also available. This interceptor uses the same rules as in the Java Matcher.replaceAll() method.

Property Name Default Description
type The component type name has to be search_replace
searchPattern The pattern to search for and replace.
replaceString The replacement string.
charset UTF-8 The charset of the event body. Assumed by default to be UTF-8.

Example configuration:

a1.sources.avroSrc.interceptors = search-replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace

# Remove leading alphanumeric characters in an event body.
a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+
a1.sources.avroSrc.interceptors.search-replace.replaceString =

Another example:

a1.sources.avroSrc.interceptors = search-replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace

# Use grouping operators to reorder and munge words on a line.
a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick brown ([a-z]+) jumped over the lazy ([a-z]+)
a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 ate the careless $1

Regex Filtering Interceptor

This interceptor filters events selectively by interpreting the event body as text and matching the text against a configured regular expression. The supplied regular expression can be used to include events or exclude events.

Property Name Default Description
type The component type name has to be regex_filter
regex ”.*” Regular expression for matching against events
excludeEvents false If true, regex determines events to exclude, otherwise regex determines events to include.

Regex Extractor Interceptor

This interceptor extracts regex match groups using a specified regular expression and appends the match groups as headers on the event. It also supports pluggable serializers for formatting the match groups before adding them as event headers.

Property Name Default Description
type The component type name has to be regex_extractor
regex Regular expression for matching against events
serializers Space-separated list of serializers for mapping matches to header names and serializing their values. (See example below) Flume provides built-in support for the following serializers: org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
serializers.<s1>.type default Must be default (org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer), org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer, or the FQCN of a custom class that implements org.apache.flume.interceptor.RegexExtractorInterceptorSerializer
serializers.<s1>.name  
serializers.* Serializer-specific properties

The serializers are used to map the matches to a header name and a formatted header value; by default, you only need to specify the header name and the default org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer will be used. This serializer simply maps the matches to the specified header name and passes the value through as it was extracted by the regex. You can plug custom serializer implementations into the extractor using the fully qualified class name (FQCN) to format the matches in anyway you like.

Example 1:

If the Flume event body contained 1:2:3.4foobar5 and the following configuration was used

a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three

The extracted event will contain the same body but the following headers will have been added one=>1, two=>2, three=>3

Example 2:

If the Flume event body contained 2012-10-18 18:47:57,614 some log line and the following configuration was used

a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm

the extracted event will contain the same body but the following headers will have been added timestamp=>1350611220000

时间: 2024-10-05 07:01:10

Flume interceptor 使用注意事项的相关文章

Flume 拦截器(interceptor)详解

flume 拦截器(interceptor)1.flume拦截器介绍拦截器是简单的插件式组件,设置在source和channel之间.source接收到的事件event,在写入channel之前,拦截器都可以进行转换或者删除这些事件.每个拦截器只处理同一个source接收到的事件.可以自定义拦截器.2.flume内置的拦截器 2.1 时间戳拦截器flume中一个最经常使用的拦截器 ,该拦截器的作用是将时间戳插入到flume的事件报头中.如果不使用任何拦截器,flume接受到的只有message.

flume拦截器

拦截器作用:拦截器是简单的插件式组件,设置在source和channel之间.source接收到的事件,在写入channel之前,拦截器都可以进行转换或者删除这些事件.每个拦截器只处理同一个source接收到的事件.可以自定义拦截器. flume修改时间戳的插件见 https://github.com/haebin/flume-timestamp-interceptor 有一个缺陷是,DateUtils.parseDate(timestamp, dateFormat)里面的dateFormat不

Flume使用说明

关于Flume,官方定义如下: Apache Flume is a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store. The use of Apache Flume is not only re

flume 配置

[[email protected] data]#tar -zxvf apache-flume-1.7.0-bin.tar.gz[[email protected] conf]# cp flume-env.sh.template flume-env.sh 修改java_home[[email protected] conf]# cp flume-env.shexport JAVA_HOME=/data/jdkexport JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.s

flume 日志导入elasticsearch

Flume配置 . flume生成的数据结构 <span style="font-size:18px;">"_index" : "logstash-2013.01.07", "_type" : "tms_jboss_syslog", "_id" : "a_M9X_0YSpmE7A_bEzIFiw", "_score" : 1.0, &q

flume 收集日志,写入hdfs

首先安装flume: 建议和Hadoop保持统一用户来安装Hadoop,flume 本次我采用Hadoop用户安装flume http://douya.blog.51cto.com/6173221/1860390 开始配置: 1,配置文件编写: vim  flume_hdfs.conf # Define a memory channel called ch1 on agent1 agent1.channels.ch1.type = memory agent1.channels.ch1.capac

flume单channel多sink的测试

说明: 该结果是亲自测试,只提供简单的数据分析,很简陋,结果可能不准确. 先说一下结果,多sink可以直接按常规配置,这样的话每个sink会启动一个sinkrunner,相当于每个线程一个sink,互不干扰,负载均衡是通过channel实现的,效率会提高为n倍,如果在此基础上加入 sinkgroup,则sinkgroup会启动一个sinkrunner,就是单线程,sinkgroup从channel中读取数据,然后分发到下面挂载的sink中,效率和单sink一样,没有提高,但是可以实现两个sink

Data Collection with Apache Flume(二)

今天继续讨论几个agent的配置. 第一个agent是从终端捕获特定命令执行的输出结果,并将文件输出到特定目录.先看一下配置的代码: agent2.sources = execsource //指定为从命令获取输出的source agent2.sinks = filesink //输出到文件的sink agent2.channels = filechannel //输出到文件的channel agent2.sources.execsource.type = exec //类型 agent2.so

Data Collection with Apache Flume(三)

最后提及两个agent.首先第一个是使用一个avro souce和一个avro sink向另一个agent传递event,然后再写入特定目录. 先看看配置代码. agent6.sources = avrosource //定义avrosource,可以使用avro client在网络上向其传送数据 agent6.sinks = avrosink agent6.channels = memorychannel agent6.sources.avrosource.type = avro agent6