YARN源码分析(四)-----Journalnode

前言

最近在排查公司Hadoop集群性能问题时,发现Hadoop集群整体处理速度非常缓慢,平时只需要跑几十分钟的任务时间一下子上张到了个把小时,起初怀疑是网络原因,后来证明的确是有一部分这块的原因,但是过了没几天,问题又重现了,这次就比较难定位问题了,后来分析hdfs请求日志和Ganglia的各项监控指标,发现namenode的挤压请求数持续比较大,说明namenode处理速度异常,然后进而分析出是因为写journalnode的editlog速度慢问题导致的,后来发现的确是journalnode的问题引起的,后来的原因是因为journalnode的editlog目录没创建,导致某台节点写edillog一直抛FileNotFoundException,所以在这里提醒大家一定要重视一些小角色,比如JournalNode.在问题排查期间,也对YARN的JournalNode相关部分的代码做了学习,下面是一下学习心得,可能有些地方分析有误,敬请谅解.

JournalNode

可能有些同学没有听说过JournalNode,只听过Hadoop的Datanode,Namenode,因为这个概念是在MR2也就是Yarn中新加的,journalNode的作用是存放EditLog的,在MR1中editlog是和fsimage存放在一起的然后SecondNamenode做定期合并,Yarn在这上面就不用SecondNamanode了.下面是目前的Yarn的架构图,重点关注一下JournalNode的角色.

上面在Active Namenode与StandBy Namenode之间的绿色区域就是JournalNode,当然数量不一定只有1个,作用相当于NFS共享文件系统.Active Namenode往里写editlog数据,StandBy再从里面读取数据进行同步.

QJM

下面从Yarn源码的角度分析一下JournalNode的机制,在配置中定义JournalNode节点的个数是可多个的,所以一定会存在一个类似管理者这样的角色存在,而这个管理者就是QJM,全程QuorumJournalManager.下面是QJM的变量定义:

/**
 * A JournalManager that writes to a set of remote JournalNodes,
 * requiring a quorum of nodes to ack each write.
 * JournalManager可以写很多记录数据给多个远程JournalNode节点
 */
@InterfaceAudience.Private
public class QuorumJournalManager implements JournalManager {
  static final Log LOG = LogFactory.getLog(QuorumJournalManager.class);

  // Timeouts for which the QJM will wait for each of the following actions.
  private final int startSegmentTimeoutMs;
  private final int prepareRecoveryTimeoutMs;
  private final int acceptRecoveryTimeoutMs;
  private final int finalizeSegmentTimeoutMs;
  private final int selectInputStreamsTimeoutMs;
  private final int getJournalStateTimeoutMs;
  private final int newEpochTimeoutMs;
  private final int writeTxnsTimeoutMs;

  // Since these don‘t occur during normal operation, we can
  // use rather lengthy timeouts, and don‘t need to make them
  // configurable.
  private static final int FORMAT_TIMEOUT_MS            = 60000;
  private static final int HASDATA_TIMEOUT_MS           = 60000;
  private static final int CAN_ROLL_BACK_TIMEOUT_MS     = 60000;
  private static final int FINALIZE_TIMEOUT_MS          = 60000;
  private static final int PRE_UPGRADE_TIMEOUT_MS       = 60000;
  private static final int ROLL_BACK_TIMEOUT_MS         = 60000;
  private static final int UPGRADE_TIMEOUT_MS           = 60000;
  private static final int GET_JOURNAL_CTIME_TIMEOUT_MS = 60000;
  private static final int DISCARD_SEGMENTS_TIMEOUT_MS  = 60000;

  private final Configuration conf;
  private final URI uri;
  private final NamespaceInfo nsInfo;
  private boolean isActiveWriter;

  //远程节点存在于AsyncLoggerSet集合中
  private final AsyncLoggerSet loggers;

