大数据:Spark Standalone 集群调度(三)多Master节点的可用性

1. Master 单节点可用性

Master节点在Spark中所承载的作用是分配Application到Worker节点,维护Worker节点,Driver,Application的状态。

在Spark中,Master本身也提供了基于硬盘的单节点的可用性,也就是可以直接通过重启Master,Master通过读取硬盘里保存的状态,进行单节点的恢复。

In order to enable this recovery mode, you can set SPARK_DAEMON_JAVA_OPTS in spark-env using this configuration:

System property Meaning
spark.deploy.recoveryMode Set to FILESYSTEM to enable single-node recovery mode (default: NONE).
spark.deploy.recoveryDirectory The directory in which Spark will store recovery state, accessible from the Master‘s perspective.

通过设置recoveryMode为FILESYSTEM,当Master节点Down掉时候,节点的可用性通过重启Master节点来恢复,Master会通过获取磁盘里保存的集群原来的状态进行恢复。

如果没有设置recoveryMode,默认为不支持恢复,Master重启的时候,不会进行对前状态进行恢复。

备注:恢复节点的方式与多节点可用性后续的处理方式基本类似,就不在此具体讨论细节了

2. Master 多节点可用性

单节点的问题很明显,虽然有恢复的方案,很明显在重启的过程中,是无法继续对资源调度和分配的,而磁盘也是具有单节点特性(假使没有构建磁盘阵列),同样也不具备高可用性的特点

Spark引入了多Master节点来提高集群可用性

设置多Master节点的方式:

-Dspark.deploy.recoveryMode=ZOOKEEPER

2.1 StandBy Master 节点

在Spark的多Master节点设计中,分为两个角色,一个是主Master节点提供服务,另一个角色是备用Master节点

当Master节点处于备用的状态时候,是不提供服务的 ,备用节点只是在处于等待状态,只有当主Master节点crash掉后,备用节点才升级成为主Master提供服务

主备节点互相不感知,不提供负载均衡,不是对等节点,在上图可以看到每个提交任务的Client并不提交任务到StandBy节点

2.2 ZooKeeper 选举Main Master节点

在Master多节点中,只有Main Master才是提供服务的,这个设计就象选举Leader,这种方式非常适合使用Zookeeper的Watch模式。

Spark并没有直接使用Zookeeper,而是使用了curator 框架(基于Zookeeper)进行Master的选举。由于Zookeeper的操作比较底层,操作的是数据结构和节点的状态,curator在上面封装了一层,以应用与不同的应用情景。

在使用Leader模式下,curator的框架开发代码非常简单。使用LeaderLatch, 实现LeaderLatchListener的监听器

override def isLeader() {
    synchronized {
      // could have lost leadership by now.
      if (!leaderLatch.hasLeadership) {
        return
      }

      logInfo("We have gained leadership")
      updateLeadershipStatus(true)
    }
  }

  override def notLeader() {
    synchronized {
      // could have gained leadership by now.
      if (leaderLatch.hasLeadership) {
        return
      }

      logInfo("We have lost leadership")
      updateLeadershipStatus(false)
    }
  }

isLeader 和 notLeader 分别代表成为Leader或者失去Leader, 在Spark的ZookeeperLeaderElectionAgent.scala里跟新leader的状态

  private def updateLeadershipStatus(isLeader: Boolean) {
    if (isLeader && status == LeadershipStatus.NOT_LEADER) {
      status = LeadershipStatus.LEADER
      masterInstance.electedLeader()
    } else if (!isLeader && status == LeadershipStatus.LEADER) {
      status = LeadershipStatus.NOT_LEADER
      masterInstance.revokedLeadership()
    }
  }

调用的是在Master.scala的

  override def electedLeader() {
    self.send(ElectedLeader)
  }

  override def revokedLeadership() {
    self.send(RevokedLeadership)
  }

也就是发了消息ElectedLeader, RevokeLeaderShip 给自己的Netty

Zookeeper默认的leader节点 /spark/leader_election

ls /spark/leader_election
[_c_6b494d00-cf91-4d69-8828-942377bafdf1-latch-0000000217, _c_c1f4c10f-9479-4a55-b660-0497c0b54f89-latch-0000000218]

前面应该是UUID后面有一个序列号,由后面的序列号前后来决定谁是 leader, 而序列号前后的生成取决于Master连接zookeeper的顺序,熟悉zookeeper的知道这些节点是ephemeral节点。

2.3 StandBy Master 节点的恢复

在前面章节里已经描述了如何选为Main Master, 当Main Master 当机后,备用的Master通过选举成为了 Main Master,备用的Master Netty收到了ElectedLeader的消息,在成为主Master节点前,先检查集群状态并准备进行恢复

    case ElectedLeader =>
      val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
      state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
        RecoveryState.ALIVE
      } else {
        RecoveryState.RECOVERING
      }
      logInfo("I have been elected leader! New state: " + state)
      if (state == RecoveryState.RECOVERING) {
        beginRecovery(storedApps, storedDrivers, storedWorkers)
        recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            self.send(CompleteRecovery)
          }
        }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
      }

