spark 源码理解2 进一步窥探Master、Worker通信机制

上一篇文章 spark 源码理解1 从spark启动脚本开始 是分析执行start_all.sh时,集群中启动了哪些进程,下面我们再深入一点看看这些进程都是做什么用的,它们之间又是如何通信的?

一、Master进程的启动

Master进程,它主要负责对Worker、Driver、App等资源的管理并与它们进行通信,这篇文章中我打算着重讲一下它与Worker的通信,其它的部分放在以后的章节再加以描述。

spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT

前面说过,我们最终是通过上面的命令来启动Master进程的,其中org.apache.spark.deploy.master.Master是类名,其余为参数:1 表示实例个数,--ip 主机地址,--port 主机端口,默认7077,--webui-port web界面端口,默认8080,它们的解析是由org.apache.spark.deploy.master.MasterArguments 完成的。

在接下来深入之前,简单介绍一下Spark重要的通信协议Akka,它是基于消息传递的机制,scala原生支持的,具有一下特点:

Actors
Actors为你提供:

对并发/并行程序的简单的、高级别的抽象。
异步、非阻塞、高性能的事件驱动编程模型。
非常轻量的事件驱动处理(1G内存可容纳约270万个actors)。

容错性
使用“let-it-crash”语义和监管者树形结构来实现容错。非常适合编写永不停机、自愈合的高容错系统。监管者树形结构可以跨多个JVM来提供真正的高容错系统。

位置透明性
Akka的所有元素都为分布式环境而设计:所有actor都仅通过发送消息进行互操作,所有操作都是异步的。

事务性actors
事务性Actor是actor与STM(Software Transactional Memory)的组合。它使你能够使用自动重试和回滚来组合出原子消息流。

有一个很好的入门级教程可以参考 过忘记忆 的 Akka学习笔记 。

继续回到Master.scala这个文件,它由Master类和Master对象组成。在scala中,这个对象被叫做伴生对象(object Companion),是一个Singleton(注:类和单例对象存在于同一个文件中,才能被称之为Singleton,否则为孤立对象),对应的类叫做伴生类(class Companion),它们之间可以彼此访问对方的私有成员。

程序的入口是Master对象的main函数,它主要调用startSystemAndActor方法,启动Master并返回一个Tuple4的对象。

val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
/**
   * Start the Master and return a four tuple of:
   *   (1) The Master actor system
   *   (2) The bound port
   *   (3) The web UI bound port
   *   (4) The REST server bound port, if any
   */
  def startSystemAndActor(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {
    val securityMgr = new SecurityManager(conf)
    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
      securityManager = securityMgr)
    val actor = actorSystem.actorOf(
      Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
    val timeout = AkkaUtils.askTimeout(conf)
    val portsRequest = actor.ask(BoundPortsRequest)(timeout)
    val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
    (actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
  }

生成Master实例时,会创建 Master Metrics System, Application Metrics System,并启动它们,然后根据RECOVERY_MODE来指定Master的容错恢复模式,这里可供选择的有ZOOKEEPER,FILESYSTEM,CUSTOM和其它BlackHole。

val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
      case "ZOOKEEPER" =>
        logInfo("Persisting recovery state to ZooKeeper")
        val zkFactory =
          new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system))
        (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
      case "FILESYSTEM" =>
        val fsFactory =
          new FileSystemRecoveryModeFactory(conf, SerializationExtension(context.system))
        (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
      case "CUSTOM" =>
        val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
        val factory = clazz.getConstructor(conf.getClass, Serialization.getClass)
          .newInstance(conf, SerializationExtension(context.system))
          .asInstanceOf[StandaloneRecoveryModeFactory]
        (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
      case _ =>
        (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
    }

 Master进程启来后,开始监听其它进程发来的消息,并在receiveWithLogging中对它们进行相应的处理。

二、Worker进程的启动及与Master的通信

同样地,当启动脚本执行到

spark-daemon.sh start org.apache.spark.deploy.worker.Worker "[email protected]"

会启动Worker进程,启动过程跟Master的启动很相似,先是通过org.apache.spark.deploy.worker.WorkerArguments 进行参数解析,得到端口、内核数、内存、web ui 端口、工作目录等信息,然后生成Worker Actor并启动它。

Worker会初始化如下信息:

1. 发送心跳时间 默认 60*1000/4 毫秒

2. 重连次数为 6 + 10

3. 前6次 间隔5-15秒

4. 后10次 间隔30-90秒

5. 清理应用程序文件夹时间间隔为 60*30/1000 毫秒

启动时,向Master发送RegisterWorker消息:

      val actor = context.actorSelection(masterAkkaUrl)
      actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)

Master通过以下代码来响应此消息:

    case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
    {
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
      if (state == RecoveryState.STANDBY) {
        // ignore, don‘t send response
      } else if (idToWorker.contains(id)) {
        sender ! RegisterWorkerFailed("Duplicate worker ID")
      } else {
        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
          sender, workerUiPort, publicAddress)
        if (registerWorker(worker)) {
          persistenceEngine.addWorker(worker)
          sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
          schedule()
        } else {
          val workerAddress = worker.actor.path.address
          logWarning("Worker registration failed. Attempted to re-register worker at same " +
            "address: " + workerAddress)
          sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "
            + workerAddress)
        }
      }
    }

上面代码大致理解为:如果当前Master为Active状态(HA模式下),并且目标work没有成功注册过,此时将work信息添加到workers, idToWorker, addressToWorker, 并向Worker Actor 发送RegisteredWorker消息,否则发送 RegisterWorkerFailed 消息。

当Worker 收到 RegisteredWorker 消息后,更新Master信息,并向Master按心跳时间间隔不停的发送 Heartbeat 消息,以告诉当前Worker 是“活”的。

