利用QJM实现HDFS自动主从切换(HA Automatic Failover)源码详析

最近研究了下NameNode HA Automatic Failover方面的东西,当Active NN因为异常或其他原因不能正常提供服务时,处于Standby状态的NN就可以自动切换为Active状态,从而到达真正的高可用

NN HA Automatic Failover架构图

为了实现自动切换,需要依赖ZooKeeper和ZKFC组件,ZooKeeper主要用来记录NN的相关状态信息,zkfc组件以单独的JVM进程的形式运行在NN所在的节点上。下面首先分析下NN的启动流程,NN对象在实例化过程中,如果在hdfs-site.xml中配置的dfs.ha.namenodes.${dfs.nameservices}个数多于1个,则属性haEnabled为true,state的初始状态即为STANDBY

HAUtil.isHAEnabled(Configuration conf, String nsId) {
    //根据${dfs.nameservices}获取dfs.ha.namenodes.${dfs.nameservices}
    Map<String, Map<String, InetSocketAddress>> addresses =
      DFSUtil.getHaNnRpcAddresses(conf);
    if (addresses == null) return false;
    Map<String, InetSocketAddress> nnMap = addresses.get(nsId);
    //如果ndId对应的ha.namenodes个数大于1并且nnMap不为空,返回true
    return nnMap != null && nnMap.size() > 1;
  }
  
  //NN启动过程中首先判断haEnabled属性值,为true则初始状态为STANDBY
  NameNode.createHAState() {
    return !haEnabled ? ACTIVE_STATE : STANDBY_STATE;
  }

假设集群中有两个NN,这两个NN在初始启动时,都为STANDBY状态,如果不配置Automatic Failover,则需要手动将其中的一个NN切换为ACTIVE模式,命令如下

hdfs haadmin-transitionToActive nn1

如果配置并启动zkfc组件,则会自动将本节点的一个NameNode切换为ACTIVE状态,这个取决于先在哪台NN上启动ZKFC,ZKFC和NN的启动顺序并没有强制的要求。下面来主要分析下,当配置的两个NN节点都启动之后,ZKFC组件的启动主要做了哪些事。ZKFC的启动类是DFSZKFailoverController,继承自ZKFailoverController,首先从main方法入手

DFSZKFailoverController.main(String args[]){
    if (DFSUtil.parseHelpArgument(args, 
        ZKFailoverController.USAGE, System.out, true)) {
      System.exit(0);
    }
    //加载hdfs的配置信息
    GenericOptionsParser parser = new GenericOptionsParser(
        new HdfsConfiguration(), args);
    //根据配置信息构造DFSZKFailoverController
    //根据nsId,nnId实例化NNHAServiceTarget对象,并设置到zkfc对象中
    //HAServiceTarget对象用来建立到指定NN的各种网络参数
    DFSZKFailoverController zkfc = DFSZKFailoverController.create(
        parser.getConfiguration());
    //建立与zk的连接并格式化zk的目录
    //实例化ActiveStandbyElector和HealthMonitor
    //启动RPCServer
    System.exit(zkfc.run(parser.getRemainingArgs()));
  }

