hadoop2.5.2学习及实践笔记(四)—— namenode启动过程源码概览

对namenode启动时的相关操作及相关类有一个大体了解,后续深入研究时,再对本文进行补充

>实现类

HDFS启动脚本为$HADOOP_HOME/sbin/start-dfs.sh,查看start-dfs.sh可以看出,namenode是通过bin/hdfs命令来启动

$ vi  start-dfs.sh

# namenodes
NAMENODES=$($HADOOP_PREFIX/bin/hdfs getconf -namenodes)
echo "Starting namenodes on [$NAMENODES]"
"$HADOOP_PREFIX/sbin/hadoop-daemons.sh"   --config "$HADOOP_CONF_DIR"   --hostnames "$NAMENODES"   --script "$bin/hdfs" start namenode $nameStartOpt
#---------------------------------------------------------

查看$HADOOP_HOME/bin/hdfs,可以找到namenode启动所调用的java类。

$ vi bin/hdfs:

if [ "$COMMAND" = "namenode" ] ; then
  CLASS=‘org.apache.hadoop.hdfs.server.namenode.NameNode‘
  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_NAMENODE_OPTS"

>源码查看

按照前文hadoop2.5.2学习及实践笔记(二)——编译源代码及导入源码至eclipse步骤,源码已经导入到eclipse中,快捷键ctrl+shift+R搜索并打开NameNode.java查看源码

NameNode类中有一个静态代码块,表示在加载器加载NameNode类过程中的准备阶段,就会执行代码块中的代码。HdfsConfiguration的init()方法的方法体是空的,这里的作用是触发对HdfsConfiguration的主动调用,从而保证在执行NameNode类相关调用时,如果HdfsConfiguration类没有被加载和初始化,先触发HdfsConfiguration的初始化过程。

//org.apache.hadoop.hdfs.server.namenode.NameNode.java

static{
     //HdfsConfiguration类init()方法:public static void init() {}
    HdfsConfiguration.init();
}

查看其main方法,可以看出namenode相关操作的主要入口方法是createNameNode(String argv[], Configuration conf)方法。

//org.apache.hadoop.hdfs.server.namenode.NameNode.java

public static void main(String argv[]) throws Exception {
    if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
      System.exit(0);
    }

    try {
      //打印namenode启动或关闭日志信息
      StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
     //namenode相关主要操作
      NameNode namenode = createNameNode(argv, null);
      if (namenode != null) {
          //向客户端和datanode提供RPC服务,直到RPC服务器结束运行
        namenode.join();
      }
    } catch (Throwable e) {
      LOG.fatal("Exception in namenode join", e);
      terminate(1, e);
    }
  }

createNameNode方法中通过一个switch语句对不同的命令执行不同的操作。比如搭建环境时格式化文件系统时的操作,可以查看FORMAT分支。

//org.apache.hadoop.hdfs.server.namenode.NameNode.java

