Flume NG 学习笔记(八)Interceptors(拦截器)测试

版权声明:本文为博主原创文章,未经博主允许不得转载。

目录(?)[+]

拦截器主要是对事件的header信息信息操作,要么直接忽略他,要么修改他的数据

一、Event Serializers

file_roll sink 和hdfs sink 都支持EventSerializer接口

1.1、Body Text Serializer

Body TextSerializer,别名:text。这个拦截器将把事件的body部分写入到输出流中而不需要任何转换或者修改。事件的header将直接被忽略。

下面是官网配置:


Property Name


Default


Description


appendNewline


true


Whether a newline will be appended to each event at write time. The default of true assumes that events do not contain newlines, for legacy reasons.

下面是官网例子:appendNewline是选择是否加入到新行去。默认是true,而false 就是换行,一般我们都选择换行。

a1.sinks=k1

a1.sinks.k1.type=file_roll

a1.sinks.k1.channel=c1

a1.sinks.k1.sink.directory=/var/log/flume

a1.sinks.k1.sink.serializer=text

a1.sinks.k1.sink.serializer.appendNewline=false

下面是实际例子

因为是考虑Body TextSerializer的特性,他会忽略header的信息,因此我们这边要采用http source来接收定义的header 与body 的内容

[html] view plain copy

  1. #配置文件:body_case15.conf
  2. # Name the components on this agent
  3. a1.sources = r1
  4. a1.sinks = k1
  5. a1.channels = c1
  6. # Describe/configure the source
  7. a1.sources.r1.type = http
  8. a1.sources.r1.port = 50000
  9. a1.sources.r1.host = 192.168.233.128
  10. a1.sources.r1.channels = c1
  11. # Describe the sink
  12. a1.sinks.k1.type = file_roll
  13. a1.sinks.k1.channel = c1
  14. a1.sinks.k1.sink.directory = /tmp/logs
  15. a1.sinks.k1.sink.serializer = text
  16. a1.sinks.k1.sink.serializer.appendNewline =false
  17. # Use a channel which buffers events inmemory
  18. a1.channels.c1.type = memory
  19. a1.channels.c1.capacity = 1000
  20. a1.channels.c1.transactionCapacity = 100

#敲命令

flume-ng agent -c conf -fconf/body_case15.conf -n a1 -Dflume.root.logger=INFO,console

启动成功后

打开另一个终端输入,往侦听端口送数据

curl -X POST -d ‘[{"headers":{"looklook1" : "looklook1 isheader","looklook2": "looklook2 isheader"},"body" : "hellolooklook5"}]‘http://192.168.233.128:50000

#在启动源发送的代理终端查看console输出



数据已经输出,但只输出了hello looklook5,即BODY这块。

1.2、Avro Event Serializer

Avro Event Serializer别名:avro_event。这个拦截器将把事件序列化到一个Avro容器文件中。使用的模式和RPC Avro机制使用到的处理flume事件的机制一样。这个序列化器继承自AbstractAvroEventSerializer类。

官网例子


Property Name


Default


Description


syncIntervalBytes


2048000


Avro sync interval, in approximate bytes.


compressionCodec


null


Avro compression codec. For supported codecs, see Avro’s CodecFactory docs.

下面是官网例子

a1.sinks.k1.type=hdfs

a1.sinks.k1.channel=c1

a1.sinks.k1.hdfs.path=/flume/events/%y-%m-%d/%H%M/%S

a1.sinks.k1.serializer=avro_event

a1.sinks.k1.serializer.compressionCodec=snappy

例子这边就不演示了,因为和BodyText Serializer 差距不大。

二、Timestamp Interceptor

官网说Flume 可以在事件传输过程中对它进行修改与删除,而这个都是通过Interceptor进行实现的,实际都是往事件的header里插数据。而Timestamp Interceptor拦截器就是可以往event的header中插入关键词为timestamp的时间戳。

下面是官网配置


Property Name


Default


Description


type



The component type name, has to be timestamp or the FQCN


preserveExisting


false


If the timestamp already exists, should it be preserved - true or false

以及官网例子

a1.sources=r1

a1.channels=c1

a1.sources.r1.channels= c1

a1.sources.r1.type=seq

a1.sources.r1.interceptors=i1

a1.sources.r1.interceptors.i1.type=timestamp

下面是测试例子

