flume HDFSEventSink实现分析

HDFSEventSink用于把数据从channel中拿出来(主动pull的形式)然后放到hdfs中,HDFSEventSink在启动时会启动两个线程池callTimeoutPool 和timedRollerPool ,callTimeoutPool 用于运行append/flush等操作hdfs的task(通过callWithTimeout方法调用,并实现timeout功能),用于运行翻转文件的计划任务timedRollerPool:

    callTimeoutPool = Executors.newFixedThreadPool(threadsPoolSize,
            new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
    timedRollerPool = Executors.newScheduledThreadPool(rollTimerPoolSize,
            new ThreadFactoryBuilder().setNameFormat(rollerName).build());

channel到sink的操作最终调用了sink的process方法(由SinkProcessor实现类调用),比如HDFSEventSink的process方法,每个process方法中都是一个事务,用来提供原子性操作,process方法调用Channel的take方法从Channel中取出Event,每个transaction中最多的Event数量由hdfs.batchSize设定,默认是100,对每一个Event有如下操作:
1.获取文件的完整路径和名称lookupPath
2.声明一个BucketWriter对象和HDFSWriter 对象,HDFSWriter由hdfs.fileType设定,负责实际数据的写入,BucketWriter可以理解成对hdfs文件和写入方法的封装,每个lookupPath对应一个BucketWriter对象,对应关系写入到sfWriters中(这里sfWriters是一个WriterLinkedHashMap对象,WriterLinkedHashMap是LinkedHashMap的子类(private static class WriterLinkedHashMap  extends LinkedHashMap<String, BucketWriter>),用来存放文件到BucketWriter的对应关系,在start方法中初始化:
this.sfWriters = new WriterLinkedHashMap( maxOpenFiles);
长度为hdfs.maxOpenFiles的设置,默认为5000,这个代表最多能打开的文件数量)
3.调用BucketWriter的append方法写入数据
4.当操作的Event数量达到hdfs.batchSize设定后,循环调用每个BucketWriter对象的flush方法,并提交transaction
5.如果出现异常则回滚事务
6.最后关闭transaction
process方法最后返回的是代表Sink状态的Status对象(BACKOFF或者READY),这个可以用于判断Sink的健康状态,比如failover的SinkProcessor就根据这个来判断Sink是否可以提供服务

主要方法分析:
1.构造函数声明一个HDFSWriterFactory对象
在后面会使用HDFSWriterFactory的getWriter方法会根据file类型返回对应的HDFSWriter实现类
2.configure
1)通过configure方法会根据Context设置各种参数项
比如:

inUseSuffix = context.getString( "hdfs.inUseSuffix", defaultInUseSuffix ); //正在写入的文件的后缀名,默认为".tmp"
rollInterval = context.getLong( "hdfs.rollInterval", defaultRollInterval ); //文件翻转时间,默认30
rollSize = context.getLong( "hdfs.rollSize", defaultRollSize ); //文件翻转大小,默认1024
rollCount = context.getLong( "hdfs.rollCount", defaultRollCount ); //默认为10
batchSize = context.getLong( "hdfs.batchSize", defaultBatchSize ); //默认为100
idleTimeout = context.getInteger( "hdfs.idleTimeout", 0); //默认为
String codecName = context.getString( "hdfs.codeC"); //压缩格式
fileType = context.getString( "hdfs.fileType", defaultFileType ); //默认为HDFSWriterFactory.SequenceFileType,即sequencefile
maxOpenFiles = context.getInteger( "hdfs.maxOpenFiles", defaultMaxOpenFiles ); //默认为5000
callTimeout = context.getLong( "hdfs.callTimeout", defaultCallTimeout ); //BucketWriter超时时间,默认为10000
threadsPoolSize = context.getInteger( "hdfs.threadsPoolSize",
        defaultThreadPoolSize); //操作append/open/close/flush任务的线程池大小,默认为10
rollTimerPoolSize = context.getInteger( "hdfs.rollTimerPoolSize",
        defaultRollTimerPoolSize); //文件翻转计时器线程池大小,默认为1