public static NameNode createNameNode(String argv[], Configuration conf)
      throws IOException {
    LOG.info("createNameNode " + Arrays.asList(argv));
    if (conf == null)
      conf = new HdfsConfiguration();
    //参数为空时默认: -regular
    StartupOption startOpt = parseArguments(argv);
    if (startOpt == null) {
      printUsage(System.err);
      return null;
    }
    setStartupOption(conf, startOpt);

    switch (startOpt) {
      case FORMAT: {//格式化文件系统,伪分布式环境搭建时调用过namenode -format命令
        boolean aborted = format(conf, startOpt.getForceFormat(),
            startOpt.getInteractiveFormat());
        terminate(aborted ? 1 : 0);
        return null; // avoid javac warning
      }
      case GENCLUSTERID: {
        System.err.println("Generating new cluster id:");
        System.out.println(NNStorage.newClusterID());
        terminate(0);
        return null;
      }
      case FINALIZE: {
        System.err.println("Use of the argument ‘" + StartupOption.FINALIZE +
            "‘ is no longer supported. To finalize an upgrade, start the NN " +
            " and then run `hdfs dfsadmin -finalizeUpgrade‘");
        terminate(1);
        return null; // avoid javac warning
      }
      case ROLLBACK: {
        boolean aborted = doRollback(conf, true);
        terminate(aborted ? 1 : 0);
        return null; // avoid warning
      }
      case BOOTSTRAPSTANDBY: {
        String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length);
        int rc = BootstrapStandby.run(toolArgs, conf);
        terminate(rc);
        return null; // avoid warning
      }
      case INITIALIZESHAREDEDITS: {
        boolean aborted = initializeSharedEdits(conf,
            startOpt.getForceFormat(),
            startOpt.getInteractiveFormat());
        terminate(aborted ? 1 : 0);
        return null; // avoid warning
      }
      case BACKUP:
      case CHECKPOINT: {//backupnode和checkpointnode启动
        NamenodeRole role = startOpt.toNodeRole();
        DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
          //backupnode继承NameNode类,代码最终执行的还是NameNode的构造方法
        return new BackupNode(conf, role);
      }
      case RECOVER: {
        NameNode.doRecovery(startOpt, conf);
        return null;
      }
      case METADATAVERSION: {
        printMetadataVersion(conf);
        terminate(0);
        return null; // avoid javac warning
      }
      default: {
        DefaultMetricsSystem.initialize("NameNode");
          //启动时startOpt=“-regular”,代码执行default分支,通过构造函数返回一个namenode实例
        return new NameNode(conf);
      }
    }
  }

namenode的构造方法

//org.apache.hadoop.hdfs.server.namenode.NameNode.java

 public NameNode(Configuration conf) throws IOException {
    this(conf, NamenodeRole.NAMENODE);
  }
 protected NameNode(Configuration conf, NamenodeRole role)
      throws IOException {
    this.conf = conf;
    this.role = role;
    //获取fs.defaultFS,设置namenode地址
    setClientNamenodeAddress(conf);
    String nsId = getNameServiceId(conf);
    String namenodeId = HAUtil.getNameNodeId(conf, nsId);
    //是否启用HA
    this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
    //HA状态:启用/备用
    state = createHAState(getStartupOption(conf));
    //读取dfs.ha.allow.stale.reads,设置namenode在备用状态时是否允许读操作,默认为false
    this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
    this.haContext = createHAContext();
    try {
     //联邦环境下,使用该方法配置一系列使用一个逻辑上的nsId组合在一起的namenode
      initializeGenericKeys(conf, nsId, namenodeId);
    //namenode初始化
    initialize(conf);
      try {
        haContext.writeLock();
        state.prepareToEnterState(haContext);
        //namenode进入相应状态:active state/backup state/standby state
       state.enterState(haContext);
      } finally {
        haContext.writeUnlock();
      }
    } catch (IOException e) {
      this.stop();
      throw e;
    } catch (HadoopIllegalArgumentException e) {
      this.stop();
      throw e;
    }
  }

namenode初始化方法代码

//org.apache.hadoop.hdfs.server.namenode.NameNode.java

protected void initialize(Configuration conf) throws IOException {
    if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
      String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
      if (intervals != null) {
        conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
          intervals);
      }
    }
    //设置权限,根据hadoop.security.authentication获取认证方式及规则
    UserGroupInformation.setConfiguration(conf);
    //登录:如果认证方式为simple则退出该方法
    //否则调用UserGroupInformation.loginUserFromKeytab进行登陆,登陆使用dfs.namenode.kerberos.principal作为用户名
    loginAsNameNodeUser(conf);

    //初始化度量系统,用于度量namenode服务状态
    NameNode.initMetrics(conf, this.getRole());
    StartupProgressMetrics.register(startupProgress);

    if (NamenodeRole.NAMENODE == role) {
      //启动http服务器
      startHttpServer(conf);
    }
    //根据命令对命名空间进行操作,如:前文所述启动时加载本地命名空间镜像和应用编辑日志,在内存中建立命名空间的映像
    loadNamesystem(conf);

    //创建RPC服务器
    rpcServer = createRpcServer(conf);
    if (clientNamenodeAddress == null) {
      // This is expected for MiniDFSCluster. Set it now using
      // the RPC server‘s bind address.
      clientNamenodeAddress =
          NetUtils.getHostPortString(rpcServer.getRpcAddress());
      LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
          + " this namenode/service.");
    }
    if (NamenodeRole.NAMENODE == role) {
      httpServer.setNameNodeAddress(getNameNodeAddress());
      httpServer.setFSImage(getFSImage());
    }

    pauseMonitor = new JvmPauseMonitor(conf);
    pauseMonitor.start();
    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);

    //启动活动状态和备用状态的公共服务:RPC服务和namenode的插件程序启动
    startCommonServices(conf);
  }