[html] view plain copy

  1. #配置文件:timestamp_case16.conf
  2. # Name the components on this agent
  3. a1.sources = r1
  4. a1.sinks = k1
  5. a1.channels = c1
  6. # Describe/configure the source
  7. a1.sources.r1.type = syslogtcp
  8. a1.sources.r1.port = 50000
  9. a1.sources.r1.host = 192.168.233.128
  10. a1.sources.r1.channels = c1
  11. a1.sources.r1.interceptors = i1
  12. a1.sources.r1.interceptors.i1.preserveExisting= false
  13. a1.sources.r1.interceptors.i1.type = timestamp
  14. # Describe the sink
  15. a1.sinks.k1.type = hdfs
  16. a1.sinks.k1.channel = c1
  17. a1.sinks.k1.hdfs.path =hdfs://carl:9000/flume/%Y-%m-%d/%H%M
  18. a1.sinks.k1.hdfs.filePrefix = looklook5.
  19. a1.sinks.k1.hdfs.fileType=DataStream
  20. # Use a channel which buffers events inmemory
  21. a1.channels.c1.type = memory
  22. a1.channels.c1.capacity = 1000
  23. a1.channels.c1.transactionCapacity = 100

这里拿header作为文件夹目录名称。

#敲命令

flume-ng agent -c conf -f conf/timestamp_case16.conf-n a1 -Dflume.root.logger=INFO,console

启动成功后

打开另一个终端输入,往侦听端口送数据

echo "TimestampInterceptor" | nc 192.168.233.128 50000

#在启动源发送的代理终端查看console输出

查看hdfs生成的文件,可以看到timestamp已经生成在header里面,可以根据自定义的格式生成文件夹,数据也都传输过来了。

三、Host Interceptor

该拦截器可以往event的header中插入关键词默认为host主机名或者ip地址(注意是agent运行的机器的主机名或者ip地址)

下面是官网配置


Property Name


Default


Description


type



The component type name, has to be host


preserveExisting


false


If the host header already exists, should it be preserved - true or false


useIP


true


Use the IP Address if true, else use hostname.


hostHeader


host


The header key to be used.

以及官网例子

a1.sources=r1

a1.channels=c1

a1.sources.r1.interceptors=i1

a1.sources.r1.interceptors.i1.type=host

a1.sources.r1.interceptors.i1.hostHeader=hostname

下面是测试例子

[html] view plain copy

  1. #配置文件:time_host_case17.conf
  2. # Name the components on this agent
  3. a1.sources = r1
  4. a1.sinks = k1
  5. a1.channels = c1
  6. # Describe/configure the source
  7. a1.sources.r1.type = syslogtcp
  8. a1.sources.r1.port = 50000
  9. a1.sources.r1.host = 192.168.233.128
  10. a1.sources.r1.channels = c1
  11. a1.sources.r1.interceptors = i1 i2
  12. a1.sources.r1.interceptors.i1.preserveExisting= false
  13. a1.sources.r1.interceptors.i1.type =timestamp
  14. a1.sources.r1.interceptors.i2.type = host
  15. a1.sources.r1.interceptors.i2.hostHeader =hostname
  16. a1.sources.r1.interceptors.i2.useIP = false
  17. # Describe the sink
  18. a1.sinks.k1.type = hdfs
  19. a1.sinks.k1.channel = c1
  20. a1.sinks.k1.hdfs.path =hdfs://carl:9000/flume/%Y-%m-%d/%H%M
  21. a1.sinks.k1.hdfs.filePrefix = %{hostname}
  22. a1.sinks.k1.hdfs.fileType=DataStream
  23. # Use a channel which buffers events inmemory
  24. a1.channels.c1.type = memory
  25. a1.channels.c1.capacity = 1000
  26. a1.channels.c1.transactionCapacity = 100

增加一个拦截器,类型是host,h将hostname作为文件前缀。

#敲命令

flume-ng agent -c conf -f conf/time_host_case17.conf-n a1 -Dflume.root.logger=INFO,console

启动成功后

打开另一个终端输入,往侦听端口送数据

echo "Time&hostInterceptor1" | nc 192.168.233.128 50000

echo "Time&hostInterceptor2" | nc 192.168.233.128 50000

#在启动源发送的代理终端查看console输出

查看hdfs生成的文件,可以看到host已经生成在header里面,可以根据自定义的格式生成文件夹,数据也都传输过来了。

四、Static Interceptor

