Samza在YARN上的启动过程 =》 之一

运行脚本,提交job


往YARN提交Samza job要使用run-job.sh这个脚本。

samza-example/target/bin/run-job.sh
 --config-factory=samza.config.factories.PropertiesConfigFactory
 --config-path=file://$PWD/config/hello-world.properties

这脚本的内容是什么呢?

exec $(dirname $0)/run-class.sh org.apache.samza.job.JobRunner
[email protected]

它是调用run-class这个脚本。

run-class.sh会根据环境变量HADOOP_CONF_DIR和HADOOP_YARN_HOME获取YARN的配置文件位置,把它加入CLASSPATH。同时会把samza
job根目录下的lib文件夹里的jar或war都加进CLASSPATH。只所以需要yarn的配置文件,目的是得到Resource
Manager的地址。而lib目录下的包,是运行job必须的。

run-class.sh也会通过环境变量$SAMZA_LOG_DIR获知samza的log应存放的位置,通过$SAMZA_CONTAINER_NAME决定container的名字,然后把它们用-D设在JAVA_OPTS中。然后在lib目录下查找log4j.xml文件,存在的话,就把-Dlog4j.configuration设为log4j.xml的路径。

通过以上动作,构造好了调用java所需的classpath和任务运行时需要的一些配置项,然后调用

exec $JAVA $JAVA_OPTS -cp $CLASSPATH [email protected]

启动虚拟机。

可以,在调用run-job.sh时,会运行org.apache.samza.job.JobRunner这个类。在samza的官方指南中简要介绍了一下这个类的作用。Samza自带了两种StreamJobFactory
:LocalJobFactory 和 YarnJobFactory 。
StreamJobFactory的作用就是把给JobRunner提供一个可以执行的job

?





1

2

3

public interface StreamJobFactory {

  StreamJob getJob(Config config);

}

而StreamJob就是一个可以执行的job, JobRunner会调用它的submit方法  

?





1

2

3

4

5

6

7

8

9

10

11

public interface StreamJob {

  StreamJob submit();

  StreamJob kill();

  ApplicationStatus waitForFinish(long
timeoutMs);

  ApplicationStatus waitForStatus(ApplicationStatus status, long
timeoutMs);

  ApplicationStatus getStatus();

}

下边来看JobRunner这个类。程序入口在JobRunner这个类的伴生对象里

?





1

2

3

4

5

6

7

8

object JobRunner extends
Logging {

  def main(args: Array[String]) {

    val cmdline = new
CommandLine

    val options = cmdline.parser.parse(args: _*)

    val config = cmdline.loadConfig(options)

    new
JobRunner(config).run

  }

}

  在配置参数以后,会走到JobRunner的run方法, 下边是它的主要逻辑

?





1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

   val jobFactoryClass = conf.getStreamJobFactoryClass match {

     case
Some(factoryClass) => factoryClass

     case
_ => throw
new SamzaException("no job factory class defined")

   }

   val jobFactory = Class.forName(jobFactoryClass).newInstance.asInstanceOf[StreamJobFactory]

// Create the actual job, and submit it.

   val job = jobFactory.getJob(conf).submit  //提交job

   info("waiting for job to start")

   // Wait until the job has started, then exit.

   Option(job.waitForStatus(Running, 500)) match {

     case
Some(appStatus) => {

       if
(Running.equals(appStatus)) {

         info("job started successfully")

       } else
{

         warn("unable to start job successfully. job has status %s"
format (appStatus))

       }

     }

     case
_ => warn("unable to start job successfully.")

   }

  首先,它会去conf里找到是否设置了job.factory,即有没有指定StreamJobFactory的实现,没有就抛出异常退出。否则就通过这个StreamJobFactory提交job。在提交后,等待500毫秒,如果任务的状态不是Running就退出。这里的Running并不代表任务已经在跑了,比如在使用YARN时,只要成功提交给Resource
Manager,就算是running了,所以这里的running是“任务提交成功“的意思。

YarnJob的实现

当提交给YARN时,我们使用YarnJobFactory这个StreamJobFactory的实现。

?





1

2

3

4

5

6

7

8

9

10

class YarnJobFactory extends
StreamJobFactory {

  def getJob(config: Config) = {

    // TODO fix this. needed to support http package locations.

    //这里会读yarn-site.xml。前提是yarn-site.xml必须在classpath里 在run-class.sh里,HADOOP_CONF_DIR路径被写进了classpath里

    val hConfig = new
YarnConfiguration

    hConfig.set("fs.http.impl", classOf[HttpFileSystem].getName)

    new
YarnJob(config, hConfig)

  }

}

  

” hConfig.set("fs.http.impl",
classOf[HttpFileSystem].getName)“
这一句是使得可以在job里调用http文件系统,如把文件路径写成"http://xxx.xx.xx.xx:8080/xx/xx"这种。Samza自带了一个HTTP
filsystem的实现。或许是LinkedIn的人需要这么用?