2.3.1 获取具体的信息

首先从持久化层获取Application, Drivers, Worker 的信息,也就是通过Curator 的框架获取原Master保存的信息

Zookeeper默认的master状态 /spark/master_status

 ls /spark/master_status
[worker_worker-20170329184022-192.168.121.101-55109, worker_worker-20170330035822-192.168.121.102-41307]

Apps, Driver, Workers 的信息都会被保存在master_status节点里,而每个子节点的内容就是序列化过的ApplicationInfo, DriverInfo, WorkInfo

获选的主Master通过获取Zookeeper上的节点的值,反序列化重构Application, Driver, Worker的信息

2.3.2 恢复Application,Driver,Workers

private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
      storedWorkers: Seq[WorkerInfo]) {
    for (app <- storedApps) {
      logInfo("Trying to recover app: " + app.id)
      try {
        registerApplication(app)
        app.state = ApplicationState.UNKNOWN
        app.driver.send(MasterChanged(self, masterWebUiUrl))
      } catch {
        case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
      }
    }

    for (driver <- storedDrivers) {
      // Here we just read in the list of drivers. Any drivers associated with now-lost workers
      // will be re-launched when we detect that the worker is missing.
      drivers += driver
    }

    for (worker <- storedWorkers) {
      logInfo("Trying to recover worker: " + worker.id)
      try {
        registerWorker(worker)
        worker.state = WorkerState.UNKNOWN
        worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
      } catch {
        case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
      }
    }
  }

2.3.2.1 恢复Application

在前面一章说过如何分配Executor,里面也说到了ApplicationInfo 结构。在Master做Recovery的时候,把保存在Zookeeper里的Applications 重新在Master里重新注册

这里有几个注意点:

1. Zookeeper 里保存的是正在运行的Applications,但在Spark里并没有进行细粒度的控制,只要是在运行的Applications,都会重新进行被分配executor,哪怕正在分配或者已经在计算了。

2. 注册完Application 并没有马上进行Application的调度,直到恢复完Worker,Driver才开始进行进行

  private def completeRecovery() {
    // Ensure "only-once" recovery semantics using a short synchronization period.
    if (state != RecoveryState.RECOVERING) { return }
    state = RecoveryState.COMPLETING_RECOVERY

    // Kill off any workers and apps that didn‘t respond to us.
    workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
    apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)

    // Reschedule drivers which were not claimed by any workers
    drivers.filter(_.worker.isEmpty).foreach { d =>
      logWarning(s"Driver ${d.id} was not found after master recovery")
      if (d.desc.supervise) {
        logWarning(s"Re-launching ${d.id}")
        relaunchDriver(d)
      } else {
        removeDriver(d.id, DriverState.ERROR, None)
        logWarning(s"Did not re-launch ${d.id} because it was not supervised")
      }
    }

    state = RecoveryState.ALIVE
    schedule()
    logInfo("Recovery complete - resuming operations!")
  }

在completeRecovery 的函数里,只有当Master的节点的状态RecoveryState.ALIVE的时候,Master才调用了Schedule重新调度已经注册的Application

2.3.2.2 恢复Worker

第一步:注册Worker,这里只是简单的确认了Worker地址(ip+port)的格式是否正确和是否已经注册过了。

第二步:设置Worker的状态为UNKNOW

第三步:发送MasterChange消息给Worker

第四步:Worker将自己的Application ID, Executor的信息WorkerSchedulerStateResponse发回给Master

第五步:Master的跟新信息中ApplicationInfo中的Executor信息

第三步到第五步都是通过消息来异步消息恢复的,在没有收到WorkerScheduleStateResponse,是无法开始准确进行Application调度的,此时的Worker的里的信息不准确,例如用了多少核

2.3.3 完成Recovery

因为要完成worker信息的传递,而Recovery的过程是在Netty的接收ElectedLeader消息的线程中进行的,而比如象Worker汇报消息的不能被堵塞,在这里Master起了一个定时任务去触发任务的完成,定时任务的时间就是worker连接的超时时间,在这里的前提是所有的workers会在超时间段内回复消息。

recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            self.send(CompleteRecovery)
          }
        }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)

既然还是发消息,对Master发送了CompleteRecovery的消息,在CompleteRecovery的消息中重新调度了Application。

2.3.4 Recovery中的几个状态

Master节点在recovery中有四个状态:

RecoveryState.ALIVE
RecoveryState.RECOVERING
RecoveryState.COMPLETING_RECOVERY
RecoveryState.STANDBY

ALIVE ----- 活着

RECOVERING ------ 恢复的过程中

COMPLETING_RECOVERY ----------运行在completeRecovery函数中

STANDBY  ------------ 备份节点

在整个Recovery的恢复过程中状态是 从 STANDBY   => RECOVERING => COMPLETING_RECOVERY => ALIVE

1.  在整个Recovery的恢复过程中可以Register Application,当然必须Elected_Leader的消息函数块运行完,因为在同一个线程中

2.  在整个Recovery的恢复过程中只有在进入ALIVE的状态,才能进行Application的调度

3 Submit 如何设置多个Master

