Storm系列(五)Nimbus启动过程

启动流程图

 

mk-assignments

功能:对当前集群中所有Topology进行新一轮的任务调度。

实现源码路径:
\apache-storm-0.9.4\storm-core\src\clj\backtype\storm\daemon\ nimbus.clj

方法原型:

defnk mk-assignments [nimbus :scratch-topology-id nil]

 

方法说明:

  1. 参数nimbus为nimbus-data对象,scratch-topology-id为提交的Topology id.
  2. 从nimbus中依次获取conf、storm-cluster-state。
  3. 获取当前所有活跃的topology的id集合。
  4. 根据活跃的topology id调用read-topology-details方法,获取TopologyDetails信息并返回<storm-id,TopologyDetails>集合。
  5. 根据<storm-id,TopologyDetails>集合创建topologies对象。
  6. 获取已分配资源的Topology的id集合。
  7. 根据已分配资源的Topology id获取每个Topology的任务分配情况assigments,并返回<storm-id,Assigment>集合existing-assignments,除了scratch-topology-id指定的Topology不会获取它的Assigments。
  8. 调用compute-new-topology->executor->node+port方法获为所有Topology计算新的调度,返回topology->executor->node+port.
  9. 调用basic-supervisor-details-map从Zookeeper中获取所有SupervisorInfo信息,返回<supervisor-id,supervisorDetails>集合。
  10. 对第8步返回的结果集中的每一项进行遍历构造新的Assignment对象集合new-assignments,Assigmnet定义如下:
    (defrecord Assignent [master-code-dir node->host executor->node+port executor->start-time-secs])
    master-code-dir:Nimbus在本地保存Topology信息路劲,主要包括stormjar.jar、stormcode.ser、stormconf.ser.
    node->host:该Topology分配的<supervisor-id,hostname>集合.
    executor->node+port:该Topology中executor的分配情况,node为supervisor-id,port为端口号。
    executor->start-time-secs:该Topology对用的supervisor的启动时间.
  11. 比较new-assignments与existing-assignments中的每一项是否有差异,如果没有就打印一条提示信息,如果有就将该Topology在Zookeeper中保存的调度结果更新为new-assignments。
  12. 计算new-assignment中的每一项新增加的slot并进行分配。(新增的solt通过new-assignment中的node+port减去existing-assignment中的node+port得到,返回为<topology-id,WorkerSlot>集合)
    WorkerSlot格式为{ nodeId port }

功能总结:
获取已分配资源的Topology的任务分配情况<storm-id,Assigment>集合(existing-assignments),获取活跃的Topology信息<storm-id,TopologyDetails>集合创建topologies对象。然后调用compute-new-topology->executor->node+port方法获为所有Topology计算新的调度,返回topology->executor->node+port再构造Assigmnet对象集。

compute-new-topology->executor->node+port

函数原型:

defn compute-new-topology->executor->node+port [nimbus existing-assignments topologies scratch-topology-id]

参数说明:
nimbus:nimbus-data对象。
existing-assignments:当前已经分配的的任务,格式<topology-id,Assignment>。
Topologies:当前活跃的Topology,格式<storm-id,TopologyDetails>.
scratch-topology-id:需要重新调度的topology-id.

  1. 调用compute-topology->executors方法根据existing-assignments中的topology-id获取<topology-id,executors>集合,与调用compute-executors方法效果作用一样。
  2. 调用update-all-hearbeats!更新上一步中executor中的心跳信息.
  3. 调用compute-topology->alive-executors获取<topology-id,alive-executors>集合,每个topology-id对应的活跃的executor.
  4. 调用compute-supervisor->dead-ports获取<supervisor-id,dead-ports>集合。
  5. 调用compute-topology->scheduler-assignment获取<topology-id,Scheduler-AssignmentImpl>集合.(topology-id对用的任务分配情况Scheduler-AssignmentImpl == <ExecutorDetails,WorkerSlot>).
  6. 根据参数topologies中的topology-id进行条件过滤,该topology中所有executor为空或者topology中的所有executor不等于Topology中活跃的executor或者该Topology的num-use-workers小于其指定的num-worker,过滤后的结果集群赋值给missing-assignmnet-topologies.
  7. 调用all-scheduling-slots方法获取<node-id,port>集合。
  8. 调用read-all-supervisor-details方法获取<supervisor-id,supervisorDetails>集合。
  9. 根据参数nimbus、第5步、第8步的结果集构造Cluster对象。
  10. 调用nimbus中的scheduler方法进行任务调度。
  11. 从Cluster对象中获取重新调度完之后的所有Assignments作为new-scheduler-assignment,格式为<topology-id,SchedulerAssignment>集合。
  12. 调用compute-topology->executor->node+port将第11步的结果集转换为<topology-id,{executor[node port]}>集合。
  13. 调用basic-supervisor-details-map将Zookeeper中记录的所有SupervisorInfo都转换为SupervisorDetails,返回<supervisor-id,SuperviosrDetails>集合.

流程图:

compute-executor

函数原型:

defn- compute-executors [nimbus storm-id]

