Spark源码系列(一)spark-submit提交作业过程

前言


折腾了很久,终于开始学习Spark的源码了,第一篇我打算讲一下Spark作业的提交过程。有不明白Spark的原理的话,有另外一位大牛已经写了一个系列的Spark的源码分析了,大家可以去参考他的,他的过程图画得非常好,他写过的我可能就不写了,实在没办法比人家写得更好。

下面给出他的地址: http://www.cnblogs.com/hseagle/p/3664933.html,屌丝们,赶紧去膜拜大神吧。

这个是Spark的App运行图,它通过一个Driver来和集群通信,集群负责作业的分配。今天我要讲的是如何创建这个Driver
Program的过程。

在1.0之前就不需要提交作业,只需要把程序打包成一个jar,然后java -jar就运行了,这个运行的程序就充当了Driver
Program的角色。有可能的一个原因是,以前这个程序脱离了组织,现在组织要对它进行监管了。

作业提交方法以及参数

我们先看一下用Spark Submit提交的方法吧,下面是从官方上面摘抄的内容。


# Run on a Spark standalone cluster
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://207.184.161.138:7077 \
--executor-memory 20G --total-executor-cores 100 /path/to/examples.jar 1000

这个是提交到standalone集群的方式,打开spark-submit这文件,我们会发现它最后是调用了org.apache.spark.deploy.SparkSubmit这个类。

我们直接进去看就行了,main函数就几行代码,太节省了。

def main(args: Array[String]) {
val appArgs = new SparkSubmitArguments(args)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
}

我们主要看看createLaunchEnv方法就可以了,launch是反射调用mainClass,精华全在createLaunchEnv里面了。

在里面我发现一些有用的信息,可能在官方文档上面都没有的,发出来大家瞅瞅。前面不带--的可以在spark-defaults.conf里面设置,带--的直接在提交的时候指定,具体含义大家一看就懂。

val options = List[OptionAssigner](
OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"),
OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraClassPath"),
OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
sysProp = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraLibraryPath"),
OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false,
sysProp = "spark.executor.memory"),
OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"),
OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.files, YARN, true, clOption = "--files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
OptionAssigner(args.archives, YARN, true, clOption = "--archives"),
OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars")
)

yarn模式的话mainClass是org.apache.spark.deploy.yarn.Client,standalone的mainClass是org.apache.spark.deploy.Client。

这次我们讲org.apache.spark.deploy.Client,yarn的话单独找一章出来单独讲,目前超哥还是推荐使用standalone的方式部署spark,具体原因不详,据说是因为资源调度方面的问题。

说个快捷键吧,Ctrl+Shift+N,然后输入Client就能找到这个类,这是IDEA的快捷键,相当好使。

我们直接找到它的main函数,发现了它居然使用了Akka框架,我百度了一下,被它震惊了。

Akka

在main函数里面,主要代码就这么三行。


//创建一个ActorSystem
val (actorSystem, _) = AkkaUtils.createActorSystem("driverClient",Utils.localHostName(),0,
  conf, new SecurityManager(conf))
//执行ClientActor的preStart方法和receive方法
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
//等待运行结束
actorSystem.awaitTermination()

看了这里真的有点儿懵啊,这是啥玩意儿,不懂的朋友们,请点击这里Akka。下面是它官方放出来的例子:


//定义一个case class用来传递参数
case class Greeting(who: String)
//定义Actor,比较重要的一个方法是receive方法,用来接收信息的
class GreetingActor extends Actor with ActorLogging {
def receive = {
case Greeting(who) ? log.info("Hello " + who)
}
}
//创建一个ActorSystem
val system = ActorSystem("MySystem")
//给ActorSystem设置Actor
val greeter = system.actorOf(Props[GreetingActor], name = "greeter")
//向greeter发送信息,用Greeting来传递
greeter ! Greeting("Charlie Parker")

简直是无比强大啊,就这么几行代码就搞定了,接下来看你会更加震惊的。

我们回到Client类当中,找到ClientActor,它有两个方法,是之前说的preStart和receive方法,preStart方法用于连接master提交作业请求,receive方法用于接收从master返回的反馈信息。

我们先看preStart方法吧。


override def preStart() = {
// 这里需要把master的地址转换成akka的地址,然后通过这个akka地址获得指定的actor
// 它的格式是"akka.tcp://%[email protected]%s:%s/user/%s".format(systemName, host, port, actorName)
masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))
// 把自身设置成远程生命周期的事件
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

driverArgs.cmd match {
case "launch" =>
// 此处省略100个字
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
// 此处省略100个字
// 向master发送提交Driver的请求,把driverDescription传过去,RequestSubmitDriver前面说过了,是个case class
masterActor ! RequestSubmitDriver(driverDescription)

case "kill" =>
val driverId = driverArgs.driverId
val killFuture = masterActor ! RequestKillDriver(driverId)
}
}

从上面的代码看得出来,它需要设置master的连接地址,最后提交了一个RequestSubmitDriver的信息。在receive方法里面,就是等待接受回应了,有两个Response分别对应着这里的launch和kill。

