Apache Spark-1.0.0浅析(八):资源调度——Akka通信建立

Spark使用Akka作为各种功能和组件之间的通信工具。同样,在资源调度过程中也使用其作为消息传递系统。之前,在分析了Apache Spark-1.0.0资源调度过程中,明确了主要消息的传递过程和引起的相关动作,本文主要分析Spark资源调度过程中所用到的Akka通信的初始化过程。

(I)Job相关(DagScheduler.scala)

SparkContext中实例化DAGScheduler

@volatile private[spark] var dagScheduler: DAGScheduler = _
  try {
    dagScheduler = new DAGScheduler(this)
  } catch {
    case e: Exception => throw
      new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))
  }

DAGScheduler类中定义了dagSchedulerActorSupervisor,使用env调用actorSystem.actorof实例化

private val dagSchedulerActorSupervisor =
    env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this)))

首先看一下DAGSchedulerActorSupervisor类,重新定义了supervisorStrategy

private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
  extends Actor with Logging {

  override val supervisorStrategy =
    OneForOneStrategy() {
      case x: Exception =>
        logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
          .format(x.getMessage))
        dagScheduler.doCancelAllJobs()
        dagScheduler.sc.stop()
        Stop
    }

  def receive = {
    case p: Props => sender ! context.actorOf(p)
    case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor")
  }
}

接着,SparkEnv实例化时,创建了actorSystem

val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
      securityManager = securityManager)

而creatActorSystem的定义如下,最后返回actorSystem

/**
   * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
   * ActorSystem itself and its port (which is hard to get from Akka).
   *
   * Note: the `name` parameter is important, as even if a client sends a message to right
   * host + port, if the system name is incorrect, Akka will drop the message.
   *
   * If indestructible is set to true, the Actor System will continue running in the event
   * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
   */
  def createActorSystem(name: String, host: String, port: Int,
    conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = {

    val akkaThreads   = conf.getInt("spark.akka.threads", 4)
    val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)

    val akkaTimeout = conf.getInt("spark.akka.timeout", 100)

    val akkaFrameSize = maxFrameSizeBytes(conf)
    val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
    val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
    if (!akkaLogLifecycleEvents) {
      // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent.
      // See: https://www.assembla.com/spaces/akka/tickets/3787#/
      Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))
    }

    val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off"

    val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 600)
    val akkaFailureDetector =
      conf.getDouble("spark.akka.failure-detector.threshold", 300.0)
    val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000)

    val secretKey = securityManager.getSecretKey()
    val isAuthOn = securityManager.isAuthenticationEnabled()
    if (isAuthOn && secretKey == null) {
      throw new Exception("Secret key is null with authentication on")
    }
    val requireCookie = if (isAuthOn) "on" else "off"
    val secureCookie = if (isAuthOn) secretKey else ""
    logDebug("In createActorSystem, requireCookie is: " + requireCookie)

    val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
      ConfigFactory.parseString(
      s"""
      |akka.daemonic = on
      |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
      |akka.stdout-loglevel = "ERROR"
      |akka.jvm-exit-on-fatal-error = off
      |akka.remote.require-cookie = "$requireCookie"
      |akka.remote.secure-cookie = "$secureCookie"
      |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
      |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
      |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector
      |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
      |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
      |akka.remote.netty.tcp.hostname = "$host"
      |akka.remote.netty.tcp.port = $port
      |akka.remote.netty.tcp.tcp-nodelay = on
      |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
      |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B
      |akka.remote.netty.tcp.execution-pool-size = $akkaThreads
      |akka.actor.default-dispatcher.throughput = $akkaBatchSize
      |akka.log-config-on-start = $logAkkaConfig
      |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
      |akka.log-dead-letters = $lifecycleEvents
      |akka.log-dead-letters-during-shutdown = $lifecycleEvents
      """.stripMargin))

    val actorSystem = ActorSystem(name, akkaConf)
    val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
    val boundPort = provider.getDefaultAddress.port.get
    (actorSystem, boundPort)
  }