tryCount = context.getInteger( "hdfs.closeTries", defaultTryCount ); //尝试close文件的此数(大于0)
retryInterval = context.getLong( "hdfs.retryInterval", defaultRetryInterval); //间隔时间(大于0)

2)获取压缩格式

    if (codecName == null) { //如果hdfs.codeC没有设置
      codeC = null; //则没有压缩功能
      compType = CompressionType. NONE; 
    } else {
      codeC = getCodec(codecName);  //调用getCodec方法获取压缩格式
      // TODO : set proper compression type
      compType = CompressionType. BLOCK; //压缩类型为BLOCK类型
    }

3)hdfs文件翻转相关设置,在实例化BucketWriter对象时会用到

   needRounding = context.getBoolean( "hdfs.round", false );
    if(needRounding) {
      String unit = context.getString( "hdfs.roundUnit", "second" );
      if (unit.equalsIgnoreCase( "hour")) {
        this.roundUnit = Calendar.HOUR_OF_DAY;
      } else if (unit.equalsIgnoreCase("minute" )) {
        this.roundUnit = Calendar.MINUTE;
      } else if (unit.equalsIgnoreCase("second" )){
        this.roundUnit = Calendar.SECOND;
      } else {
        LOG.warn("Rounding unit is not valid, please set one of" +
            "minute, hour, or second. Rounding will be disabled" );
        needRounding = false ;
      }
      this. roundValue = context.getInteger("hdfs.roundValue" , 1);
      if(roundUnit == Calendar. SECOND || roundUnit == Calendar.MINUTE){
        Preconditions. checkArgument(roundValue > 0 && roundValue <= 60,
            "Round value" +
            "must be > 0 and <= 60");
      } else if (roundUnit == Calendar.HOUR_OF_DAY){
        Preconditions. checkArgument(roundValue > 0 && roundValue <= 24,
            "Round value" +
            "must be > 0 and <= 24");
      }
    }

4)最后初始化一个SinkCounter对象用来记录sink的性能数据

    if (sinkCounter == null) {
      sinkCounter = new SinkCounter(getName());
    }

