Apache Spark源码分析-- Job的提交与运行

本文以wordCount为例,详细说明spark创建和运行job的过程,重点是在进程及线程的创建。

实验环境搭建

在进行后续操作前,确保下列条件已满足。

1. 下载spark binary 0.9.1

2. 安装scala

3. 安装sbt

4. 安装java

启动spark-shell单机模式运行,即local模式

local模式运行非常简单,只要运行以下命令即可,假设当前目录是$SPARK_HOME

MASTER=local
bin/spark-shell

"MASTER=local"就是表明当前运行在单机模式

local
cluster方式运行

localcluster模式是一种伪cluster模式,在单机环境下模拟standalone的集群,启动顺序分别如下

1. 启动master

2. 启动worker

3. 启动spark-shell

master$SPARK_HOME/sbin/start-master.sh

注意运行时的输出,日志默认保存在$SPARK_HOME/logs目录。

master主要是运行类 org.apache.spark.deploy.master.Master在8080端口启动监听,日志如下图所示

修改配置

1. 进入$SPARK_HOME/conf目录

2. 将spark-env.sh.template重命名为spark-env.sh

3. 修改spark-env.sh,添加如下内容

export
SPARK_MASTER_IP=localhostexport
SPARK_LOCAL_IP=localhost运行workerbin/spark-class
org.apache.spark.deploy.worker.Worker
spark://localhost:7077 -i 127.0.0.1  -c 1 -m 512M

worker启动完成,连接到master。打开maser的webui可以看到连接上来的worker. Master WEb
UI的监听地址是http://localhost:8080

启动spark-shellMASTER=spark://localhost:7077
bin/spark-shell

如果一切顺利,将看到下面的提示信息。

Created spark
context..Spark context available as
sc.

可以用浏览器打开localhost:4040来查看如下内容

1. stages

2. storage

3. environment

4. executors

wordcount

上述环境准备妥当之后,我们在sparkshell中运行一下最简单的例子,在spark-shell中输入如下代码

scala>sc.textFile("README.md").filter(_.contains("Spark")).count

上述代码统计在README.md中含有Spark的行数有多少

部署过程详解

Spark布置环境中组件构成如下图所示。

  • Driver
    Program
     简要来说在spark-shell中输入的wordcount语句对应于上图的Driver
    Program.
  • Cluster
    Manager 
    就是对应于上面提到的master,主要起到deploy
    management的作用
  • Worker
    Node 
    与Master相比,这是slave
    node。上面运行各个executor,executor可以对应于线程。executor处理两种基本的业务逻辑,一种就是driver 
       programme,另一种就是job在提交之后拆分成各个stage,每个stage可以运行一到多个task

Notes: 在集群(cluster)方式下, Cluster
Manager运行在一个jvm进程之中,而worker运行在另一个jvm进程中。在local
cluster中,这些jvm进程都在同一台机器中,如果是真正的standalone或Mesos及Yarn集群,worker与master或分布于不同的主机之上。

JOB的生成和运行

job生成的简单流程如下

1. 首先应用程序创建SparkContext的实例,如实例为sc

2. 利用SparkContext的实例来创建生成RDD

3. 经过一连串的transformation操作,原始的RDD转换成为其它类型的RDD

4. 当action作用于转换之后RDD时,会调用SparkContext的runJob方法

5. sc.runJob的调用是后面一连串反应的起点,关键性的跃变就发生在此处

调用路径大致如下

1. sc.runJob->dagScheduler.runJob->submitJob

2.
DAGScheduler::submitJob会创建JobSummitted的event发送给内嵌类eventProcessActor

3.
eventProcessActor在接收到JobSubmmitted之后调用processEvent处理函数

4. job到stage的转换,生成finalStage并提交运行,关键是调用submitStage

5.
在submitStage中会计算stage之间的依赖关系,依赖关系分为宽依赖窄依赖两种

6. 如果计算中发现当前的stage没有任何依赖或者所有的依赖都已经准备完毕,则提交task

7. 提交task是调用函数submitMissingTasks来完成

8.
task真正运行在哪个worker上面是由TaskScheduler来管理,也就是上面的submitMissingTasks会调用TaskScheduler::submitTasks

9. TaskSchedulerImpl中会根据Spark的当前运行模式来创建相应的backend,如果是在单机运行则创建LocalBackend

10. LocalBackend收到TaskSchedulerImpl传递进来的ReceiveOffers事件

11. receiveOffers->executor.launchTask->TaskRunner.run

代码片段executor.lauchTask

def launchTask(context:
ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)

  val tr
= new TaskRunner(context,
taskId, serializedTask)    runningTasks.put(taskId,
tr)   
threadPool.execute(tr)  }

说了这么一大通,也就是讲最终的逻辑处理切切实实是发生在TaskRunner这么一个executor之内。

