【Flume】flume中sink到hdfs,文件系统频繁产生文件,文件滚动配置不起作用?

本人在测试hdfs的sink,发现sink端的文件滚动配置项起不到任何作用,配置如下:

a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.path=hdfs://192.168.11.177:9000/flume/events/%Y/%m/%d/%H/%M
a1.sinks.k1.hdfs.filePrefix=XXX
a1.sinks.k1.hdfs.rollInterval=60
a1.sinks.k1.hdfs.rollSize=0
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=0

这里配置的是60秒,文件滚动一次,也就每隔60秒,会新产生一个文件【前提,flume的source端有数据来】

但是当我启动flume的时候,运行十几秒,不断写入数据,发现hdfs端频繁的产生文件,每隔几秒就有新文件产生

而且在flume的日志输出可以频繁看到这句:

[WARN] Block Under-replication detected. Rotating file.

只要有这句,就会产生一个新的文件

意思就是检测到复制块正在滚动文件,结合源码看下:

private boolean shouldRotate() {
    boolean doRotate = false;

    if (writer.isUnderReplicated()) {
      this.isUnderReplicated = true;
      doRotate = true;
    } else {
      this.isUnderReplicated = false;
    }

    if ((rollCount > 0) && (rollCount <= eventCounter)) {
      LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter);
      doRotate = true;
    }

    if ((rollSize > 0) && (rollSize <= processSize)) {
      LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize);
      doRotate = true;
    }

    return doRotate;
  }

这是判断是否滚动文件,但是这里面的第一判断条件是判断是否当前的HDFSWriter正在复制块

public boolean isUnderReplicated() {
    try {
      int numBlocks = getNumCurrentReplicas();
      if (numBlocks == -1) {
        return false;
      }
      int desiredBlocks;
      if (configuredMinReplicas != null) {
        desiredBlocks = configuredMinReplicas;
      } else {
        desiredBlocks = getFsDesiredReplication();
      }
      return numBlocks < desiredBlocks;
    } catch (IllegalAccessException e) {
      logger.error("Unexpected error while checking replication factor", e);
    } catch (InvocationTargetException e) {
      logger.error("Unexpected error while checking replication factor", e);
    } catch (IllegalArgumentException e) {
      logger.error("Unexpected error while checking replication factor", e);
    }
    return false;
  }

通过读取的配置复制块数量和当前正在复制的块比较,判断是否正在被复制

