RegexExtractorInterceptor实现分析

RegexExtractorInterceptor作为一个Interceptor实现类可以根据一个正则表达式匹配event body来提取字符串,并使用serializers把字符串作为header的值

实例:
以如下的命令使用execsource收集日志的时候,可以根据文件的名称设置不同的header,进行不同的操作

#!/bin/sh
filename=$1
hostname=`hostname -s`
tail -F $1 | awk -v filename=$filename -v hostname=$hostname ‘{print filename":"hostname":"$0}‘

source的配置:

xxxx.sources.kafka1.interceptors = i1
xxxx.sources.kafka1.interceptors.i1.type = regex_extractor
xxxx.sources.kafka1.interceptors.i1.regex = /apps/logs/(.*?)/
xxxx.sources.kafka1.interceptors.i1.serializers = s1
xxxx.sources.kafka1.interceptors.i1.serializers.s1.name = logtypename
xxxx.sources.kafka1.selector.type = multiplexing
xxxx.sources.kafka1.selector.header = logtypename
xxxx.sources.kafka1.selector.mapping.nginx = nginx-channel

几个参数项:
regex 正则表达式

serializers  定义匹配组(正则匹配之后的值作为header的值,比如如果
Event body为1:2:3.4foobar5,regex为(\\d):(\\d):(\\d),serializers 
设置为a b c,serializers.a.name 为one,serializers.b.name为two,serializers.c.name
为three,那么one->1,two->2,three->3.4foobar5,注意可以不必匹配所有的组)

serializers.x.name 作为event的header

首先看内部类Builder:
1)configureSerializers方法用来生成配置项,主要是操作List<NameAndSerializer>,静态内部类NameAndSerializer是一个包含了headerName和RegexExtractorInterceptorSerializer属性的容器,这里每一个serializers.x.name的配置对应一个RegexExtractorInterceptorSerializer对象,RegexExtractorInterceptorSerializer默认是org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer,即对参数不做任何处理直接返回:

private List<NameAndSerializer> serializerList;
private final RegexExtractorInterceptorSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer();
....
     private void configureSerializers(Context context) {
      String serializerListStr = context.getString( SERIALIZERS ); //解析serializers的配置
      Preconditions. checkArgument(!StringUtils. isEmpty(serializerListStr),
          "Must supply at least one name and serializer" );
      String[] serializerNames = serializerListStr.split( "\\s+" ); //按空格分隔
      Context serializerContexts =
          new Context(context.getSubProperties( SERIALIZERS + "."));
      serializerList = Lists. newArrayListWithCapacity(serializerNames.length);
      for(String serializerName : serializerNames) { //对每一个serializers里面的设置进行操作
        Context serializerContext = new Context(
            serializerContexts.getSubProperties(serializerName + "." ));
        String type = serializerContext.getString( "type" , "DEFAULT" ); //获取serializers.x.type的设置,默认值是DEFAULT,即org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer
        String name = serializerContext.getString( "name" ); ////获取serializers.x.name的设置
        Preconditions. checkArgument(!StringUtils. isEmpty(name),
            "Supplied name cannot be empty." );
        if ("DEFAULT" .equals(type)) {
          serializerList .add(new NameAndSerializer(name, defaultSerializer)); //生成NameAndSerializer对象,并加入到List<NameAndSerializer>中,静态内部类NameAndSerializer是一个包含了headerName和RegexExtractorInterceptorSerializer属性的容器,这里每一个serializers.x.name的配置对应一个RegexExtractorInterceptorSerializer对象
        } else {
          serializerList .add(new NameAndSerializer(name, getCustomSerializer(
              type, serializerContext))); //getCustomSerializer用于根据type的设置返回RegexExtractorInterceptorSerializer对象
        }
      }
    }

这里org.apache.flume.interceptor.RegexExtractorInterceptorSerializer 接口类,定义了一个抽象方法serialize,实现类包括:

org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer 
//直接返回,不做另外的操作(默认的类)
org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer 
//使用指定的formatting pattern把传入的值转换为milliseconds

2)build方法用于返回一个RegexExtractorInterceptor对象

return new RegexExtractorInterceptor( regex , serializerList );

RegexExtractorInterceptor的主要方法intercept:

 static final String REGEX = "regex" ;
  static final String SERIALIZERS = "serializers" ;
...
  public Event intercept(Event event) {
    Matcher matcher = regex.matcher(
        new String(event.getBody(), Charsets.UTF_8)); //对Event的body进行matcher操作
    Map<String, String> headers = event.getHeaders(); // 获取Event的header键值对
    if (matcher.find()) { //检测字符串中的子字符串是否可以匹配到正则
      for ( int group = 0, count = matcher.groupCount(); group < count; group++) {
        int groupIndex = group + 1; // 匹配的index从1开始
        if (groupIndex > serializers .size()) { //判断index是否大于serializers列表(configure产生的List<NameAndSerializer>)的长度
....
          break;
        }
        NameAndSerializer serializer = serializers.get(group); //从serializers中获取对应的NameAndSerializer 对象
....
        headers.put(serializer. headerName,
            serializer. serializer.serialize(matcher.group(groupIndex))); // 向Event中插入headerName和对应的value,这里headerName即为serializers.x.name的设置,value会通过RegexExtractorInterceptorSerializer进行处理
      }
    }
    return event;
  }