另外,Worker 还会创建 Worker Metrics System。

至此,Master、Worker(s)都已在各自主机启动,并连接成功。

  

时间: 2024-11-10 16:24:38

spark 源码理解2 进一步窥探Master、Worker通信机制的相关文章

Spark源码定制第一课:通过案例对SparkStreaming透彻理解三板斧之一

第一课:通过案例对SparkStreaming透彻理解三板斧之一:解密SparkStreaming另类实验及SparkStreaming本质解析 本期导读: 1 Spark源码定制选择从SparkStreaming入手: 2 Spark Streaming另类在线实验: 3 瞬间理解SparkStreaming本质. 1.    从Spark Streaming入手开始Spark源码版本定制之路 1.1           从Spark Streaming入手Spark源码版本定制之路的理由 从

Apache Spark源码走读之21 -- 浅谈mllib中线性回归的算法实现

欢迎转载,转载请注明出处,徽沪一郎. 概要 本文简要描述线性回归算法在Spark MLLib中的具体实现,涉及线性回归算法本身及线性回归并行处理的理论基础,然后对代码实现部分进行走读. 线性回归模型 机器学习算法是的主要目的是找到最能够对数据做出合理解释的模型,这个模型是假设函数,一步步的推导基本遵循这样的思路 假设函数 为了找到最好的假设函数,需要找到合理的评估标准,一般来说使用损失函数来做为评估标准 根据损失函数推出目标函数 现在问题转换成为如何找到目标函数的最优解,也就是目标函数的最优化

Apache Spark源码走读之14 -- Graphx实现剖析

欢迎转载,转载请注明出处,徽沪一郎. 概要 图的并行化处理一直是一个非常热门的话题,这里头的重点有两个,一是如何将图的算法并行化,二是找到一个合适的并行化处理框架.Spark作为一个非常优秀的并行处理框架,将一些并行化的算法移到其上面就成了一个很自然的事情. Graphx是一些图的常用算法在Spark上的并行化实现,同时提供了丰富的API接口.本文就Graphx的代码架构及pagerank在graphx中的具体实现做一个初步的学习. Google为什么赢得了搜索引擎大战 当Google还在起步的

Apache Spark源码走读之5 -- DStream处理的容错性分析

欢迎转载,转载请注明出处,徽沪一郎,谢谢. 在流数据的处理过程中,为了保证处理结果的可信度(不能多算,也不能漏算),需要做到对所有的输入数据有且仅有一次处理.在Spark Streaming的处理机制中,不能多算,比较容易理解.那么它又是如何作到即使数据处理结点被重启,在重启之后这些数据也会被再次处理呢? 环境搭建 为了有一个感性的认识,先运行一下简单的Spark Streaming示例.首先确认已经安装了openbsd-netcat. 运行netcatnc -lk 9999 运行spark-s

Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析

欢迎转载,转载请注明出处,徽沪一郎. 概要 本文就standalone部署方式下的容错性问题做比较细致的分析,主要回答standalone部署方式下的包含哪些主要节点,当某一类节点出现问题时,系统是如何处理的. Standalone部署的节点组成 介绍Spark的资料中对于RDD这个概念涉及的比较多,但对于RDD如何运行起来,如何对应到进程和线程的,着墨的不是很多. 在实际的生产环境中,Spark总是会以集群的方式进行运行的,其中standalone的部署方式是所有集群方式中最为精简的一种,另外

Spark源码分析之五:Task调度(一)

在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段: 1.Job的调度模型与运行反馈: 2.Stage划分: 3.Stage提交:对应TaskSet的生成. Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分为一个个stage,并将每个stage转化为tasks的集合--TaskSet.

《Apache Spark源码剖析》

Spark Contributor,Databricks工程师连城,华为大数据平台开发部部长陈亮,网易杭州研究院副院长汪源,TalkingData首席数据科学家张夏天联袂力荐1.本书全面.系统地介绍了Spark源码,深入浅出,细致入微2.提供给读者一系列分析源码的实用技巧,并给出一个合理的阅读顺序3.始终抓住资源分配.消息传递.容错处理等基本问题,抽丝拨茧4.一步步寻找答案,所有问题迎刃而解,使读者知其然更知其所以然 内容简介 书籍计算机书籍 <Apache Spark源码剖析>以Spark

Apache Spark源码走读之16 -- spark repl实现详解

欢迎转载,转载请注明出处,徽沪一郎. 概要 之所以对spark shell的内部实现产生兴趣全部缘于好奇代码的编译加载过程,scala是需要编译才能执行的语言,但提供的scala repl可以实现代码的实时交互式执行,这是为什么呢? 既然scala已经提供了repl,为什么spark还要自己单独搞一套spark repl,这其中的缘由到底何在? 显然,这些都是问题,要解开这些谜团,只有再次开启一段源码分析之旅了. 全局视图 上图显示了java源文件从编译到加载执行的全局视图,整个过程中最主要的步

Apache Spark源码走读之12 -- Hive on Spark运行环境搭建

欢迎转载,转载请注明出处,徽沪一郎. 楔子 Hive是基于Hadoop的开源数据仓库工具,提供了类似于SQL的HiveQL语言,使得上层的数据分析人员不用知道太多MapReduce的知识就能对存储于Hdfs中的海量数据进行分析.由于这一特性而收到广泛的欢迎. Hive的整体框架中有一个重要的模块是执行模块,这一部分是用Hadoop中MapReduce计算框架来实现,因而在处理速度上不是非常令人满意.由于Spark出色的处理速度,有人已经成功将HiveQL的执行利用Spark来运行,这就是已经非常