回到DAGScheduler,完成dagSchedulerActorSupervisor定义后,继续定义eventProcessActor,在initializeEventProcessActor中,首先定义了一个timeout,然后向dagSchedulerActorSupervisor发送实例化的DAGSchedulerEventProcessActor对象消息,等待接收返回的结果赋值给eventProcessActor

private[scheduler] var eventProcessActor: ActorRef = _

  private def initializeEventProcessActor() {
    // blocking the thread until supervisor is started, which ensures eventProcessActor is
    // not null before any job is submitted
    implicit val timeout = Timeout(30 seconds)
    val initEventActorReply =
      dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))
    eventProcessActor = Await.result(initEventActorReply, timeout.duration).
      asInstanceOf[ActorRef]
  }

  initializeEventProcessActor()

回到DAGSchedulerActorSupervisor中,定义了receive方法

def receive = {
    case p: Props => sender ! context.actorOf(p)
    case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor")
  }

在接收到Props消息后,向发送者发送通过context创建的p对象。由此,eventProcessActor即是一个DAGSchedulerEventProcessActor实例化Actor对象。

如果向eventProcessActor发送消息,如图等

则要调用DAGSchedulerEventProcessActor中定义的receive方法接收消息

private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)
  extends Actor with Logging {

  override def preStart() {
    // set DAGScheduler for taskScheduler to ensure eventProcessActor is always
    // valid when the messages arrive
    dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)
  }

  /**
   * The main event loop of the DAG scheduler.
   */
  def receive = {
    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
        listener, properties)

    case StageCancelled(stageId) =>
      dagScheduler.handleStageCancellation(stageId)

    case JobCancelled(jobId) =>
      dagScheduler.handleJobCancellation(jobId)

    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)

    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()

    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId) =>
      dagScheduler.handleExecutorLost(execId)

    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)

    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)

    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
      dagScheduler.handleTaskCompletion(completion)

    case TaskSetFailed(taskSet, reason) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason)

    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  }

  override def postStop() {
    // Cancel any active jobs in postStop hook
    dagScheduler.cleanUpAfterSchedulerStop()
  }
}

(II)Task相关(CoarseGrainedSchedulerBackend.scala + CoarseGrainedExecutorBackend.scala)

CoarseGrainedSchedulerBackend中,应用actorSystem.actorOfchuagnjian 实例化DriverActor,命名为“CoarseGrainedScheduler”

override def start() {
    val properties = new ArrayBuffer[(String, String)]
    for ((key, value) <- scheduler.sc.conf.getAll) {
      if (key.startsWith("spark.")) {
        properties += ((key, value))
      }
    }
    // TODO (prashant) send conf instead of properties
    driverActor = actorSystem.actorOf(
      Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
  }

而executorActor定义为HashMap,是executorID到,后面会在RegisterExecutor中填充

private val executorActor = new HashMap[String, ActorRef]

在CoarseGrainedExecutorBackend伴生对象中,重写了run方法,创建了新的actorSystem,并以CoarseGrainedExecutorBackend类创建actor

def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
    workerUrl: Option[String]) {

    SparkHadoopUtil.get.runAsSparkUser { () =>
        // Debug code
        Utils.checkHost(hostname)

        val conf = new SparkConf
        // Create a new ActorSystem to run the backend, because we can‘t create a
        // SparkEnv / Executor before getting started with all our system properties, etc
        val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
          conf, new SecurityManager(conf))
        // set it
        val sparkHostPort = hostname + ":" + boundPort
        actorSystem.actorOf(
          Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
            sparkHostPort, cores),
          name = "Executor")
        workerUrl.foreach {
          url =>
            actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
        }
        actorSystem.awaitTermination()

    }
  }

重新定义prestart,通过driverUrl创建远程Actor,向driver发送RegiExecutor消息

override def preStart() {
    logInfo("Connecting to driver: " + driverUrl)
    driver = context.actorSelection(driverUrl)
    driver ! RegisterExecutor(executorId, hostPort, cores)
    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
  }

CoarseGrainedSchedulerBackend的DriverActor类中定义了receive接受消息