Spark的master是主和Standby的状态,对submit任务来说,并不能知道目前集群上主Master是那个,提交到Standby的master并不接受任务,容易导致提交任务失败。

Spark 提供了一种简单的方式:在submit 或者在sparkconf里可以设置多个Master, 格式注意是用逗号分隔

--master spark://raintungmaster:7077,raintungslave1:7077

当然你可以也可以设置一个局域网的DNS,通过 healthcheck来判断哪个Master挂了,DNS就指向谁

时间: 2024-10-21 14:25:47

大数据:Spark Standalone 集群调度(三)多Master节点的可用性的相关文章

大数据:Spark Standalone 集群调度(一)从远程调试开始说application创建

远程debug,特别是在集群方式时候,会很方便了解代码的运行方式,这也是码农比较喜欢的方式 虽然scala的语法和java不一样,但是scala是运行在JVM虚拟机上的,也就是scala最后编译成字节码运行在JVM上,那么远程调试方式就是JVM调试方式 在服务器端: -Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=7001,suspend=y 客户端通过socket就能远程调试代码 1. 调试submit, master, worke

大数据-spark HA集群搭建

一.安装scala 我们安装的是scala-2.11.8 5台机器全部安装 下载需要的安装包并进行解压 配置环境变量 [root@master1 ~]# vi /etc/profile export SCALA_HOME=/opt/software/scala-2.11.8 export PATH=$SCALA_HOME/bin:$PATH [root@master1 ~]# source /etc/profile 启动scala [root@master1 workspace]# vim /e

(二)win7下用Intelij IDEA 远程调试spark standalone 集群

关于这个spark的环境搭建了好久,踩了一堆坑,今天 环境: WIN7笔记本  spark 集群(4个虚拟机搭建的) Intelij IDEA15 scala-2.10.4 java-1.7.0 版本问题: 个人选择的是hadoop2.6.0 spark1.5.0 scala2.10.4  jdk1.7.0 关于搭建集群环境,见个人的上一篇博客:(一) Spark Standalone集群环境搭建,接下来就是用Intelij IDEA来远程连接spark集群,这样就可以方便的在本机上进行调试.

大数据高可用集群环境安装与配置(09)——安装Spark高可用集群

1. 获取spark下载链接 登录官网:http://spark.apache.org/downloads.html 选择要下载的版本 2. 执行命令下载并安装 cd /usr/local/src/ wget http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz tar -zxvf spark-2.4.4-bin-hadoop2.7.tgz mv spark-2.4.4

Laxcus大数据管理系统单机集群版

Laxcus大数据管理系统是我们Laxcus大数据实验室历时5年,全体系全功能设计研发的大数据产品,目前的最新版本是2.1版本.从三年前的1.0版本开始,Laxcus大数据系统投入到多个大数据和云计算项目中使用.2.0版本的Laxcus大数据管理系统,已经从紧耦合架构转为松耦合架构,整合了最新的大数据和关系数据库的技术,实现了一站式数据处理,大幅度提高了并行处理能力,同时兼具易操作.易维护.运行稳定的特点,节点数和数据存储计算规模已经达到百万台级和EB量级.目前已经覆盖的技术包括:行列混合存储.

大数据(hdfs集群及其集群的高级管理)

#### 大数据课程第二天 伪分布式hadoop的启动停止脚本[使用] sbin/hadoop-daemon.sh start namenode sbin/hadoop-daemon.sh start datanode sbin/yarn-daemon.sh start resourcemanager sbin/yarn-daemon.sh start nodemanager ? shell脚本 xxx.sh ls mkdir hadoop-start.sh sbin/hadoop-daemon

大数据高可用集群环境安装与配置(07)——安装HBase高可用集群

1. 下载安装包 登录官网获取HBase安装包下载地址 https://hbase.apache.org/downloads.html 2. 执行命令下载并安装 cd /usr/local/src/ wget http://mirrors.tuna.tsinghua.edu.cn/apache/hbase/2.1.8/hbase-2.1.8-bin.tar.gz tar -zxvf hbase-2.1.8-bin.tar.gz mv hbase-2.1.8 /usr/local/hbase/ 3

大数据高可用集群环境安装与配置(06)——安装Hadoop高可用集群

下载Hadoop安装包 登录 https://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/common/ 镜像站,找到我们要安装的版本,点击进去复制下载链接 安装Hadoop时要注意版本与后续安装的HBase.Spark等相关组件的兼容,不要安装了不匹配的版本,而导致某些组件需要重装 输入命令进行安装操作 cd /usr/local/src/ wget https://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/

Spark standalone集群安装

本文不会搞什么Yarn混搭Spark,只想建立一个纯粹的Spark环境,太多层东西搅和在一起,不靠谱. 创建spark服务运行帐号 # useradd smile smile帐号就是spark服务的运行帐号. 下载安装包并测试 在root帐号下,下载最新安装包,注意不是source,而是bin安装包,支持hadoop2.6以后的 wget http://mirrors.cnnic.cn/apache/spark/spark-1.5.1/spark-1.5.1-bin-hadoop2.6.tgz