HDFSEventSink目录设置功能实现源码分析

这里以按自定义头部的配置为例(根据某些业务不同写入不同的主目录)
配置:
source:

interceptors = i1
interceptors.i1.type = regex_extractor 
interceptors.i1.regex = /apps/logs/(.*?)/
interceptors.i1.serializers = s1
interceptors.i1.serializers.s1.name = logtypename

sink:

hdfs.path = hdfs://xxxxxx/%{logtypename}/%Y%m%d/%H
hdfs.round = true
hdfs.roundValue = 30
hdfs.roundUnit = minute
hdfs.filePrefix = xxxxx1-

在source中定义了regex_extractor 类型的interceptor,使用org.apache.flume.interceptor.RegexExtractorInterceptor类构建interceptor对象,这个interceptor可以根据一个正则表达式提取字符串,并使用serializers把字符串作为header的值,这header可以在sink中获取对应的值做进一步的操作.

比如写hdfs的sink HDFSEventSink的process方法中

       // reconstruct the path name by substituting place holders
        String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
            timeZone, needRounding, roundUnit , roundValue , useLocalTime );
        String realName = BucketPath.escapeString(fileName, event.getHeaders(),
          timeZone, needRounding, roundUnit , roundValue , useLocalTime );

几个参数项:
useLocalTime 是hdfs.useLocalTimeStamp的设置,默认是false
filePath为hdfs.path的设置,不能为空
fileName为hdfs.filePrefix的设置,默认为FlumeData
rounding(取近似值)的设置相关:

    needRounding = context.getBoolean( "hdfs.round", false ); 
    //hdfs.round的设置,默认为false
    if(needRounding) {
      String unit = context.getString( "hdfs.roundUnit", "second" ); 
      //hdfs.roundUnit,默认为second
      if (unit.equalsIgnoreCase( "hour")) {
        this.roundUnit = Calendar.HOUR_OF_DAY;
      } else if (unit.equalsIgnoreCase("minute" )) {
        this.roundUnit = Calendar.MINUTE;
      } else if (unit.equalsIgnoreCase("second" )){
        this.roundUnit = Calendar.SECOND;
      } else {
        LOG.warn("Rounding unit is not valid, please set one of" +
            "minute, hour, or second. Rounding will be disabled" );
        needRounding = false ;
      }
      this.roundValue = context.getInteger("hdfs.roundValue" , 1); 
      //hdfs.roundValue值的设置,默认为1
      if(roundUnit == Calendar. SECOND || roundUnit == Calendar.MINUTE){ 
      //下面个为检测roundValue的值是否设置合理
        Preconditions.checkArgument(roundValue > 0 && roundValue <= 60,
            "Round value" +
            "must be > 0 and <= 60");
      } else if (roundUnit == Calendar.HOUR_OF_DAY){
        Preconditions.checkArgument(roundValue > 0 && roundValue <= 24,
            "Round value" +
            "must be > 0 and <= 24");
      }
    }

hdfs的具体路径主要由org.apache.flume.formatter.output.BucketPath类的escapeString方法实现
BucketPath类方法分析:
1.escapeString用于替换%{yyy}的设置和%x的设置,需要设置为%x或者%{yyy}的形式,yyy可以是单词字符,和.或者-其调用replaceShorthand

final public static String TAG_REGEX = "\\%(\\w|\\%)|\\%\\{([\\w\\.-]+)\\}";
//正则表达式
  final public static Pattern tagPattern = Pattern.compile(TAG_REGEX);
  ....
  public static String escapeString(String in, Map<String, String> headers,
    TimeZone timeZone, boolean needRounding, int unit, int roundDown,
    boolean useLocalTimeStamp) {
    long ts = clock.currentTimeMillis(); //获取当前的时间戳
    Matcher matcher = tagPattern.matcher(in); 
    //对输入的字符串进行matcher操作,返回Matcher对象,比如这里in可以是hdfs.path的设置
    StringBuffer sb = new StringBuffer();
    while (matcher.find()) { 
    //用于查看字符串中是否有子字符串可以匹配正则表达式,有的话进入循环中
      String replacement = "";
      if (matcher.group(2) != null) { //匹配%{...}的设置
        replacement = headers.get(matcher.group(2)); //获取对应的header值
        if (replacement == null) {
          replacement = "";
        }
      } else { //匹配%x的设置
        Preconditions.checkState(matcher.group(1) != null
            && matcher.group(1).length() == 1,
            "Expected to match single character tag in string " + in);
        char c = matcher.group(1).charAt(0);
        replacement = replaceShorthand(c, headers, timeZone,
            needRounding, unit, roundDown, useLocalTimeStamp, ts); 
            //对字符调用replaceShorthand方法
      }
      replacement = replacement.replaceAll("\\\\", "\\\\\\\\");
      replacement = replacement.replaceAll("\\$", "\\\\\\$");
      matcher.appendReplacement(sb, replacement);
    }
    matcher.appendTail(sb);
    return sb.toString(); //返回字符串
  }