  private int outputBufferCapacity = 512 * 1024;
  private final URLConnectionFactory connectionFactory;

上面定义了很多的操作超时时间,这个过程也是走RPC的方式的.所有JournalNode客户端的代理被包含在了AsyncLoggerSet对象中,在此对象中包含了AsyncLogger对象列表,每个logger对象管控一个独立的Journalnode,下面是QJM中从配置动态创建logger对象

static List<AsyncLogger> createLoggers(Configuration conf,
      URI uri, NamespaceInfo nsInfo, AsyncLogger.Factory factory)
          throws IOException {
    List<AsyncLogger> ret = Lists.newArrayList();
    List<InetSocketAddress> addrs = getLoggerAddresses(uri);
    String jid = parseJournalId(uri);
    for (InetSocketAddress addr : addrs) {
      ret.add(factory.createLogger(conf, nsInfo, jid, addr));
    }
    return ret;
  }

然后设置到AsyncLoggerSet集合类中:

 QuorumJournalManager(Configuration conf,
      URI uri, NamespaceInfo nsInfo,
      AsyncLogger.Factory loggerFactory) throws IOException {
    Preconditions.checkArgument(conf != null, "must be configured");

    this.conf = conf;
    this.uri = uri;
    this.nsInfo = nsInfo;
    this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));
    ...

AsyncLoggerSet集合类的定义很简单,就是Logger对象的包装类.

/**
 * Wrapper around a set of Loggers, taking care of fanning out
 * calls to the underlying loggers and constructing corresponding
 * {@link QuorumCall} instances.
 */
class AsyncLoggerSet {
  static final Log LOG = LogFactory.getLog(AsyncLoggerSet.class);

  private final List<AsyncLogger> loggers;

  private static final long INVALID_EPOCH = -1;
  private long myEpoch = INVALID_EPOCH;

  public AsyncLoggerSet(List<AsyncLogger> loggers) {
    this.loggers = ImmutableList.copyOf(loggers);
  }

重新回到Logger对象类中,AsyncLogger对象是一个抽象类,实际起作用的是下面这个管道类

/**
 * Channel to a remote JournalNode using Hadoop IPC.
 * All of the calls are run on a separate thread, and return
 * {@link ListenableFuture} instances to wait for their result.
 * This allows calls to be bound together using the {@link QuorumCall}
 * class.
 */
@InterfaceAudience.Private
public class IPCLoggerChannel implements AsyncLogger {

  private final Configuration conf;
  //JournalNode通信地址
  protected final InetSocketAddress addr;
  private QJournalProtocol proxy;

  /**
   * Executes tasks submitted to it serially, on a single thread, in FIFO order
   * (generally used for write tasks that should not be reordered).
   * 单线程串行操作线程池
   */
  private final ListeningExecutorService singleThreadExecutor;
  /**
   * Executes tasks submitted to it in parallel with each other and with those
   * submitted to singleThreadExecutor (generally used for read tasks that can
   * be safely reordered and interleaved with writes).
   * 并行操作线程池
   */
  private final ListeningExecutorService parallelExecutor;
  private long ipcSerial = 0;
  private long epoch = -1;
  private long committedTxId = HdfsConstants.INVALID_TXID;

  private final String journalId;
  private final NamespaceInfo nsInfo;

  private URL httpServerURL;
  //journalnode线程metric统计操作
  private final IPCLoggerChannelMetrics metrics;

正如这个类的名称一样,作用就是服务端与客户端执行类的连接类,注意,这个类并不是直接执行类.在这个管道类中,定义了许多有用的监控信息变量,ganglia上的journal监控指标就是取自于这里

...
/**
   * The number of bytes of edits data still in the queue.
   * 积压的editlog记录数
   */
  private int queuedEditsSizeBytes = 0;

  /**
   * The highest txid that has been successfully logged on the remote JN.
   * 最高位的事物Id数量
   */
  private long highestAckedTxId = 0;

  /**
   * Nanotime of the last time we successfully journaled some edits
   * to the remote node.
   */
  private long lastAckNanos = 0;