def receive = {
      case RegisterExecutor(executorId, hostPort, cores) =>
        Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
        if (executorActor.contains(executorId)) {
          sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
        } else {
          logInfo("Registered executor: " + sender + " with ID " + executorId)
          sender ! RegisteredExecutor(sparkProperties)
          executorActor(executorId) = sender
          executorHost(executorId) = Utils.parseHostPort(hostPort)._1
          totalCores(executorId) = cores
          freeCores(executorId) = cores
          executorAddress(executorId) = sender.path.address
          addressToExecutorId(sender.path.address) = executorId
          totalCoreCount.addAndGet(cores)
          makeOffers()
        }

      case StatusUpdate(executorId, taskId, state, data) =>
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {
          if (executorActor.contains(executorId)) {
            freeCores(executorId) += scheduler.CPUS_PER_TASK
            makeOffers(executorId)
          } else {
            // Ignoring the update since we don‘t know about the executor.
            val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"
            logWarning(msg.format(taskId, state, sender, executorId))
          }
        }

      case ReviveOffers =>
        makeOffers()

      case KillTask(taskId, executorId, interruptThread) =>
        executorActor(executorId) ! KillTask(taskId, executorId, interruptThread)

      case StopDriver =>
        sender ! true
        context.stop(self)

      case StopExecutors =>
        logInfo("Asking each executor to shut down")
        for (executor <- executorActor.values) {
          executor ! StopExecutor
        }
        sender ! true

      case RemoveExecutor(executorId, reason) =>
        removeExecutor(executorId, reason)
        sender ! true

      case DisassociatedEvent(_, address, _) =>
        addressToExecutorId.get(address).foreach(removeExecutor(_,
          "remote Akka client disassociated"))

    }

RegisterExecutor填充executorActor,将executorId与sender对应起来

case RegisterExecutor(executorId, hostPort, cores) =>
        Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
        if (executorActor.contains(executorId)) {
          sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
        } else {
          logInfo("Registered executor: " + sender + " with ID " + executorId)
          sender ! RegisteredExecutor(sparkProperties)
          executorActor(executorId) = sender
          executorHost(executorId) = Utils.parseHostPort(hostPort)._1
          totalCores(executorId) = cores
          freeCores(executorId) = cores
          executorAddress(executorId) = sender.path.address
          addressToExecutorId(sender.path.address) = executorId
          totalCoreCount.addAndGet(cores)
          makeOffers()
        }

receive中向sender发送各种消息,在CoarseGrainedExecutorBackend中也定义了receive接受处理消息,如此driver和executor可以通过Akka相互通信

override def receive = {
    case RegisteredExecutor(sparkProperties) =>
      logInfo("Successfully registered with driver")
      // Make this host instead of hostPort ?
      executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,
        false)

    case RegisterExecutorFailed(message) =>
      logError("Slave registration failed: " + message)
      System.exit(1)

    case LaunchTask(taskDesc) =>
      logInfo("Got assigned task " + taskDesc.taskId)
      if (executor == null) {
        logError("Received LaunchTask command but executor was null")
        System.exit(1)
      } else {
        executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
      }

    case KillTask(taskId, _, interruptThread) =>
      if (executor == null) {
        logError("Received KillTask command but executor was null")
        System.exit(1)
      } else {
        executor.killTask(taskId, interruptThread)
      }

    case x: DisassociatedEvent =>
      logError(s"Driver $x disassociated! Shutting down.")
      System.exit(1)

    case StopExecutor =>
      logInfo("Driver commanded a shutdown")
      context.stop(self)
      context.system.shutdown()
  }

最后说明一点

Messages are sent to an Actor through one of the following methods.

! means “fire-and-forget”, e.g. send a message asynchronously and return immediately. Also known as tell. 

? sends a message asynchronously and returns a Future representing a possible reply. Also known as ask.

END

时间: 2024-08-30 04:37:34

Apache Spark-1.0.0浅析(八):资源调度——Akka通信建立的相关文章

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas

