Spark分析之Standalone运行过程分析

一、集群启动过程--启动Master

$SPARK_HOME/sbin/start-master.sh

start-master.sh脚本关键内容:

spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT

日志信息:$SPARK_HOME/logs/

14/07/22 13:41:33 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:7077]
14/07/22 13:41:33 INFO master.Master: Starting Spark master at spark://hadoop000:7077
14/07/22 13:41:33 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/07/22 13:41:33 INFO server.AbstractConnector: Started [email protected]0.0.0.0:8080
14/07/22 13:41:33 INFO ui.MasterWebUI: Started MasterWebUI at http://hadoop000:8080
14/07/22 13:41:33 INFO master.Master: I have been elected leader! New state: ALIVE

二、集群启动过程--启动Worker

$SPARK_HOME/sbin/start-slaves.sh

start-slaves.sh脚本关键内容:

spark-daemon.sh start org.apache.spark.deploy.worker.Worker master-spark-URL

Worker运行时,需要注册到指定的master url,这里就是spark://hadoop000:7077

Worker启动之后主要做了两件事情:
  1)将自己注册到Master(RegisterWorker);
  2)定期发送心跳信息给Master;

Worker向Master发送注册信息:

Worker.scala
    ==>preStart
      ==>registerWithMaster
        ==>tryRegisterAllMasters
          ==> actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)

Master侧收到RegisterWorker通知:

Master.scala
  ==>case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) => {
      val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
       sender, workerUiPort, publicAddress)
       if (registerWorker(worker)) {
         persistenceEngine.addWorker(worker)
        sender ! RegisteredWorker(masterUrl, masterWebUiUrl)   //注册成功后向Worker发送注册成功信息
        schedule()
      }
    }

Worker在收到Master发来的注册成功信息后,定期向Master发送心跳信息

Worker.scala
  ==>case SendHeartbeat =>
    masterLock.synchronized {if (connected) { master ! Heartbeat(workerId) }
  }

Master在接收到Worker发送来的心跳信息后更新最后一次心跳时间

Master.scala
  ==>case Heartbeat(workerId) => {
      idToWorker.get(workerId) match {
            case Some(workerInfo) =>
          workerInfo.lastHeartbeat = System.currentTimeMillis()
      }
  }

Master定期移除超时未发送心跳信息给Master的Worker节点

Master.scala
  ==>preStart
    ==>CheckForWorkerTimeOut
      ==>case CheckForWorkerTimeOut => {timeOutDeadWorkers()} //Check for, and remove, any timed-out workers

日志信息:$SPARK_HOME/logs/

Master部分日志信息:

14/07/22 13:41:36 INFO master.Master: Registering worker hadoop000:48343 with 1 cores, 2.0 GB RAM

Worker部分日志信息:

14/07/22 13:41:35 INFO Worker: Starting Spark worker hadoop000:48343 with 1 cores, 2.0 GB RAM
14/07/22 13:41:35 INFO Worker: Spark home: /home/spark/app/spark-1.0.1-bin-2.3.0-cdh5.0.0
14/07/22 13:41:35 INFO WorkerWebUI: Started WorkerWebUI at http://hadoop000:8081
14/07/22 13:41:35 INFO Worker: Connecting to master spark://hadoop000:7077...
14/07/22 13:41:36 INFO Worker: Successfully registered with master spark://hadoop000:7077

三、Application提交过程

A、提交Application

运行spark-shell: $SPARK_HOME/bin/spark-shell --master spark://hadoop000:7077

日志信息:$SPARK_HOME/work

spark-shell属于application,在启动SparkContext的createTaskScheduler创建SparkDeploySchedulerBackend的过程中创建

client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()

会向Master发送RegisterApplication请求

AppClient.scala
  ==>preStart
    ==>registerWithMaster
      ==>tryRegisterAllMasters
        ==>actor ! RegisterApplication(appDescription)

B、 Master处理RegisterApplication的请求

在Master侧其处理的分支是RegisterApplication;Master在收到RegisterApplication请求之后,Master进行调度:如果有worker已经注册上来,发送LaunchExecutor指令给相应worker

Master.scala
        ==>case RegisterApplication(description) => {
            logInfo("Registering app " + description.name)
            val app = createApplication(description, sender)
            registerApplication(app)
            logInfo("Registered app " + description.name + " with ID " + app.id)
            persistenceEngine.addApplication(app)
            sender ! RegisteredApplication(app.id, masterUrl)
            schedule()
        }
        ==>schedule
            ==>launchExecutor(worker, exec)
                ==> worker.addExecutor(exec)
                    worker.actor ! LaunchExecutor(masterUrl,exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
                    exec.application.driver ! ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)