  /**
   * Nanotime of the last time that committedTxId was update. Used
   * to calculate the lag in terms of time, rather than just a number
   * of txns.
   */
  private long lastCommitNanos = 0;

  /**
   * The maximum number of bytes that can be pending in the queue.
   * This keeps the writer from hitting OOME if one of the loggers
   * starts responding really slowly. Eventually, the queue
   * overflows and it starts to treat the logger as having errored.
   */
  private final int queueSizeLimitBytes;

  /**
   * If this logger misses some edits, or restarts in the middle of
   * a segment, the writer won‘t be able to write any more edits until
   * the beginning of the next segment. Upon detecting this situation,
   * the writer sets this flag to true to avoid sending useless RPCs.
   * 非同步状态指标,判断JournalNode是否掉线
   */
  private boolean outOfSync = false;
...

因为管道类方法与真正客户端方法继承了相同的协议,方法定义是相同的,下面列举几个常见方法:

开始执行记录写操作

@Override
  public ListenableFuture<Void> startLogSegment(final long txid,
      final int layoutVersion) {
    return singleThreadExecutor.submit(new Callable<Void>() {
      @Override
      public Void call() throws IOException {
        getProxy().startLogSegment(createReqInfo(), txid, layoutVersion);
        synchronized (IPCLoggerChannel.this) {
          if (outOfSync) {
            outOfSync = false;
            QuorumJournalManager.LOG.info(
                "Restarting previously-stopped writes to " +
                IPCLoggerChannel.this + " in segment starting at txid " +
                txid);
          }
        }
        return null;
      }
    });
  }

写完之后,执行记录确认finalize操作

@Override
  public ListenableFuture<Void> finalizeLogSegment(
      final long startTxId, final long endTxId) {
    return singleThreadExecutor.submit(new Callable<Void>() {
      @Override
      public Void call() throws IOException {
        throwIfOutOfSync();

        getProxy().finalizeLogSegment(createReqInfo(), startTxId, endTxId);
        return null;
      }
    });
  }

singleThreadExecutor单线程线程池一般执行的是写操作相关,而并行线程池则进行的是读操作,而且所有的这些操作采用的异步执行的方式,保证了高效性.服务端执行操作函数后,立刻得到一个call列表,并等待回复值

@Override
  public void finalizeLogSegment(long firstTxId, long lastTxId)
      throws IOException {
    QuorumCall<AsyncLogger,Void> q = loggers.finalizeLogSegment(
        firstTxId, lastTxId);
    loggers.waitForWriteQuorum(q, finalizeSegmentTimeoutMs,
        String.format("finalizeLogSegment(%s-%s)", firstTxId, lastTxId));
  }

JournalNode和Journal

与服务端对应的客户端,对每个JournalNode进行操作执行的类是JournalNode

/**
 * The JournalNode is a daemon which allows namenodes using
 * the QuorumJournalManager to log and retrieve edits stored
 * remotely. It is a thin wrapper around a local edit log
 * directory with the addition of facilities to participate
 * in the quorum protocol.
 */
@InterfaceAudience.Private
public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
  public static final Log LOG = LogFactory.getLog(JournalNode.class);
  private Configuration conf;
  private JournalNodeRpcServer rpcServer;
  private JournalNodeHttpServer httpServer;
  private final Map<String, Journal> journalsById = Maps.newHashMap();
  private ObjectName journalNodeInfoBeanName;
  private String httpServerURI;
  private File localDir;

  static {
    HdfsConfiguration.init();
  }

  /**
   * When stopped, the daemon will exit with this code.
   */
  private int resultCode = 0;

里面定义了与服务端对应的log记录操作方法

...
public void discardSegments(String journalId, long startTxId)
      throws IOException {
    getOrCreateJournal(journalId).discardSegments(startTxId);
  }

  public void doPreUpgrade(String journalId) throws IOException {
    getOrCreateJournal(journalId).doPreUpgrade();
  }

  public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException {
    getOrCreateJournal(journalId).doUpgrade(sInfo);
  }

