从源码看Azkaban作业流下发过程——记我的第一次白盒测试

一、Azkaban简介  

Azkaban作为开源的调度系统,在大数据中有广泛地使用。它主要有三部分组成:Azkaban Webserver、Azkaban Executor、 DB。

                      图1 Azkaban架构

图1所示的是Azkaban的基本架构:Webserver主要负责权限验证、项目管理、作业流下发等工作;Executor主要负责作业流/作业的具体执行以及搜集执行日志等工作;MySQL用于存储作业/作业流的执行状态信息。图中所示的是单executor场景,但是实际应用中大部分的项目使用的都是多executor场景。下面主要介绍多executor场景下的azkaban调度过程。

二、作业流执行过程

图2 作业流执行过程

图2展示的就是Azkaban作业流的执行过程:

1. 首先Webserver根据内存中缓存的各Executor的资源状态(Webserver有一个线程会遍历各个active executor,去发送http请求获取其资源状态信息缓存到内存中),按照选择策略(包括executor资源状态、最近执行流个数等)选择一个executor下发作业流;

2. 然后executor判断是否设置作业粒度分配,如果未设置作业粒度分配,则在当前executor执行所有作业;

3. 如果设置了作业粒度分配,则当前节点会成为作业分配的决策者,即分配节点;

4. 分配节点从zookeeper获取各个executor的资源状态信息,然后根据策略选择一个executor分配作业;

5. 被分配到作业的executor即成为执行节点,执行作业,然后更新数据库。

三、从源码看作业流执行过程

首先是Webserver端:

1. ExecutorServlet类根据请求的ajax参数判断,如果ajax=executeFlow,就去调ajaxAttemptExecuteFlow(req, resp, ret, session.getUser())方法

2. ajaxAttemptExecuteFlow方法里,首先调getProjectAjaxByPermission方法判断用户是否有执行权限,如果验证权限通过,且Project和Flow都存在,就调ajaxExecuteFlow方法

3. ajaxExecuteFlow方法的主要作用就是构造ExecutableFlow对象,设定执行参数(通知机制,并发,失败策略),然后去调executorManager.submitExecutableFlow方法

4. executorManager.submitExecutableFlow方法:判断执行策略(流水线、忽略、并发);如果是多执行节点模式,则将作业流提交到执行队列queue;如果是单执行节点模式,选择唯一执行节点下发作业流。

5. ExecutorManager.submitExecutableFlow()方法是Webserver端下发作业流的主要实现逻辑,下面重点细述其内容:

5.1 从exflow实例获取作业流的flowId(就是作业流的名字),打日志(“开始提交流XXX by 某某某了”)。

5.2 判断queuedFlows是否满,如果满了打日志(“提交失败,Azkaban过饱和啦”),return;如果未满,继续往下执行代码

5.3 获取该作业流所有正在跑的实例的id, List<Integer> running
    5.4 获取执行设置options
    5.5 从执行设置options里获取流的执行参数(是否enable,是则将参数生效)
    5.6 判断running是否为空,如果为空,即没有并发的实例在跑
    5.7 如果running不为空,获取并发设置getConcurrentOption()
         5.7.1 流水线(pipeline):设置pipelineExcutionId为running中最后提交的实例id
         5.7.2 忽略(skip):抛异常,“流已经在执行了,忽略本次执行”
         5.7.3 并发:仅修改日志
    5.8 根据白名单设置是否memoryCheck
    5.9 executorLoader.uploadExecutableFlow(exflow) 写数据库表execution_flows,状态为preparing

5.10 构造具体的执行实例ExecutionReference
    5.11 判断是否多执行节点模式,如果不是,将该执行流的状态标记为active,即写数据库表active_executing_flows,将流dispatch到唯一执行节点执行。
    5.12 如果是多执行节点模式,则将该执行流的状态标记为active,然后将流放入执行队列queuedFlows。

6. 如果是多执行节点模式,ExecutorManager类在构造函数里会调setupMultiExecutorMode()方法,该方法会建一个线程通过processQueuedFlows方法去持续地消费队列里的首个作业流。processQueuedFlows方法的主要内容就是按照一定规则去refreshExecutors刷新执行节点的资源信息,以及selectExecutorAndDispatchFlow从activeExecutors中根据策略选择一个executor下发作业流。refreshExecutors()方法实际上是通过遍历每个active executor,去发请求获取状态信息,而不是通过zookeeper。

至此,Webserver端的工作已经完毕。

然后是Executor端:

1. 执行流到达Executor端,此时在数据库中的状态已经是preparing

2. ExecutorServlet类根据请求的action参数判断,如果action=execute,就去调handleAjaxExecute(req, respMap, execid)方法

