JStorm与Storm源码分析(三)--Scheduler,调度器

Scheduler作为Storm的调度器,负责为Topology分配可用资源。 
Storm提供了IScheduler接口,用户可以通过实现该接口来自定义Scheduler。 
其定义如下:

public interface IScheduler {
    //接收当前Nimbus的Storm配置作为参数,进行一些初始化工作
    void prepare(Map conf);

    /**
     * 真正进行任务分配的方法,在Nimbus进行任务分配的时候会调用该方法.
     * 参数为topologies、cluster:前者含有当前集群中所有Topology的静态信息,
     * cluster包含了Topology的运行态信息,比如用户自定义调度逻辑时所需要的所有资源、
     * Supervisor信息、当前可用的所有slot
     * 以及任物分配情况等,根据topologies和cluster信息,就可以进行调度分配任务了
     */
    void schedule(Topologies topologies, Cluster cluster);
}

真正选择哪个调度器来对Topology进行分配的方法是mk-assignments。 
mk-assignments方法定义与解释如下:

;;参数:stormConf和接口INimbus的实现类实例
(defn mk-scheduler [conf inimbus]
    ;;调用inimbus中的getForcedScheduler方法,并将返回值赋给临时变量forced-scheduler
  (let [forced-scheduler (.getForcedScheduler inimbus)
        scheduler (cond
            ;;若调用的getForcedScheduler方法,返回的是非null的IScheduler,则返回该IScheduler实例
            forced-scheduler
            (do (log-message "Using forced scheduler from INimbus " (class forced-scheduler))
                forced-scheduler)
            ;;如果用户实现了自定义的IScheduler,并且在storm.yaml中有配置,
            ;;则返回用户自定义的IScheduler.
            (conf STORM-SCHEDULER)
            (do (log-message "Using custom scheduler: " (conf STORM-SCHEDULER))
                (-> (conf STORM-SCHEDULER) new-instance))
            ;;如果上述都不满足则返回默认的DefaultScheduler
            :else
            (do (log-message "Using default scheduler")
                (DefaultScheduler.)))]
    (.prepare scheduler conf)
    scheduler
    ))

从上述代码可以看出,如果调用inimbus中的getForcedScheduler方法,且返回的是非null的IScheduler,则返回该IScheduler实例;如果用户实现了自定义的IScheduler,并且在storm.yaml中有配置,则返回用户自定义的IScheduler;如果两者都没有实现,则采用默认调度器DefaultScheduler进行任务的分配。现在我们只关心DefaultScheduler。 
DefaultScheduler的定义与解释如下:

;;DefaultScheduler是Storm默认的调度器,如果用户没有指定自己实现的调度器,
;;Storm就会使用该调度器进行Topology的任务分配。
;;DefaultScheduler实现了IScheduler接口
(ns backtype.storm.scheduler.DefaultScheduler
  (:use [backtype.storm util config])
  (:require [backtype.storm.scheduler.EvenScheduler :as EvenScheduler])
  (:import [backtype.storm.scheduler IScheduler Topologies
            Cluster TopologyDetails WorkerSlot SchedulerAssignment
            EvenScheduler ExecutorDetails])
  (:gen-class
    :implements [backtype.storm.scheduler.IScheduler]))