2.replaceShorthand方法用于根据timestamp header的值和round的设置以及路径设置返回对应的日期字符串(比如%Y生成yyyyy(20150310)形式的日期),会调用roundDown方法

  protected static String replaceShorthand(char c, Map<String, String> headers,
      TimeZone timeZone, boolean needRounding, int unit, int roundDown,
      boolean useLocalTimestamp, long ts) {
    String timestampHeader = null;
    try {
      if(!useLocalTimestamp) { //hdfs.useLocalTimeStamp设置为false时(默认)
        timestampHeader = headers.get("timestamp"); //获取timestamp的值
        Preconditions.checkNotNull(timestampHeader, "Expected timestamp in " +
          "the Flume event headers, but it was null"); 
          //检测timestamp header的值是否为空
        ts = Long.valueOf(timestampHeader);
      } else {
        timestampHeader = String.valueOf(ts);
      }
    }
     ...
    if(needRounding){ //如果hdfs.round设置为true(默认为false)
      ts = roundDown(roundDown, unit, ts); //调用roundDown向下取整,生成新的ts
    }
    // It‘s a date
    String formatString = "";
    switch (c) { //对字符串进行匹配,生成日期格式,比如%Y%m%d 最后生成的日期格式为yyyyMMdd
    case ‘%‘:
      return "%";
    case ‘a‘:
      formatString = "EEE";
      break;
  .....
    case ‘z‘:
      formatString = "ZZZ";
      break;
    default:
//      LOG.warn("Unrecognized escape in event format string: %" + c);
      return "";
    }
    SimpleDateFormat format = new SimpleDateFormat(formatString); 
    //根据格式生成SimpleDateFormat对象
    if (timeZone != null) {
      format.setTimeZone(timeZone);
    }
    Date date = new Date(ts); //由ts生成Date对象
    return format.format(date); //根据Date对象生成时间字符串
  }

3.roundDown用于向下取整

  private static long roundDown(int roundDown, int unit, long ts){
    long timestamp = ts;
    if(roundDown <= 0){
      roundDown = 1;
    }
    switch (unit) {
      case Calendar. SECOND:
        timestamp = TimestampRoundDownUtil. roundDownTimeStampSeconds(
            ts, roundDown); //如果hdfs.roundUnit是second调用TimestampRoundDownUtil.roundDownTimeStampSeconds方法
        break; 
....
      default:
        timestamp = ts;
        break;
    }
    return timestamp;
  }
时间: 2024-10-29 04:26:47

HDFSEventSink目录设置功能实现源码分析的相关文章

Qt5.10实现QQ截图工具功能代码源码分析