loadNamesystem(Configuration conf)方法调用FSNamesystem类的loadFromDisk(Configuration conf)。前文提到的,namenode启动时从本地文件系统加载镜像并重做编辑日志,都在此方法中实现。

//org.apache.hadoop.hdfs.server.namenode.FSNamesystem.java

static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
    //必须的编辑日志目录检查
    checkConfiguration(conf);
    //设在NNStorage,并初始化编辑日志目录。NNStorage主要功能是管理namenode使用的存储目录
    FSImage fsImage = new FSImage(conf,
        FSNamesystem.getNamespaceDirs(conf),
        FSNamesystem.getNamespaceEditsDirs(conf));
    //根据指定的镜像创建FSNamesystem对象
    FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
    StartupOption startOpt = NameNode.getStartupOption(conf);    if (startOpt == StartupOption.RECOVER) {
      namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
    }

    long loadStart = now();
    try {
      //加载镜像、重做编辑日志,并打开一个新编辑文件都在此方法中
      namesystem.loadFSImage(startOpt);
    } catch (IOException ioe) {
      LOG.warn("Encountered exception loading fsimage", ioe);
      fsImage.close();
      throw ioe;
    }
    long timeTakenToLoadFSImage = now() - loadStart;
    LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
    NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();
    if (nnMetrics != null) {
      nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage);
    }
    return namesystem;
  }

private void loadFSImage(StartupOption startOpt) throws IOException {
    final FSImage fsImage = getFSImage();

    // format before starting up if requested
    if (startOpt == StartupOption.FORMAT) {

      fsImage.format(this, fsImage.getStorage().determineClusterId());// reuse current id

      startOpt = StartupOption.REGULAR;
    }
    boolean success = false;
    writeLock();
    try {
      // We shouldn‘t be calling saveNamespace if we‘ve come up in standby state.
      MetaRecoveryContext recovery = startOpt.createRecoveryContext();
      final boolean staleImage
          = fsImage.recoverTransitionRead(startOpt, this, recovery);
      if (RollingUpgradeStartupOption.ROLLBACK.matches(startOpt)) {
        rollingUpgradeInfo = null;
      }
      final boolean needToSave = staleImage && !haEnabled && !isRollingUpgrade();
      LOG.info("Need to save fs image? " + needToSave
          + " (staleImage=" + staleImage + ", haEnabled=" + haEnabled
          + ", isRollingUpgrade=" + isRollingUpgrade() + ")");
      if (needToSave) {
        fsImage.saveNamespace(this);
      } else {
        // No need to save, so mark the phase done.
        StartupProgress prog = NameNode.getStartupProgress();
        prog.beginPhase(Phase.SAVING_CHECKPOINT);
        prog.endPhase(Phase.SAVING_CHECKPOINT);
      }
      // This will start a new log segment and write to the seen_txid file, so
      // we shouldn‘t do it when coming up in standby state
      if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)) {
        fsImage.openEditLogForWrite();
      }
      success = true;
    } finally {
      if (!success) {
        fsImage.close();
      }
      writeUnlock();
    }
    imageLoadComplete();
  }
时间: 2024-08-04 10:11:49

hadoop2.5.2学习及实践笔记(四)—— namenode启动过程源码概览的相关文章

hadoop2.5.2学习及实践笔记(三)—— HDFS概念及体系结构

注:文中涉及的文件路径或配置文件中属性名称是针对hadoop2.X系列,相对于之前版本,可能有改动. 附: HDFS用户指南官方介绍: http://hadoop.apache.org/docs/r2.5.2/hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html HDFS体系结构官方介绍: http://hadoop.apache.org/docs/r2.5.2/hadoop-project-dist/hadoop-hdfs/HdfsDesign.

