Master HA彻底解密(DT大数据梦工厂)

内容:

1、Master HA解析;

2、Master HA的四种方式;

3、Master HA的内部工作机制;

4、Master HA的源码解密;

本讲主要源码角度分析Master HA,因为在生产环境必然要做的

==========Master HA解析============

Spark是Master-Slave的结构

现在业界是1个Master Active,2个以上standby

如果有HA的话,切换active的时候,会在上次运行的基础上继续运行

Drvier提交程序、申请资源,是跟Master交互

ZOOKEEPER工作采用的是leader的机制,它对外提供服务,其它都是follower

ZOOKEEPER保留了集群的信息:Worker、Driver、Application的信息,会被持久化到Zookeeper,切换的时候只会影响新Job的提交。

因为每个Job运行之前,只要跟集群申请到资源之后,和Master没关系了,之后就是Driver和executor的交互,在27讲说过。所以只会影响新Job提交,而不会影响现有Job的运行。

=>总结:

1、生产环境一般采用zookeeper做HA,且建议为3台Master,Zookeeper会自动化管理Masters的切换;

2、采用Zookeeper做HA的时候,Zookeeper会保存整个Spark集群运行时候的元数据:所有的Workers、Drivers、Applications、Executors等;

3、Zookeeper在遇到当前Active级别的Master出现故障的时候,会从standby Master中选取出一台做为Active Master,但是要注意:被选举后到成为真正的Active Master之间需要从Zookeeper获取集群当前运行状态的元数据信息并进行恢复;

4、在Master切换的过程中,所有的已经在运行的程序皆正常运行!因为Spark Application在运行前就已经通过Cluster Manager获得了计算资源,所以在运行时Job本身的调度和处理和Master是没有任何关系的;

5、在Master切换过程中唯一的影响是,是不能提交新的Job:一方面不能提交新的应用程序给集群,因为只有Active Master才能接收新的程序的提交请求,另外一方面,已经运行的程序中也不能够因为Action操作触发新的Job的提交请求;

经验之谈:yarn的模式比standalone模式性能低30%左右

==========Master HA的四种方式============

1、MasterHA四大方式分别是:ZOOKEEPER、FILESYSTEM、CUSTOM(自定义)、NONE(不做HA,下载Spark直接使用);

2、需要说明的是:

1)ZOOKEEPER是自动管理Master;

2)FILESYSTEM的方式在Master出现故障后需要手动重新启动机器,机器启动后会立即成为Active级别的Master来对外提供服务(接受应用程序提交的请求,接受新的Job运行的请求);

3)CUSTOM的方式允许用户自定义MasterHA的实现,这对于高级用户特别有用;

4)NONE,这是默认情况,当我们下载安装了Spark集群后,就是采用这种方式,该方式不会持久化集群的数据,Master启动后立即管理集群;

Master中onStart