;;default-schedule方法主要是计算当前集群中所有可供分配的slot资源,
;;并判断当前已经分配给该Topology的slot资源是否需要重新分配,
;;利用这些信息,对新提交的Topology进行资源分配
(defn default-schedule [^Topologies topologies ^Cluster cluster]
    ;;调用cluster的needsSchedulingTopologies方法获取所需要进行任务调度的Topology集合
    ;;needsSchedulingTopologies方法定义如fn1所示.
  (let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
    ;;这部分代码块的作用是对每一个需要进行任务调度的Topology进行处理
    (doseq [^TopologyDetails topology needs-scheduling-topologies
        ;;通过调用getId获取topology-id
        :let [topology-id (.getId topology)
            ;;调用cluster的getAvailableSlots方法获取当前集群中所有可用的slot资源,
            ;;并将其转换为<node,port>集合赋给available-slots变量.
            ;;getAvailableSlots方法定义如下fn2所示
          available-slots (->> (.getAvailableSlots cluster)
                               (map #(vector (.getNodeId %) (.getPort %))))
           ;;调用getExecutors获取Topology的所有Executor信息,
           ;;并将其转换为<start-task-id,end-task-id>集合
          all-executors (->> topology
                             .getExecutors
                             (map #(vector (.getStartTask %) (.getEndTask %)))
                             set)
           ;;调用EvenScheduler的get-alive-assigned-node+port->executors方法
           ;;计算当前Topology已经分配的任务信息,以<[node,port],executors>信息保存到alive-assigned变量中
          alive-assigned (EvenScheduler/get-alive-assigned-node+port->executors cluster topology-id)
          ;;
          alive-executors (->> alive-assigned vals (apply concat) set)
          ;;调用slots-can-reassign方法对alive-assigned的slot信息进行判断,
          ;;选出其中可被重新分配的slot集合并保存到can-reassign-slots.
          ;;slots-can-reassign方法定义如fn3所示:
          can-reassign-slots (slots-can-reassign cluster (keys alive-assigned))
          ;;计算当前Topology所能使用的全部slot数目,它取以下两个量中较小的值作为total-slots-to-use
          total-slots-to-use (min (.getNumWorkers topology)
                                  (+ (count can-reassign-slots) (count available-slots)))
          ;;用于判断如果total-slots-to-use的数目大于当前已经分配的slot数目,
          ;;或者正在运行的executors数目不等于所有的executors数
          ;;则调用bad-slots方法计算所有可被释放的slot.
          ;;bad-slots方法的具体定义如fn4所示.
          bad-slots (if (or (> total-slots-to-use (count alive-assigned))
                            (not= alive-executors all-executors))
                        (bad-slots alive-assigned (count all-executors) total-slots-to-use)
                        [])]]
      ;;调用cluster的freeSlots方法释放前面计算出来的bad-slots
      (.freeSlots cluster bad-slots)
      ;;调用EvenScheduler的schedule-topologies-evenly方法将系统中的资源均匀分配给Topology
      (EvenScheduler/schedule-topologies-evenly (Topologies. {topology-id topology}) cluster))))

fn1:

/**
 * 获取所有需要调度的topology,并以集合的形式返回
 */
public List<TopologyDetails> needsSchedulingTopologies(Topologies topologies) {
    List<TopologyDetails> ret = new ArrayList<TopologyDetails>();
    for (TopologyDetails topology : topologies.getTopologies()) {
        if (needsScheduling(topology)) {
            ret.add(topology);
        }
    }
    return ret;
}

fn2:

//根据supervisor信息获取所有可用的slot资源,并封装在WorkerSlot中,以集合的形式返回
public List<WorkerSlot> getAvailableSlots(SupervisorDetails supervisor) {
    Set<Integer> ports = this.getAvailablePorts(supervisor);
    List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size());
    for (Integer port : ports) {
        slots.add(new WorkerSlot(supervisor.getId(), port));
    }
    return slots;
}

fn3:

;;该方法将对传入的slots资源进行过滤,选出其中仍然可以继续使用的slot,组成新的集合
;;过滤方法:先判断slot的node信息是否存在于集群的黑名单里,
;;如果不在则继续判断slot的port信息是否在于node相对应的Supervisor的所有可用端口列表中
;;如果在,则表示该slot可以继续使用
(defn slots-can-reassign [^Cluster cluster slots]
  (->> slots
      (filter
        (fn [[node port]]
          (if-not (.isBlackListed cluster node)
            (if-let [supervisor (.getSupervisorById cluster node)]
              (.contains (.getAllPorts supervisor) (int port))
              ))))))

fn4:

;;该方法用于计算一个Topology已经分配的资源中哪些是不再需要的
;;existing-slots:已经分配出去的资源(分配给Topology),它是一个<[node,port],executors>集合
;;num-executors:Topology的所有Executor(包括已分配和未分配的)
;;num-workers:Topology可使用的全部slot数目
(defn- bad-slots [existing-slots num-executors num-workers]
    ;;判断num-workers是否为0。如果是,意味着当前没有可供该Topology使用的slot,这时返回一个空集合
  (if (= 0 num-workers)
    ‘()
    ;;定义distribution集合和keepers集合,distribution集合通过调用integer-divided方法生成
    ;;实际所做的事是将num-executors均匀地分配到num-workers中.
    ;;keepers集合为一个空集合
    (let [distribution (atom (integer-divided num-executors num-workers))
          keepers (atom {})]
       ;;对于传入的existing=slots中的每一项,计算其对象的executor-count,
       ;;然后以该executor-count作为键从前面计算的distribution集合中获取值.如果获取的值大于0,
       ;;则意味着存在至少一个Worker上有executor-count个Executor的分配,并且,这个分配信息便继续维持,不更新。
       ;;这时,会将<[node,port],executors>信息放入keepers中,同时将distribution中该executor-count的对应值减一.
      (doseq [[node+port executor-list] existing-slots :let [executor-count (count executor-list)]]
        (when (pos? (get @distribution executor-count 0))
          (swap! keepers assoc node+port executor-list)
          (swap! distribution update-in [executor-count] dec)
          ))
       ;;从existing-slots中移除keepers中记录的需要继续维持的分配情况.如果移除完之后还存在slot信息,
       ;;表明这些slot可以被释放掉,将其转换为WorkerSlot对象集合并返回.
      (->> @keepers
           keys
           (apply dissoc existing-slots)
           keys
           (map (fn [[node port]]
                  (WorkerSlot. node port)))))))

注:学习李明等老师Storm源码分析和陈敏敏等老师Storm技术内幕与大数据实践的笔记整理。 
欢迎关注下面二维码进行技术交流:

时间: 2024-08-26 06:22:19

JStorm与Storm源码分析(三)--Scheduler,调度器的相关文章

JStorm与Storm源码分析(四)--均衡调度器,EvenScheduler

EvenScheduler同DefaultScheduler一样,同样实现了IScheduler接口, 由下面代码可以看出: (ns backtype.storm.scheduler.EvenScheduler (:use [backtype.storm util log config]) (:require [clojure.set :as set]) (:import [backtype.storm.scheduler IScheduler Topologies Cluster Topolo

JStorm与Storm源码分析(七)--BasicBoltExecutor与装饰模式

在Storm中IBasicBolt的主要作用是为用户提供一种更为简单的Bolt编写方式,更为简单体现在Storm框架本身帮你处理了所发出消息的Ack.Fail和Anchor操作,而这部分操作是由执行器BasicBoltExecutor 实现的. 下面我们看一下BasicBoltExecutor的源码: /** * BasicBoltExecutor实现了IRichBolt接口 * 在该类中持有一个IBasicBolt成员变量用于调用转发 * 说明: * 该类是基于装饰模式实现的. */ publ

JStorm与Storm源码分析(五)--SpoutOutputCollector与代理模式

本文主要是解析SpoutOutputCollector源码,顺便分析该类中所涉及的设计模式–代理模式. 首先介绍一下Spout输出收集器接口–ISpoutOutputCollector,该接口主要声明了以下3个抽象方法用来约束ISpoutOutputCollector的实现类.接口定义与方法说明如下: /** * ISpoutOutputCollector:Spout输出收集器接口 */ public interface ISpoutOutputCollector { /** * 改方法用来向外

Duilib源码分析(三)XML解析器—CMarkup

上一节介绍了控件构造器CDialogBuilder,接下来将分析其XML解析器CMarkup: CMarkup:xml解析器,目前内置支持三种编码格式:UTF8.UNICODE.ASNI:CMarkupNode:xml节点类 先介绍CMarkup: XMLELEMENT: xml节点元素类型定义,iStart,节点元素在xml文件中的起始位置:iChild,节点元素子节点:iNext,节点元素的下一个节点(兄弟节点):iParent,节点元素的父节点:iData, 节点元素的数据. CMarku

Storm源码分析--Nimbus-data

nimbus-datastorm-core/backtype/storm/nimbus.clj (defn nimbus-data [conf inimbus] (let [forced-scheduler (.getForcedScheduler inimbus)] {:conf conf :inimbus inimbus ; INimbus实现类, standalone-nimbus的返回值 :submitted-count (atom 0) ; 已经提交的计算拓扑的数量, 初始值为原子值0

storm源码分析之任务分配--task assignment

在"storm源码分析之topology提交过程"一文最后,submitTopologyWithOpts函数调用了mk-assignments函数.该函数的主要功能就是进行topology的任务分配(task assignment).mk-assignments函数定义如下: ;; get existing assignment (just the executor->node+port map) -> default to {};; filter out ones whi

Cocos2d-x 源码分析 : Scheduler(定时器) 源码分析

源码版本 3.1r,转载请注明 我也终于不out了,开始看3.x的源码了,此时此刻的心情只能是wtf!!!!!!!!!!不过也终于告别CC时代了. cocos2d-x 源码分析目录 http://blog.csdn.net/u011225840/article/details/31743129 1.继承结构 没错,是两张图.(你没有老眼昏花..我脑子也没有秀逗..)Ref就是原来的CCObject,而Timer类是与Scheduler类密切相关的类,所以需要把他们放在一起说.Timer和Sche

Nouveau源码分析(三):NVIDIA设备初始化之nouveau_drm_probe

Nouveau源码分析(三) 向DRM注册了Nouveau驱动之后,内核中的PCI模块就会扫描所有没有对应驱动的设备,然后和nouveau_drm_pci_table对照. 对于匹配的设备,PCI模块就调用对应的probe函数,也就是nouveau_drm_probe. // /drivers/gpu/drm/nouveau/nouveau_drm.c 281 static int nouveau_drm_probe(struct pci_dev *pdev, 282 const struct

[Android]Fragment源码分析(三) 事务

Fragment管理中,不得不谈到的就是它的事务管理,它的事务管理写的非常的出彩.我们先引入一个简单常用的Fragment事务管理代码片段: FragmentTransaction ft = this.getSupportFragmentManager().beginTransaction(); ft.add(R.id.fragmentContainer, fragment, "tag"); ft.addToBackStack("<span style="fo