在《HDFS源码分析之EditLogTailer》一文中,我们详细了解了编辑日志跟踪器EditLogTailer的实现,介绍了其内部编辑日志追踪线程EditLogTailerThread的实现,及其线程完成编辑日志跟踪所依赖的最重要的方法,执行日志追踪的doTailEdits()方法。在该方法的处理流程中,首先需要从编辑日志editLog中获取编辑日志输入流集合streams,获取的输入流为最新事务ID加1之后的数据。那么这个编辑日志输入流集合streams是如何获取的呢?本文我们将进行详细研究。
在doTailEdits()方法中,获取编辑日志输入流的代码如下:
// 从编辑日志editLog中获取编辑日志输入流集合streams,获取的输入流为最新事务ID加1之后的数据 streams = editLog.selectInputStreams(lastTxnId + 1, 0, null, false);
这个editLog便是一个文件系统编辑日志FSEditLog实例,我们看下它的selectInputStreams()方法,代码如下:
/** * Select a list of input streams. * * @param fromTxId first transaction in the selected streams * @param toAtLeastTxId the selected streams must contain this transaction * @param recovery recovery context * @param inProgressOk set to true if in-progress streams are OK */ public Collection<EditLogInputStream> selectInputStreams( long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery, boolean inProgressOk) throws IOException { // 创建编辑日志输入流EditLogInputStream列表streams List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>(); // 在Object对象journalSetLock上使用synchronized进行同步 synchronized(journalSetLock) { // 检测journalSet状态 Preconditions.checkState(journalSet.isOpen(), "Cannot call " + "selectInputStreams() on closed FSEditLog"); // 调用三个参数的selectInputStreams()方法,传入空的streams列表,从fromTxId事务ID开始, // 编辑日志同步时,标志位inProgressOk为false selectInputStreams(streams, fromTxId, inProgressOk); } try { // 数据监测 checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk); } catch (IOException e) { if (recovery != null) { // If recovery mode is enabled, continue loading even if we know we // can't load up to toAtLeastTxId. LOG.error(e); } else { closeAllStreams(streams); throw e; } } return streams; }
它首先会创建编辑日志输入流EditLogInputStream列表streams,并在Object对象journalSetLock上使用synchronized进行同步,检测journalSet状态,随后调用三个参数的selectInputStreams()方法,传入空的streams列表,从fromTxId事务ID开始,编辑日志是否可处于处理过程中的标志位inProgressOk,编辑日志同步时,标志位inProgressOk为false,最后,调用checkForGaps()方法进行相关数据监测。
我们继续看下三个参数的selectInputStreams()方法,代码如下:
@Override public void selectInputStreams(Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk) throws IOException { // 调用JournalSet的同名方法 journalSet.selectInputStreams(streams, fromTxId, inProgressOk); }
它其实是调用JournalSet的同名方法。JournalSet是什么呢?它是Journal集合的管理者,而Journal就是日志的意思,它是Hadoop HA中EditLog在JournalNode上的组织形式。我们看下JournalSet的selectInputStreams()方法,代码如下:
/** * In this function, we get a bunch of streams from all of our JournalManager * objects. Then we add these to the collection one by one. * 在这个方法内,我们从所有JournalManager对象中得到一堆输入流。接着我们把它们一个接一个的添加到集合中。 * * @param streams The collection to add the streams to. It may or * may not be sorted-- this is up to the caller. * @param fromTxId The transaction ID to start looking for streams at * @param inProgressOk Should we consider unfinalized streams? */ @Override public void selectInputStreams(Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk) throws IOException { final PriorityQueue<EditLogInputStream> allStreams = new PriorityQueue<EditLogInputStream>(64, EDIT_LOG_INPUT_STREAM_COMPARATOR); // 遍历journals,得到每个JournalAndStream实例jas for (JournalAndStream jas : journals) { // 如果jas不可用,记录日志,跳过 if (jas.isDisabled()) { LOG.info("Skipping jas " + jas + " since it's disabled"); continue; } // 利用jas得到JournalManager实例,然后调用其selectInputStreams()方法,获得输入流,并放入allStreams输入流集合 try { jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk); } catch (IOException ioe) { LOG.warn("Unable to determine input streams from " + jas.getManager() + ". Skipping.", ioe); } } // 过滤掉无用的编辑日志输入流 chainAndMakeRedundantStreams(streams, allStreams, fromTxId); }
这个方法的大体处理流程如下:
1、遍历JournalSet中的journals,得到每个JournalAndStream实例jas:
1.1、如果jas不可用,记录日志,跳过;
1.2、利用jas得到JournalManager实例,然后调用其selectInputStreams()方法,获得输入流,并放入allStreams输入流集合;
2、调用chainAndMakeRedundantStreams()方法,过滤掉无用的编辑日志输入流。
首先,我们来看下JournalManager实例的selectInputStreams()方法,我们以FileJournalManager为例,代码如下:
@Override synchronized public void selectInputStreams( Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk) throws IOException { // 先调用StorageDirectory的getCurrentDir()方法获得其current目录, // 然后再调用matchEditLogs()方法,获得编辑日志文件EditLogFile列表elfs List<EditLogFile> elfs = matchEditLogs(sd.getCurrentDir()); LOG.debug(this + ": selecting input streams starting at " + fromTxId + (inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") + "from among " + elfs.size() + " candidate file(s)"); // 调用addStreamsToCollectionFromFiles()方法,根据编辑日志文件列表elfs获得输入流,并添加到输入流列表streams addStreamsToCollectionFromFiles(elfs, streams, fromTxId, inProgressOk); }
这个方法的处理流程如下:
1、先调用StorageDirectory的getCurrentDir()方法获得其current目录,然后再调用matchEditLogs()方法,获得编辑日志文件EditLogFile列表elfs;
2、调用addStreamsToCollectionFromFiles()方法,根据编辑日志文件列表elfs获得输入流,并添加到输入流列表streams。
我们再看下matchEditLogs()方法,代码如下:
/** * returns matching edit logs via the log directory. Simple helper function * that lists the files in the logDir and calls matchEditLogs(File[]) * * @param logDir * directory to match edit logs in * @return matched edit logs * @throws IOException * IOException thrown for invalid logDir */ public static List<EditLogFile> matchEditLogs(File logDir) throws IOException { return matchEditLogs(FileUtil.listFiles(logDir)); } static List<EditLogFile> matchEditLogs(File[] filesInStorage) { return matchEditLogs(filesInStorage, false); } private static List<EditLogFile> matchEditLogs(File[] filesInStorage, boolean forPurging) { // 创建编辑日志文件EditLogFile列表ret List<EditLogFile> ret = Lists.newArrayList(); // 遍历filesInStorage,对每个文件进行处理 for (File f : filesInStorage) { // 获得文件名 String name = f.getName(); // Check for edits // 根据文件名,利用正则表达式,检测其是否为编辑日志edits log // 正则表达式为edits_(\d+)-(\d+) // 原edits_(\\d+)-(\\d+)中d前面的两个\中第一个只是转义符 // 文件名类似如下:edits_0000000000001833048-0000000000001833081 // 第一串数字为起始事务ID,第二串数字为终止事务ID Matcher editsMatch = EDITS_REGEX.matcher(name); if (editsMatch.matches()) {// 文件名匹配正则表达式的话,说明其是我们需要寻找的编辑日志文件 try { // 获取起始事务ID:正则表达式中第一个匹配的是起始事务ID long startTxId = Long.parseLong(editsMatch.group(1)); // 获取终止事务ID:正则表达式中第二个匹配的是终止事务ID long endTxId = Long.parseLong(editsMatch.group(2)); // 利用文件f、起始事务艾迪startTxId、终止事务艾迪endTxId构造编辑日志文件EditLogFile实例, // 并添加到ret列表中 ret.add(new EditLogFile(f, startTxId, endTxId)); continue; } catch (NumberFormatException nfe) { LOG.error("Edits file " + f + " has improperly formatted " + "transaction ID"); // skip } } // Check for in-progress edits // 检测正在处理中的编辑日志 // 正则表达式为edits_inprogress_(\d+) // 原edits_inprogress_(\\d+)中d前面的两个\中第一个只是转义符 Matcher inProgressEditsMatch = EDITS_INPROGRESS_REGEX.matcher(name); if (inProgressEditsMatch.matches()) { try { long startTxId = Long.parseLong(inProgressEditsMatch.group(1)); ret.add( new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID, true)); continue; } catch (NumberFormatException nfe) { LOG.error("In-progress edits file " + f + " has improperly " + "formatted transaction ID"); // skip } } if (forPurging) { // Check for in-progress stale edits Matcher staleInprogressEditsMatch = EDITS_INPROGRESS_STALE_REGEX .matcher(name); if (staleInprogressEditsMatch.matches()) { try { long startTxId = Long.valueOf(staleInprogressEditsMatch.group(1)); ret.add(new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID, true)); continue; } catch (NumberFormatException nfe) { LOG.error("In-progress stale edits file " + f + " has improperly " + "formatted transaction ID"); // skip } } } } return ret; }
我们看下最核心的两个参数的matchEditLogs()方法,它的处理流程为:
1、创建编辑日志文件EditLogFile列表ret;
2、遍历filesInStorage,对每个文件进行处理:
2.1、获得文件名name;
2.2、根据文件名name,利用正则表达式,检测其是否为编辑日志edits log:
利用的正则表达式为edits_(\d+)-(\d+),原edits_(\\d+)-(\\d+)中d前面的两个\中第一个只是转义符,文件名类似如下:edits_0000000000001833048-0000000000001833081,第一串数字为起始事务ID,第二串数字为终止事务ID;
2.3、文件名匹配正则表达式的话,说明其是我们需要寻找的编辑日志文件:
2.3.1、获取起始事务艾迪startTxId:正则表达式中第一个匹配的是起始事务ID;
2.3.2、获取终止事务艾迪endTxId:正则表达式中第二个匹配的是终止事务ID;
2.3.3、利用文件f、起始事务艾迪startTxId、终止事务艾迪endTxId构造编辑日志文件EditLogFile实例,并添加到ret列表中;
2.3.4、continue,继续循环下一个文件;
2.4、根据文件名name,利用正则表达式,检测其是否为正在处理中的编辑日志edits log:
利用的正则表达式为edits_inprogress_(\d+),原edits_inprogress_(\\d+)中d前面的两个\中第一个只是转义符,文件名类似如下:edits_inprogress_0000000000001853186,后面的字符串为起始事务ID;
2.5、文件名匹配正则表达式的话,说明其是我们需要寻找的正在处理中的编辑日志文件:
2.5.1、获取起始事务艾迪startTxId:正则表达式中第一个匹配的是起始事务ID;
2.5.2、利用文件f、起始事务艾迪startTxId、终止事务艾迪-12345构造编辑日志文件EditLogFile实例,并添加到ret列表中;
2.5.3、continue,继续循环下一个文件;
3、返回编辑日志文件EditLogFile列表ret。
再回到FileJournalManager的selectInputStreams()方法,我们看下它的第二步:调用addStreamsToCollectionFromFiles()方法,根据编辑日志文件列表elfs添加输入流列表streams,代码如下:
static void addStreamsToCollectionFromFiles(Collection<EditLogFile> elfs, Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk) { // 遍历EditLogFile集合elfs,针对每个EditLogFile实例elf进行如下处理: for (EditLogFile elf : elfs) { // 如果elf处于处理过程中: if (elf.isInProgress()) { // 如果不需要获取处于处理过程中的编辑日志,直接跳过 if (!inProgressOk) { LOG.debug("passing over " + elf + " because it is in progress " + "and we are ignoring in-progress logs."); continue; } // 校验编辑日志,校验不成功的话,直接跳过 try { elf.validateLog(); } catch (IOException e) { LOG.error("got IOException while trying to validate header of " + elf + ". Skipping.", e); continue; } } // 如果elf的最后事务艾迪lastTxId小于我们获取编辑日志的起始事务艾迪fromTxId,直接跳过 if (elf.lastTxId < fromTxId) { assert elf.lastTxId != HdfsConstants.INVALID_TXID; LOG.debug("passing over " + elf + " because it ends at " + elf.lastTxId + ", but we only care about transactions " + "as new as " + fromTxId); continue; } // 利用elf中的文件file、起始事务艾迪firstTxId、终止事务艾迪lastTxId、编辑日志文件是否处于处理过程中标志位isInProgress, // 构造编辑日志文件输入流EditLogFileInputStream实例elfis EditLogFileInputStream elfis = new EditLogFileInputStream(elf.getFile(), elf.getFirstTxId(), elf.getLastTxId(), elf.isInProgress()); LOG.debug("selecting edit log stream " + elf); // 将elfis加入到输入流列表streams streams.add(elfis); } }
它的处理流程如下:
遍历EditLogFile集合elfs,针对每个EditLogFile实例elf进行如下处理:
1、如果elf处于处理过程中,同时如果不需要获取处于处理过程中的编辑日志,直接跳过,否则校验编辑日志,校验不成功的话,直接跳过,成功则继续;
2、如果elf的最后事务艾迪lastTxId小于我们获取编辑日志的起始事务艾迪fromTxId,直接跳过,否则继续;
3、利用elf中的文件file、起始事务艾迪firstTxId、终止事务艾迪lastTxId、编辑日志文件是否处于处理过程中标志位isInProgress,构造编辑日志文件输入流EditLogFileInputStream实例elfis;
4、将elfis加入到输入流列表streams。