3. handleAjaxExecute方法里执行flowRunnerManager.submitFlow(execId),去调FlowRunnerManager的submitFlow(execId)方法来提交执行流。

4. FlowRunnerManager的两个重要的数据结构:

4.1 Map<Future<?>, Integer> submittedFlows = new ConcurrentHashMap<Future<?>, Integer>();

4.2 Map<Integer, FlowRunner> runningFlows = new ConcurrentHashMap<Integer, FlowRunner>();

submittedFlows用于跟踪当前executor所有处于preparing状态的流的执行;runningFlows用于存数当前executor所有正在执行的流的信息,当需要执行cancling()或killing()的时候就可以找到这些流。

5. FlowRunnerManager.submitFlow(execId)方法是Executor端执行作业流的主要实现逻辑,下面重点细述其内容:

5.1 先判断runningFlows是否包含该execId对应的实例,如果已经包含,抛异常

5.2 从executorLoader去获取execId对应的执行实例(ExecutableFlow)flow

5.3 执行setupFlow(flow),配置flow:创建项目和执行的目录等
    5.4 获取执行设置ExecutionOptions
    5.5 判断pipelineExecId是否为null。如果不为null,就判断pipelineExecId对应的flowRunner在不在runningFlows中。如果在runningFlows中,起一个LocalFlowWatcher去监控在flow中各个job的执行状态;如果不在         5.6 runningFlows中,起一个RemoteFlowWatcher去监控,即每隔一定时间(默认为60秒)通过读取数据库的记录来监控流中各个job的状态
    5.7 判断执行参数里是否包含flow.num.job.threads,如果存在且小于默认值10,则修改该值。这个值代表该流可以同时执行的job线程数。
    5.8 构造一个新的FlowRunner实例runner
    5.9 configureFlowLevelMetrics(runner)配置runner 
    5.10 再次判断runningFlows是否包含该次execId对应的执行实例,如果包含,抛异常
    5.11 将runner加入到runningFlows的map
    5.12 提交到TrackingThreadPool(工作线程池)
    5.13 加入到submittedFlows的map

6. 自此,我们就有了FlowRunner实例,下面我们看FlowRunner中都干了些什么事。

FlowRunner其实就是一个线程,它的run()方法的内容如下:

6.1 Executors.newFixedThreadPool(numJobThreads) 创建flow内部job线程池flow
    6.2 setupFlowExecution()
    6.3 updateFlowReference()
    6.4 updateFlow() 更新flow的状态信息,写数据库表execution_flows
    6.5 loadAllProperties()载入job参数和共享的参数
    6.6 判断输入参数是否包含job.dispatch(作业粒度分配),如果包含且为true,起一个新的线程jobEventUpdaterThread,用于跟踪该作业流下各个作业的执行状态。
    6.7 执行runFlow()
    6.8 runFlow()方法:根据DAG图的算法依次执行job。从流的开始节点,递归调用runReadyjob()来执行作业,然后updateFlow();如果流还没结束,根据重试设置,决定是否重跑失败的作业。
    6.9 在runReadyjob()里会调runExecutableNode(node)方法,runExecutableNode方法再判断job.dispatch参数,如果为false,则通过LocalJobRunner本地执行;如果为true,则再通过JobRunnerManager提交作业。
    6.10 JobRunnerManager通过submitExecutableNode方法构建RemoteJobRunner,RemoteJobRunner会根据各执行节点(包含本节点)的资源状态去选择一个节点执行作业。

最后,整个过程可以总结成一个图,如下图所示:

图3 从源码看作业流执行过程

结语:

初次接手后台测试,发现后台测试与前端测试可以说是完全不同类型的测试,它需要测试人员对开发代码有一定的熟悉程度。如果你不清楚其中的执行过程,你无法设计全面的测试用例,无法准确预估到整个项目上线的风险程度。就拿异常测试来说,我们一般执行异常测试主要是覆盖网络异常、进程异常(假死、假死恢复)、服务器异常(宕机或强杀进程)等。然而即使你设计了这些用例,如果不去结合整个业务的过程,那也是不全面的。比如进程的假死恢复而言,当executor假死时,webserver需要一定的周期才能检测到executor挂掉,在周期内恢复跟在周期外恢复,结果就是不一样的。还有一些用例,可能实际操作时有些过程执行时间非常短,很难去复现,这时候就必须通过远程调试在代码中打断点,来保障测试用例能够覆盖到。此外,在项目任务重、时间紧的情况下,测试人员无法做到全面覆盖的前提下,必须准确地把握测试重点,制定有效的测试策略来保障项目的可靠上线。