时间: 2024-10-22 15:15:40

RegexExtractorInterceptor实现分析的相关文章

HDFSEventSink目录设置功能实现源码分析

这里以按自定义头部的配置为例(根据某些业务不同写入不同的主目录)配置:source: interceptors = i1 interceptors.i1.type = regex_extractor  interceptors.i1.regex = /apps/logs/(.*?)/ interceptors.i1.serializers = s1 interceptors.i1.serializers.s1.name = logtypename sink: hdfs.path = hdfs:/

爱奇艺、优酷、腾讯视频竞品分析报告2016(一)

1 背景 1.1 行业背景 1.1.1 移动端网民规模过半,使用时长份额超PC端 2016年1月22日,中国互联网络信息中心 (CNNIC)发布第37次<中国互联网络发展状况统计报告>,报告显示,网民的上网设备正在向手机端集中,手机成为拉动网民规模增长的主要因素.截至2015年12月,我国手机网民规模达6.20亿,有90.1%的网民通过手机上网. 图 1  2013Q1~2015Q3在线视频移动端和PC端有效使用时长份额对比 根据艾瑞网民行为监测系统iUserTracker及mUserTrac

Tomcat启动分析(我们为什么要配置CATALINA_HOME环境变量)

原文:http://www.cnblogs.com/heshan664754022/archive/2013/03/27/2984357.html Tomcat启动分析(我们为什么要配置CATALINA_HOME环境变量) 用文本编辑工具打开用于启动Tomcat的批处理文件startup.bat,仔细阅读.在这个文件中,首先判断CATALINA_HOME环境变量是否为空,如果为空,就将当前目录设为CATALINA_HOME的值.接着判断当前目录下是否存在bin\catalina.bat,如果文件

C# 最佳工具集合: IDE 、分析、自动化工具等

C#是企业中广泛使用的编程语言,特别是那些依赖微软的程序语言.如果您使用C#构建应用程序,则最有可能使用Visual Studio,并且已经寻找了一些扩展来对您的开发进行管理.但是,这个工具列表可能会改变您编写C#代码的方式. C#编程的最佳工具有以下几类: IDE VS扩展 编译器.编辑器和序列化 反编译和代码转换工具 构建自动化和合并工具 版本控制 测试工具和VS扩展 性能分析 APM 部署自动化 容器 使用上面的链接直接跳转到特定工具,或继续阅读以浏览完整列表.

秒杀系统架构分析与实战

0 系列目录 秒杀系统架构 秒杀系统架构分析与实战 1 秒杀业务分析 正常电子商务流程 (1)查询商品:(2)创建订单:(3)扣减库存:(4)更新订单:(5)付款:(6)卖家发货 秒杀业务的特性 (1)低廉价格:(2)大幅推广:(3)瞬时售空:(4)一般是定时上架:(5)时间短.瞬时并发量高: 2 秒杀技术挑战 假设某网站秒杀活动只推出一件商品,预计会吸引1万人参加活动,也就说最大并发请求数是10000,秒杀系统需要面对的技术挑战有: 对现有网站业务造成冲击 秒杀活动只是网站营销的一个附加活动,

Openfire分析之二:主干程序分析

引言 宇宙大爆炸,于是开始了万物生衍,从一个连人渣都还没有的时代,一步步进化到如今的花花世界. 然而沧海桑田,一百多亿年过去了-. 好复杂,但程序就简单多了,main()函数运行,敲个回车,一行Hello World就出来了,所以没事多敲敲回车,可以练手感-. 一.程序入口 Java的程序入口是main方法,Openfire也不例外.可以全局检索一下"void main",可以看到,Openfire的main函数有两个: (1)org.jivesoftware.openfire.lau

gecode FunctionBranch 源码分析

从名字上看,这个类的核心就在于function, 那么看代码: /// Function to call SharedData<std::function<void(Space& home)>> f; /// Call function just once bool done; 的确是定义了一个function,然后一个状态,猜测是调用了function之后会设置为true,往下看代码: ExecStatus FunctionBranch::commit(Space&

爬虫难点分析

难点分析 1.网站采取反爬策略 2.网站模板定期变动 3.网站url抓取失败 4.网站频繁抓取ip被封 1.网站采取反爬策略 >网站默认对方正常访问的方式是浏览器访问而不是代码访问,为了防止对方使用大规模服务器进行爬虫从而导致自身服务器承受过大的压力,通常网站会采取反爬策略 根据这一特性,我们用代码模拟实现浏览器访问 2.网站模板定期变动-解决方案 >标签变动,比如<div>变动,那么我们不能把代码给写死了 (1)不同配置文件配置不同网站的模板规则 (2)数据库存储不同网站的模板规

R语言学习-词频分析

概念 1.语料库-Corpus 语料库是我们要分析的所有文档的集合,就是需要为哪些文档来做词频 2.中文分词-Chinese Word Segmentation 指的是将一个汉字序列切分成一个一个单独的词语. 3.停用词-Stop Words 数据处理的时候,自动过滤掉某些字或词,包括泛滥的词如Web.网站等,又如语气助词如的.地.得等. 需要加载的包 1.tm包 安装方式:install.packages("tm") 语料库: Corpus(x,readerControl) x-语料