【Flume】flume ng中HDFS sink设置按天滚动,0点滚动文件,修改源码实现

HDFS sink里有个属性hdfs.rollInterval=86400,这个属性你设置了24小时滚动一次,它的确就到了24小时才滚动,但是我们的需求往往是到了0点就滚动文件了,因为离线的job因为都会放在夜里执行。

如果flume是早上9点启动的,那么要到明天早上9点,hdfs的文件才会关闭,难道job要等到9点后才执行,这显然不合适,所以通过修改源码使其能够在0点滚动文件。

首先添加一个属性,可配置为day,hour,min

 private String timeRollerFlag;
  timeRollerFlag = context.getString("hdfs.timeroller.flag", Constants.defaultTimeRollerFlagDay);
public class Constants {

	public static final String defaultTimeRollerFlagDay = "day";

	public static final String timeRollerFlagHour = "hour";

	public static final String timeRollerFlagMin = "min";
}

HDFS sink中在new BucketWriter的时候,需要将参数传递过去

BucketWriter bucketWriter = new BucketWriter(rollInterval,
      rollSize, rollCount,
      batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
      suffix, codeC, compType, hdfsWriter, timedRollerPool,
      proxyTicket, sinkCounter, idleTimeout, closeCallback,
      lookupPath, callTimeout, callTimeoutPool, retryInterval,
      tryCount,timeRollerFlag);

以上是针对org.apache.flume.sink.hdfs.HDFSEventSink的修改。

下面看org.apache.flume.sink.hdfs.BucketWriter的修改:

private final String timeRollerFlag;

	private Calendar calendar = Calendar.getInstance();
	private int lastDayOfYear;
	private int lastYear;
	private int lastHour;
	private int lastMin;
	private int nowDayOfYear;
	private int nowYear;
	private int nowHour;
	private int nowMin;