为了做出一个QQ截图功能的模块,参考了网上某位网友的代码.这里我们分析一下主要代码.源码已经过测试运行,不懂的留言即可.源码地址:Qtjietu20191216.zip1.首先我们一定会想到要对鼠标事件进行改写,包括点击,移动,释放,双击取消选区等功能. ```//重写基类方法 void keyPressEvent(QKeyEvent *event); void paintEvent(QPaintEvent *event); void mousePressEvent(QMouseEvent *e

开源中国 OsChina Android 客户端源码分析(9)下载APK功能

源码中用以下载客户端的类为DownloadService,是一个服务.如果你对android服务不够理解的话,建议先查阅下有关服务的知识点.源码分析如下: 1首先我们先来看下该服务中几个重写的方法: 1.1onCreate()中 首先声明了自定义的绑定器对象,并在自定义的绑定器中添加了几个界面可以访问服务的方法,我们发现在这几个方法中,目前实际用到的start()方法用以开始下载APK,其他的没有用到.获取通知管理器.设置服务为 非前台服务.代码注释中,火蚁表明了不确定性. 其实如果将服务设置为

ABP源码分析二十六:核心框架中的一些其他功能

本文是ABP核心项目源码分析的最后一篇,介绍一些前面遗漏的功能 AbpSession AbpSession: 目前这个和CLR的Session没有什么直接的联系.当然可以自定义的去实现IAbpSession使之与CLR的Session关联 IAbpSession:定义如下图中的四个属性. NullAbpSession:IAbpSession的一个缺省实现,给每个属性都给予null值,无实际作用 ClaimsAbpSession:实现了从ClaimsPrincipal/ClaimsIdentity

【Zookeeper】源码分析目录

Zookeeper源码分析目录如下 1. [Zookeeper]源码分析之序列化 2. [Zookeeper]源码分析之持久化(一)之FileTxnLog 3. [Zookeeper]源码分析之持久化(二)之FileSnap 4. [Zookeeper]源码分析之持久化(三)之FileTxnSnapLog 5. [Zookeeper]源码分析之Watcher机制(一) 6. [Zookeeper]源码分析之Watcher机制(二)之WatchManager 7. [Zookeeper]源码分析之

cocos2d-x 源码分析 总目录

这篇博客用来整理与cocos2d-x相关的工作,只要有新的分析.扩展或者修改,都会更改此文章. 祝大家愉快~ 1.源码分析 1.CCScrollView源码分析 http://blog.csdn.net/u011225840/article/details/30033501 2.CCTableView源码分析 http://blog.csdn.net/u011225840/article/details/30032379 2.cocos2d-x的扩展 1.实现可以循环的CCCycleScroll

jquery2源码分析系列目录

学习jquery的源码对于提高前端的能力很有帮助,下面的系列是我在网上看到的对jquery2的源码的分析.等有时间了好好研究下.我们知道jquery2开始就不支持IE6-8了,从jquery2的源码中可以学到很多w3c新的标准( 如html5,css3,ECMAScript).原文地址是:http://www.cnblogs.com/aaronjs/p/3279314.html 关于1.x.x版的jquery源码分析系列,本博客也转载了一个地址http://www.cnblogs.com/jav

jQuery-1.9.1源码分析系列完毕目录整理

jQuery 1.9.1源码分析已经完毕.目录如下 jQuery-1.9.1源码分析系列(一)整体架构 jQuery-1.9.1源码分析系列(一)整体架构续 jQuery-1.9.1源码分析系列(二)jQuery选择器 jQuery-1.9.1源码分析系列(二)jQuery选择器续1 jQuery-1.9.1源码分析系列(二)jQuery选择器续2——筛选 jQuery-1.9.1源码分析系列(三) Sizzle选择器引擎——词法解析 jQuery-1.9.1源码分析系列(三) Sizzle选择

Spring源码分析专题——目录

Spring源码分析专题 -- 阅读指引 IOC容器 Spring源码分析专题 -- IOC容器启动过程(上篇) Spring源码分析专题 -- IOC容器启动过程(中篇) Spring源码分析专题 -- IOC容器启动过程(下篇) Spring源码分析专题 -- IOC容器依赖注入 SpringMVC Spring源码分析专题 -- SpringMVC IOC容器依赖注入 Spring源码分析专题 -- SpringMVC原理分析 Spring源码分析专题 -- SpringAOP源码分析 S

ABP源码分析三十三:ABP.Web

ABP.Web模块并不复杂,主要完成ABP系统的初始化和一些基础功能的实现. AbpWebApplication : 继承自ASP.Net的HttpApplication类,主要完成下面三件事一,在Application_Start完成AbpBootstrapper的初始化.整个ABP系统的初始化就是通过AbpBootstrapper完成初始化的.二,在Application_BeginRequest设置根据request或cookie中的Culture信息,完成当前工作线程的CurrentCu