Azkaban源码学习笔记(一)

1. ConnectorParams (interface): 定义了各种常量参数,没有声明任何方法。

2. ExecutorServlet.java类

2.1 继承类HttpServlet和接口ConnectorParams,用于处理Http请求,主要是Get请求,处理方式都写在doGet方法中。

2.2 init()方法:创建AzkabanExecutorServer实例,通过该executor server实例获取flowRunnerManager,以及jobRunnerManager。

2.3 doGet(HttpServletRequest req, HttpServletResponse resp)方法:处理具体的请求,并返回resp。 针对不同的action,进行不同的处理。

action可以分为两类:

第一类不需要获取execid和user,有三个action,分别是:update,ping,reloadJobTypePlugins;

第二类action,会先获取execid和user,包含:metadata,metadata_jobRunnerMgr,log,attachments,execute,status,cancel,pause,resume,modifyExecution,job_execute,job_cancel

action=execute时,ExecutorServlet类调handleAjaxExecute()方法去调flowRunnerManager.submitFlow(execId)来执行该工作流。

3. FlowRunnerManager.java类

private Map<Integer, FlowRunner> runningFlows = new ConcurrentHashMap<Integer, FlowRunner>();   记录当前正在执行的Flows,key是execId

private Map<Integer, ExecutableFlow> recentlyFinishedFlows = new ConcurrentHashMap<Integer, ExecutableFlow>();

void submitFlow(int execId) 方法:

先判断runningFlows是不是已经包含该execId对应的作业流,获取execId对应的executableFlow实例flow,然后执行setupFlow(flow)配置flow(创建项目和执行的目录等)。

4. EventListener接口,声明了一个方法:void handleEvent(Event event)

5. EventHandler.java类:包含一个HashSet<EventListener>,包含方法addListener,fireEventListener(该方法调用每个listener.handleEvent()),removeListener。

6. ExecutableFlow.java: 包含可执行流的相关信息和设置信息的方法。

7.pipelineExecId:pipeline就是并发策略里的流水线,而pipelineExecId指该次execId对应的flow所有正在执行的其他实例中最后次提交的实例的execId

8. ExecutorManager.java类:

8.1 public String submitExecutableFlow(ExecutableFlow exflow, String userId)方法:将作业流提交到执行队列

方法过程:

根据exflow获取flowId -》判断queuedFlows是否满-》

queuedFlows满了-》提交失败,打出log提示error

queuedFlows未满-》获取该流正在跑的runningflows,获取流执行设置(ExecutionOptions)-》获取流的执行参数(是否enable,如果enable则将参数生效)-》判断runningflows是否为空,如果不为空-》获取并发设置

并发设置:流水线(pipeline)-》设置流水线执行Id(PipelineExecutionId)为正在执行的最后次提交的执行流id,获取pipeline level

并发设置:忽略本次执行(skip)-》抛出异常ExecutorManagerException,给出提示该流已经有实例已经在执行,本次执行被skip了

并发设置:并行执行-》仅修改message提示

-》白名单设置?options.setMemoryCheck(memoryCheck);-》判断是否多节点模式(isMultiExecutorMode)

多节点模式:是-》将该flow记录为正在执行的flow(executorLoader.addActiveExecutableReference(reference);),将作业流放入队列queuedFlows.enqueue(exflow, reference);

多节点模式:否-》将该flow记录为正在执行的flow,选择本地executor,下发作业流dispatch(reference, exflow, choosenExecutor);

9. ExecutableFlow.java类:一个可执行流的相关信息。

10.ExecutionReference.java类:存储execId,executor,updateTime,nextCheckTime,numErrors信息;一个具体的执行实例

11. FlowWatcher.java类:检测某个execId的各个作业的执行状态。

private int execId;

private ExecutableFlow flow;

private Map<String, BlockingStatus> map = new ConcurrentHashMap<String, BlockingStatus>();   该map用于存储各个job的执行状态,key为jobId,value为job的状态

12. BlockingStatus.java类:管理特定作业的状态,以同步的方式改变作业的状态。当状态处于block状态时,线程会处于等待状态,等待其他线程的通知(notify),最多等待时长为5分钟。

private static final long WAIT_TIME = 5 * 60 * 1000;

private final int execId;

private final String jobId;

private Status status;

时间: 2024-11-06 13:03:04

Azkaban源码学习笔记(一)的相关文章

Hadoop源码学习笔记(1) ——第二季开始——找到Main函数及读一读Configure类