3.start方法用来启动线程池等

  public void start() {
    String timeoutName = "hdfs-" + getName() + "-call-runner-%d" ;
    callTimeoutPool = Executors. newFixedThreadPool(threadsPoolSize,
            new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
    String rollerName = "hdfs-" + getName() + "-roll-timer-%d" ;
    timedRollerPool = Executors. newScheduledThreadPool(rollTimerPoolSize,
            new ThreadFactoryBuilder().setNameFormat(rollerName).build());
    this. sfWriters = new WriterLinkedHashMap(maxOpenFiles); //初始化WriterLinkedHashMap对象
    sinkCounter.start();
    super.start();
  }

4.process方法,从channel中pull出数据并发送到hdfs中(每一个transaction中最多可以有batchSize条Event),获取对应的bucket,序列化数据并写入hdfs文件

  public Status process() throws EventDeliveryException {
    Channel channel = getChannel(); //获取对应的channel
    Transaction transaction = channel.getTransaction(); //获取Transaction 对象,提供事务功能
    List<BucketWriter> writers = Lists. newArrayList();
    transaction.begin(); //事务开始
    try {
      int txnEventCount = 0;
      for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {//这里一个transaction存放的数据最多由hdfs.batchSize指定
        Event event = channel.take(); //循环调用Channel的take方法获取Event
        if (event == null) {
          break;
        }
        // reconstruct the path name by substituting place holders
        String realPath = BucketPath. escapeString(filePath, event.getHeaders(),
            timeZone, needRounding, roundUnit , roundValue ); //设置文件路径
        String realName = BucketPath. escapeString(fileName, event.getHeaders(),
          timeZone, needRounding, roundUnit , roundValue ); //设置文件名称
        String lookupPath = realPath + DIRECTORY_DELIMITER + realName; //完整的文件名称
        BucketWriter bucketWriter = sfWriters.get(lookupPath);  //根据文件获取对应的BucketWriter对象
        // we haven‘t seen this file yet, so open it and cache the handle
        if (bucketWriter == null) {
          HDFSWriter hdfsWriter = writerFactory.getWriter(fileType ); //根据文件类型获取HDFSWriter 对象
          WriterCallback idleCallback = null;
          if(idleTimeout != 0) {
            idleCallback = new WriterCallback() {
              @Override
              public void run(String bucketPath) {
                sfWriters.remove(bucketPath); //回调方法
              }
            };
          }
          bucketWriter = new BucketWriter(rollInterval , rollSize , rollCount ,
              batchSize, context , realPath, realName, inUsePrefix, inUseSuffix,
              suffix, codeC, compType, hdfsWriter, timedRollerPool,
              proxyTicket, sinkCounter , idleTimeout , idleCallback, lookupPath); //实例化BucketWriter
          sfWriters.put(lookupPath, bucketWriter); //这里sfWriters是一个WriterLinkedHashMap对象,WriterLinkedHashMap是LinkedHashMap的子类,用来存放文件到BucketWriter的对应关系,在start方法中初始化:this .sfWriters = new WriterLinkedHashMap(maxOpenFiles);大小为hdfs.maxOpenFiles的设置,默认为5000
        }
        // track the buckets getting written in this transaction
        if (!writers.contains(bucketWriter)) { //List<BucketWriter> writers = Lists.newArrayList();
          writers.add(bucketWriter);
        }
        // Write the data to HDFS
        append(bucketWriter, event); //调用append方法写入Event数据
      }
      if (txnEventCount == 0) {
        sinkCounter.incrementBatchEmptyCount();
      } else if (txnEventCount == batchSize ) {
        sinkCounter.incrementBatchCompleteCount();
      } else {
        sinkCounter.incrementBatchUnderflowCount();
      }
      // flush all pending buckets before committing the transaction
      for (BucketWriter bucketWriter : writers) {
        flush(bucketWriter); //调用flush方法
      }
      transaction.commit(); //事务提交
      if (txnEventCount < 1) {
        return Status.BACKOFF ;
      } else {
        sinkCounter.addToEventDrainSuccessCount(txnEventCount);
        return Status.READY ;
      }
    } catch (IOException eIO) { //如果异常则回滚事务
      transaction.rollback();
      LOG.warn( "HDFS IO error", eIO);
      return Status. BACKOFF;
    } catch (Throwable th) {
      transaction.rollback();
      LOG.error( "process failed", th);
      if (th instanceof Error) {
        throw (Error) th;
      } else {
        throw new EventDeliveryException(th);
      }
    } finally {
      transaction.close();
    }
  }

5.同时定义了几个操作BucketWriter的方法append,flush,close

 1) private void append(final BucketWriter bucketWriter, final Event event)
          throws IOException, InterruptedException {
    // Write the data to HDFS
    callWithTimeout(new Callable<Void>() { //注意这里使用callWithTimeout提供了调用的超时功能
      public Void call() throws Exception {
        bucketWriter.append(event); //调用BucketWriter.append方法写入Event数据
        return null;
      }
    });
  }
2)flush-->BucketWriter.flush()
3) close-->BucketWriter.close()
时间: 2024-10-18 12:22:00

flume HDFSEventSink实现分析的相关文章

Flume NG源代码分析(二)支持执行时动态改动配置的配置模块

在上一篇中讲了Flume NG配置模块主要的接口的类,PropertiesConfigurationProvider提供了基于properties配置文件的静态配置的能力,这篇细说一下PollingPropertiesFileConfigurationProvider提供的执行时动态改动配置并生效的能力. 要实现动态改动配置文件并生效,主要有两个待实现的功能 1. 观察配置文件是否改动 2. 假设改动,将改动的内容通知给观察者 对于第一点,监控配置文件是否改动,Flume NG定义了一个File

Flume结构简要分析

一.Flume介绍 Flume是一个分布式.可靠.和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力. 设计目标: (1) 可靠性 当节点出现故障时,日志能够被传送到其他节点上而不会丢失.Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end(收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除:如果数据发送失败,可以重新发送.),Store on fa

