Hbase0.96源代码之HMaster(二)Hmaster主要循环becomeActiveMaster

1,Hmaster主循环主要这里主要有:

1,1 becomeActiveMaster(startupStatus);

1.2 finishInitialization

1.3 loop()

  becomeActiveMaster(startupStatus);
  // We are either the active master or we were asked to shutdown
  if (!this.stopped) {
    finishInitialization(startupStatus, false);
    loop();
  }

2,becomeActiveMaster(startupStatus);

2.1.1首先创建一个ActiveMasterManager,负责watch zk上的事件,这里主要是nodeCreated(),nodeDeleted()

  private boolean becomeActiveMaster(MonitoredTask startupStatus)
  throws InterruptedException {
    // TODO: This is wrong!!!! Should have new servername if we restart ourselves,
    // if we come back to life.
    //创建activeMasterManager对象
    this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName,
        this);
    //注册activeMasterManager到zk
    this.zooKeeper.registerListener(activeMasterManager);
    stallIfBackupMaster(this.conf, this.activeMasterManager);

    // The ClusterStatusTracker is setup before the other
    // ZKBasedSystemTrackers because it‘s needed by the activeMasterManager
    // to check if the cluster should be shutdown.
    this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this);
    this.clusterStatusTracker.start();
    return this.activeMasterManager.blockUntilBecomingActiveMaster(startupStatus);
  }

2.1.2 ActiveMasterManager上的事件处理这里无论是create还是都是delete节点都是一样的处

2.1.2.1 nodeCreated()与nodeDeleted事件处理

  @Override
  public void nodeCreated(String path) {
    handle(path);
  }

  @Override
  public void nodeDeleted(String path) {
    if(path.equals(watcher.clusterStateZNode) && !master.isStopped()) {
      clusterShutDown.set(true);
    }

    handle(path);
  }

  void handle(final String path) {
    if (path.equals(watcher.getMasterAddressZNode()) && !master.isStopped()) {
      handleMasterNodeChange();
    }
  }

2.1.2.2,最终的节点创建和删除处理函数,

2.1.2.2.1这里不管是创建还是删除节点都是同一处理函数,

2.1.2.2.2如果/hbase/master节点处在说明已经有active master了,

2.1.2.2.3另外这个 clusterHasActiveMaster.notifyAll();需要关注下,在后面的阻塞成为master会用到

  private void handleMasterNodeChange() {
    // Watch the node and check if it exists.
    try {
      synchronized(clusterHasActiveMaster) {
        if (ZKUtil.watchAndCheckExists(watcher, watcher.getMasterAddressZNode())) {
          // A master node exists, there is an active master
          LOG.debug("A master is now available");
          clusterHasActiveMaster.set(true);
        } else {
          // Node is no longer there, cluster does not have an active master
          LOG.debug("No master available. Notifying waiting threads");
          clusterHasActiveMaster.set(false);
          // Notify any thread waiting to become the active master
          clusterHasActiveMaster.notifyAll();
        }
      }
    } catch (KeeperException ke) {
      master.abort("Received an unexpected KeeperException, aborting", ke);
    }
  }