Apache Spark 2.2.0 中文文档 - SparkR (R on Spark) | ApacheCN

SparkR (R on Spark) 概述 SparkDataFrame 启动: SparkSession 从 RStudio 来启动 创建 SparkDataFrames 从本地的 data frames 来创建 SparkDataFrames 从 Data Sources(数据源)创建 SparkDataFrame 从 Hive tables 来创建 SparkDataFrame SparkDataFrame 操作 Selecting rows(行), columns(列) Groupin

Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

Spark SQL, DataFrames and Datasets Guide Overview SQL Datasets and DataFrames 开始入门 起始点: SparkSession 创建 DataFrames 无类型的Dataset操作 (aka DataFrame 操作) Running SQL Queries Programmatically 全局临时视图 创建Datasets RDD的互操作性 使用反射推断Schema 以编程的方式指定Schema Aggregatio

Apache Spark 2.2.0 中文文档 - Structured Streaming 编程指南 | ApacheCN

Structured Streaming 编程指南 概述 快速示例 Programming Model (编程模型) 基本概念 处理 Event-time 和延迟数据 容错语义 API 使用 Datasets 和 DataFrames 创建 streaming DataFrames 和 streaming Datasets Input Sources (输入源) streaming DataFrames/Datasets 的模式接口和分区 streaming DataFrames/Dataset

Apache Spark 2.2.0 中文文档 - GraphX Programming Guide | ApacheCN

GraphX Programming Guide 概述 入门 属性 Graph 示例属性 Graph Graph 运算符 运算符的汇总表 Property 运算符 Structural 运算符 Join 运算符 邻域聚合 聚合消息 (aggregateMessages) Map Reduce Triplets Transition Guide (Legacy) 计算级别信息 收集相邻点 Caching and Uncaching Pregel API Graph 建造者 Vertex and E

Apache Spark 2.2.0 中文文档 - Submitting Applications | ApacheCN

Submitting Applications 在 script in Spark的 bin 目录中的spark-submit 脚本用与在集群上启动应用程序.它可以通过一个统一的接口使用所有 Spark 支持的 cluster managers,所以您不需要专门的为每个cluster managers配置您的应用程序. 打包应用依赖 如果您的代码依赖了其它的项目,为了分发代码到 Spark 集群中您将需要将它们和您的应用程序一起打包.为此,创建一个包含您的代码以及依赖的 assembly jar

Apache Spark 2.2.0新特性介绍(转载)

这个版本是 Structured Streaming 的一个重要里程碑,因为其终于可以正式在生产环境中使用,实验标签(experimental tag)已经被移除.在流系统中支持对任意状态进行操作:Apache Kafka 0.10 的 streaming 和 batch API支持读和写操作.除了在 SparkR, MLlib 和 GraphX 里面添加新功能外,该版本更多的工作在系统的可用性(usability).稳定性(stability)以及代码的润色(polish)并解决了超过 110

Apache Spark 1.5.0正式发布

Spark 1.5.0是1.x线上的第6个发行版.这个版本共处理了来自230+contributors和80+机构的1400+个patches.Spark 1.5的许多改变都是围绕在提升Spark的性能.可用性以及操作稳定性.Spark 1.5.0焦点在Tungsten项目,它主要是通过对低层次的组建进行优化从而提升Spark的性能.Spark 1.5版本为Streaming增加了operational特性,比如支持backpressure.另外比较重要的更新就是新增加了一些机器学习算法和工具,

Apache Spark 2.2.0 中文文档 - 集群模式概述 | ApacheCN

集群模式概述 该文档给出了 Spark 如何在集群上运行.使之更容易来理解所涉及到的组件的简短概述.通过阅读 应用提交指南 来学习关于在集群上启动应用. 组件 Spark 应用在集群上作为独立的进程组来运行,在您的 main 程序中通过 SparkContext 来协调(称之为 driver 程序). 具体的说,为了运行在集群上,SparkContext 可以连接至几种类型的 Cluster Manager(既可以用 Spark 自己的 Standlone Cluster Manager,或者