在ZKFC启动的过程中,启动了两个非常重要的进程内组件:HealthMonitor和ActiveStandbyElector。ZKFC主要从HealthMonitor和ActiveStandbyElector中订阅事件并管理NN的状态并负责fencing。HealthMonitor定期检查NN的健康状况,如果出现问题,以捕获异常的方式通过回调方法将变化通知给ZKFailoverController。ActiveStandbyElector主要用于管理NN在zk上的状态,包括创建节点,节点监控等。HealthMonitor在初始启动时,如果本地节点处于健康状态,则会触发一系列的事件使当前NN节点参加选举并切换为ACTIVE状态,具体代码如下

 HealthMonitor.doHealthChecks() {
    while (shouldRun) {
      HAServiceStatus status = null;
      boolean healthy = false;
      try {
    	//通过proxy获取NN的状态
        status = proxy.getServiceStatus();
        proxy.monitorHealth();
        healthy = true;
      } catch (HealthCheckFailedException e) {
        LOG.warn("Service health check failed for " + targetToMonitor
            + ": " + e.getMessage());
        //调用callbacks集合中的回调方法,以事件的形式通知ZKFC
        enterState(State.SERVICE_UNHEALTHY);
      } catch (Throwable t) {
        RPC.stopProxy(proxy);
        proxy = null;
        enterState(State.SERVICE_NOT_RESPONDING);
        Thread.sleep(sleepAfterDisconnectMillis);
        return;
      }
      
      if (status != null) {
        setLastServiceStatus(status);
      }
      if (healthy) {
	//初始启动时State=INITIALIZING,当前状态与初始状态不一致时,触发状态转移
        enterState(State.SERVICE_HEALTHY);
      }

      Thread.sleep(checkIntervalMillis);
    }
  }
  
  
  HealthMonitor.enterState(State newState) {
    //本次状态和上一次状态不一致的时候,触发回调
    if (newState != state) {
      LOG.info("Entering state " + newState);
      state = newState;
      synchronized (callbacks) {
        for (Callback cb : callbacks) {
          cb.enteredState(newState);
        }
      }
    }
  }
  
  
  HealthCallbacks.enteredState(HealthMonitor.State newState) {
      setLastHealthState(newState);
      //根据当前状态判断是否可以参加选举
      recheckElectability();
   }
   
  HealthMonitor.recheckElectability() {
    synchronized (elector) {
      synchronized (this) {
        boolean healthy = lastHealthState == State.SERVICE_HEALTHY;
    
        long remainingDelay = delayJoiningUntilNanotime - System.nanoTime(); 
        if (remainingDelay > 0) {
          if (healthy) {
            LOG.info("Would have joined master election, but this node is " +
                "prohibited from doing so for " +
                TimeUnit.NANOSECONDS.toMillis(remainingDelay) + " more ms");
          }
          scheduleRecheck(remainingDelay);
          return;
        }
    
        switch (lastHealthState) {
	//状态为SERVICE_HEALTHY,可参加选举
        case SERVICE_HEALTHY:
          elector.joinElection(targetToData(localTarget));
          break;
        //状态为INITIALIZING,失去选举资格
        case INITIALIZING:
          elector.quitElection(false);
          break;
    
	//状态为SERVICE_UNHEALTHY或者SERVICE_NOT_RESPONDING,失去选举资格
        case SERVICE_UNHEALTHY:
        case SERVICE_NOT_RESPONDING:
          elector.quitElection(true);
          break;
          
        case HEALTH_MONITOR_FAILED:
          fatalError("Health monitor failed!");
          break;
          
        default:
          throw new IllegalArgumentException("Unhandled state:" + lastHealthState);
        }
      }
    }
  }
  
  
  ActiveStandbyElector.joinElectionInternal() {
    Preconditions.checkState(appData != null,
        "trying to join election without any app data");
    if (zkClient == null) {
      if (!reEstablishSession()) {
        fatalError("Failed to reEstablish connection with ZooKeeper");
        return;
      }
    }

    createRetryCount = 0;
    wantToBeInElection = true;
    //向zk写入ephemeral类型的znode,当NN挂掉后,会被自动删除
    createLockNodeAsync();
  }
  
  
  ActiveStandbyElector.createLockNodeAsync() {
    //异步调用,当方法返回时,触发回调方法
    //processResult(int rc, String path, Object ctx,String name)
    zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL,
        this, zkClient);
  }
  
  
  ActiveStandbyElector.processResult(int rc, String path, Object ctx,
      String name) {
    Code code = Code.get(rc);
    if (isSuccess(code)) {
      //创建成功,试图使节点变为ACTIVE状态
      //becomeActive()通过与本地NN进行RPC通信,将NN的state设置为ACTIVE
      //从而使得当前节点的NN更新为主节点
      if (becomeActive()) {
	//监控节点
        monitorActiveStatus();
      } else {
        reJoinElectionAfterFailureToBecomeActive();
      }
      return;
    }
    //节点已经存在,说明已经有NN成为了主节点,此时本节点只能作为热备节点存在
    //故状态设置为STANDBY
    if (isNodeExists(code)) {
      if (createRetryCount == 0) {
        becomeStandby();
      }
      //监控ACTIVE NN写入的znode,当节点状态改变时,通过watch机制触发回调事件
      monitorActiveStatus();
      return;
    }
    
    
    ActiveStandbyElector.becomeActive() {
      if (state == State.ACTIVE) {
        // already active
        return true;
      }
      try {
    	//获取上一个ACTIVE NN的面包屑节点数据,并对上一个ACTIVE NN执行fence操作
        Stat oldBreadcrumbStat = fenceOldActive();
        //更新面包屑节点的数据
        writeBreadCrumbNode(oldBreadcrumbStat);
      
        //rpc方式与当前NN交互,使得当前节点的NN变为ACTIVE状态
        appClient.becomeActive();
        state = State.ACTIVE;
        return true;
      } catch (Exception e) {
        return false;
      }
    }

处于STANDBY状态的NN会监控ACTIVE NN写入zk的znode节点,当节点状态改变时,触发zk的watch回调,使得STANDBY NN重新参与到选举中,从而完成状态的自动切换,代码如下

 ActiveStandbyElector.processWatchEvent(ZooKeeper zk, WatchedEvent event) {
   String path = event.getPath();
   if (path != null) {
     switch (eventType) {
     case NodeDeleted:
       if (state == State.ACTIVE) {
         enterNeutralMode();
       }
       //重新参加选举
       joinElectionInternal();
       break;
     case NodeDataChanged:
       monitorActiveStatus();
       break;
     default:
       monitorActiveStatus();
     }
 }

上述主要是从代码的角度去理解和分析了NN自动切换的大致流程,相比手动切换的方式,可用性大大提升,同时减轻了运维的负担。

利用QJM实现HDFS自动主从切换(HA Automatic Failover)源码详析

时间: 2024-11-05 22:50:56

利用QJM实现HDFS自动主从切换(HA Automatic Failover)源码详析的相关文章

redis配置读写分离以及利用哨兵sentinel进行自动主从切换

redis利用哨兵(sentinel)进行主从切换,断断续续,自己终于通过配置验证了一下该功能,其中遇到过一些的问题,也是耗费了大量的时间才解决,接下来分享下配置的过程以及遇到的问题和解决方法.希望对各位有所帮助. 首先说一下实验环境: redis软件:redis-3.2.1(安装在虚拟机的linux系统中) 宿主主机:window8.1 x64 secureCRT:宿主主机安装此软件来操作linux,这只是个人喜欢,大家可以不装. 对于redis在linux如何安装这里不进行说明,不懂的朋友可

HDFS HA架构以及源码引导

HA体系架构 相关知识介绍 HDFS master/slave架构,HDFS节点分为NameNode节点和DataNode节点.NameNode存有HDFS的元数据:主要由FSImage和EditLog组成.FSImage保存有文件的目录.分块ID.文件权限等,EditLog保存有对HDFS的操作记录.DataNode存放分块的数据,并采用CRC循环校验方式对本地的数据进行校验,DataNode周期性向NameNode汇报本机的信息. NameNode单点故障:HDFS只有一个NameNode节

Hadoop之HDFS原理及文件上传下载源码分析(上)

HDFS原理 首先说明下,hadoop的各种搭建方式不再介绍,相信各位玩hadoop的同学随便都能搭出来. 楼主的环境: 操作系统:Ubuntu 15.10 hadoop版本:2.7.3 HA:否(随便搭了个伪分布式) 文件上传 下图描述了Client向HDFS上传一个200M大小的日志文件的大致过程: 首先,Client发起文件上传请求,即通过RPC与NameNode建立通讯. NameNode与各DataNode使用心跳机制来获取DataNode信息.NameNode收到Client请求后,

Hadoop之HDFS原理及文件上传下载源码分析(下)

上篇Hadoop之HDFS原理及文件上传下载源码分析(上)楼主主要介绍了hdfs原理及FileSystem的初始化源码解析, Client如何与NameNode建立RPC通信.本篇将继续介绍hdfs文件上传.下载源解析. 文件上传 先上文件上传的方法调用过程时序图: 其主要执行过程: FileSystem初始化,Client拿到NameNodeRpcServer代理对象,建立与NameNode的RPC通信(楼主上篇已经介绍过了) 调用FileSystem的create()方法,由于实现类为Dis

Cocos2d-X3.0 刨根问底(九)----- 场景切换(TransitionScene)源码分析

上一章我们分析了Scene与Layer相关类的源码,对Cocos2d-x的场景有了初步了解,这章我们来分析一下场景变换TransitionScene源码. 直接看TransitionScene的定义 class CC_DLL TransitionScene : public Scene { public: /** Orientation Type used by some transitions */ enum class Orientation { /// An horizontal orie

SpringMVC关于json、xml自动转换的原理研究[附带源码分析 --转

SpringMVC关于json.xml自动转换的原理研究[附带源码分析] 原文地址:http://www.cnblogs.com/fangjian0423/p/springMVC-xml-json-convert.html 目录 前言 现象 源码分析 实例讲解 关于配置 总结 参考资料 前言 SpringMVC是目前主流的Web MVC框架之一. 如果有同学对它不熟悉,那么请参考它的入门blog:http://www.cnblogs.com/fangjian0423/p/springMVC-in

利用QJM实现HDFS的HA策略部署与验证工作记录分享

1.概述  Hadoop2.X中的HDFS(Vsersion2.0)相比于Hadoop1.X增加了两个重要功能,HA和Federation.HA解决了Hadoop1.X  Namenode中一直存在的单点故障问题,HA策略通过热备的方式为主NameNode提供一个备用者,并且这个备用者的状态一直和主Namenode的元数据保持一致,一旦主NameNode挂了,备用NameNode可以立马转换变换为主NameNode,从而提供不间断的服务.另外,Federation特性,主要是允许一个 HDFS

推荐10款左右切换的焦点图源码下载

1.jQuery左右循环焦点图,带箭头按钮 这次要分享的jQuery插件也是一款焦点图插件,这在我们之前分享的焦点图插件中算是功能比较完善的了,图片的滑动效果也非常不错.图片两侧带有左右箭头,点击它们即可左右切换图片.另外,图片上面可以浮动文字描述,是一款功能强大的jQuery焦点图插件. 在线预览   源码下载 2.一款基于jQuery的图片左右滑动焦点图 今天给大家分享一款基于jQuery的焦点图插件,这款jQuery焦点图插件的特点是可以多张图片左右滑动切换,可以点击切换按钮进行图片滑动,

第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密

一:Spark集群开发环境准备 启动HDFS,如下图所示: 通过web端查看节点正常启动,如下图所示: 2.启动Spark集群,如下图所示: 通过web端查看集群启动正常,如下图所示: 3.启动start-history-server.sh,如下图所示: 二:HDFS的SparkStreaming案例实战(代码部分) package com.dt.spark.SparkApps.sparkstreaming; import org.apache.spark.SparkConf; import o