if (shouldRotate()) {
      boolean doRotate = true;

      if (isUnderReplicated) {
        if (maxConsecUnderReplRotations > 0 &&
            consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {
          doRotate = false;
          if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {
            LOG.error("Hit max consecutive under-replication rotations ({}); " +
                "will not continue rolling files under this path due to " +
                "under-replication", maxConsecUnderReplRotations);
          }
        } else {
          LOG.warn("Block Under-replication detected. Rotating file.");
        }
        consecutiveUnderReplRotateCount++;
      } else {
        consecutiveUnderReplRotateCount = 0;
      }

以上方法,入口是shouldRotate()方法,也就是如果你配置了rollcount,rollsize大于0,会按照你的配置来滚动的,但是在入口进来后,发现,又去判断了是否有块在复制;

里面就读取了一个固定变量maxConsecUnderReplRotations=30,也就是正在复制的块,最多之能滚动出30个文件,如果超过了30次,该数据块如果还在复制中,那么数据也不会滚动了,doRotate=false,不会滚动了,所以有的人发现自己一旦运行一段时间,会出现30个文件

再结合上面的源码看一下:

如果你配置了10秒滚动一次,写了2秒,恰好这时候该文件内容所在的块在复制中,那么虽然没到10秒,依然会给你滚动文件的,文件大小,事件数量的配置同理了。

为了解决上述问题,我们只要让程序感知不到写的文件所在块正在复制就行了,怎么做呢??

只要让isUnderReplicated()方法始终返回false就行了

该方法是通过当前正在被复制的块和配置中读取的复制块数量比较的,我们能改的就只有配置项中复制块的数量,而官方给出的flume配置项中有该项

hdfs.minBlockReplicas

Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath.

默认读的是hadoop中的dfs.replication属性,该属性默认值是3

这里我们也不去该hadoop中的配置,在flume中添加上述属性为1即可

配置如下:

a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.path=hdfs://192.168.11.177:9000/flume/events/%Y/%m/%d/%H/%M
a1.sinks.k1.hdfs.filePrefix=cmcc
a1.sinks.k1.hdfs.minBlockReplicas=1
#a1.sinks.k1.hdfs.fileType=DataStream
#a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.rollInterval=60
a1.sinks.k1.hdfs.rollSize=0
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=0

这样程序就永远不会因为文件所在块的复制而滚动文件了,只会根据你的配置项来滚动文件了,试试吧!!

望各位网友不吝指教!!!

时间: 2024-10-31 10:27:47

【Flume】flume中sink到hdfs,文件系统频繁产生文件,文件滚动配置不起作用?的相关文章

【Flume】【源码分析】flume中sink到hdfs,文件系统频繁产生文件,文件滚动配置不起作用? ERROR hdfs.BucketWriter: Hit max consecutive under-replication rotations (30)

[转载] http://blog.csdn.net/simonchi/article/details/43231891 ERROR hdfs.BucketWriter: Hit max consecutive under-replication rotations (30) 本人在测试hdfs的sink,发现sink端的文件滚动配置项起不到任何作用,配置如下: [plain] view plain copy print? a1.sinks.k1.type=hdfs a1.sinks.k1.cha

django中使用FastDFS分布式文件系统接口代码实现文件上传、下载、更新、删除

运维使用docker部署好之后FastDFS分布式文件系统之后,提供给我接口如下: fastdfs tracker 192.168.1.216 192.168.1.217 storage 192.168.1.216 192.168.1.217 我们只需要在配置文件中进行配置即可,然后利用客户端提供的接口通过简单的代码就可以将文件上传到分布式文件系统中 至于内部实现机制,可以参考我的另外一篇博客:分布式文件系统Fastdfs原理及部署 再次提醒在安装客户端可能会遇到各种不可控的因素,导致你上传失败

flume简介与监听文件目录并sink至hdfs实战

场景 1. flume是什么 1.1 背景 flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用.Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera.但随着 FLume 功能的扩展,Flume OG 代码工程臃肿.核心组件设计不合理.核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 1

Flume监听文件目录sink至hdfs配置

一:flume介绍 Flume是一个分布式.可靠.和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力.,Flume架构分为三个部分 源-Source,接收器-Sink,通道-Channel. 二:配置文件 此配置文件source为一个目录,注意,该目录下的文件应为只读,不可写,且文件名不能相同,采用的channels为file,sink为hdfs,此处往hdfs写的策略是当时间达到3600s或者

Flume实时监控目录sink到hdfs

目标:Flume实时监控目录sink到hdfs,再用sparkStreaming监控hdfs的这个目录,对数据进行计算 1.flume的配置,配置spoolDirSource_hdfsSink.properties,监控本地的一个目录,上传到hdfs一个目录下. agent1.channels = ch1agent1.sources = spoolDir-source1agent1.sinks = hdfs-sink1 # 定义channelagent1.channels.ch1.type =

利用开源日志收集软件fluentd收集日志到HDFS文件系统中

说明:本来研究开源日志的系统是flume,后来发现配置比较麻烦,网上搜索到fluentd也是开源的日志收集系统,配置简单多了,性能不错,所以就改研究这个东东了!官方主页,大家可以看看:fluentd.org,支持300+的plugins,应该是不错的! fluentd是通过hadoop中的webHDFS与HDFS进行通信的,所以在配置fluentd时,一定要保证webHDFS能正常通信,和通过webHDFS写数据到hdfs中! 原理图如下: webHDFS的相关配置与测试,请看这篇文章:http

flume 自定义 hbase sink 类

参考(向原作者致敬) http://ydt619.blog.51cto.com/316163/1230586 https://blogs.apache.org/flume/entry/streaming_data_into_apache_hbase flume 1.5 的配置文件示例 #Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the

在Spark shell中基于HDFS文件系统进行wordcount交互式分析

Spark是一个分布式内存计算框架,可部署在YARN或者MESOS管理的分布式系统中(Fully Distributed),也可以以Pseudo Distributed方式部署在单个机器上面,还可以以Standalone方式部署在单个机器上面.运行Spark的方式有interactive和submit方式.本文中所有的操作都是以interactive方式操作以Standalone方式部署的Spark.具体的部署方式,请参考Hadoop Ecosystem. HDFS是一个分布式的文件管理系统,其

flume spoolDirectorySource中的 File has been modified since being read与File has changed size since being read错误

以下内容都为自己浅显的理解,用作备忘的流水账,所以写的比较混乱.如理解有错误,请帮忙指正 FLUME-NG中没有之前的对文件的实时流SOURCE,只提供了spoolDir的source,这个source的功能监控指定文件夹,放入文件夹内的文件不能再做任何修改(包括修改时间和文件大小),这2个错误正是对应这2个 在代码中体现为 org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile()方法内 1