C、启动Executor

Worker在收到LaunchExecutor指令之后,会启动Executor进程

Worker.scala
    ==>case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
        logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
        val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
        self, workerId, host,
        appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
        workDir, akkaUrl, ExecutorState.RUNNING)
        executors(appId + "/" + execId) = manager
        manager.start()
        coresUsed += cores_
        memoryUsed += memory_
        masterLock.synchronized {master ! ExecutorStateChanged(appId, execId, manager.state, None, None)}
    }

D、注册Executor

启动的Executor进程会根据启动时的入参,将自己注册到Driver中的SchedulerBackend

SparkDeploySchedulerBackend.scala
    ==>preStart   (CoarseGrainedSchedulerBackend)
        ==> case RegisterExecutor(executorId, hostPort, cores) =>
            logInfo("Registered executor: " + sender + " with ID " + executorId)
            sender ! RegisteredExecutor(sparkProperties)
            executorActor(executorId) = sender
            executorHost(executorId) = Utils.parseHostPort(hostPort)._1
            totalCores(executorId) = cores
            freeCores(executorId) = cores
            executorAddress(executorId) = sender.path.address
            addressToExecutorId(sender.path.address) = executorId
            totalCoreCount.addAndGet(cores)
            makeOffers()

CoarseGrainedExecutorBackend.scala
    case RegisteredExecutor(sparkProperties) =>
        ogInfo("Successfully registered with driver")
        executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,false)

executor日志信息位置:控制台/$SPARK_HOME/logs

E、运行Task

示例代码:

sc.textFile("hdfs://hadoop000:8020/hello.txt").flatMap(_.split(‘\t‘)).map((_,1)).reduceByKey(_+_).collect

SchedulerBackend收到Executor的注册消息之后,会将提交到的Spark Job分解为多个具体的Task,然后通过LaunchTask指令将这些Task分散到各个Executor上真正的运行