hadoop2.5.2学习及实践笔记(五)—— HDFS shell命令行常见操作

附:HDFS shell guide文档地址 http://hadoop.apache.org/docs/r2.5.2/hadoop-project-dist/hadoop-common/FileSystemShell.html 启动HDFS后,输入hadoop fs命令,即可显示HDFS常用命令的用法 [[email protected] hadoop-2.5.2]$ hadoop fs Usage: hadoop fs [generic options] [-appendToFile <lo

hadoop2.5.2学习及实践笔记(六)—— Hadoop文件系统及其java接口

文件系统概述 org.apache.hadoop.fs.FileSystem是hadoop的抽象文件系统,为不同的数据访问提供了统一的接口,并提供了大量具体文件系统的实现,满足hadoop上各种数据访问需求,如以下几个具体实现(原表格见<hadoop权威指南>): 文件系统 URI方案 Java实现 (org.apache.hadoop) 定义 Local file fs.LocalFileSystem 支持有客户端校验和本地文件系统.带有校验和的本地系统文件在fs.RawLocalFileS

Docker学习与实践(四)

四.仓库管理 1.创建本地仓库 ①获取官方registry镜像 [[email protected] ~]# docker run -d -p 5000:5000 --restart=always --name registry registry:2 Unable to find image 'registry:2' locally 2: Pulling from library/registry 81033e7c1d6a: Pull complete b235084c2315: Pull co

Kotlin学习与实践 (四)类、接口

1.类的继承结构 接口 * Kotlin的类和接口与Java的有些地方不一样:* Kotlin的声明默认是public final的.* Kotlin嵌套的类默认不是内部类:它没有包含对外部类的隐式引用 等* Kotlin也一样是使用interface来声明接口 * 如下: 声明一个简单的接口 interface Clickable { fun click() fun longPress() = println("longPress") //接口中的抽象方法也可以有默认的实现,有了默认

dubbo系列四、dubbo启动过程源码解析

一.代码准备 1.示例代码 参考dubbo系列二.dubbo+zookeeper+dubboadmin分布式服务框架搭建(windows平台) 2.简单了解下spring自定义标签 https://www.jianshu.com/p/16b72c10fca8 Spring自定义标签总共可以分为以下几个步骤 定义Bean 标签解析生成接收配置的POJO. 定义schema文件,定义自定义标签的attr属性 定义解析类parser,遇到自定义标签如何解析. 定义命名空间处理类namespaceSup

Caliburn.Micro学习笔记(四)----IHandle&lt;T&gt;实现多语言功能

Caliburn.Micro学习笔记(四)----IHandle<T>实现多语言功能 说一下IHandle<T>实现多语言功能 因为Caliburn.Micro是基于MvvM的UI与codebehind分离, binding可以是双向的所以我们想动态的实现多语言切换很是方便今天我做一个小demo给大家提供一个思路 先看一下效果 点击英文  变成英文状态点chinese就会变成中文                          源码的下载地址在文章的最下边 多语言用的是资源文件建

代码管理工具 --- git的学习笔记四《重新整理git(1)》

1.创建版本库 mkdir  创建目录 cd  地址,到该地址下 pwd 显示当前目录 1.创建目录 $ mkdir startGit $ cd startGit $ pwd 显示当前目录 或者cd到桌面,然后再创建目录 2.初始化版本库 $ git init 初始化仓库 提示信息:Initialized empty Git repository in /Users/xingzai/Desktop/startGit/.git/ 建立一个空的git仓库在/Users/xingzai/Desktop

Linux学习笔记四:Linux的文件搜索命令

1.文件搜索命令  which 语法:which [命令名称] 范例:$which ls  列出ls命令所在目录 [[email protected] ~]$ which ls alias ls='ls --color=auto' /bin/ls 另外一个命令:whereis [名称名称],也可以列出命令所在目录. [[email protected] ~]$ whereis ls ls: /bin/ls /usr/share/man/man1/ls.1.gz /usr/share/man/ma