Apache Spark-1.0.0浅析(八):资源调度——Akka通信建立

Spark使用Akka作为各种功能和组件之间的通信工具。同样,在资源调度过程中也使用其作为消息传递系统。之前,在分析了Apache Spark-1.0.0资源调度过程中,明确了主要消息的传递过程和引起的相关动作,本文主要分析Spark资源调度过程中所用到的Akka通信的初始化过程。



@volatile private[spark] var dagScheduler: DAGScheduler = _
  try {
    dagScheduler = new DAGScheduler(this)
  } catch {
    case e: Exception => throw
      new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))


private val dagSchedulerActorSupervisor =
    env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this)))


private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
  extends Actor with Logging {

  override val supervisorStrategy =
    OneForOneStrategy() {
      case x: Exception =>
        logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"

  def receive = {
    case p: Props => sender ! context.actorOf(p)
    case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor")


val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
      securityManager = securityManager)


   * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
   * ActorSystem itself and its port (which is hard to get from Akka).
   * Note: the `name` parameter is important, as even if a client sends a message to right
   * host + port, if the system name is incorrect, Akka will drop the message.
   * If indestructible is set to true, the Actor System will continue running in the event
   * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
  def createActorSystem(name: String, host: String, port: Int,
    conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = {

    val akkaThreads   = conf.getInt("spark.akka.threads", 4)
    val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)

    val akkaTimeout = conf.getInt("spark.akka.timeout", 100)

    val akkaFrameSize = maxFrameSizeBytes(conf)
    val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
    val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
    if (!akkaLogLifecycleEvents) {
      // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent.
      // See: https://www.assembla.com/spaces/akka/tickets/3787#/
      Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))

    val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off"

    val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 600)
    val akkaFailureDetector =
      conf.getDouble("spark.akka.failure-detector.threshold", 300.0)
    val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000)

    val secretKey = securityManager.getSecretKey()
    val isAuthOn = securityManager.isAuthenticationEnabled()
    if (isAuthOn && secretKey == null) {
      throw new Exception("Secret key is null with authentication on")
    val requireCookie = if (isAuthOn) "on" else "off"
    val secureCookie = if (isAuthOn) secretKey else ""
    logDebug("In createActorSystem, requireCookie is: " + requireCookie)

    val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
      |akka.daemonic = on
      |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
      |akka.stdout-loglevel = "ERROR"
      |akka.jvm-exit-on-fatal-error = off
      |akka.remote.require-cookie = "$requireCookie"
      |akka.remote.secure-cookie = "$secureCookie"
      |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
      |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
      |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector
      |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
      |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
      |akka.remote.netty.tcp.hostname = "$host"
      |akka.remote.netty.tcp.port = $port
      |akka.remote.netty.tcp.tcp-nodelay = on
      |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
      |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B
      |akka.remote.netty.tcp.execution-pool-size = $akkaThreads
      |akka.actor.default-dispatcher.throughput = $akkaBatchSize
      |akka.log-config-on-start = $logAkkaConfig
      |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
      |akka.log-dead-letters = $lifecycleEvents
      |akka.log-dead-letters-during-shutdown = $lifecycleEvents

    val actorSystem = ActorSystem(name, akkaConf)
    val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
    val boundPort = provider.getDefaultAddress.port.get
    (actorSystem, boundPort)


private[scheduler] var eventProcessActor: ActorRef = _

  private def initializeEventProcessActor() {
    // blocking the thread until supervisor is started, which ensures eventProcessActor is
    // not null before any job is submitted
    implicit val timeout = Timeout(30 seconds)
    val initEventActorReply =
      dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))
    eventProcessActor = Await.result(initEventActorReply, timeout.duration).