Static Interceptor拦截器允许用户增加一个static的header并为所有的事件赋值。范围是所有事件。

官网配置


Property Name


Default


Description


type



The component type name, has to be static


preserveExisting


true


If configured header already exists, should it be preserved - true or false


key


key


Name of header that should be created


value


value


Static value that should be created

其中参数key与value等于类似json格式里的"headers":{" key":" value"}

下面是官网例子

a1.sources=r1

a1.channels=c1

a1.sources.r1.channels= c1

a1.sources.r1.type=seq

a1.sources.r1.interceptors=i1

a1.sources.r1.interceptors.i1.type=static

a1.sources.r1.interceptors.i1.key=datacenter

a1.sources.r1.interceptors.i1.value=NEW_YORK

以及实际的列子

[html] view plain copy

  1. #配置文件:static_case18.conf
  2. # Name the components on this agent
  3. a1.sources = r1
  4. a1.sinks = k1
  5. a1.channels = c1
  6. # Describe/configure the source
  7. a1.sources.r1.type = syslogtcp
  8. a1.sources.r1.port = 50000
  9. a1.sources.r1.host = 192.168.233.128
  10. a1.sources.r1.channels = c1
  11. a1.sources.r1.interceptors = i1
  12. a1.sources.r1.interceptors.i1.type = static
  13. a1.sources.r1.interceptors.i1.key = looklook5
  14. a1.sources.r1.interceptors.i1.value =looklook10
  15. # Describe the sink
  16. a1.sinks.k1.type = logger
  17. # Use a channel which buffers events inmemory
  18. a1.channels.c1.type = memory
  19. a1.channels.c1.capacity = 1000
  20. a1.channels.c1.transactionCapacity = 100
  21. # Bind the source and sink to the channel
  22. a1.sources.r1.channels = c1
  23. a1.sinks.k1.channel = c1

#敲命令

flume-ng agent -c conf -f conf/static_case18.conf-n a1 -Dflume.root.logger=INFO,console

启动成功后

打开另一个终端输入,往侦听端口送数据

echo "statInterceptor1" | nc 192.168.233.128 50000

#在启动源发送的代理终端查看console输出

可以看到输出的header信息里自定义部分正确输出,body部分也输出正确。

五、Regex FilteringInterceptor

Regex Filtering Interceptor拦截器用于过滤事件,筛选出与配置的正则表达式相匹配的事件。可以用于包含事件和排除事件。常用于数据清洗,通过正则表达式把数据过滤出来。

官网配置


Property Name


Default


Description


type



The component type name has to be regex_filter


regex


”.*”


Regular expression for matching against events


excludeEvents


false


If true, regex determines events to exclude, otherwise regex determines events to include.

excludeEvents 为true的时候为排除所有匹配正则表达式的数据。

下面是测试例子

[html] view plain copy

  1. #配置文件:regex_filter_case19.conf
  2. # Name the components on this agent
  3. a1.sources = r1
  4. a1.sinks = k1
  5. a1.channels = c1
  6. # Describe/configure the source
  7. a1.sources.r1.type = syslogtcp
  8. a1.sources.r1.port = 50000
  9. a1.sources.r1.host = 192.168.233.128
  10. a1.sources.r1.channels = c1
  11. a1.sources.r1.interceptors = i1
  12. a1.sources.r1.interceptors.i1.type =regex_filter
  13. a1.sources.r1.interceptors.i1.regex =^[0-9]*$
  14. a1.sources.r1.interceptors.i1.excludeEvents =true
  15. # Describe the sink
  16. a1.sinks.k1.type = logger
  17. # Use a channel which buffers events inmemory
  18. a1.channels.c1.type = memory
  19. a1.channels.c1.capacity = 1000
  20. a1.channels.c1.transactionCapacity = 100
  21. # Bind the source and sink to the channel
  22. a1.sources.r1.channels = c1
  23. a1.sinks.k1.channel = c1

我们对开头字母是数字的数据,全部过滤。

#敲命令

flume-ng agent -c conf -f conf/regex_filter_case19.conf-n a1 -Dflume.root.logger=INFO,console

启动成功后

打开另一个终端输入,往侦听端口送数据

echo "a" | nc192.168.233.128 50000

echo "1222" |nc 192.168.233.128 50000

echo "a222" |nc 192.168.233.128 50000

#在启动源发送的代理终端查看console输出

可以看出1222 被认为是无效的数据没有发出来。