函数实现说明:

  1. 获取storm-id(topology-id)对用的stormBase中component-executors形象(每个组件的并行度)。
  2. 获取storm-id对应的storm-conf配置。
  3. 获取storm-id对应Topology.
  4. 调用storm-task-info获取<task-id,component-id>集合,其中task-id对该Topology的所有组件是全局递增的。
  5. 将第4步的结果集转换为<component-id,tasks>并按照升序排序。
  6. 将第1步的结果集<component-id,parallelism>与第5步的结果集进行join得到<component-id,[parallelism,tasks]>集合.
  7. 对第6步的结果集中的每一项进行处理,将tasks集合均匀分布到数目为parallelism的分区上。

功能总结:

获取storm-id对应Topology所有组件的并行度(线程数),获取该Topology中各组件TOPOLOGY_TASK信息,最后的结果使每个线程中均匀分布多个task运行。

时间: 2024-08-30 06:55:52

Storm系列(五)Nimbus启动过程的相关文章

Storm系列(四)Topology校验过程

功能:提交一个新的Topology,并为Topology创建storm-id(topology-id),校验其结构,设置必要的元数据,最后为Topology分配任务. 实现源码: 1  (^void submitTopology 2          [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology] 3          (.submitTopo

Storm系列(三)Topology提交过程

提交示例代码: 1 public static void main(String[] args) throws Exception { 2 TopologyBuilder builder = new TopologyBuilder(); 3 builder.setSpout("random", new RandomWordSpout(), 2); 4 builder.setBolt("transfer", new TransferBolt(), 4).shuffle

MapReduce 编程 系列五 MapReduce 主要过程梳理

前面4篇文章介绍了如何编写一个简单的日志提取程序,读取HDFS share/logs目录下的所有csv日志文件,然后提取数据后,最终输出到share/output目录下. 本篇停留一下,梳理一下主要过程,然后提出新的改进目标. 首先声明一下,所有的代码都是maven工程的,没有使用任何IDE.  这是我一贯的编程风格,用Emacs + JDEE开发.需要使用IDE的只需要学习如何在IDE中使用maven即可. 可比较的序列化 第一个是序列化,这是各种编程技术中常用的.MapReduce的特别之处

内核启动过程

具体要求如下: Grub的源码分析:Grub如何支持多个系统和内核的启动选择(MultiBoot机制). 一.GRUB 简介(介绍主流的系统管理程序grub) 什么是GRUB: GNU GRUB 是一个多重操作系统启动管理器.GNU GRUB 是由GRUB(GRand Unified Bootloader) 派生而来.GRUB 最初由Erich Stefan Boleyn 设计和应用: "boot loader"是计算机启动后第一个运行的软件.它负责加载一个操作系统的内核,并把控制权交

Storm 系列(五)—— Storm 编程模型详解

一.简介 下图为 Strom 的运行流程图,在开发 Storm 流处理程序时,我们需要采用内置或自定义实现 spout(数据源) 和 bolt(处理单元),并通过 TopologyBuilder 将它们之间进行关联,形成 Topology. 二.IComponent接口 IComponent 接口定义了 Topology 中所有组件 (spout/bolt) 的公共方法,自定义的 spout 或 bolt 必须直接或间接实现这个接口. public interface IComponent ex

Android深入四大组件(五)Content Provider的启动过程

"-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> Android深入四大组件(五)Content Provider的启动过程 - 刘望舒的专栏 - 博客频道 - CSDN.NET 刘望舒的专栏 欲为大树,莫与草争 目录视图 摘要视图 订阅 [活动]2017 CSDN博客专栏评选 &nbsp [5月书讯]流畅的Pyth

Storm杂谈之Topology的启动过程(二)

在一中讲到了topology提交给nimbus nimbus Nimbus可以 说是storm中最核心的部分,它的主要功能有两个: 对Topology的任务进行分配资源 接收用户的命令并做相应的处理,如Topology的提交,杀死,激活等等 Nimbus本身是基于Thrift框架实现的,使用了Thrift的THsHaServer服务,即半同步半异步服务模式,使用一个单独的线程来处理网络IO,使用一个独立的线程池来处理消息,大大提高了消息的并发处理能力. 服务接口的定义都在storm.thrift

storm源码剖析(3):topology启动过程

storm的topology启动过程是执行strom jar topology1.jar MAINCLASS ARG1 ARG2 鉴于前面已经分析了脚本的解析过程,现在重点分析topology1.jar的执行. 以storm-starter中的ExclamationTopology为例,来进行剖析: public class ExclamationTopology { public static class ExclamationBolt extends BaseRichBolt { Outpu

hbase源码系列(六)HMaster启动过程

这一章是server端开始的第一章,有兴趣的朋友先去看一下hbase的架构图,我专门从网上弄下来的. 按照HMaster的run方法的注释,我们可以了解到它的启动过程会去做以下的动作. * <li>阻塞直到变成ActiveMaster * <li>结束初始化操作 * <li>循环 * <li>停止服务并执行清理操作* </ol> HMaster是没有单点问题是,因为它可以同时启动多个HMaster,然后通过zk的选举算法选出一个HMaster来.