运算结果是包装成为MapStatus然后通过一系列的内部消息传递,反馈到DAGScheduler,这一个消息传递路径不是过于复杂,有兴趣可以自行勾勒。

更多精彩内容请关注:http://bbs.superwu.cn

关注超人学院微信二维码:

关注超人学院java免费学习交流群:

时间: 2024-08-24 05:19:10

Apache Spark源码分析-- Job的提交与运行的相关文章

Spark源码分析之五:Task调度(一)

在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段: 1.Job的调度模型与运行反馈: 2.Stage划分: 3.Stage提交:对应TaskSet的生成. Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分为一个个stage,并将每个stage转化为tasks的集合--TaskSet.

Spark源码分析之二:Job的调度模型与运行反馈

在<Spark源码分析之Job提交运行总流程概述>一文中,我们提到了,Job提交与运行的第一阶段Stage划分与提交,可以分为三个阶段: 1.Job的调度模型与运行反馈: 2.Stage划分: 3.Stage提交:对应TaskSet的生成. 今天,我们就结合源码来分析下第一个小阶段:Job的调度模型与运行反馈. 首先由DAGScheduler负责将Job提交到事件队列eventProcessLoop中,等待调度执行.入口方法为DAGScheduler的runJon()方法.代码如下: [jav

spark源码分析之Executor启动与任务提交篇

任务提交流程 概述 在阐明了Spark的Master的启动流程与Worker启动流程.接下继续执行的就是Worker上的Executor进程了,本文继续分析整个Executor的启动与任务提交流程 Spark-submit 提交一个任务到集群通过的是Spark-submit 通过启动脚本的方式启动它的主类,这里以WordCount为例子 spark-submit --class cn.itcast.spark.WordCount bin/spark-clas -> org.apache.spar

Apache Spark源码走读之5 -- DStream处理的容错性分析

欢迎转载,转载请注明出处,徽沪一郎,谢谢. 在流数据的处理过程中,为了保证处理结果的可信度(不能多算,也不能漏算),需要做到对所有的输入数据有且仅有一次处理.在Spark Streaming的处理机制中,不能多算,比较容易理解.那么它又是如何作到即使数据处理结点被重启,在重启之后这些数据也会被再次处理呢? 环境搭建 为了有一个感性的认识,先运行一下简单的Spark Streaming示例.首先确认已经安装了openbsd-netcat. 运行netcatnc -lk 9999 运行spark-s

Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析

欢迎转载,转载请注明出处,徽沪一郎. 概要 本文就standalone部署方式下的容错性问题做比较细致的分析,主要回答standalone部署方式下的包含哪些主要节点,当某一类节点出现问题时,系统是如何处理的. Standalone部署的节点组成 介绍Spark的资料中对于RDD这个概念涉及的比较多,但对于RDD如何运行起来,如何对应到进程和线程的,着墨的不是很多. 在实际的生产环境中,Spark总是会以集群的方式进行运行的,其中standalone的部署方式是所有集群方式中最为精简的一种,另外

Apache Spark源码走读之7 -- Standalone部署方式分析

欢迎转载,转载请注明出处,徽沪一郎. 楔子 在Spark源码走读系列之2中曾经提到Spark能以Standalone的方式来运行cluster,但没有对Application的提交与具体运行流程做详细的分析,本文就这些问题做一个比较详细的分析,并且对在standalone模式下如何实现HA进行讲解. 没有HA的Standalone运行模式 先从比较简单的说起,所谓的没有ha是指master节点没有ha. 组成cluster的两大元素即Master和Worker.slave worker可以有1到

Spark源码分析之四:Stage提交

各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交. Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示: 与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那么顺理成章的下一步便是Stage的提交.在han

Apache Spark源码走读之6 -- 存储子系统分析

欢迎转载,转载请注明出处,徽沪一郎. 楔子 Spark计算速度远胜于Hadoop的原因之一就在于中间结果是缓存在内存而不是直接写入到disk,本文尝试分析Spark中存储子系统的构成,并以数据写入和数据读取为例,讲述清楚存储子系统中各部件的交互关系. 存储子系统概览 上图是Spark存储子系统中几个主要模块的关系示意图,现简要说明如下 CacheManager  RDD在进行计算的时候,通过CacheManager来获取数据,并通过CacheManager来存储计算结果 BlockManager

Apache Spark源码走读之13 -- hiveql on spark实现详解

欢迎转载,转载请注明出处,徽沪一郎 概要 在新近发布的spark 1.0中新加了sql的模块,更为引人注意的是对hive中的hiveql也提供了良好的支持,作为一个源码分析控,了解一下spark是如何完成对hql的支持是一件非常有趣的事情. Hive简介 Hive的由来 以下部分摘自Hadoop definite guide中的Hive一章 "Hive由Facebook出品,其设计之初目的是让精通SQL技能的分析师能够对Facebook存放在HDFS上的大规模数据集进行分析和查询. Hive大大