Spark源码解读-JOB的提交与执行

本文以wordCount为例,详细说明spark创建和运行job的过程,重点是在进程及线程的创建。

实验环境搭建

在进行后续操作前,确保下列条件已满足。

1. 下载spark binary 0.9.1

2. 安装scala

3. 安装sbt

4. 安装java

启动spark-shell单机模式运行,即local模式

local模式运行非常简单,只要运行以下命令即可,假设当前目录是$SPARK_HOME

MASTER=local
bin/spark-shell

"MASTER=local"就是表明当前运行在单机模式

local cluster方式运行

localcluster模式是一种伪cluster模式,在单机环境下模拟standalone的集群,启动顺序分别如下

1. 启动master

2. 启动worker

3. 启动spark-shell

master$SPARK_HOME/sbin/start-master.sh

注意运行时的输出,日志默认保存在$SPARK_HOME/logs目录。

master主要是运行类 org.apache.spark.deploy.master.Master,在8080端口启动监听,日志如下图所示

修改配置

1. 进入$SPARK_HOME/conf目录

2. 将spark-env.sh.template重命名为spark-env.sh

3. 修改spark-env.sh,添加如下内容

export SPARK_MASTER_IP=localhostexport
SPARK_LOCAL_IP=localhost运行workerbin/spark-class
org.apache.spark.deploy.worker.Worker spark://localhost:7077 -i 127.0.0.1  -c 1 -m 512M

worker启动完成,连接到master。打开maser的webui可以看到连接上来的worker. Master WEb UI的监听地址是http://localhost:8080

启动spark-shellMASTER=spark://localhost:7077
bin/spark-shell

如果一切顺利,将看到下面的提示信息。

Created spark
context..Spark
context available as sc.

可以用浏览器打开localhost:4040来查看如下内容

1. stages

2. storage

3. environment

4. executors

wordcount

上述环境准备妥当之后,我们在sparkshell中运行一下最简单的例子,在spark-shell中输入如下代码

scala>sc.textFile("README.md").filter(_.contains("Spark")).count

上述代码统计在README.md中含有Spark的行数有多少

部署过程详解

Spark布置环境中组件构成如下图所示。

  • Driver Program 简要来说在spark-shell中输入的wordcount语句对应于上图的Driver
    Program.
  • Cluster Manager 就是对应于上面提到的master,主要起到deploy
    management的作用
  • Worker Node 与Master相比,这是slave
    node。上面运行各个executor,executor可以对应于线程。executor处理两种基本的业务逻辑,一种就是driver     programme,另一种就是job在提交之后拆分成各个stage,每个stage可以运行一到多个task

Notes: 在集群(cluster)方式下, Cluster Manager运行在一个jvm进程之中,而worker运行在另一个jvm进程中。在local cluster中,这些jvm进程都在同一台机器中,如果是真正的standalone或Mesos及Yarn集群,worker与master或分布于不同的主机之上。

JOB的生成和运行

job生成的简单流程如下

1. 首先应用程序创建SparkContext的实例,如实例为sc

2. 利用SparkContext的实例来创建生成RDD

3. 经过一连串的transformation操作,原始的RDD转换成为其它类型的RDD

4. 当action作用于转换之后RDD时,会调用SparkContext的runJob方法

5. sc.runJob的调用是后面一连串反应的起点,关键性的跃变就发生在此处

调用路径大致如下

1. sc.runJob->dagScheduler.runJob->submitJob

2. DAGScheduler::submitJob会创建JobSummitted的event发送给内嵌类eventProcessActor

3. eventProcessActor在接收到JobSubmmitted之后调用processEvent处理函数

4. job到stage的转换,生成finalStage并提交运行,关键是调用submitStage

5. 在submitStage中会计算stage之间的依赖关系,依赖关系分为宽依赖和窄依赖两种

6. 如果计算中发现当前的stage没有任何依赖或者所有的依赖都已经准备完毕,则提交task

7. 提交task是调用函数submitMissingTasks来完成

8. task真正运行在哪个worker上面是由TaskScheduler来管理,也就是上面的submitMissingTasks会调用TaskScheduler::submitTasks

9. TaskSchedulerImpl中会根据Spark的当前运行模式来创建相应的backend,如果是在单机运行则创建LocalBackend

10. LocalBackend收到TaskSchedulerImpl传递进来的ReceiveOffers事件

11. receiveOffers->executor.launchTask->TaskRunner.run

代码片段executor.lauchTask

