hdfs sink的具体写入流程分析

上一篇说了HDFSEventSink的实现,这里根据hdfs sink的配置和调用分析来看下sink中整个hdfs数据写入的过程:
线上hdfs sink的几个重要设置

hdfs.path = hdfs://xxxxx/%{logtypename}/%Y%m%d/%H:
hdfs.rollInterval = 60
hdfs.rollSize = 0 //想让文件只根据实际来roll
hdfs.rollCount = 0
hdfs.batchSize = 2000
hdfs.txnEventMax = 2000
hdfs.fileType = DataStream 
hdfs.writeFormat = Text

这里说下和类相关的hdfs.fileType和hdfs.writeFormat,一个定义了文件流式用的类,一个定义了具体的数据序列化的类.
1)hdfs.fileType 有3个可选项:SequenceFile/DataStream/CompressedStream,DataStream可以想象成hdfs的textfile,默认是SequenceFileType,CompressedStream是用于压缩时设置
2)hdfs.writeFormat 定义了3种序列化方法,TEXT只写Event的body部分,HEADER_AND_TEXT写Event的body和header,AVRO_EVENT是avro的序列化方式

上面的设置,其数据写入流程大概如下:

SinkRunner.process->SinkProcessor.process->HDFSEventSink.process->HDFSEventSink.append->BucketWriter.append->HDFSWriter.append->HDFSDataStream.append->BodyTextEventSerializer.write->java.io.OutputStream.write

简单说下:
在HDFSEventSink中会实例化BucketWriter和HDFSWriter:

        if (bucketWriter == null) {
          HDFSWriter hdfsWriter = writerFactory.getWriter(fileType ); //获取HDFSWriter 对象
....
          bucketWriter = new BucketWriter(rollInterval , rollSize , rollCount ,
              batchSize, context , realPath, realName, inUsePrefix, inUseSuffix,
              suffix, codeC, compType, hdfsWriter, timedRollerPool,
              proxyTicket, sinkCounter , idleTimeout , idleCallback, lookupPath); //根据HDFSWriter 对象获取BucketWriter对象

这里获取HDFSWriter 对象时用到了org.apache.flume.sink.hdfs.HDFSWriterFactory的getWriter方法,根据hdfs.fileType的设置会返回具体的org.apache.flume.sink.hdfs.HDFSWriter实现类的对象
目前只支持3种

  static final String SequenceFileType = "SequenceFile" ;
  static final String DataStreamType = "DataStream" ;
  static final String CompStreamType = "CompressedStream" ;
....
  public HDFSWriter getWriter(String fileType) throws IOException {
    if (fileType.equalsIgnoreCase( SequenceFileType)) { //SequenceFile,sequencefile
      return new HDFSSequenceFile();
    } else if (fileType.equalsIgnoreCase(DataStreamType)) { //DataStream
      return new HDFSDataStream();
    } else if (fileType.equalsIgnoreCase(CompStreamType)) { //CompressedStream
      return new HDFSCompressedDataStream();
    } else {
      throw new IOException("File type " + fileType + " not supported");
    }

BucketWriter可以理解成是对下层数据操作的一个封装,比如数据写入时其实调用了其append方法,append主要有下面几个步骤:
1)首先判断文件是否打开:

    if (! isOpen) {
      if(idleClosed) {
        throw new IOException("This bucket writer was closed due to idling and this handle " +
            "is thus no longer valid");
      }
      open(); //如果没有打开,则调用open->doOpen->HDFSWriter.open方法打开bucketPath (bucketPath是临时写入目录,比如tmp结尾的目录,targetPath是最终目录)
    }

doOpen的主要步骤
a.设置两个文件名:

        bucketPath = filePath + DIRECTORY_DELIMITER + inUsePrefix
          + fullFileName + inUseSuffix;
        targetPath = filePath + DIRECTORY_DELIMITER + fullFileName;

b.调用HDFSWriter.open方法打开bucketPath

         if (codeC == null) {
          // Need to get reference to FS using above config before underlying
          // writer does in order to avoid shutdown hook & IllegalStateExceptions
          fileSystem = new Path(bucketPath ).getFileSystem(config);
          LOG.info("Creating " + bucketPath );
          writer.open( bucketPath);
        } else {
          // need to get reference to FS before writer does to avoid shutdown hook
          fileSystem = new Path(bucketPath ).getFileSystem(config);
          LOG.info("Creating " + bucketPath );
          writer.open( bucketPath, codeC , compType );
        }

c.如果设置了rollInterval ,则执行计划任务调用close方法

    // if time-based rolling is enabled, schedule the roll
    if (rollInterval > 0) {
      Callable<Void> action = new Callable<Void>() {
        public Void call() throws Exception {
          LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed." ,
              bucketPath, rollInterval );
          try {
            close();
          } catch(Throwable t) {
            LOG.error("Unexpected error" , t);
          }
          return null ;
        }
      };
      timedRollFuture = timedRollerPool.schedule(action, rollInterval ,
          TimeUnit. SECONDS);
    }