  public void doFinalize(String journalId) throws IOException {
    getOrCreateJournal(journalId).doFinalize();
  }
...

而这些方法间接调用的方法又是Journal这个方法,并不约而同的传入了方法journald,journalId难道指的是所在JournalNode节点的标识?起初我也是这么想的,后来证明是错的.

File[] journalDirs = localDir.listFiles(new FileFilter() {
      @Override
      public boolean accept(File file) {
        return file.isDirectory();
      }
    });
    for (File journalDir : journalDirs) {
      String jid = journalDir.getName();
      if (!status.containsKey(jid)) {
        Map<String, String> jMap = new HashMap<String, String>();
        jMap.put("Formatted", "true");
        status.put(jid, jMap);
      }
    }

答案其实是目标写目录,从hadoop-yarn-project的测试代码中也能知道

/**
   * Set up the given Configuration object to point to the set of JournalNodes
   * in this cluster.
   */
  public URI getQuorumJournalURI(String jid) {
    List<String> addrs = Lists.newArrayList();
    for (JNInfo info : nodes) {
      addrs.add("127.0.0.1:" + info.ipcAddr.getPort());
    }
    String addrsVal = Joiner.on(";").join(addrs);
    LOG.debug("Setting logger addresses to: " + addrsVal);
    try {
      return new URI("qjournal://" + addrsVal + "/" + jid);
    } catch (URISyntaxException e) {
      throw new AssertionError(e);
    }
  }

JournalUri的格式是下面这种,qjournal://host/jid

<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://had1:8485;had2:8485;had3:8485/mycluster</value>
</property>

JournalNode中保存了Journal的map图映射对象可以使得不同的节点可以写不同的editlog目录.Journal对象才是最终的操作执行者,并且拥有直接操作editlog输出文件的EditLogOutputStream类.下面是其中一个方法

/**
   * Start a new segment at the given txid. The previous segment
   * must have already been finalized.
   */
  public synchronized void startLogSegment(RequestInfo reqInfo, long txid,
      int layoutVersion) throws IOException {
    assert fjm != null;
    checkFormatted();
    checkRequest(reqInfo);

    if (curSegment != null) {
      LOG.warn("Client is requesting a new log segment " + txid +
          " though we are already writing " + curSegment + ". " +
          "Aborting the current segment in order to begin the new one.");
      // The writer may have lost a connection to us and is now
      // re-connecting after the connection came back.
      // We should abort our own old segment.
      abortCurSegment();
    }

    // Paranoid sanity check: we should never overwrite a finalized log file.
    // Additionally, if it‘s in-progress, it should have at most 1 transaction.
    // This can happen if the writer crashes exactly at the start of a segment.
    EditLogFile existing = fjm.getLogFile(txid);
    if (existing != null) {
      if (!existing.isInProgress()) {
        throw new IllegalStateException("Already have a finalized segment " +
            existing + " beginning at " + txid);
      }
...

具体代码的写逻辑,读者可自行查阅,本文只从整体上梳理一下整个JournalNode的写流程,下面是准备的一张简单架构图,帮助大家理解.

全部代码的分析请点击链接https://github.com/linyiqun/hadoop-yarn,后续将会继续更新YARN其他方面的代码分析。

参考源代码

Apach-hadoop-2.7.1(hadoop-hdfs-project)

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-30 04:41:11

YARN源码分析(四)-----Journalnode的相关文章

Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)

我们知道,如果想要在Yarn上运行MapReduce作业,仅需实现一个ApplicationMaster组件即可,而MRAppMaster正是MapReduce在Yarn上ApplicationMaster的实现,由其控制MR作业在Yarn上的执行.如此,随之而来的一个问题就是,MRAppMaster是如何控制MapReduce作业在Yarn上运行的,换句话说,MRAppMaster上MapReduce作业处理总流程是什么?这就是本文要研究的重点. 通过MRAppMaster类的定义我们就能看出

Yarn源码分析之如何确定作业运行方式Uber or Non-Uber?

在MRAppMaster中,当MapReduce作业初始化时,它会通过作业状态机JobImpl中InitTransition的transition()方法,进行MapReduce作业初始化相关操作,而这其中就包括: 1.调用createSplits()方法,创建分片,并获取任务分片元数据信息TaskSplitMetaInfo数组taskSplitMetaInfo: 2.确定Map Task数目numMapTasks:分片元数据信息数组的长度,即有多少分片就有多少numMapTasks: 3.确定

baksmali和smali源码分析(四)

baksmali 首先执行的第一个main 函数     public static void main(String[] args) throws IOException {         Locale locale = new Locale("en", "US");         Locale.setDefault(locale);         CommandLineParser parser = new PosixParser();         C

Nouveau源码分析(四):NVIDIA设备初始化之nouveau_drm_load (1)

Nouveau源码分析(四) probe函数成功返回之后,DRM模块就会调用struct drm_driver的load函数,对应nouveau的nouveau_drm_load. 这个函数虽然看起来不是特别长,但每一个调用的函数展开后就会变得非常长了! // /drivers/gpu/drm/nouveau/nouveau_drm.c 364 static int 365 nouveau_drm_load(struct drm_device *dev, unsigned long flags)

mybatis源码分析(四) mybatis与spring事务管理分析

mybatis源码分析(四) mybatis与spring事务管理分析 一丶从jdbc的角度理解什么是事务 从mysql获取一个连接之后, 默认是自动提交, 即执行完sql之后, 就会提交事务. 这种事务的范围是一条sql语句. 将该连接设置非自动提交, 可以执行多条sql语句, 然后由程序决定是提交事务, 还是回滚事务. 这也是我们常说的事务. Connection connection = dataSource.getConnection(); // connection.setTransa

YARN源码分析(三)-----ResourceManager HA之应用状态存储与恢复

前言 任何系统即使做的再大,都会有可能出现各种各样的突发状况.尽管你可以说我在软件层面上已经做到所有情况的意外处理了,但是万一硬件出问题了或者说物理层面上出了问题,恐怕就不是多写几行代码能够立刻解决的吧,说了这么多,无非就是想强调HA,系统高可用性的重要性.在YARN中,NameNode的HA方式估计很多人都已经了解了,那本篇文章就来为大家梳理梳理RM资源管理器HA方面的知识,并不是指简单的RM的HA配置,确切的说是RM的应用状态存储于恢复. RM应用状态存储使用 RM应用状态存储是什么意思呢,

ABP源码分析四十七:ABP中的异常处理

ABP 中异常处理的思路是很清晰的.一共五种类型的异常类. AbpInitializationException用于封装ABP初始化过程中出现的异常,只要抛出AbpInitializationException异常就可以,无须做额外处理.这类异常往往是需要维护人员介入分析的. 其他四个异常都在AbpController中被集中处理,处理分为两步:一,通过EventBus触发异常事件,相应的异常处理函数则处理异常.而针对AbpValidationException,UserFriendlyExce

ABP源码分析四十六:ABP ZERO中的Ldap模块

通过AD作为用户认证的数据源.整个管理用户认证逻辑就在LdapAuthenticationSource类中实现. LdapSettingProvider:定义LDAP的setting和提供DefautValue.主要提供配置访问AD数据库的账号信息. LdapSettings/ILdapSettings:通过settingManager获取LDAP settings AbpZeroLdapModuleConfig/IAbpZeroLdapModuleConfig: 提供激活Ldap认证的配置.

docker 源码分析 四(基于1.8.2版本),Docker镜像的获取和存储

前段时间一直忙些其他事情,docker源码分析的事情耽搁了,今天接着写,上一章了解了docker client 和 docker daemon(会启动一个http server)是C/S的结构,client端发出的命令由docker daemon接收并处理. 我们在运行docker的时候,可能会使用到docker run命令(当然通过Dockerfile运行docker build命令也是一样的)时,如果本地没有你需要的镜像,docker daemon首先会去下载你需要的docker镜像,然后存