def launchTask(context:
ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { 
  val tr
= new TaskRunner(context,
taskId, serializedTask) 
  runningTasks.put(taskId, tr) 
  threadPool.execute(tr)  }

说了这么一大通,也就是讲最终的逻辑处理切切实实是发生在TaskRunner这么一个executor之内。

运算结果是包装成为MapStatus然后通过一系列的内部消息传递,反馈到DAGScheduler,这一个消息传递路径不是过于复杂,有兴趣可以自行勾勒。

时间: 2024-08-28 01:34:15

Spark源码解读-JOB的提交与执行的相关文章

struct2源码解读(11)之执行action请求中篇

struct2源码解读之执行action请求(2) 上篇博文介绍了执行action请求前的一些准备工作,包括封装http参数到一个map中,获得一个值栈对象和配置信息configuration,并创建一个执行action请求的actionProxy对象,并对这个对象进行了初始化,包括指定默认执行方法和对actionName对应的类进行依赖注入以及调出拦截器. 然后就到了我们执行action请求的工作.  if (mapping.getResult() != null) {            

Spark修炼之道(高级篇)——Spark源码阅读:第八节 Task执行

Task执行 在上一节中,我们提到在Driver端CoarseGrainedSchedulerBackend中的launchTasks方法向Worker节点中的Executor发送启动任务命令,该命令的接收者是CoarseGrainedExecutorBackend(Standalone模式),类定义源码如下: private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, driverUrl: St

Apache Spark源码分析-- Job的提交与运行

本文以wordCount为例,详细说明spark创建和运行job的过程,重点是在进程及线程的创建. 实验环境搭建 在进行后续操作前,确保下列条件已满足. 1. 下载spark binary 0.9.1 2. 安装scala 3. 安装sbt 4. 安装java 启动spark-shell单机模式运行,即local模式 local模式运行非常简单,只要运行以下命令即可,假设当前目录是$SPARK_HOME MASTER=local bin/spark-shell "MASTER=local&quo

spark源码解读-SparkContext初始化过程

sparkcontext是spark应用程序的入口,每个spark应用都会创建sparkcontext,用于连接spark集群来执行计算任务.在sparkcontext初始化过程中会创建SparkEnv,SparkUI,TaskSchedule,DAGSchedule等多个核心类,我们会逐个分析他们. 下面我们看一下sparkcontext的初始化过程,首先判断一些参数, try { _conf = config.clone() _conf.validateSettings() if (!_co

Spark源码分析之二:Job的调度模型与运行反馈

在<Spark源码分析之Job提交运行总流程概述>一文中,我们提到了,Job提交与运行的第一阶段Stage划分与提交,可以分为三个阶段: 1.Job的调度模型与运行反馈: 2.Stage划分: 3.Stage提交:对应TaskSet的生成. 今天,我们就结合源码来分析下第一个小阶段:Job的调度模型与运行反馈. 首先由DAGScheduler负责将Job提交到事件队列eventProcessLoop中,等待调度执行.入口方法为DAGScheduler的runJon()方法.代码如下: [jav

Spark源码分析之五:Task调度(一)

在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段: 1.Job的调度模型与运行反馈: 2.Stage划分: 3.Stage提交:对应TaskSet的生成. Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分为一个个stage,并将每个stage转化为tasks的集合--TaskSet.

Spark学习之路 (十六)SparkCore的源码解读(二)spark-submit提交脚本

讨论QQ:1586558083 目录 一.概述 二.源码解读 2.2 find-spark-home 2.3 spark-class 2.4 SparkSubmit 正文 回到顶部 一.概述 上一篇主要是介绍了spark启动的一些脚本,这篇主要分析一下Spark源码中提交任务脚本的处理逻辑,从spark-submit一步步深入进去看看任务提交的整体流程,首先看一下整体的流程概要图: 回到顶部 二.源码解读 2.1 spark-submit # -z是检查后面变量是否为空(空则真) shell可以

15、Spark Streaming源码解读之No Receivers彻底思考

在前几期文章里讲了带Receiver的Spark Streaming 应用的相关源码解读,但是现在开发Spark Streaming的应用越来越多的采用No Receivers(Direct Approach)的方式,No Receiver的方式的优势: 1. 更强的控制自由度 2. 语义一致性 其实No Receivers的方式更符合我们读取数据,操作数据的思路的.因为Spark 本身是一个计算框架,他底层会有数据来源,如果没有Receivers,我们直接操作数据来源,这其实是一种更自然的方式

spark源码分析之Executor启动与任务提交篇

任务提交流程 概述 在阐明了Spark的Master的启动流程与Worker启动流程.接下继续执行的就是Worker上的Executor进程了,本文继续分析整个Executor的启动与任务提交流程 Spark-submit 提交一个任务到集群通过的是Spark-submit 通过启动脚本的方式启动它的主类,这里以WordCount为例子 spark-submit --class cn.itcast.spark.WordCount bin/spark-clas -> org.apache.spar