在之前的一篇文章我们介绍了如何编写在yarn集群提交运行应用的AM的yarnClient端,现在我们来继续介绍如何编写在yarn集群控制应用app运行的核心模块 ApplicationMaster.
首先,介绍下我们要在yarn集群上运行的应用程序 HelloWorld.
这个应用很简单,就是一个主函数,启动的时候打印一句话表明应用已经启动,结束的时候打印一句话表明应用运行结束.
现在我们要用AM把这个应用在YARN集群上运行起来.
上篇文章我们写到 用 YarnClient 提交了 应用到YARN集群运行,这个时候,YARN集群首先会专门为 AM分配一个 container, 然后根据YarnClient构建传递的命令启动 AM,这里比如我们为了 这个写的AM类叫做 MyYarnAM,下面我们来看一下AM的运行过程:
在通过示例代码详细说明AM逻辑之前,我们再复习一下在YARN集群启动一个应用的流程
首先,在YarnClient 提交应用之后,YARN集群会为AM申请一个container,这个container是一个特殊的container,只用来运行 AM,一般用container-0 来表示.
然后,在这个 AM-container被分配后,假如被分配给了 NM-1这台机器,那么这个机器的NM会首先根据 YarnClient传过来的信息启动AM.
然后,AM启动后,AM会向YARN集群申请新的xontainer,用来运行真正的应用,在我们这里就是HelloWorld
这里需要注意的事情,不同于YarnClient申请启动AM,AM启动应用程序的方法会一直堵塞,直到应用程序运行结束
下面我们来仔细分析下如何编写一个 AM.
1.首先AM启动后会调用main函数,并且根据上一篇的文章
启动main函数后会传入一个参数 ,也就是 要运行的 HelloWorld程序在hdfs的路径.
下面来看一下 MyYarnAM的主函数
可以看见主函数的逻辑很简单,就是接收传过来的参数,然后初始化并运行.
所以我们编写AM的第一步首先就是要从主函数接收YarnClient端构建的启动命令中的参数,然后如果有必要的话,通过这些参数来决定AM如何进行运行.
2.现在我们来看如何构建和初始化 MyYarnAM
首先我们 在 MyYarnAM的构造函数里面构建我们在AM需要用到的一些对象.在本例子中,我们主要在构造函数里面构建了两个对象.
一个对象是yarn的配置文件的对象,我们这里就是默认使用的classpath中的 相关配置文件,如果用户有需要,可以配置自己需要的一些参数
其次我们构建了一个端口扫描器,并且设定了端口的范围,这样在后面需要使用端口的时候,为了防止预设的端口已经被占用导致应用无法正常启动,所以我们可以在需要配置端口前先通过这个端口扫描器获得一个目前空闲的,可以被使用的端口进行使用.
然后我们在初始化方法里面解析了传过来的参数,也就是要启动的应用的hdfs的路径的参数.当然,这里面只是一个例子,我们在实际使用过程中可能需要传递更多的参数过来,例如要启动的container的个数,每个container的内存大小,虚拟内存个数等等.
3.在初始化 MyYarnAM之后,后面要开始进行真正的运行了.
首先,AM和和RM和NM进行通讯,所以需要构建对应的 AM<->RM 和 AM <-> NM 的客户端
4.先来看 AM<->RM 的客户端,也就是 AMRMCLientAsync
首先要构造一个监控RM回调的监听器,所有RM下发给AM的通知等信息,都是通过AM和RM之间的心跳响应返回的,然后通过这个监听器回调通知给AM的.
这个监听器有六个回调函数,其中我们目前主要使用的有两个回调函数,分别为 onContainersAllocated(List<Container> containers) 和 onContainersCompleted(List<ContainerStatus> statues)
其中 onContainersAllocated(List<Container> containers) 的调用时机是RM响应了AM发出的申请container请求,然后分配好了对应的container后,会将分配的container信息列表返回给AM,然后AM可以在这个回调函数中获取到对应的信息,也就是函数的参数 List<Container> containers.AM在获取到这个container信息列表后,可以对其进行后一步的处理,例如在分配的container中启动对应的应用程序.
然后 onContainersCompleted(List<ContainerStatus> statues) 的调用时机是 这个应用分配的container运行结束(包括正常运行结束,异常退出等情况)之后,RM会将运行结束的container列表返回给AM,由AM自己根据业务决定后续的处理,例如如果判断运行失败,则可以重新申请container然后启动重跑等操作.
最后,构建完成 监听器之后,我们就可以构建 AMRMClientAsync 了, 需要注意的是, AM会周期性的向RM发出心跳,其中例如AM发给RM的请求会包含在心跳中发送给RM,而RM返回给AM的信息则会包含在心跳响应中返回.
5.然后来看 AM<->NM的客户端,也就是 NMClientAsyncImpl
NMClientAsyncImpl的使用过程类似于AMRMCLientAsync,首先是构造一个回调函数,用来监听NM返回给AM的信息,然后执行启动.
6.在完成以上步骤之后,我们需要将本AM注册到RM上,注册成功后 会启动AM和RM之间的心跳线程.
到此为止,我们算是成功运行AM并将其注册到了RM之上.
7.现在我们要开始在AM之上将我们真正要在yarn上运行的应用程序,也就是HelloWorld, 真正的运行起来,本处作为一个演示用例,只申请一个container来运行HelloWorld.以后我们在真正使用YARN集群运行例如storm等集群的时候,一般会是申请多个container进行运行,这个我们会在以后讲解 如何 在YARN集群上运行 jstorm集群的时候会详细讲解.
8.首先,我们要对RM发出对于container的申请
如程序所示,我们先发出了对于container的申请,并且设置了这个container的优先级,内存大小和虚拟cpu的核数
这里需要注意的是,如上文所说,AM并不是直接调用RM的某个调用,而是会将这个请求缓存起来,等待下次AM发给RM发送请求的时候,会将这些内容加到心跳列表里面一起发送过去,等待RM分配完container发送给AM的时候也是会等待AM的心跳请求过来后,将相关内容加在心跳的返回值之内
9.申请完container之后,AM不要运行结束,应该 等待我们需要运行的应用程序成功结束之后再退出.
AM推出的时候,应该执行如下方法 进行清除
10.前面说到了AM对RM发出了container 的申请,然后RM给其分配了container 之后,会触发 AM的回调函数 onContainersAllocated
然后在 onContainersAllocated函数里面会执行如下逻辑
即对每个container启动一个线程,来运行启动container内应用进程的命令,这个例子里面我们只有一个container
11.后面我们来介绍如何在 LaunchContainerRunnable 里面来运行HelloWorld的进程
在LaunchContainerRunnable的里面,要把 Container 的环境 启动命令和本地资源的设置都加到ContainerLaunchContext 对象中然后把这个对象提交给Container所在的NodeManager.最后完成以上工作之后,NodeManager就启动这个Container
这里需要注意: 每个Container 本地文件系统的目录,由yarn-site.xml 文件中的属性 yarn.nodemanager.local-dirs 定义.
例如/var/data/yarn 在这个根目录下,NodeManager会为每个 Container 创建自己的子目录,在Container终止以后,这个子目录会被删除.不过可以通过 yarn-site.xml 文件中的属性 yarn.nodemanager.delete.debug-delay-sec 来设置保留多久之后进行删除.
12.首先是初始化container的上下文,并设置环境变量等信息
13.其次,要把我们要运行的HelloWorld.jar设置到LocalResource 中,以便于container启动的时候进行读取
14.最终,我们要设置启动contaienr的命令,并且进行启动
这个里面我们运行了两个命令,一个是给文件夹设置执行权限,其实这步是多余的,我们只是为了演示连续执行两个命令
需要注意的是
1.两个命令之间要用 && 进行隔离
2.运行java命令如果失败的话,要注意设置环境变量的PATH,保证其中有可以运行的java环境
以上,就是大概说命令如何编写yarn集群运行HelloWorld的相关事项,后面将会进入正题,即如何在yarn集群运行一个jstorm集群.