2)判断文件是否需要翻转(达到hdfs.rollSize或者hdfs.rollCount设置):

    // check if it‘s time to rotate the file
    if (shouldRotate()) {
      close(); //close调用flush+doClose,flush调用doFlush,doFlush调用HDFSWriter.sync方法把数据同步到hdfs中
      open();
    }

其中shouldRotate(基于数量和大小的roll方式):

  private boolean shouldRotate() {
    boolean doRotate = false;
    if (( rollCount > 0) && (rollCount <= eventCounter )) { //hdfs.rollCount大于0并且处理的event的数量大于或等于hdfs.rollCount,doRotate 设置为true
      LOG.debug( "rolling: rollCount: {}, events: {}" , rollCount , eventCounter );
      doRotate = true;
    }
    if (( rollSize > 0) && ( rollSize <= processSize)) { //hdfs.rollCount大于0并且处理的event的数量大于或等于hdfs.rollCount,doRotate 设置为true
      LOG.debug( "rolling: rollSize: {}, bytes: {}" , rollSize , processSize );
      doRotate = true;
    }
    return doRotate;
  }

其中doClose主要的步骤
a.调用HDFSWriter.close方法
b.调用renameBucket方法把tmp文件命名为最终文件:

    if (bucketPath != null && fileSystem != null) {
      renameBucket(); // could block or throw IOException
      fileSystem = null;
    }

其中renameBucket:

fileSystem.rename(srcPath, dstPath)

3)调用HDFSWriter.append方法写入Event

writer.append(event);

4) 更新计数器

    // update statistics
    processSize += event.getBody(). length;
    eventCounter++;
    batchCounter++;

5)判断是否需要flush(达到hdfs.batchSize的设置),batch写入数据到hdfs

    if (batchCounter == batchSize) {
      flush();
    }

Event写入时BucketWriter的append方法调用org.apache.flume.sink.hdfs.HDFSWriter实现类的append方法,比如这里的HDFSDataStream类,HDFSDataStream的主要方法:
configure用于设置serializer:

  public void configure(Context context) {
    serializerType = context.getString( "serializer", "TEXT" ); //默认序列化方式为TEXT
    useRawLocalFileSystem = context.getBoolean( "hdfs.useRawLocalFileSystem",
        false);
    serializerContext =
        new Context(context.getSubProperties(EventSerializer.CTX_PREFIX));
    logger.info( "Serializer = " + serializerType + ", UseRawLocalFileSystem = "
        + useRawLocalFileSystem);
  }
append方法用于Event的写入,调用EventSerializer.write方法:
  public void append(Event e) throws IOException {
    // shun flumeformatter...
    serializer.write(e); //调用EventSerializer.write方法写入Event
  }

open方法主要步骤:
1)根据hdfs.append.support的设置(默认为false)打开或者新建文件

    boolean appending = false;
    if (conf.getBoolean( "hdfs.append.support", false ) == true && hdfs.isFile
            (dstPath)) { //默认hdfs.append.support为false
      outStream = hdfs.append(dstPath);
      appending = true;
    } else {
      outStream = hdfs.create(dstPath); //如果不支持append,则创建文件
    }

2)使用EventSerializerFactory.getInstance方法创建EventSerializer的对象

    serializer = EventSerializerFactory.getInstance(
        serializerType, serializerContext , outStream ); //实例化EventSerializer对象

3)如果EventSerializer对象支持reopen,并且hdfs.append.support设置为true时会抛出异常

    if (appending && ! serializer.supportsReopen()) {
      outStream.close();
      serializer = null;
      throw new IOException("serializer (" + serializerType +
          ") does not support append");
    }

4)调用文件打开或者reopen之后的操作

    if (appending) {
      serializer.afterReopen();
    } else {
      serializer.afterCreate();
    }
  }

这里hdfs.writeFormat的3种设置和对应的类:

  TEXT(BodyTextEventSerializer.Builder. class), //支持reopen
  HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder. class), //支持reopen
  AVRO_EVENT(FlumeEventAvroEventSerializer.Builder. class), // 不支持reopen

默认设置为TEXT,即BodyTextEventSerializer类:

  private BodyTextEventSerializer(OutputStream out, Context ctx) { //构造方法
    this. appendNewline = ctx.getBoolean(APPEND_NEWLINE , APPEND_NEWLINE_DFLT ); //默认为true
    this. out = out;
  }