YarnJobFactory,主要就是构造了一个YarnConfiguration,和以前的commandLine参数一起作为config来构造一个YarnJob.
YarnConfiguration是YARN自己的类,它会从classpath里读yarn-site.xml这个配置文件。

YarnJob这个才是要被提交给Yarn的任务,它实现了StreamJob这个接口。这里主要关心它的submit方法。

  val client = new ClientHelper(hadoopConfig)
  var appId:
Option[ApplicationId] = None

?





1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

//提交job,注意会有AppMaster所需的内存和cpu数目。但不包括container数目

 def submit: YarnJob = {

   appId = client.submitApplication( //注意submitApplication的返回值是appId

     new
Path(config.getPackagePath.getOrElse(throw
new SamzaException("No YARN package path defined in config."))),

     config.getAMContainerMaxMemoryMb.getOrElse(DEFAULT_AM_CONTAINER_MEM),

     1,

     List(

       "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/bin/run-am.sh 1>logs/%s 2>logs/%s"

         format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)),

     Some(Map(

       ShellCommandConfig.ENV_CONFIG -> Util.envVarEscape(JsonConfigSerializer.toJson(config)),

       ShellCommandConfig.ENV_CONTAINER_NAME -> Util.envVarEscape("application-master"),

       ShellCommandConfig.ENV_JAVA_OPTS -> Util.envVarEscape(config.getAmOpts.getOrElse("")))),

     Some("%s_%s"
format (config.getName.get, config.getJobId.getOrElse(1))))

   this

 }

  submit方法来提交任务的过程交给了ClientHelper的submitAppliation来实现。

  这个方法才是提交YARN任务的关键。

  首先,我们知道要提交任务给YARN一定实现YARN指定的接口。那么来揣摩一下YARN需要我们提供给它什么东西,它才能调度一个job的执行。

  来猜猜看(实际上我已经知道了一些,看能不能想得更全一点,重要是理清思路)

  1. 首先,被提交的是什么?我已经知道YARN程序的执行过程是,先向YARN申请资源来运行一个 application master,
    再由application
    master申请后续的资源。因此,这里提交的是一个请求,请求的内容是:RM,我要启动一个AM,给我分个container吧

  2. RM在什么时候才会同意一个application master的申请呢? 如果系统里的资源不足,有很多任务在运行,它会给拒了吗?

  3. 假如,RM批准了请求,给分了一个container。那么,那么……它只是一个container,毕竟没人使用它来执行程序。那么,谁来使用这个container来跑程序呢?那就是NodeManager了。那们在提交对AM的申请时,我们就需要告诉Yarn说我”需要NodeManager在这个container里这样……这样……这样做”

  4. 或许我们还该告诉Yarn这个AM需要的资源数量。毕竟运行这个AM也需要一个container,而一个container的核心就是它对应着一些资源。实际上,瞅一眼下边的方法签名,就会看到cpu和memory这两个大字。

  先看它的签名

   def submitApplication(packagePath: Path, memoryMb: Int, cpuCore:
Int, cmds: List[String], env: Option[Map[String, String]], name:
Option[String]): Option[ApplicationId]

下边介绍一下各个参数的含义,有助于我们了解这个方法都干了啥事。

  • packagePath
    这个机制之前没有想到。我们实际上提交任务时,只需要告诉yarn这个任务所需要资源(文件资源)的地址,由NodeManager去down下来这个资源,做本地化。而不是把这个job
    package整个传输给yarn。这样设计使得获取任务资源更灵活,比如job
    package可以在http文件系统里,像Samza的例子里一样,也可以在HDFS里(这个得需要配一下,在Samza工程里所有以.md结尾里的文件里搜hdfs,就会找到使用HDFS存放job
    package的指南),当然也可以在本地(在没有成功搞在http和hdfs两种方式之前,曾经试过这样……)。好吧,这样起码不会给RM在存储上造成压力。

  • memoryMb 这个是想要给运行AM的container分配的内存大小

  • cpuCore 需要几个core,这个是虚拟的,具体怎么实现,有待研究……

  • cmds 这个就是NodeManage要执行命令吧

  • env 环境变量,这个是配给运行AM的虚拟机的

  • name 这个是这个job的名字,就是YARN job的名字,就是在RM的web
    UI上看到的名字。同时,Samza在bin目录下有个脚本,可以到当前在执行samza job。

返回值是ApplicationId。这是一个Option,所以提交失败时,返回值就是None。

再写下去submitApplication这个方法的实现,就有些太长,换下一篇

Samza在YARN上的启动过程 =》 之一,布布扣,bubuko.com

时间: 2024-12-18 05:08:27

Samza在YARN上的启动过程 =》 之一的相关文章

Samza在YARN上的启动过程 =》 之二 submitApplication