CoarseGrainedSchedulerBackend.scala
    def makeOffers() {
        launchTasks(scheduler.resourceOffers(
            executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
        }   ==>executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
            ==>CoarseGrainedSchedulerBackend  case LaunchTask(data) =>
                  if (executor == null) {
                    logError("Received LaunchTask command but executor was null")
                    System.exit(1)
                  } else {
                    val ser = SparkEnv.get.closureSerializer.newInstance()
                    val taskDesc = ser.deserialize[TaskDescription](data.value)
                    logInfo("Got assigned task " + taskDesc.taskId)
                    executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
                  }    

Master部分日志信息:

14/07/22 15:25:27 INFO master.Master: Registering app Spark shell
14/07/22 15:25:27 INFO master.Master: Registered app Spark shell with ID app-20140722152527-0001
14/07/22 15:25:27 INFO master.Master: Launching executor app-20140722152527-0001/0 on worker worker-20140722134135-hadoop000-48343

Worker部分日志信息:

Spark assembly has been built with Hive, including Datanucleus jars on classpath
14/07/22 15:25:27 INFO Worker: Asked to launch executor app-20140722152527-0001/0 for Spark shell
Spark assembly has been built with Hive, including Datanucleus jars on classpath
14/07/22 15:25:28 INFO ExecutorRunner: Launch command: "java" "-cp" "::/home/spark/app/spark-1.0.1-bin-2.3.0-cdh5.0.0/conf:/home/spark/app/spark-1.0.1-bin-2.3.0-cdh5.0.0/lib/spark-assembly-1.0.1-hadoop2.3.0-cdh5.0.0.jar:/home/spark/app/spark-1.0.1-bin-2.3.0-cdh5.0.0/lib/datanucleus-rdbms-3.2.1.jar:/home/spark/app/spark-1.0.1-bin-2.3.0-cdh5.0.0/lib/datanucleus-core-3.2.2.jar:/home/spark/app/spark-1.0.1-bin-2.3.0-cdh5.0.0/lib/datanucleus-api-jdo-3.2.1.jar" "-XX:MaxPermSize=128m" "-Xms1024M" "-Xmx1024M" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "akka.tcp://[email protected]:50515/user/CoarseGrainedScheduler" "0" "hadoop000" "1" "akka.tcp://[email protected]:48343/user/Worker" "app-20140722152527-0001"

控制台部分日志信息:

14/07/22 15:25:31 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://[email protected]:45150/user/Executor#-791712793] with ID 0
14/07/22 15:25:31 INFO CoarseGrainedExecutorBackend: Successfully registered with driver

每当有新的application注册到master,master都要调度schedule函数将application发送到相应的worker,在对应的worker启动相应的ExecutorBackend,最终的Task就运行在ExecutorBackend中

Spark分析之Standalone运行过程分析

时间: 2024-11-05 14:59:28

Spark分析之Standalone运行过程分析的相关文章

Spark修炼之道(高级篇)——Spark源码阅读:第十节 Standalone运行模式解析

Spark Standalone采用的是Master/Slave架构,主要涉及到的类包括: 类:org.apache.spark.deploy.master.Master 说明:负责整个集群的资源调度及Application的管理. 消息类型: 接收Worker发送的消息 1. RegisterWorker 2. ExecutorStateChanged 3. WorkerSchedulerStateResponse 4. Heartbeat 向Worker发送的消息 1. Registered

【Spark Core】任务运行机制和Task源代码浅析1

引言 上一小节<TaskScheduler源代码与任务提交原理浅析2>介绍了Driver側将Stage进行划分.依据Executor闲置情况分发任务,终于通过DriverActor向executorActor发送任务消息. 我们要了解Executor的运行机制首先要了解Executor在Driver側的注冊过程.这篇文章先了解一下Application和Executor的注冊过程. 1. Task类及其相关 1.1 Task类 Spark将由Executor运行的Task分为ShuffleMa

Spark分析之Job Scheduling Process

经过前面文章的SparkContext.DAGScheduler.TaskScheduler分析,再从总体上了解Spark Job的调度流程 1.SparkContext将job的RDD DAG图提交给DAGScheduler: 2.DAGScheduler将job分解成Stage DAG,将每个Stage的Task封装成TaskSet提交给TaskScheduler:窄依赖以pipeline方式执行,效率高: 3.TaskScheduler将TaskSet中的一个个Task提交到集群中去运行:

Task运行过程分析3——Map Task内部实现

Map Task内部实现 在Task运行过程分析2中提到,MapTask分为4种,分别是Job-setup Task.Job-cleanup Task.Task-cleanup Task和Map Task.其中,Job-setup Task和Job-cleanup Task分别是作业运行时启动的第一个任务和最后一个任务,主要工作分别是进行一些作业初始化和收尾工作,比如创建和删除作业临时输出目录:而Task-cleanup Task则是任务失败或者被杀死后,用于清理已写入临时目录中数据的任务.本文

Task运行过程分析4——Map Task内部实现2

在Task运行过程分析3--MapTask内部实现中,我们分析了MapTask的Collect阶段,并且解读了环形缓冲区使得MapTask的Collect阶段和Spill阶段可并行执行...接下来分析Spill阶段和Combine阶段... Spill过程分析 Spill过程由SpillThread线程完成,SpillThread线程实际上是缓冲区kvbuffer的消费者 protected class SpillThread extends Thread { @Override public

Spark分析之Dependency

在Spark中,每一个RDD是对于数据集在某一状态下的表现形式,比如说:map.filter.group by等都算一次操作,这个状态有可能是从前一状态转换而来的: 因此换句话说一个RDD可能与之前的RDD(s)有依赖关系:RDD之间存在依赖关系: 根据依赖关系的不同,可以将RDD分成两种不同的类型:宽依赖和窄依赖. 窄依赖:一个父RDD的partition至多被子RDD的某个partition使用一次: 宽依赖:一个父RDD的partition会被子RDD的partition使用多次,需要sh

Spark新手入门——3.Spark集群(standalone模式)安装

主要包括以下三部分,本文为第三部分: 一. Scala环境准备 查看二. Hadoop集群(伪分布模式)安装 查看三. Spark集群(standalone模式)安装 Spark集群(standalone模式)安装 若使用spark对本地文件进行测试学习,可以不用安装上面的hadoop环境,若要结合hdfs使用spark,则可以参考上面的步骤搭建hadoop. 1. 下载安装包并解压(如:~/tools/spark-2.3.1-bin-hadoop2.7): 2. 启动服务 a.启动master

Spark集群-Standalone 模式

Spark 集群相关 来源于官方, 可以理解为是官方译文, 外加一点自己的理解. 版本是2.4.4 本篇文章涉及到: 集群概述 master, worker, driver, executor的理解 打包提交,发布 Spark application standalone模式 SparkCluster 启动 及相关配置 资源, executor分配 开放网络端口 高可用(Zookeeper) 名词解释 Term(术语) Meaning(含义) Application 用户构建在 Spark 上的

Spark学习笔记-如何运行wordcount(使用jar包)

IDE:eclipse Spark:spark-1.1.0-bin-hadoop2.4 scala:2.10.4 创建scala工程,编写wordcount程序如下 package com.luogankun.spark.base import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ /** * 统计字符出现次数 */ object