线索貌似到这里就断了,那下一步在哪里了呢?当然是在Master里面啦,怎么知道的,猜的,哈哈。

Master也是继承了Actor,在它的main函数里面找到了以下代码:


val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, 
  securityManager = securityMgr)
val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort, securityMgr), actorName)
val timeout = AkkaUtils.askTimeout(conf)
val respFuture = actor.ask(RequestWebUIPort)(timeout)
val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]

和前面的actor基本一致,多了actor.ask这句话,查了一下官网的文档,这句话的意思的发送消息,并且接受一个Future作为response,和前面的actor !
message的区别就是它还接受返回值。

具体的Akka的用法,大家还是参照官网吧,Akka确实如它官网所言的那样子,是一个简单、强大、并行的分布式框架。

小结:

Akka的使用确实简单,短短的几行代码即刻完成一个通信功能,比Socket简单很多。但是它也逃不脱我们常说的那些东西,请求、接收请求、传递的消息、注册的地址和端口这些概念。

调度schedule

我们接下来查找Master的receive方法吧,Master是作为接收方的,而不是主动请求,这点和hadoop是一致的。


    case RequestSubmitDriver(description) => {
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
// 调度
schedule()
 // 告诉client,提交成功了,把driver.id告诉它
sender ! SubmitDriverResponse(true, Some(driver.id), s"Driver successfully submitted as ${driver.id}")
}

这里我们主要看schedule方法就可以了,它是执行调度的方法。


private def schedule() {
if (state != RecoveryState.ALIVE) { return }

// 首先调度Driver程序,从workers里面随机抽一些出来
val shuffledWorkers = Random.shuffle(workers)
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
for (driver <- waitingDrivers) {
// 判断内存和cpu够不够,够的就执行了哈
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
}
}
}

// 这里是按照先进先出的,spreadOutApps是由spark.deploy.spreadOut参数来决定的,默认是true
if (spreadOutApps) {
// 遍历一下app
for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // 记录每个节点的核心数
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
// 遍历直到分配结束
while (toAssign > 0) {
// 从0开始遍历可用的work,如果可用的cpu减去已经分配的>0,就可以分配给它
if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
toAssign -= 1
// 这个位置的work的可分配的cpu数+1
assigned(pos) += 1
}
pos = (pos + 1) % numUsable
}
// 给刚才标记的worker分配任务
for (pos <- 0 until numUsable) {
if (assigned(pos) > 0) {
val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
launchExecutor(usableWorkers(pos), exec)
app.state = ApplicationState.RUNNING
}
}
}
} else {
// 这种方式和上面的方式的区别是,这种方式尽可能用少量的节点来完成这个任务
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
// 判断条件是worker的内存比app需要的内存多
if (canUse(app, worker)) {
val coresToUse = math.min(worker.coresFree, app.coresLeft)
if (coresToUse > 0) {
val exec = app.addExecutor(worker, coresToUse)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
}
}
}
}

它的调度器是这样的,先调度Driver程序,然后再调度App,调度App的方式是从各个worker的里面和App进行匹配,看需要分配多少个cpu。

那我们接下来看两个方法launchDriver和launchExecutor即可。


  def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker)
worker.actor ! LaunchDriver(driver.id, driver.desc)
driver.state = DriverState.RUNNING
}

给worker发送了一个LaunchDriver的消息,下面在看launchExecutor的方法。


  def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}

它要做的事情多一点,除了给worker发送LaunchExecutor指令外,还需要给driver发送ExecutorAdded的消息,说你的任务已经有人干了。

在继续Worker讲之前,我们先看看它是怎么注册进来的,每个Worker启动之后,会自动去请求Master去注册自己,具体我们可以看receive的方法里面的RegisterWorker这一段,它需要上报自己的内存、Cpu、地址、端口等信息,注册成功之后返回RegisteredWorker信息给它,说已经注册成功了。

Worker执行

同样的,我们到Worker里面在receive方法找LaunchDriver和LaunchExecutor就可以找到我们要的东西。


case LaunchDriver(driverId, driverDesc) => {
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
drivers(driverId) = driver
driver.start()

coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
}

看一下start方法吧,start方法里面,其实是new
Thread().start(),run方法里面是通过传过来的DriverDescription构造的一个命令,丢给ProcessBuilder去执行命令,结束之后调用

worker
!DriverStateChanged通知worker,worker再通过master
! DriverStateChanged通知master,释放掉worker的cpu和内存。

同理,LaunchExecutor执行完毕了,通过worker !
ExecutorStateChanged通知worker,然后worker通过master
! ExecutorStateChanged通知master,释放掉worker的cpu和内存。

下面我们再梳理一下这个过程,只包括Driver注册,Driver运行之后的过程在之后的文章再说,比较复杂。

1、Client通过获得Url地址获得ActorSelection(master的actor引用),然后通过ActorSelection给Master发送注册Driver请求(RequestSubmitDriver)

2、Master接收到请求之后就开始调度了,从workers列表里面找出可以用的Worker