....
  public void write(Event e) throws IOException { //write方法
    out.write(e.getBody()); //java.io.OutputStream.write,只写Event的body
    if (appendNewline) { //每一行之后增加一个回车
      out.write(‘\n‘);
    }
时间: 2024-10-14 01:30:09

hdfs sink的具体写入流程分析的相关文章

修改Flume-NG的hdfs sink解析时间戳源码大幅提高写入性能

转自:http://www.cnblogs.com/lxf20061900/p/4014281.html Flume-NG中的hdfs sink的路径名(对应参数"hdfs.path",不允许为空)以及文件前缀(对应参数"hdfs.filePrefix")支持正则解析时间戳自动按时间创建目录及文件前缀. 在实际使用中发现Flume内置的基于正则的解析方式非常耗时,有非常大的提升空间.如果你不需要配置按时间戳解析时间,那这篇文章对你用处不大,hdfs sink对应的解

修改Flume-NG的hdfs sink解析时间戳源码部分大幅提高写入性能

Flume-NG中的hdfs sink的路径名(对应参数"hdfs.path",不允许为空)以及文件前缀(对应参数"hdfs.filePrefix")支持正则解析时间戳自动按时间创建目录及文件前缀. 在实际使用中发现Flume内置的基于正则的解析方式非常耗时,有非常大的提升空间.如果你不需要配置按时间戳解析时间,那这篇文章对你用处不大,hdfs sink对应的解析时间戳的代码位于org.apache.flume.sink.hdfs.HDFSEventSink的pro

大数据技术之_10_Kafka学习_Kafka概述+Kafka集群部署+Kafka工作流程分析+Kafka API实战+Kafka Producer拦截器+Kafka Streams

第1章 Kafka概述1.1 消息队列1.2 为什么需要消息队列1.3 什么是Kafka1.4 Kafka架构第2章 Kafka集群部署2.1 环境准备2.1.1 集群规划2.1.2 jar包下载2.2 Kafka集群部署2.3 Kafka命令行操作第3章 Kafka工作流程分析3.1 Kafka 生产过程分析3.1.1 写入方式3.1.2 分区(Partition)3.1.3 副本(Replication)3.1.4 写入流程3.2 Broker 保存消息3.2.1 存储方式3.2.2 存储策

spark 启动job的流程分析

从WordCount开始分析 编写一个例子程序 编写一个从HDFS中读取并计算wordcount的例子程序: packageorg.apache.spark.examples importorg.apache.spark.SparkContext importorg.apache.spark.SparkContext._ objectWordCount{ defmain(args : Array[String]) { valsc = newSparkContext(args(0),"wordco

compact处理流程分析

compact处理流程分析 compact的处理与split同样.由client端与flush时检查发起. 针对compact另一个在rs生成时生成的CompactionChecker线程定期去检查是否须要做compact操作 线程运行的间隔时间通过hbase.server.thread.wakefrequency配置,默觉得10*1000ms CompactionChecker线程主要作用: 生成通过hbase.server.thread.wakefrequency(10*1000ms)配置的

hbase数据写入流程深度解析

2019/3/28 星期四hbase数据写入流程深度解析在看此链接之前,可以写查看 hbase读写请求详细解释 中的写请求流程 https://blog.51cto.com/12445535/2356085 简介:hbase设置之初就是为了应对大量的写多读少的应用,他出色的写性能,在一个100台RS的集群可以轻松地支撑每天10T的写入量.hbase的写数据流程大体分为3部分1.客户端的写入流程2.服务端的写入流程3.wal的工作原理 我们先回顾一下hbase写数据流程写请求处理过程小结1 cli

Solr4.8.0源码分析(5)之查询流程分析总述

Solr4.8.0源码分析(5)之查询流程分析总述 前面已经写到,solr查询是通过http发送命令,solr servlet接受并进行处理.所以solr的查询流程从SolrDispatchsFilter的dofilter开始.dofilter包含了对http的各个请求的操作.Solr的查询方式有很多,比如q,fq等,本章只关注select和q.页面下发的查询请求如下:http://localhost:8080/solr/test/select?q=code%3A%E8%BE%BD*+AND+l

nova挂载cinder卷流程分析

Nova挂载cinder卷流程分析 1. nova通过命令nova volume-attach server volume device-name或者http请求 Req:POST /v2/{tenant-id}/servers/{server-id}/os-volume_attachments' Body:{'volumeAttachment': {'device': '/dev/vdb', 'volumeId': '951be889-b794-4723-9ac9-efde61cacf3a'}

region split流程分析

region split流程分析 splitregion的发起主要通过client端调用regionserver.splitRegion或memstore.flsuh时检查并发起. Client通过rpc调用regionserver的splitRegion方法 client端通过HBaseAdmin.split传入regionname与splitpoint(切分的rowkey,能够不传入), 通过meta得到此region所在的server,发起rpc请求,调用HRegionServer.spl