首先,来看怎么构造一个org.apache.hadoop.yarn.client.api.YarnClient ? 1 2 3 4 5 class ClientHelper(conf: Configuration) extends Logging {   val yarnClient = YarnClient.createYarnClient   info("trying to connect to RM %s" format conf.get(YarnConfiguration.RM

JBoss启动过程详解

今天看了看jboss的boot.log和server.log日志,结合自己的理解和其他的资料,现对jboss的启动和加载过程做出如下总结: boot.xml是服务器的启动过程的日志,不涉及后续的操作过程 server.xml是操作过程的日志,是更加详细的,其中包含了启动的过程 本文以JBoss Application Server 4.2.1 GA(以下简称JBoss)为例,介绍它在Windows平台上的启动过程.为了方便叙述,对平台环境做以下假定:Java运行时的安装路径为C:/Java,JB

VxWorks启动过程详解(上)

vxworks有三种映像: VxWorks Image的文件类型有三种 Loadable Images:由Boot-ROM引导通过网口或串口下载到RAM ROM-based Images(压缩/没有压缩):即将Image直接烧入ROM,运行时将Image拷入RAM中运行. ROM-Resident Images:Image的指令部分驻留在ROM中运行,仅将数据段部分拷入RAM. 注意这里说的三种映像都是包含真正操作系统VxWorks的映像,其中后两种可以直接启动并运行起来,但是第一种不行,它必须

光盘上CentOS 安装程序启动过程

光盘上CentOS 安装程序启动过程 MBR:boot.cat stage2: isolinux/isolinux.bin 配置文件:isolinux/isolinux.bin 每个对应菜单选项 加载内核:isolinuz/vmlinuz 向内核传递参数:append initrd=initrd.img .. 随后装载根文件系统,并启动anaconda ##anaconda应用的工作过程 安装前的配置 键盘类型 语言 .. 安装阶段 在目标磁盘创建分区.格式化.安装bootloader.. 首次

VxWorks启动过程的详细解释(上)

vxworks有三个图像: VxWorks Image有三种类型的文件 Loadable Images:由Boot-ROM引导通过网口或串口下载到RAM ROM-based Images(压缩/没有压缩):即将Image直接烧入ROM,执行时将Image拷入RAM中执行. ROM-Resident Images:Image的指令部分驻留在ROM中执行.仅将数据段部分拷入RAM. 注意这里说的三种映像都是包括真正操作系统VxWorks的映像,当中后两种能够直接启动并执行起来.可是第一种不行,它必须

MapReduce框架在Yarn上的具体解释

MapReduce任务解析 在YARN上一个MapReduce任务叫做一个Job. 一个Job的主程序在MapReduce框架上实现的应用名称叫MRAppMaster. MapReduce任务的Timeline 这是一个MapReduce作业运行时间: Map 阶段:依据数据块会运行多个Map Task Reduce 阶段:依据配置项会运行多个Reduce Task 为提高Shuffle效率Reduce阶段会在Map阶段结束之前就開始.(直到全部MapTask完毕之后ReduceTask才干完毕

【译】Yarn上常驻Spark-Streaming程序调优

作者从容错.性能等方面优化了长时间运行在yarn上的spark-Streaming作业 对于长时间运行的Spark Streaming作业,一旦提交到YARN群集便需要永久运行,直到有意停止.任何中断都会引起严重的处理延迟,并可能导致数据丢失或重复.YARN和Apache Spark都不是为了执行长时间运行的服务而设计的.但是,它们已经成功地满足了近实时数据处理作业的常驻需求.成功并不一定意味着没有技术挑战. 这篇博客总结了在安全的YARN集群上,运行一个关键任务且长时间的Spark Strea

如何在yarn上运行Hello World(二)

在之前的一篇文章我们介绍了如何编写在yarn集群提交运行应用的AM的yarnClient端,现在我们来继续介绍如何编写在yarn集群控制应用app运行的核心模块 ApplicationMaster. 首先,介绍下我们要在yarn集群上运行的应用程序 HelloWorld. 这个应用很简单,就是一个主函数,启动的时候打印一句话表明应用已经启动,结束的时候打印一句话表明应用运行结束. 现在我们要用AM把这个应用在YARN集群上运行起来. 上篇文章我们写到 用 YarnClient 提交了 应用到YA

Spark 在yarn上运行模式详解:cluster模式和client模式

1.    官方文档 http://spark.apache.org/docs/latest/running-on-yarn.html 2.    配置安装 2.1.安装hadoop:需要安装HDFS模块和YARN模块,HDFS必须安装,spark运行时要把jar包存放到HDFS上. 2.2.安装Spark:解压Spark安装程序到一台服务器上,修改spark-env.sh配置文件,spark程序将作为YARN的客户端用于提交任务 export JAVA_HOME=/usr/local/jdk1