Flume+Kafka+SparkStreaming+Hbase+可视化(一)

一.前置准备: Linux命令基础 Scala.Python其中一门 Hadoop.Spark.Flume.Kafka.Hbase基础知识 二.分布式日志收集框架Flume 业务现状分析:服务器.web服务产生的大量日志,怎么使用,怎么将大量日志导入到集群 1.shell脚本批量,再传到Hdfs:实效性不高,容错率低,网络/磁盘IO,监控 2.Flume: Flume:关键在于写配置文件 1)配置 agent 2)配置 Source 3)配置 Channel 4)配置 Sink 1-netcat

从金融行业转型大数据,一路学习点滴的分享!

大数据学习之路,很漫长,但是请放心,Java 转大数据很轻松,零基础学大数据也很轻松,我会陪着你们一起搞起来,干就完事了. 本篇文章有点长,都是我的真实感受.分为:开始.转折.成长.New Flag.关于此号.推荐.总结七个部分. 开始 我大学学的是软件工程专业,2018 年毕业.据统计,近几年毕业生中平均薪资最高的专业就是软件工程!还好我没有拉低平均水平,凭借自己努力和运气毕业去了一家金融大厂(杭州)实习. 我所在的部门是最挣钱的部门--资管,我们的系统覆盖了全部的金融业务,除了保险业务.每天

python 日志处理练习

1 日志采集概述 1 日志采集流程 生产过程中会产生大量的系统日志,应用程序日志,安全日志等等日志,通过对日志的分析可以了解服务器的负载,健康状况,可以分析客户的分布情况,客户的行为,甚至于这些分析可以做出预测一般采集流程日志产出---采集 (logstash,flume,scribe) --- 存储---分析---存储(数据库.NoSQL)---可视化 2 半结构化数据 日志是半结构化数据,是有组织的,有格式的数据,可以分割成行和列,就可以当做表理解和处理了,当然也可以分析里面的数据 文本分析

HDFSEventSink目录设置功能实现源码分析

这里以按自定义头部的配置为例(根据某些业务不同写入不同的主目录)配置:source: interceptors = i1 interceptors.i1.type = regex_extractor  interceptors.i1.regex = /apps/logs/(.*?)/ interceptors.i1.serializers = s1 interceptors.i1.serializers.s1.name = logtypename sink: hdfs.path = hdfs:/

Flume NG源码分析(五)使用ThriftSource通过RPC方式收集日志

上一篇说了利用ExecSource从本地日志文件异步的收集日志,这篇说说采用RPC方式同步收集日志的方式.笔者对Thrift比较熟悉,所以用ThriftSource来介绍RPC的日志收集方式. 整体的结构图如下: 1. ThriftSource包含了一个Thrift Server,以及一个Thrift Service服务的实现.这里的Thrift Service是由ThriftSourceProtocol定义 2. 应用程序调用Thrift Service的客户端,以RPC的方式将日志发送到Th

【Java】【Flume】Flume-NG启动过程源码分析(一)

从bin/flume 这个shell脚本可以看到Flume的起始于org.apache.flume.node.Application类,这是flume的main函数所在. main方法首先会先解析shell命令,如果指定的配置文件不存在就甩出异常. 根据命令中含有"no-reload-conf"参数,决定采用那种加载配置文件方式:一.没有此参数,会动态加载配置文件,默认每30秒加载一次配置文件,因此可以动态修改配置文件:二.有此参数,则只在启动时加载一次配置文件.实现动态加载功能采用了

【Java】【Flume】Flume-NG启动过程源码分析(二)

本节分析配置文件的解析,即PollingPropertiesFileConfigurationProvider.FileWatcherRunnable.run中的eventBus.post(getConfiguration()).分析getConfiguration()方法.此方法在AbstractConfigurationProvider类中实现了,并且这个类也初始化了三大组件的工厂类:this.sourceFactory = new DefaultSourceFactory();this.s