Hadoop源码学习笔记(1) ——找到Main函数及读一读Configure类 前面在第一季中,我们简单地研究了下Hadoop是什么,怎么用.在这开源的大牛作品的诱惑下,接下来我们要研究一下它是如何实现的. 提前申明,本人是一直搞.net的,对java略为生疏,所以在学习该作品时,会时不时插入对java的学习,到时也会摆一些上来,包括一下设计模式之类的.欢迎高手指正. 整个学习过程,我们主要通过eclipse来学习,之前已经讲过如何在eclipse中搭建调试环境,这里就不多述了. 在之前源码初

Hadoop源码学习笔记(4) ——Socket到RPC调用

Hadoop源码学习笔记(4) ——Socket到RPC调用 Hadoop是一个分布式程序,分布在多台机器上运行,事必会涉及到网络编程.那这里如何让网络编程变得简单.透明的呢? 网络编程中,首先我们要学的就是Socket编程,这是网络编程中最底层的程序接口,分为服务器端和客户端,服务器负责监听某个端口,客户端负责连接服务器上的某个端口,一旦连接通过后,服务器和客户端就可以双向通讯了,我们看下示例代码: ServerSocket server = new ServerSocket(8111); S

Spring源码学习笔记(6)

Spring源码学习笔记(六) 前言-- 最近花了些时间看了<Spring源码深度解析>这本书,算是入门了Spring的源码吧.打算写下系列文章,回忆一下书的内容,总结代码的运行流程.推荐那些和我一样没接触过SSH框架源码又想学习的,阅读郝佳编著的<Spring源码深度解析>这本书,会是个很好的入门. 上一篇中我们梳理到 Spring 加载 XML 配置文件, 完成 XML 的解析工作,接下来我们将进入 Spring 加载 bean 的逻辑. 我们使用 Spring 获取 XML

Spring源码学习笔记(3)

Spring源码学习笔记(三) 前言----     最近花了些时间看了<Spring源码深度解析>这本书,算是入门了Spring的源码吧.打算写下系列文章,回忆一下书的内容,总结代码的运行流程.推荐那些和我一样没接触过SSH框架源码又想学习的,阅读郝佳编著的<Spring源码深度解析>这本书,会是个很好的入门. DispatcherServlet 实现核心功能 和普通的 Servelt 类一样, DispatcherServlet 中的 doGet() 和 doPost() 方法

Hadoop源码学习笔记(3) ——初览DataNode及学习线程

Hadoop源码学习笔记(3) ——初览DataNode及学习线程 进入了main函数,我们走出了第一步,接下来看看再怎么走: public class DataNode extends Configured implements InterDatanodeProtocol,       ClientDatanodeProtocol, FSConstants, Runnable {      public static DataNode createDataNode(String args[],

菜鸟的jQuery源码学习笔记(二)

jQuery对象是使用构造函数和原型模式相结合的方式创建的.现在来看看jQuery的原型对象jQuery.prototype: 1 jQuery.fn = jQuery.prototype = { 2 //成员变量和方法 3 } 这里给原型对象起了一个别名叫做jQuery.fn.要注意的是这个jQuery.fn可不是jQuery对象的属性,而是jQuery构造方法本身的属性,它是不会传给它所创建的对象的.如果你在控制台敲$().fn的话输出的结果会是undefined.接下来看看原型对象里面有些

Java多线程之JUC包:ReentrantReadWriteLock源码学习笔记

若有不正之处请多多谅解,并欢迎批评指正. 请尊重作者劳动成果,转载请标明原文链接: http://www.cnblogs.com/go2sea/p/5634701.html ReentrantLock提供了标准的互斥操作,但在应用中,我们对一个资源的访问有两种方式:读和写,读操作一般不会影响数据的一致性问题.但如果我们使用ReentrantLock,则在需要在读操作的时候也独占锁,这会导致并发效率大大降低.JUC包提供了读写锁ReentrantReadWriteLock,使得读写锁分离,在上述情

Java多线程之JUC包:Semaphore源码学习笔记

若有不正之处请多多谅解,并欢迎批评指正. 请尊重作者劳动成果,转载请标明原文链接: http://www.cnblogs.com/go2sea/p/5625536.html Semaphore是JUC包提供的一个共享锁,一般称之为信号量. Semaphore通过自定义的同步器维护了一个或多个共享资源,线程通过调用acquire获取共享资源,通过调用release释放. 源代码: /* * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to lic

Java集合源码学习笔记(二)ArrayList分析

Java集合源码学习笔记(二)ArrayList分析 >>关于ArrayList ArrayList直接继承AbstractList,实现了List. RandomAccess.Cloneable.Serializable接口,为什么叫"ArrayList",因为ArrayList内部是用一个数组存储元素值,相当于一个可变大小的数组,也就是动态数组. (1)继承和实现继承了AbstractList,实现了List:ArrayList是一个数组队列,提供了相关的添加.删除.修