val serializer = new JavaSerializer(conf)
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
  case "ZOOKEEPER" =>
    logInfo("Persisting recovery state to ZooKeeper")
    val zkFactory =
      new ZooKeeperRecoveryModeFactory(conf, serializer)
    (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))//数据持久化引擎和leader选举
  case "FILESYSTEM" =>
    val fsFactory =
      new FileSystemRecoveryModeFactory(conf, serializer)
    (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
  case "CUSTOM" =>
    val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
    val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
      .newInstance(conf, serializer)
      .asInstanceOf[StandaloneRecoveryModeFactory]
    (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
  case _ =>
    (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_

4、persistEngine中有一个至关重要的方法persist来实现数据持久化,readPersistedData来回复集群中的元数据;

/**
 * Returns the persisted data sorted by their respective ids (which implies that they‘re
 * sorted by time of creation).
 */
final def readPersistedData(
    rpcEnv: RpcEnv): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
  rpcEnv.deserialize { () =>
    (read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read[WorkerInfo]("worker_"))
  }
}

5、FILESYSTEM和NONE均是采用MonarchyLeaderAgent的方式来完成leader的选举,其实际实现是直接将传入的Master作为leader

def createLeaderElectionAgent(master: LeaderElectable): LeaderElectionAgent = {
  new MonarchyLeaderAgent(master)
}

/** Single-node implementation of LeaderElectionAgent -- we‘re initially and always the leader. */
private[spark] class MonarchyLeaderAgent(val masterInstance: LeaderElectable)
  extends LeaderElectionAgent {
  masterInstance.electedLeader()
}

6、NONE根本不需要持久化,为什么写了BlockHolePersistenceEngine,里面啥都没实现?代码结构统一,且易扩展;

private[master] class BlackHolePersistenceEngine extends PersistenceEngine {

override def persist(name: String, obj: Object): Unit = {}

override def unpersist(name: String): Unit = {}

override def read[T: ClassTag](name: String): Seq[T] = Nil

}

==========Master HA的内部工作机制(主要Zookeeper)============

1、Zookeeper自动从Standby Master里面选取出作为Leader的Master;

2、使用ZookeeprPersistEngine去读取集群的状态数据Workers、Drivers、Applications、Executors 等信息;

3、判断元数据信息是否有空的内容;

4、把通过Zookeeper持久化引擎获得的Workers、Drivers、Applications、Executors 等信息重新注册到Master的内存中缓存起来;

5、验证获得的信息和当前正在运行的集群的状态的一致性;

6、将Applications和Workers的状态标识为UNKOWN,然后会向Application中的Driver以及Worker发送现在是Leader的standby模式的Master的地址信息;

7、当Drivers以及Workers收到新的Master地址信息后,会响应改信息;

8、Master接收到来自Drviers和Workers的响应信息后,会使用一个关键的方法completeRecovery,对没有响应的Applications(Drivers)、Workers(Executors)进行处理,Master的state会变成RecoveryState.ALIVE ,从而可以开始对外服务

private def completeRecovery() {
  // Ensure "only-once" recovery semantics using a short synchronization period.
  if (state != RecoveryState.RECOVERING) { return }
  state = RecoveryState.COMPLETING_RECOVERY

// Kill off any workers and apps that didn‘t respond to us.
  workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
  apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)

// Reschedule drivers which were not claimed by any workers
  drivers.filter(_.worker.isEmpty).foreach { d =>
    logWarning(s"Driver ${d.id} was not found after master recovery")
    if (d.desc.supervise) {
      logWarning(s"Re-launching ${d.id}")
      relaunchDriver(d)
    } else {
      removeDriver(d.id, DriverState.ERROR, None)
      logWarning(s"Did not re-launch ${d.id} because it was not supervised")
    }
  }

state = RecoveryState.ALIVE
  schedule()
  logInfo("Recovery complete - resuming operations!")
}

d.desc.supervise此种方式在Drvier失败后重启Drvier

9、(关键一步)此时Master调用自己的scheduler方法对正在等待的Applications和Drviers进行资源调度!!!

时间: 2024-08-08 14:09:32

Master HA彻底解密(DT大数据梦工厂)的相关文章

Spark内核架构解密(DT大数据梦工厂)

只有知道内核架构的基础上,才知道为什么要这样写程序? 手工绘图来解密Spark内核架构 通过案例来验证Spark内核架构 Spark架构思考 ==========Spark Runtime的几个概念============ 下载下来运行,基本都是standalone模式,如果掌握了standalone,则yarn和mesos,以后不做特别说明,一律是standalone模式 application=driver+executor,executor是具体处理数据分片,里面是线程池并发的处理数据分片

Spark on Yarn彻底解密(DT大数据梦工厂)

内容: 1.Hadoop Yarn的工作流程解密: 2.Spark on Yarn两种运行模式实战: 3.Spark on Yarn工作流程解密: 4.Spark on Yarn工作内幕解密: 5.Spark on Yarn最佳实践: 资源管理框架Yarn Mesos是分布式集群的资源管理框架,和大数据没关系,但是可以管理大数据的资源 ==========Hadoop Yarn解析============ 1.Yarn是Hadoop推出的资源管理器,是负责分布式(大数据)集群计算的资源管理的,负

SparkRDD解密(DT大数据梦工厂)

第一阶段,彻底精通Spark 第二阶段,从0起步,操作项目 Hadoop是大数据的基础设施,存储等等 Spark是计算核心所在 1.RDD:基于工作集的应用抽象 2.RDD内幕解密 3.RDD思考 不掌握RDD的人,不可能成为Spark的高手 绝对精通RDD,解决问题的能力大大提高 各种框架底层封装的都是RDD,RDD提供了通用框架 RDD是Spark的通用抽象基石 顶级SPark高手, 1.能解决问题.性能调优: 2.Spark高手拿Spark过来就是修改的 ==========基于工作集的应

Spark Sort-Based Shuffle内幕彻底解密(DT大数据梦工厂)

内容: 1.为什么使用Sorted-Based Shuffle: 2.Sorted-Based Shuffle实战: 3.Sorted-Based Shuffle内幕: 4.Sorted-Based Shuffle的不足: 最常用的Shuffle方式,Sorted-Based Shuffle涉及了大规模Spark开发.运维时核心问题,以及答案的要害所在. 必须掌握这一讲内容. 本课是从Spark初级人才成功升级为Spark中级人才的通道. 稍有水平的大公司,面试内容本讲肯定会涉及. ======

Spark高级排序彻底解密(DT大数据梦工厂)

内容: 1.基础排序算法实战: 2.二次排序算法实战: 3.更高局级别排序算法: 4.排序算法内幕解密: 为啥讲排序?因为在应用的时候都有排序要求. 海量数据经常排序之后要我们想要的内容. ==========基础排序算法============ scala> sc.setLogLevel("WARN") scala> val x = sc.textFile("/historyserverforSpark/README.md", 3).flatMap(_

Spark Runtime(Driver、Masster、Worker、Executor)内幕解密(DT大数据梦工厂)

内容: 1.再论Spark集群部署: 2.Job提交解密: 3.Job的生成和接受: 4.Task的运行: 5.再论Shuffle: 从一个作业视角,透过Master.Drvier.Executor来透视Spark Runtime ==========再论Spark集群部署============ 官网中关于集群的部署: 默认情况下,每个Worker下有一个Executor,会最大化的使用内存和CPU. Master发指令给Worker来分配资源,不关心Worker能不能分配到这个资源,他发给多

Spark天堂之门(SparkContext)解密(DT大数据梦工厂)

内容: 1.Spark天堂之门: 2.SparkContext使用案例鉴赏: 3.SparkContext内幕: 4.SparkContext源码解密: SparkContext是编写任意Spark程序的第一个对象,用SparkConf为传入的参数 ==========Spark天堂之门:SparkContext !!!============ 1.Spark程序在运行的时候分为Driver和Executors: 2.Spark程序编写是基于SparkContext的,具体来说包含两个方面: 1

Spark Executor内幕彻底解密(DT大数据梦工厂)

内容: 1.Spark Executor工作原理图: 2.ExecutorBackend注册源码解密: 3.Executor实例化内幕: 4.Executor具体是如何工作的? 1.Master发指令给Worker启动Executor: 2.Worker接受到Master发送来的指令,通过ExecutorRunner启动另外一个进程来运行Executor: 3.此时会启动粗粒度的ExecutorBackend(CoarseGrainedExecutorBackend): 4.CoarseGrai

底层战详解使用Java开发Spark程序(DT大数据梦工厂)

Scala开发Spark很多,为什么还要用Java开发原因:1.一般Spark作为数据处理引擎,一般会跟IT其它系统配合,现在业界里面处于霸主地位的是Java,有利于团队的组建,易于移交:2.Scala学习角度讲,比Java难.找Scala的高手比Java难,项目的维护和二次开发比较困难:3.很多人员有Java的基础,确保对Scala不是很熟悉的人可以编写课程中的案例预测:2016年Spark取代Map Reduce,拯救HadoopHadoop+Spark = A winning combat