2.2阻塞成为master

 boolean blockUntilBecomingActiveMaster(MonitoredTask startupStatus) {
    while (true) {
      startupStatus.setStatus("Trying to register in ZK as active master");
      // Try to become the active master, watch if there is another master.
      // Write out our ServerName as versioned bytes.
      try {
          //backupZNode -->/hbase/backup-masters/sn(hostname,port,startcode)
        String backupZNode =
            ZKUtil.joinZNode(this.watcher.backupMasterAddressesZNode, this.sn.toString());
        // watcher.getMasterAddressZNode()-->/hbase/master
        if (MasterAddressTracker.setMasterAddress(this.watcher,
            this.watcher.getMasterAddressZNode(), this.sn)) {

          // If we were a backup master before, delete our ZNode from the backup
          // master directory since we are the active now)
          if (ZKUtil.checkExists(this.watcher, backupZNode) != -1) {
            LOG.info("Deleting ZNode for " + backupZNode + " from backup master directory");
            ZKUtil.deleteNodeFailSilent(this.watcher, backupZNode);
          }
          // Save the znode in a file, this will allow to check if we crash in the launch scripts
          ZNodeClearer.writeMyEphemeralNodeOnDisk(this.sn.toString());

          // We are the master, return
          startupStatus.setStatus("Successfully registered as active master.");
          this.clusterHasActiveMaster.set(true);
          LOG.info("Registered Active Master=" + this.sn);
          return true;
        }

        // There is another active master running elsewhere or this is a restart
        // and the master ephemeral node has not expired yet.
        this.clusterHasActiveMaster.set(true);

        /*
        * Add a ZNode for ourselves in the backup master directory since we are
        * not the active master.
        *
        * If we become the active master later, ActiveMasterManager will delete
        * this node explicitly.  If we crash before then, ZooKeeper will delete
        * this node for us since it is ephemeral.
        */
        LOG.info("Adding ZNode for " + backupZNode + " in backup master directory");
        MasterAddressTracker.setMasterAddress(this.watcher, backupZNode, this.sn);

        String msg;
        byte[] bytes =
          ZKUtil.getDataAndWatch(this.watcher, this.watcher.getMasterAddressZNode());
        if (bytes == null) {
          msg = ("A master was detected, but went down before its address " +
            "could be read.  Attempting to become the next active master");
        } else {
          ServerName currentMaster;
          try {
            currentMaster = ServerName.parseFrom(bytes);
          } catch (DeserializationException e) {
            LOG.warn("Failed parse", e);
            // Hopefully next time around we won‘t fail the parse.  Dangerous.
            continue;
          }
          if (ServerName.isSameHostnameAndPort(currentMaster, this.sn)) {
            msg = ("Current master has this master‘s address, " +
              currentMaster + "; master was restarted? Deleting node.");
            // Hurry along the expiration of the znode.
            ZKUtil.deleteNode(this.watcher, this.watcher.getMasterAddressZNode());

            // We may have failed to delete the znode at the previous step, but
            //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
            ZNodeClearer.deleteMyEphemeralNodeOnDisk();
          } else {
            msg = "Another master is the active master, " + currentMaster +
              "; waiting to become the next active master";
          }
        }
        LOG.info(msg);
        startupStatus.setStatus(msg);
      } catch (KeeperException ke) {
        master.abort("Received an unexpected KeeperException, aborting", ke);
        return false;
      }
      synchronized (this.clusterHasActiveMaster) {
        while (this.clusterHasActiveMaster.get() && !this.master.isStopped()) {
          try {
            this.clusterHasActiveMaster.wait();
          } catch (InterruptedException e) {
            // We expect to be interrupted when a master dies,
            //  will fall out if so
            LOG.debug("Interrupted waiting for master to die", e);
          }
        }
        if (clusterShutDown.get()) {
          this.master.stop(
            "Cluster went down before this master became active");
        }
        if (this.master.isStopped()) {
          return false;
        }
        // there is no active master so we can try to become active master again
      }
    }
  }

2.2.1 创建临时节点/hbase/master,这里主要看

2.2.1.1

MasterAddressTracker.setMasterAddress(this.watcher,
            this.watcher.getMasterAddressZNode(), this.sn)

2.2.1.2

  public static boolean setMasterAddress(final ZooKeeperWatcher zkw,
      final String znode, final ServerName master)
  throws KeeperException {
    return ZKUtil.createEphemeralNodeAndWatch(zkw, znode, toByteArray(master));
  }

2.2.1.3,这里不论创建失败成功都会添加zkw事件watcher,成功则成为master,失败则可能是已经有master存在了

  public static boolean createEphemeralNodeAndWatch(ZooKeeperWatcher zkw,
      String znode, byte [] data)
  throws KeeperException {
    try {
      zkw.getRecoverableZooKeeper().create(znode, data, createACL(zkw, znode),
          CreateMode.EPHEMERAL);
    } catch (KeeperException.NodeExistsException nee) {
      if(!watchAndCheckExists(zkw, znode)) {
        // It did exist but now it doesn‘t, try again
        return createEphemeralNodeAndWatch(zkw, znode, data);
      }
      return false;
    } catch (InterruptedException e) {
      LOG.info("Interrupted", e);
      Thread.currentThread().interrupt();
    }
    return true;
  }

2.2.2如果失败则阻塞则

2.2.2.1

如果失败,说明不是active master,加入backup节点

LOG.info("Adding ZNode for " + backupZNode + " in backup master directory");
MasterAddressTracker.setMasterAddress(this.watcher, backupZNode, this.sn);