def receive = {
    case p: Props => sender ! context.actorOf(p)
    case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor")




private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)
  extends Actor with Logging {

  override def preStart() {
    // set DAGScheduler for taskScheduler to ensure eventProcessActor is always
    // valid when the messages arrive

   * The main event loop of the DAG scheduler.
  def receive = {
    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
        listener, properties)

    case StageCancelled(stageId) =>

    case JobCancelled(jobId) =>

    case JobGroupCancelled(groupId) =>

    case AllJobsCancelled =>

    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId) =>

    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)

    case GettingResultEvent(taskInfo) =>

    case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>

    case TaskSetFailed(taskSet, reason) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason)

    case ResubmitFailedStages =>

  override def postStop() {
    // Cancel any active jobs in postStop hook

(II)Task相关(CoarseGrainedSchedulerBackend.scala + CoarseGrainedExecutorBackend.scala)

CoarseGrainedSchedulerBackend中,应用actorSystem.actorOfchuagnjian 实例化DriverActor,命名为“CoarseGrainedScheduler”

override def start() {
    val properties = new ArrayBuffer[(String, String)]
    for ((key, value) <- scheduler.sc.conf.getAll) {
      if (key.startsWith("spark.")) {
        properties += ((key, value))
    // TODO (prashant) send conf instead of properties
    driverActor = actorSystem.actorOf(
      Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)


private val executorActor = new HashMap[String, ActorRef]


def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
    workerUrl: Option[String]) {

    SparkHadoopUtil.get.runAsSparkUser { () =>
        // Debug code

        val conf = new SparkConf
        // Create a new ActorSystem to run the backend, because we can‘t create a
        // SparkEnv / Executor before getting started with all our system properties, etc
        val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
          conf, new SecurityManager(conf))
        // set it
        val sparkHostPort = hostname + ":" + boundPort
          Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
            sparkHostPort, cores),
          name = "Executor")
        workerUrl.foreach {
          url =>
            actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")



override def preStart() {
    logInfo("Connecting to driver: " + driverUrl)
    driver = context.actorSelection(driverUrl)
    driver ! RegisterExecutor(executorId, hostPort, cores)
    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])


def receive = {
      case RegisterExecutor(executorId, hostPort, cores) =>
        Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
        if (executorActor.contains(executorId)) {
          sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
        } else {
          logInfo("Registered executor: " + sender + " with ID " + executorId)
          sender ! RegisteredExecutor(sparkProperties)
          executorActor(executorId) = sender
          executorHost(executorId) = Utils.parseHostPort(hostPort)._1
          totalCores(executorId) = cores
          freeCores(executorId) = cores
          executorAddress(executorId) = sender.path.address
          addressToExecutorId(sender.path.address) = executorId

      case StatusUpdate(executorId, taskId, state, data) =>
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {
          if (executorActor.contains(executorId)) {
            freeCores(executorId) += scheduler.CPUS_PER_TASK
          } else {
            // Ignoring the update since we don‘t know about the executor.
            val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"
            logWarning(msg.format(taskId, state, sender, executorId))

      case ReviveOffers =>

      case KillTask(taskId, executorId, interruptThread) =>
        executorActor(executorId) ! KillTask(taskId, executorId, interruptThread)

      case StopDriver =>
        sender ! true

      case StopExecutors =>
        logInfo("Asking each executor to shut down")
        for (executor <- executorActor.values) {
          executor ! StopExecutor
        sender ! true

      case RemoveExecutor(executorId, reason) =>
        removeExecutor(executorId, reason)
        sender ! true

      case DisassociatedEvent(_, address, _) =>
          "remote Akka client disassociated"))



case RegisterExecutor(executorId, hostPort, cores) =>
        Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
        if (executorActor.contains(executorId)) {
          sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
        } else {
          logInfo("Registered executor: " + sender + " with ID " + executorId)
          sender ! RegisteredExecutor(sparkProperties)
          executorActor(executorId) = sender
          executorHost(executorId) = Utils.parseHostPort(hostPort)._1
          totalCores(executorId) = cores
          freeCores(executorId) = cores
          executorAddress(executorId) = sender.path.address
          addressToExecutorId(sender.path.address) = executorId


override def receive = {
    case RegisteredExecutor(sparkProperties) =>
      logInfo("Successfully registered with driver")
      // Make this host instead of hostPort ?
      executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,

    case RegisterExecutorFailed(message) =>
      logError("Slave registration failed: " + message)

    case LaunchTask(taskDesc) =>
      logInfo("Got assigned task " + taskDesc.taskId)
      if (executor == null) {
        logError("Received LaunchTask command but executor was null")
      } else {
        executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)

    case KillTask(taskId, _, interruptThread) =>
      if (executor == null) {
        logError("Received KillTask command but executor was null")
      } else {
        executor.killTask(taskId, interruptThread)

    case x: DisassociatedEvent =>
      logError(s"Driver $x disassociated! Shutting down.")

    case StopExecutor =>
      logInfo("Driver commanded a shutdown")


Messages are sent to an Actor through one of the following methods.

! means “fire-and-forget”, e.g. send a message asynchronously and return immediately. Also known as tell. 

? sends a message asynchronously and returns a Future representing a possible reply. Also known as ask.


时间: 2024-08-30 04:37:34

