运行脚本,提交job
往YARN提交Samza job要使用run-job.sh这个脚本。
samza-example/target/bin/run-job.sh
--config-factory=samza.config.factories.PropertiesConfigFactory
--config-path=file://$PWD/config/hello-world.properties
这脚本的内容是什么呢?
exec $(dirname $0)/run-class.sh org.apache.samza.job.JobRunner
[email protected]
它是调用run-class这个脚本。
run-class.sh会根据环境变量HADOOP_CONF_DIR和HADOOP_YARN_HOME获取YARN的配置文件位置,把它加入CLASSPATH。同时会把samza
job根目录下的lib文件夹里的jar或war都加进CLASSPATH。只所以需要yarn的配置文件,目的是得到Resource
Manager的地址。而lib目录下的包,是运行job必须的。
run-class.sh也会通过环境变量$SAMZA_LOG_DIR获知samza的log应存放的位置,通过$SAMZA_CONTAINER_NAME决定container的名字,然后把它们用-D设在JAVA_OPTS中。然后在lib目录下查找log4j.xml文件,存在的话,就把-Dlog4j.configuration设为log4j.xml的路径。
通过以上动作,构造好了调用java所需的classpath和任务运行时需要的一些配置项,然后调用
exec $JAVA $JAVA_OPTS -cp $CLASSPATH [email protected]
启动虚拟机。
可以,在调用run-job.sh时,会运行org.apache.samza.job.JobRunner这个类。在samza的官方指南中简要介绍了一下这个类的作用。Samza自带了两种StreamJobFactory
:LocalJobFactory 和 YarnJobFactory 。
StreamJobFactory的作用就是把给JobRunner提供一个可以执行的job
?
1 2 3 |
|
而StreamJob就是一个可以执行的job, JobRunner会调用它的submit方法
?
1 2 3 4 5 6 7 8 9 10 11 |
|
下边来看JobRunner这个类。程序入口在JobRunner这个类的伴生对象里
?
1 2 3 4 5 6 7 8 |
|
在配置参数以后,会走到JobRunner的run方法, 下边是它的主要逻辑
?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
|
首先,它会去conf里找到是否设置了job.factory,即有没有指定StreamJobFactory的实现,没有就抛出异常退出。否则就通过这个StreamJobFactory提交job。在提交后,等待500毫秒,如果任务的状态不是Running就退出。这里的Running并不代表任务已经在跑了,比如在使用YARN时,只要成功提交给Resource
Manager,就算是running了,所以这里的running是“任务提交成功“的意思。
YarnJob的实现
当提交给YARN时,我们使用YarnJobFactory这个StreamJobFactory的实现。
?
1 2 3 4 5 6 7 8 9 10 |
|
” hConfig.set("fs.http.impl",
classOf[HttpFileSystem].getName)“
这一句是使得可以在job里调用http文件系统,如把文件路径写成"http://xxx.xx.xx.xx:8080/xx/xx"这种。Samza自带了一个HTTP
filsystem的实现。或许是LinkedIn的人需要这么用?
YarnJobFactory,主要就是构造了一个YarnConfiguration,和以前的commandLine参数一起作为config来构造一个YarnJob.
YarnConfiguration是YARN自己的类,它会从classpath里读yarn-site.xml这个配置文件。
YarnJob这个才是要被提交给Yarn的任务,它实现了StreamJob这个接口。这里主要关心它的submit方法。
val client = new ClientHelper(hadoopConfig)
var appId:
Option[ApplicationId] = None
?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
|
submit方法来提交任务的过程交给了ClientHelper的submitAppliation来实现。
这个方法才是提交YARN任务的关键。
首先,我们知道要提交任务给YARN一定实现YARN指定的接口。那么来揣摩一下YARN需要我们提供给它什么东西,它才能调度一个job的执行。
来猜猜看(实际上我已经知道了一些,看能不能想得更全一点,重要是理清思路)
- 首先,被提交的是什么?我已经知道YARN程序的执行过程是,先向YARN申请资源来运行一个 application master,
再由application
master申请后续的资源。因此,这里提交的是一个请求,请求的内容是:RM,我要启动一个AM,给我分个container吧 - RM在什么时候才会同意一个application master的申请呢? 如果系统里的资源不足,有很多任务在运行,它会给拒了吗?
- 假如,RM批准了请求,给分了一个container。那么,那么……它只是一个container,毕竟没人使用它来执行程序。那么,谁来使用这个container来跑程序呢?那就是NodeManager了。那们在提交对AM的申请时,我们就需要告诉Yarn说我”需要NodeManager在这个container里这样……这样……这样做”
- 或许我们还该告诉Yarn这个AM需要的资源数量。毕竟运行这个AM也需要一个container,而一个container的核心就是它对应着一些资源。实际上,瞅一眼下边的方法签名,就会看到cpu和memory这两个大字。
先看它的签名
def submitApplication(packagePath: Path, memoryMb: Int, cpuCore:
Int, cmds: List[String], env: Option[Map[String, String]], name:
Option[String]): Option[ApplicationId]
下边介绍一下各个参数的含义,有助于我们了解这个方法都干了啥事。
- packagePath
这个机制之前没有想到。我们实际上提交任务时,只需要告诉yarn这个任务所需要资源(文件资源)的地址,由NodeManager去down下来这个资源,做本地化。而不是把这个job
package整个传输给yarn。这样设计使得获取任务资源更灵活,比如job
package可以在http文件系统里,像Samza的例子里一样,也可以在HDFS里(这个得需要配一下,在Samza工程里所有以.md结尾里的文件里搜hdfs,就会找到使用HDFS存放job
package的指南),当然也可以在本地(在没有成功搞在http和hdfs两种方式之前,曾经试过这样……)。好吧,这样起码不会给RM在存储上造成压力。 - memoryMb 这个是想要给运行AM的container分配的内存大小
- cpuCore 需要几个core,这个是虚拟的,具体怎么实现,有待研究……
- cmds 这个就是NodeManage要执行命令吧
- env 环境变量,这个是配给运行AM的虚拟机的
- name 这个是这个job的名字,就是YARN job的名字,就是在RM的web
UI上看到的名字。同时,Samza在bin目录下有个脚本,可以到当前在执行samza job。
返回值是ApplicationId。这是一个Option,所以提交失败时,返回值就是None。
再写下去submitApplication这个方法的实现,就有些太长,换下一篇
Samza在YARN上的启动过程 =》 之一,布布扣,bubuko.com