3、通过Worker的actor引用ActorRef给可用的Worker发送启动Driver请求(LaunchDriver)

4、调度完毕之后,给Client回复注册成功消息(SubmitDriverResponse)

5、Worker接收到LaunchDriver请求之后,通过传过来的DriverDescription的信息构造出命令来,通过ProcessBuilder执行

6、ProcessBuilder执行完命令之后,通过DriverStateChanged通过Worker

7、Worker最后把DriverStateChanged汇报给Master

Spark源码系列(一)spark-submit提交作业过程

时间: 2024-08-04 17:10:27

Spark源码系列(一)spark-submit提交作业过程的相关文章

Spark源码系列(四)图解作业生命周期

这一章我们探索了Spark作业的运行过程,但是没把整个过程描绘出来,好,跟着我走吧,let you know! 我们先回顾一下这个图,Driver Program是我们写的那个程序,它的核心是SparkContext,回想一下,从api的使用角度,RDD都必须通过它来获得. 下面讲一讲它所不为认知的一面,它和其它组件是如何交互的. Driver向Master注册Application过程 SparkContext实例化之后,在内部实例化两个很重要的类,DAGScheduler和TaskSched

Spark源码系列(三)作业运行过程

导读 看这篇文章的时候,最好是能够跟着代码一起看,我是边看代码边写的,所以这篇文章的前进过程也就是我看代码的推进过程. 作业执行 上一章讲了RDD的转换,但是没讲作业的运行,它和Driver Program的关系是啥,和RDD的关系是啥? 官方给的例子里面,一执行collect方法就能出结果,那我们就从collect开始看吧,进入RDD,找到collect方法. def collect(): Array[T] = { val results = sc.runJob(this, (iter: It

Spark源码系列(七)Spark on yarn具体实现

本来不打算写的了,但是真的是闲来无事,整天看美剧也没啥意思.这一章打算讲一下Spark on yarn的实现,1.0.0里面已经是一个stable的版本了,可是1.0.1也出来了,离1.0.0发布才一个月的时间,更新太快了,节奏跟不上啊,这里仍旧是讲1.0.0的代码,所以各位朋友也不要再问我讲的是哪个版本,目前为止发布的文章都是基于1.0.0的代码. 在第一章<spark-submit提交作业过程>的时候,我们讲过Spark on yarn的在cluster模式下它的main class是or

Spark源码系列(八)Spark Streaming实例分析

这一章要讲Spark Streaming,讲之前首先回顾下它的用法,具体用法请参照<Spark Streaming编程指南>. Example代码分析 val ssc = new StreamingContext(sparkConf, Seconds(1)); // 获得一个DStream负责连接 监听端口:地址 val lines = ssc.socketTextStream(serverIP, serverPort); // 对每一行数据执行Split操作 val words = line

Spark源码系列(五)RDD是如何被分布式缓存?

这一章想讲一下Spark的缓存是如何实现的.这个persist方法是在RDD里面的,所以我们直接打开RDD这个类. def persist(newLevel: StorageLevel): this.type = { // StorageLevel不能随意更改 if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { throw new UnsupportedOperationException("C

hbase源码系列(七)Snapshot的过程

在看这一章之前,建议大家先去看一下snapshot的使用.可能有人会有疑问为什么要做Snapshot,hdfs不是自带了3个备份吗,这是个很大的误区,要知道hdfs的3个备份是用于防止网络传输中的失败或者别的异常情况导致数据块丢失或者不正确,它不能避免人为的删除数据导致的后果.它就想是给数据库做备份,尤其是做删除动作之前,不管是hbase还是hdfs,请经常做Snapshot,否则哪天手贱了... 直接进入主题吧,上代码. public void takeSnapshot(SnapshotDes

hbase源码系列(六)HMaster启动过程

这一章是server端开始的第一章,有兴趣的朋友先去看一下hbase的架构图,我专门从网上弄下来的. 按照HMaster的run方法的注释,我们可以了解到它的启动过程会去做以下的动作. * <li>阻塞直到变成ActiveMaster * <li>结束初始化操作 * <li>循环 * <li>停止服务并执行清理操作* </ol> HMaster是没有单点问题是,因为它可以同时启动多个HMaster,然后通过zk的选举算法选出一个HMaster来.

Spark源码分析之四:Stage提交

各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交. Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示: 与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那么顺理成章的下一步便是Stage的提交.在han

spark源码系列之累加器实现机制及自定义累加器

一,基本概念 累加器是Spark的一种变量,顾名思义该变量只能增加.有以下特点: 1,累加器只能在Driver端构建及并只能是Driver读取结果,Task只能累加. 2,累加器不会改变Spark Lazy计算的特点.只会在Job触发的时候进行相关累加操作. 3,现有累加器的类型.相信有很多学习大数据的道友,在这里我给大家说说我滴群哦,大数据海量知识分享,784789432.在此我保证,绝对大数据的干货,等待各位的到来,我们一同从入门到精通吧! 二,累加器的使用 Driver端初始化,并在Act