2.2.2.2 再次从zk获取master 的server地址,与自己比较,如果是则说明已经重启过

    byte[] bytes =
          ZKUtil.getDataAndWatch(this.watcher, this.watcher.getMasterAddressZNode());
        if (bytes == null) {
          msg = ("A master was detected, but went down before its address " +
            "could be read.  Attempting to become the next active master");
        } else {
          ServerName currentMaster;
          try {
            currentMaster = ServerName.parseFrom(bytes);
          } catch (DeserializationException e) {
            LOG.warn("Failed parse", e);
            // Hopefully next time around we won‘t fail the parse.  Dangerous.
            continue;
          }
          if (ServerName.isSameHostnameAndPort(currentMaster, this.sn)) {
            msg = ("Current master has this master‘s address, " +
              currentMaster + "; master was restarted? Deleting node.");
            // Hurry along the expiration of the znode.
            ZKUtil.deleteNode(this.watcher, this.watcher.getMasterAddressZNode());

            // We may have failed to delete the znode at the previous step, but
            //  we delete the file anyway: a second attempt to delete the znode is likely to fail again.
            ZNodeClearer.deleteMyEphemeralNodeOnDisk();
          } else {
            msg = "Another master is the active master, " + currentMaster +
              "; waiting to become the next active master";
          }

2.2 阻塞在clusterHasActiveMaster,这里等待知道notify,由ActiveMasterManager(2.1.2)来触发

     synchronized (this.clusterHasActiveMaster) {
        while (this.clusterHasActiveMaster.get() && !this.master.isStopped()) {
          try {
            this.clusterHasActiveMaster.wait();
          } catch (InterruptedException e) {
            // We expect to be interrupted when a master dies,
            //  will fall out if so
            LOG.debug("Interrupted waiting for master to die", e);
          }
        }
        if (clusterShutDown.get()) {
          this.master.stop(
            "Cluster went down before this master became active");
        }
        if (this.master.isStopped()) {
          return false;
        }
        // there is no active master so we can try to become active master again
      }

Hbase0.96源代码之HMaster(二)Hmaster主要循环becomeActiveMaster,布布扣,bubuko.com

时间: 2024-10-13 21:31:31

Hbase0.96源代码之HMaster(二)Hmaster主要循环becomeActiveMaster的相关文章

Hbase0.96源代码之HMaster(三)Hmaster主要循环

1,Master初始化 1.1 if (!this.stopped) { finishInitialization(startupStatus, false); loop(); } 1.2 finishInitialization(),这里主要完成master组件components:filesystem manager,servermanager,assignmentmanager,regionservertracker, catalogtracker等 1.2.1 设置为active mas

Hbase0.96源代码之HMaster(一)

从main()函数开始 public static void main(String [] args) { VersionInfo.logVersion(); new HMasterCommandLine(HMaster.class).doMain(args); } public void doMain(String args[]) { try { int ret = ToolRunner.run(HBaseConfiguration.create(), this, args); if (ret

Hbase0.96源码之HMaster(二)Hmaster主要循环becomeActiveMaster

1,Hmaster主循环主要这里主要有: 1,1 becomeActiveMaster(startupStatus); 1.2 finishInitialization 1.3 loop() becomeActiveMaster(startupStatus); // We are either the active master or we were asked to shutdown if (!this.stopped) { finishInitialization(startupStatus

ubuntu12.04+hadoop2.2.0+zookeeper3.4.5+hbase0.96.2+hive0.13.1分布式环境部署

博文作者:迦壹 博客地址:http://idoall.org/home.php?mod=space&uid=1&do=blog&id=542 转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作! --------------------------------------- 目录: 一.hadoop2.2.0.zookeeper3.4.5.hbase0.96.2.hive0.13.1都是什么? 二.这些软件在哪里下载? 三.如何安装 1.安装JD

Hadoop-2.2.0 + Hbase-0.96.2 + Hive-0.13.1(转)

From:http://www.itnose.net/detail/6065872.html # 需要软件 Hadoop-2.2.0(目前Apache官网最新的Stable版本) Hbase-0.96.2(这里就用这个版本,跟Hadoop-2.2.0是配套的,不用覆盖jar包什么的) Hive-0.13.1(目前是最新版本) Zookeepr-3.4.6(这里推荐使用 3.4.5) Jdk1.7.0_60(这里推荐使用1.7.0_45) Mysql-5.5.31 # 集群结构图 NN : Nam

_00018 Hadoop-2.2.0 + Hbase-0.96.2 + Hive-0.13.1 分布式环境整合,Hadoop-2.X使用HA方式

博文作者:妳那伊抹微笑 博客地址:http://blog.csdn.net/u012185296 个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在 技术方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ... 云计算技术 转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作! qq交流群:214293307  (期待与你一起学习,共同进步) # Hadoo

Hbase0.96源码之HMaster(一)

从main()函数開始 public static void main(String [] args) { VersionInfo.logVersion(); new HMasterCommandLine(HMaster.class).doMain(args); } public void doMain(String args[]) { try { int ret = ToolRunner.run(HBaseConfiguration.create(), this, args); if (ret

hbase0.96 put流程 源代码分析

1.HashMap的遍历 package com.sheepmu; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; public class KMPText { public static void main(String[] args) { Map<String,String> map=new HashMap<String,Str

PHP和Golang使用Thrift1和Thrift2访问Hbase0.96.2(ubuntu12.04)

目录: 一.Thrift1和Thrift2的简要介绍 1) 写在前面 2) Thrift1和Thrift2的区别  二.Thrift0.9.2的安装 1) 安装依赖插件 2) Thrift0.9.2的编译  3) Thrift0.9.2编译后,配置成可执行文件 三.Hbase0.96.2的安装 1) Hbase的介绍 2) Hbase的安装 3) Hbase基于源码的Thrift生成接口文件 四.PHP和Golang使用Thrift1和Thrift2访问Hbase 1) PHP基于Thrift1