HDFS sink里有个属性hdfs.rollInterval=86400,这个属性你设置了24小时滚动一次,它的确就到了24小时才滚动,但是我们的需求往往是到了0点就滚动文件了,因为离线的job因为都会放在夜里执行。
如果flume是早上9点启动的,那么要到明天早上9点,hdfs的文件才会关闭,难道job要等到9点后才执行,这显然不合适,所以通过修改源码使其能够在0点滚动文件。
首先添加一个属性,可配置为day,hour,min
private String timeRollerFlag;
timeRollerFlag = context.getString("hdfs.timeroller.flag", Constants.defaultTimeRollerFlagDay);
public class Constants { public static final String defaultTimeRollerFlagDay = "day"; public static final String timeRollerFlagHour = "hour"; public static final String timeRollerFlagMin = "min"; }
HDFS sink中在new BucketWriter的时候,需要将参数传递过去
BucketWriter bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount, batchSize, context, realPath, realName, inUsePrefix, inUseSuffix, suffix, codeC, compType, hdfsWriter, timedRollerPool, proxyTicket, sinkCounter, idleTimeout, closeCallback, lookupPath, callTimeout, callTimeoutPool, retryInterval, tryCount,timeRollerFlag);
以上是针对org.apache.flume.sink.hdfs.HDFSEventSink的修改。
下面看org.apache.flume.sink.hdfs.BucketWriter的修改:
private final String timeRollerFlag; private Calendar calendar = Calendar.getInstance(); private int lastDayOfYear; private int lastYear; private int lastHour; private int lastMin; private int nowDayOfYear; private int nowYear; private int nowHour; private int nowMin;
private static Date fileOpenTime = null;
<pre name="code" class="java">// when open the file in hdfs with inUseSuffix,instantiate the // fileOpenTime fileOpenTime = new Date(); } catch (Exception ex) {
if (!isOpen) { if (closed) { throw new BucketClosedException("This bucket writer was closed and " + "this handle is thus no longer valid"); } open(); } else { LOG.debug("##############the file is opened"); calendar.setTime(fileOpenTime); lastDayOfYear = calendar.get(Calendar.DAY_OF_YEAR); lastYear = calendar.get(Calendar.YEAR); lastHour = calendar.get(Calendar.HOUR_OF_DAY); lastMin = calendar.get(Calendar.MINUTE); Date now = new Date(); calendar.setTime(now); nowDayOfYear = calendar.get(Calendar.DAY_OF_YEAR); nowYear = calendar.get(Calendar.YEAR); nowHour = calendar.get(Calendar.HOUR_OF_DAY); nowMin = calendar.get(Calendar.MINUTE); LOG.debug("fileOpenTime = {},nowTime = {}", JodaTimeUtil.parseToString(fileOpenTime, JodaTimeUtil.FORMAT_FULL_DATE_TIME_WITH_SYMBOL), JodaTimeUtil.parseToString(now, JodaTimeUtil.FORMAT_FULL_DATE_TIME_WITH_SYMBOL)); // 年份相同,日期+1,年份+1,now日期=1 boolean condition1 = (lastYear == nowYear && (nowDayOfYear == (lastDayOfYear + 1))) || (nowYear == (lastYear + 1) && nowDayOfYear == 1); // day相同,小时+1,或者day不同,小时=0 boolean condition2 = (lastDayOfYear == nowDayOfYear && nowHour == (lastHour + 1)) || (lastDayOfYear != nowDayOfYear && nowHour == 0); // hour相同,分钟+1,或者hour不同,分钟=0 boolean condition3 = (lastHour == nowHour && nowMin == (lastMin + 1)) || (lastHour != nowHour && nowMin == 0); // 判断滚动标识 if (timeRollerFlag.equals(Constants.defaultTimeRollerFlagDay)) { if (condition1) { LOG.debug("rollflag = {},rolling", Constants.defaultTimeRollerFlagDay); close(); open(); } } else if (timeRollerFlag.equals(Constants.timeRollerFlagHour)) { if (condition2) { LOG.debug("rollflag = {},rolling", Constants.timeRollerFlagHour); close(); open(); } } else if (timeRollerFlag.equals(Constants.timeRollerFlagMin)) { if (condition3) { LOG.debug("rollflag = {},rolling", Constants.timeRollerFlagMin); close(); open(); } } }
以上的else部分即是修改的内容。
望各位不吝指教!!
时间: 2025-01-04 09:08:23