时间: 2024-10-19 14:54:21

从源码看Azkaban作业流下发过程——记我的第一次白盒测试的相关文章

从Chrome源码看浏览器的事件机制

.aligncenter { clear: both; display: block; margin-left: auto; margin-right: auto } .crayon-line span::after { content: " " } p { font-size: 15px; text-indent: 2em } #colorbox.crayon-colorbox,#cboxOverlay.crayon-colorbox,.crayon-colorbox #cboxWr

【从源码看Android】02MessageQueue的epoll原型

1 开头 上一讲讲到Looper,大家对Looper有了大概的了结(好几个月过去了-) 大家都知道一个Handler对应有一个MessageQueue, 在哪个线程上new Handler(如果不指定looper对象),那么这个handler就默认对应于这个线程上的prepare过的Looper 如下图Handler.java代码所示,mLooper由Looper.myLooper()指定, public Handler(Callback callback, boolean async) { i

nginx源码分析--从源码看nginx框架总结

nginx源码总结: 1)代码中没有特别绕特别别扭的编码实现,从变量的定义调用函数的实现封装,都非常恰当,比如从函数命名或者变量命名就可以看出来定义的大体意义,函数的基本功能,再好的架构实现在编码习惯差的人实现也会黯然失色,如果透彻理解代码的实现,领悟架构的设计初衷,觉得每块代码就想经过耐心雕琢一样,不仅仅实现了基本的功能给你,为其他人阅读也会提供很好的支持.细致恰当的命名规则就可以看出作者的功力. 2)更好更高的软件性能体现在架构设计上,好的架构会让软件更加稳定.容易维护.便于扩展.从核心模块

解密随机数生成器(二)——从java源码看线性同余算法

Random Java中的Random类生成的是伪随机数,使用的是48-bit的种子,然后调用一个linear congruential formula线性同余方程(Donald Knuth的编程艺术的3.2.1节) 如果两个Random实例使用相同的种子,并且调用同样的函数,那么生成的sequence是相同的 也可以调用Math.random()生成随机数 Random实例是线程安全的,但是并发使用Random实例会影响效率,可以考虑使用ThreadLocalRandom变量. Random实

【从源码看Android】03Android MessageQueue消息循环处理机制(epoll实现)

1 enqueueMessage handler发送一条消息 mHandler.sendEmptyMessage(1); 经过层层调用,进入到sendMessageAtTime函数块,最后调用到enqueueMessage Handler.java public boolean sendMessageAtTime(Message msg, long uptimeMillis) { MessageQueue queue = mQueue; if (queue == null) { RuntimeE

从linux源码看epoll

从linux源码看epoll前言在linux的高性能网络编程中,绕不开的就是epoll.和select.poll等系统调用相比,epoll在需要监视大量文件描述符并且其中只有少数活跃的时候,表现出无可比拟的优势.epoll能让内核记住所关注的描述符,并在对应的描述符事件就绪的时候,在epoll的就绪链表中添加这些就绪元素,并唤醒对应的epoll等待进程.本文就是笔者在探究epoll源码过程中,对kernel将就绪描述符添加到epoll并唤醒对应进程的一次源码分析(基于linux-2.6.32内核

从源码看Android中sqlite是怎么读DB的

执行query 执行SQLiteDatabase类中query系列函数时,只会构造查询信息,不会执行查询. (query的源码追踪路径) 执行move(里面的fillwindow是真正打开文件句柄并分配内存的地方) 当执行Cursor的move系列函数时,第一次执行,会为查询结果集创建一块共享内存,即cursorwindow moveToPosition源码路径 fillWindow----真正耗时的地方 然后会执行sql语句,向共享内存中填入数据, fillWindow源码路径 在SQLite

从源码看Android中sqlite是怎么读DB的(转)

执行query 执行SQLiteDatabase类中query系列函数时,只会构造查询信息,不会执行查询. (query的源码追踪路径) 执行move(里面的fillwindow是真正打开文件句柄并分配内存的地方) 当执行Cursor的move系列函数时,第一次执行,会为查询结果集创建一块共享内存,即cursorwindow moveToPosition源码路径 fillWindow----真正耗时的地方 然后会执行sql语句,向共享内存中填入数据, fillWindow源码路径 在SQLite

从源码看Java集合之ArrayList

Java集合之ArrayList - 吃透增删查改 从源码看初始化以及增删查改,学习ArrayList. 先来看下ArrayList定义的几个属性: private static final int DEFAULT_CAPACITY = 10; private static final Object[] EMPTY_ELEMENTDATA = {}; private static final Object[] DEFAULTCAPACITY_EMPTY_ELEMENTDATA = {}; tra