Regex Filtering Interceptor测试成功。

时间: 2024-10-14 12:00:24

Flume NG 学习笔记(八)Interceptors(拦截器)测试的相关文章

Flume NG 学习笔记(五)Sinks和Channel配置

一.HDFS Sink Flume Sink是将事件写入到Hadoop分布式文件系统(HDFS)中.主要是Flume在Hadoop环境中的应用,即Flume采集数据输出到HDFS,适用大数据日志场景. 目前,它支持HDFS的文本和序列文件格式,以及支持两个文件类型的压缩.支持将所用的时间.数据大小.事件的数量为操作参数,对HDFS文件进行关闭(关闭当前文件,并创建一个新的).它还可以对事源的机器名(hostname)及时间属性分离数据,即通过时间戳将数据分布到对应的文件路径. HDFS目录路径可

【Struts2学习笔记-6--】Struts2之拦截器

简单拦截器的使用 拦截器最基本的使用: 拦截方法的拦截器 拦截器的执行顺序 拦截结果的监听器-相当于 后拦截器 执行顺序: 覆盖拦截器栈里特定拦截器的参数 使用拦截器完成-权限控制 主要完成两个功能: 先检查浏览者是否登录: 看登录的用户是否有权限访问: 来自为知笔记(Wiz) 附件列表

SpringMVC 学习笔记(九) 自定义拦截器

<span style="font-family: Arial, Helvetica, sans-serif; background-color: rgb(255, 255, 255);">Spring MVC</span><span style="font-family: 宋体; background-color: rgb(255, 255, 255);">也可以使用拦截器对请求进行拦截处理,用户 可以自定义拦截器来实现特定的功

WebService学习笔记-CXF添加自定义拦截器

使用自定义拦截器实现用户名和密码的校验 客户端:出拦截器 服务器:入拦截器 客户端 AddUserInterceptor.java package com.demo.interceptors; import java.util.List; import javax.xml.namespace.QName; import org.apache.cxf.binding.soap.SoapMessage; import org.apache.cxf.headers.Header; import org

struts2框架学习笔记6:拦截器

拦截器是Struts2实现功能的核心部分 拦截器的创建: 第一种: package interceptor; import com.opensymphony.xwork2.ActionInvocation; import com.opensymphony.xwork2.interceptor.Interceptor; //拦截器的第一种创建方式 //拦截器的生命周期:随项目启动创建,随项目关闭而销毁 public class MyInterceptor implements Intercepto

Flume NG 学习笔记(一)简介

一.简介 Flume是一个分布式.可靠.高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据的简单处理,并写到各种数据接收方的能力. Flume在0.9.x and 1.x之间有较大的架构调整,1.x版本之后的改称Flume NG(next generation),0.9.x的称为Flume OG(originalgeneration). 对于OG版本, Flume NG (1.x.x)的主要变化如下: 1.sources和sinks 使用chann

WebService学习笔记-CXF添加拦截器

Webservice拦截器:在webservice请求过程中,动态操作请求和响应的数据 分类 按照所处的位置分:服务器端拦截器  客户端拦截器 按照消息的方向分:入拦截器  出拦截器 按照定义者分:系统拦截器 自定义拦截器 在服务器端添加拦截器 package com.demo; //注意引入的类一定要正确 import javax.xml.ws.Endpoint; import org.apache.cxf.interceptor.LoggingInInterceptor; import or

Flume NG 学习笔记(四)Source配置

首先.这节水的东西就比较少了,大部分是例子. 一.Avro Source与Thrift Source Avro端口监听并接收来自外部的Avro客户流的事件.当内置Avro 去Sinks另一个配对Flume代理,它就可以创建分层采集的拓扑结构.官网说的比较绕,当然我的翻译也很弱,其实就是flume可以多级代理,然后代理与代理之间用Avro去连接 下面是官网给出的source的配置,加粗的参数是必选,描述就不解释了. Property Name Default Description channel

Flume NG 学习笔记(二)单机与集群Flume 配置

下面的内容基本来自官网:http://flume.apache.org/FlumeUserGuide.html 本文使用的是最新版本的apache flume 1.5,安装完Flume然后测试下Flume是否可以用,在Flume目录下用以下语句测试: bin/flume-ng agent -n$agent_name -c conf -f conf/flume-conf.properties.template 结果如图显示: Ok,我们接下去看下面常用架构.功能配置示例 一.最简单的单一代理Flu