private static Date fileOpenTime = null;
<pre name="code" class="java">// when open the file in hdfs with inUseSuffix,instantiate the
				// fileOpenTime
				fileOpenTime = new Date();
			} catch (Exception ex) {

if (!isOpen) {
			if (closed) {
				throw new BucketClosedException("This bucket writer was closed and "
						+ "this handle is thus no longer valid");
			}
			open();
		} else {
			LOG.debug("##############the file is opened");
			calendar.setTime(fileOpenTime);
			lastDayOfYear = calendar.get(Calendar.DAY_OF_YEAR);
			lastYear = calendar.get(Calendar.YEAR);
			lastHour = calendar.get(Calendar.HOUR_OF_DAY);
			lastMin = calendar.get(Calendar.MINUTE);
			Date now = new Date();
			calendar.setTime(now);
			nowDayOfYear = calendar.get(Calendar.DAY_OF_YEAR);
			nowYear = calendar.get(Calendar.YEAR);
			nowHour = calendar.get(Calendar.HOUR_OF_DAY);
			nowMin = calendar.get(Calendar.MINUTE);
			LOG.debug("fileOpenTime = {},nowTime = {}", JodaTimeUtil.parseToString(fileOpenTime,
					JodaTimeUtil.FORMAT_FULL_DATE_TIME_WITH_SYMBOL), JodaTimeUtil.parseToString(now,
					JodaTimeUtil.FORMAT_FULL_DATE_TIME_WITH_SYMBOL));

			// 年份相同,日期+1,年份+1,now日期=1
			boolean condition1 = (lastYear == nowYear && (nowDayOfYear == (lastDayOfYear + 1)))
					|| (nowYear == (lastYear + 1) && nowDayOfYear == 1);
			// day相同,小时+1,或者day不同,小时=0
			boolean condition2 = (lastDayOfYear == nowDayOfYear && nowHour == (lastHour + 1))
					|| (lastDayOfYear != nowDayOfYear && nowHour == 0);
			// hour相同,分钟+1,或者hour不同,分钟=0
			boolean condition3 = (lastHour == nowHour && nowMin == (lastMin + 1))
					|| (lastHour != nowHour && nowMin == 0);
			// 判断滚动标识
			if (timeRollerFlag.equals(Constants.defaultTimeRollerFlagDay)) {
				if (condition1) {
					LOG.debug("rollflag = {},rolling", Constants.defaultTimeRollerFlagDay);
					close();
					open();
				}
			} else if (timeRollerFlag.equals(Constants.timeRollerFlagHour)) {
				if (condition2) {
					LOG.debug("rollflag = {},rolling", Constants.timeRollerFlagHour);
					close();
					open();
				}
			} else if (timeRollerFlag.equals(Constants.timeRollerFlagMin)) {
				if (condition3) {
					LOG.debug("rollflag = {},rolling", Constants.timeRollerFlagMin);
					close();
					open();
				}
			}
		}

以上的else部分即是修改的内容。

望各位不吝指教!!

时间: 2025-01-04 09:08:23

【Flume】flume ng中HDFS sink设置按天滚动,0点滚动文件,修改源码实现的相关文章

flume修改源码实现source文件名前后缀的更改

业务场景: 需求:通过flume进行数据采集,将本地(windows服务器)不断产生的csv文件采集到hdfs上. 问题:本地文件在生成的过程中,会出现文件名重复的现象.也就是说,在前一秒生成文件名为aaa.csv,该文件经过flume进行处理之后会进行文件名的更改,默认情况下文件名会更改为aaa.csv.COMPLATED,但是在第二秒的时候,接着又生成了aaa.csv文件,此时flume将该文件处理完进行更名的过程中,就会报错,例如: 解决:为了避免文件名重复导致flume程序挂的问题,此时

Hadoop 修改源码以及将修改后的源码应用到部署好的Hadoop中

我的Hadoop版本是hadoop-2.7.3, 我们可以去hadoop官网下载源码hadoop-2.7.3-src,以及编译好的工程文件hadoop-2.7.3, 后者可以直接部署. 前者hadoop-2.7.3-src必须mvn之后才能部署. 我们修改代码必须是在hadoop-2.7.3-src源码中进行, 而源码mvn之后才能部署或使用. 所以我们要先了解Maven.     mvn hadoop-2.7.3-src的时候会出现各种问题. 其中hadoop-2.7.3-src源码文件中有个

Spring中@Transactional事务回滚(含实例详细讲解,附源码)

原文出处: 小宝鸽 一.使用场景举例 在了解@Transactional怎么用之前我们必须要先知道@Transactional有什么用.下面举个栗子:比如一个部门里面有很多成员,这两者分别保存在部门表和成员表里面,在删除某个部门的时候,假设我们默认删除对应的成员.但是在执行的时候可能会出现这种情况,我们先删除部门,再删除成员,但是部门删除成功了,删除成员的时候出异常了.这时候我们希望如果成员删除失败了,之前删除的部门也取消删除.这种场景就可以使用@Transactional事物回滚. 二.che

FFmpeg中HLS文件解析源码

不少人都在找FFmpeg中是否有hls(m3u8)解析的源码,其实是有的.就是ffmpeg/libavformat/hlsproto.c,它依赖的文件也在那个目录中. /* * Apple HTTP Live Streaming Protocol Handler * Copyright (c) 2010 Martin Storsjo * * This file is part of FFmpeg. * * FFmpeg is free software; you can redistribute

MVC中使用SignalR打造酷炫实用的即时通讯功能附源码

前言,现在这世道写篇帖子没个前言真不好意思发出来.本贴的主要内容来自于本人在之前项目中所开发的一个小功能,用于OA中的即时通讯.由于当时走的太急,忘记把代码拿出来.想想这已经是大半年前的事情了,时间过了这么久,在当时最新的SignalR2.0.1到现在已经变成了2.2.昨天晚上特地熬了个夜,重新又把它写出来做了一个小小的Demo.当然我只是大自然的搬运工,这个SignalR即时通讯功能里面有一些前端的类库不是我自己写的.我只是改吧改吧~~在此鸣谢 @贤心,是他的几条库才使得我的这个功能如此酷炫.

2018.11.20 Struts2中对结果处理方式分析&amp;struts2内置的方式底层源码剖析

介绍一下struts2内置帮我们封装好的处理结果方式也就是底层源码分析 这是我们的jar包里面找的位置目录 打开往下拉看到result-type节点 name那一列就是我们的type类型取值 上一篇博客在分析的时候发现就算不写也会自动转发原因在这里,default=true 选择了默认方式 接着我们如果想看看底层是如何工作的就选择class属性复制双引号的内容 接着按住ctrl+shift+T就能出现一个框 OPen Type 进去之后发现是这个,点击Attach Source---->选择第二

MapReduce(十五): 从HDFS读取文件的源码分析

以Map任务读取文本数据为例: 1)   LineRecordReader负责对文件分割的定位,以及对读取每一行内容的封装供用户Map任务使用.每次在定位在文件中不为0的位置时,多读取一行,因为前一个处理该位置之前的数据时,会完整把该一行已经读取并处理. 2)   LineReader负责对所要访问文件输入流的数据进行每一行读取,只实现对每一行读取的逻辑. 3)   DFSDataInputStream封装了DFSInputStream的实现,直接调用DFSInputStream接口完成. 4)

解析opencv中Box Filter的实现并提出进一步加速的方案(源码共享)。

说明:本文所有算法的涉及到的优化均指在PC上进行的,对于其他构架是否合适未知,请自行试验. Box Filter,最经典的一种领域操作,在无数的场合中都有着广泛的应用,作为一个很基础的函数,其性能的好坏也直接影响着其他相关函数的性能,最典型莫如现在很好的EPF滤波器:GuideFilter.因此其优化的档次和程度是非常重要的,网络上有很多相关的代码和博客对该算法进行讲解和优化,提出了不少O(1)算法,但所谓的0(1)算法也有优劣之分,0(1)只是表示执行时间和某个参数无关,但本身的耗时还是有区别

MySQL代理Atlas在CentOS7.0中的源码安装实践(设置开机自启)

提示:如要去掉SQL过滤(无WHERE子句的UPDATE和DELETE)功能,可以先修改源码: 修改文件 Atlas-2.2.1\plugins\proxy\proxy-plugin.c 修改方法 is_in_blacklist,直接返回FALSE 依赖包: glib-2.32.4.tar.xz glibc-devel  libevent-devel  lua-devel  openssl-devel  flex  mysql-devel xz  gettext-devel 分步命令: # 安装