JStorm与Storm源码分析(四)--均衡调度器,EvenScheduler

EvenScheduler同DefaultScheduler一样,同样实现了IScheduler接口, 
由下面代码可以看出:

(ns backtype.storm.scheduler.EvenScheduler
  (:use [backtype.storm util log config])
  (:require [clojure.set :as set])
  (:import [backtype.storm.scheduler IScheduler Topologies
            Cluster TopologyDetails WorkerSlot ExecutorDetails])
  (:gen-class
    :implements [backtype.storm.scheduler.IScheduler]))

EvenScheduler是一个对资源进行均匀分配的调度器:
(defn -prepare [this conf]
  )

(defn -schedule [this ^Topologies topologies ^Cluster cluster]
  (schedule-topologies-evenly topologies cluster))

它是通过调用schedule-topologies-evenly方法来完成任务分配的. 
schedule-topologies-evenly方法的具体定义如下:

(defn schedule-topologies-evenly [^Topologies topologies ^Cluster cluster]
  ;;通过调用cluster对象的needsSchedulingTopologies方法来获取所有需要进行任务调度的Topology集合,
  ;;needsSchedulingTopologies方法具体定义如fn1所示.
  ;;判断Topology是否需要进行任务调度的依据在fn2中有说明.
  (let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
    (doseq [^TopologyDetails topology needs-scheduling-topologies
        ;;对需要进行任务调度的Topology中的每一个,首先获取它的topology-id,
        :let [topology-id (.getId topology)
          ;;调用schedule-topology方法获取计算得到的<executor,node+port>类型集合new-assignment
          ;;schedule-topology方法具体定义如fn3所示.
          new-assignment (schedule-topology topology cluster)
          ;;将new-assignment的键和值颠倒获取<node+port,executors>集合.
          node+port->executors (reverse-map new-assignment)]]
       ;;对于前面获取的<node+port,executors>集合中的每一项进行以下操作.
      (doseq [[node+port executors] node+port->executors
  		  ;;用node和port信息构造WorkerSlot对象,并将其作为slot
          :let [^WorkerSlot slot (WorkerSlot. (first node+port) (last node+port))
            ;;下面两行代码:对于executors集合中的每一项,构造ExecutorDetail对象,
            ;;并返回一个ExecutorDetails集合作为executors
            executors (for [[start-task end-task] executors]
               (ExecutorDetails. start-task end-task))]]
         ;;调用cluster的assign方法将计算出来的slot分配给与该Topology相对应的executors
        (.assign cluster slot topology-id executors)))))

fn1:

/**
 * 获取所有需要调度的Topology,并以集合的形式返回
 */
public List<TopologyDetails> needsSchedulingTopologies(Topologies topologies) {
    List<TopologyDetails> ret = new ArrayList<TopologyDetails>();
    for (TopologyDetails topology : topologies.getTopologies()) {
        if (needsScheduling(topology)) {
            ret.add(topology);
        }
    }
    return ret;
}

fn2:

/**
 * 判断Topology是否需要进行任务调度的依据有两个:
 * 1.Topology设置的NumWorkers数目是否大于已经分配给Topology的Worker数目
 * 2.该Topology尚未分配的Executor的数目是否大于0
 */
public boolean needsScheduling(TopologyDetails topology) {
    int desiredNumWorkers = topology.getNumWorkers();
    int assignedNumWorkers = this.getAssignedNumWorkers(topology);

    if (desiredNumWorkers > assignedNumWorkers) {
        return true;
    }

    return this.getUnassignedExecutors(topology).size() > 0;
}

fn3:

;;该方法会根据集群当前的可用资源对Topology进行任务分配
(defn- schedule-topology [^TopologyDetails topology ^Cluster cluster]
  ;;获取topology-id
  (let [topology-id (.getId topology)
        ;;调用cluster的getAvailableSlots方法获取集群当前可用的slot资源,
        ;;将其转换为<node,port>集合并赋值给available-slots
        ;;getAvailableSlots主要负责计算当前集群中还没有使用的Supervisor端口
        available-slots (->> (.getAvailableSlots cluster)
              (map #(vector (.getNodeId %) (.getPort %))))
        ;;调用getExecutors获取Topology的所有Executor信息,
        ;;将其转换为<start-task-id,end-task-id>集合,
        ;;然后赋值给all-executors并返回
        all-executors (->> topology
              .getExecutors
              (map #(vector (.getStartTask %) (.getEndTask %)))
              set)
        ;;调用get-alive-assigned-node+port->executors方法(具体定义如fn3_1)
        ;;计算当前该Topology已经分得的资源情况,
        ;;最后返回一个<node+port,executors>集合并将其赋值给变量alive-assigned
        ;;参数为cluster信息和topology-id
        alive-assigned (get-alive-assigned-node+port->executors cluster topology-id)
        ;;计算当前Topology可以使用的slot数目,并将其赋予total-slots-to-use,
        ;;该值的具体内容为下面两个值的最小值:
        ;;1.Topology中设置的Worker数目
        ;;2.当前available-slots加上alive-assigned数目
        total-slots-to-use (min (.getNumWorkers topology)
               (+ (count available-slots) (count alive-assigned)))
        ;;对available-slots进行排序,计算需要分配的slot数目(total-slots-to-use减去alive-assigned)
        ;;最后从排序后的available-slots集合中按顺序去除这些slot并赋值给reassign-slots
        reassign-slots (take (- total-slots-to-use (count alive-assigned))
             (sort-slots available-slots))
        ;;通过比较all-executors跟已经分配的Executor集合间的差异,获取需要进行分配的Executor集合
        reassign-executors (sort (set/difference all-executors (set (apply concat (vals alive-assigned)))))
        ;;将上述计算得到的reassign-executors与reassign-slots进行关联,转换为<executor,slot>映射集合,
        ;;并赋值给reassignment,此时有两种情况:
        ;;1.reassign-executors数目少于reassign-slots数目:意味着当前集群中的可用资源比较多,
        ;;eg.reassign-executors为(e1,e2,e3),reassign-slots为(s1,s2,s3,s4,s5),
        ;;那么匹配结果为{[e1,s1],[e2,s2],[e3,s3]}
        ;;2.reassign-executors数目多于reassign-slots数目:意味着当前集群的可用资源非常有限,
        ;;eg.reassign-executors为(e1,e2,e3,e4,e5,e6),reassign-slots为(s1,s2),
        ;;此时会有多个Executor被分配到同一个slot上,返回的结果可能是:
        ;;{[e1,s1],[e2,s1],[e3,s2],[e4,s1],[e5,s2],[e6,s2]}
        reassignment (into {}
           (map vector
                reassign-executors
                ;; for some reason it goes into infinite loop without limiting the repeat-seq
                (repeat-seq (count reassign-executors) reassign-slots)))]
    ;;判断reassignment是否为空,若不为空则打印内容为可用的slot信息的日志
    (when-not (empty? reassignment)
      (log-message "Available slots: " (pr-str available-slots))
      )
    ;;返回计算得到类型为<executor,[node,port]>的集合reassignment,
    reassignment))

fn3_1:

;;该方法用于获取Topology当前已经分配得到的资源
(defn get-alive-assigned-node+port->executors [cluster topology-id]
  ;;调用cluster的getAssignmentById获取该Topology当前的assignment
  (let [existing-assignment (.getAssignmentById cluster topology-id)
        ;;判断当前的assignment是否为空,若不为空,则获取其中的<executor,slot>信息
        executor->slot (if existing-assignment
                         (.getExecutorToSlot existing-assignment)
                         {})
        ;;将前面获取到的<executor,slot>转换为<executor,[node+port]>集合
        executor->node+port (into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] executor->slot
	           :let [executor [(.getStartTask executor) (.getEndTask executor)]
	                 node+port [(.getNodeId slot) (.getPort slot)]]]
	       {executor node+port}))
	    ;;将前面的<executor,[node+port]>集合转换为<[node+port],executors>集合
        alive-assigned (reverse-map executor->node+port)]
    ;;返回得到的<[node+port],executors>集合
    alive-assigned))

注:学习李明老师等Storm源码分析和陈敏敏老师等Storm技术内幕与大数据实践的笔记整理。 
欢迎关注下面二维码进行技术交流: 

时间: 2024-10-01 15:23:13

JStorm与Storm源码分析(四)--均衡调度器,EvenScheduler的相关文章

JStorm与Storm源码分析(七)--BasicBoltExecutor与装饰模式

在Storm中IBasicBolt的主要作用是为用户提供一种更为简单的Bolt编写方式,更为简单体现在Storm框架本身帮你处理了所发出消息的Ack.Fail和Anchor操作,而这部分操作是由执行器BasicBoltExecutor 实现的. 下面我们看一下BasicBoltExecutor的源码: /** * BasicBoltExecutor实现了IRichBolt接口 * 在该类中持有一个IBasicBolt成员变量用于调用转发 * 说明: * 该类是基于装饰模式实现的. */ publ

JStorm与Storm源码分析(三)--Scheduler,调度器

Scheduler作为Storm的调度器,负责为Topology分配可用资源. Storm提供了IScheduler接口,用户可以通过实现该接口来自定义Scheduler. 其定义如下: public interface IScheduler { //接收当前Nimbus的Storm配置作为参数,进行一些初始化工作 void prepare(Map conf); /** * 真正进行任务分配的方法,在Nimbus进行任务分配的时候会调用该方法. * 参数为topologies.cluster:前

JStorm与Storm源码分析(五)--SpoutOutputCollector与代理模式

本文主要是解析SpoutOutputCollector源码,顺便分析该类中所涉及的设计模式–代理模式. 首先介绍一下Spout输出收集器接口–ISpoutOutputCollector,该接口主要声明了以下3个抽象方法用来约束ISpoutOutputCollector的实现类.接口定义与方法说明如下: /** * ISpoutOutputCollector:Spout输出收集器接口 */ public interface ISpoutOutputCollector { /** * 改方法用来向外

Duilib源码分析(四)绘制管理器—CPaintManagerUI

接下来,分析uilib.h中的UIManager.h,在正式分析CPaintManagerUI前先了解前面的一些宏.结构: 枚举类型EVENTTYPE_UI:定义了UIManager.h中事件通告类型TEventUI结构中的各Type值,从UIEVENT__FIRST至UIEVENT__LAST分别定义了  键盘按键事件 (UIEVENT__KEYBEGIN~UIEVENT__KEYEND).鼠标事件(UIEVENT__MOUSEBEGIN~UIEVENT__MOUSEEND).以及其他的几个事

Duilib源码分析(四)绘制管理器—CPaintManagerUI—(前期准备二)

接下来,我们继续分析UIlib.h文件中余下的文件,当然部分文件可能顺序错开分析,这样便于从简单到复杂的整个过程的里面,而避免一开始就出现各种不理解的地方. UIManager.h:UI管理器,暂时放在后面介绍: UIBase.h:UI窗口相关,包括常用的窗口风格.窗口类风格的宏定义,调试相关,以及基本窗口类: 首先宏定义了几个常用的窗口风格.窗口扩展风格和窗口类风格,以UI_WNDSTYLE_XXX和UI_CLASSSTYLE__XXX开头的: ASSERT采用的是CRT的_ASSERTE.D

Duilib源码分析(四)绘制管理器—CPaintManagerUI—(前期准备三)

接下来,我们将继续分析UIlib.h文件中其他的文件, UIContainer.h, UIRender.h, WinImplBase.h, UIManager.h,以及其他布局.控件等: 1. UIRender.h:UI渲染器,其中cpp文件中,定义的ZIP压缩相关的数据结构,以及宏操作,与XUnzip.cpp中一样的(个人认为可以提取出来作为共用的一部分),此外还有stbi_load_from_memory. stbi_image_free,涉及到图片加载操作,具体详细细节可参考stb_ima

Storm源码分析--Nimbus-data

nimbus-datastorm-core/backtype/storm/nimbus.clj (defn nimbus-data [conf inimbus] (let [forced-scheduler (.getForcedScheduler inimbus)] {:conf conf :inimbus inimbus ; INimbus实现类, standalone-nimbus的返回值 :submitted-count (atom 0) ; 已经提交的计算拓扑的数量, 初始值为原子值0

baksmali和smali源码分析(四)

baksmali 首先执行的第一个main 函数     public static void main(String[] args) throws IOException {         Locale locale = new Locale("en", "US");         Locale.setDefault(locale);         CommandLineParser parser = new PosixParser();         C

Nouveau源码分析(四):NVIDIA设备初始化之nouveau_drm_load (1)

Nouveau源码分析(四) probe函数成功返回之后,DRM模块就会调用struct drm_driver的load函数,对应nouveau的nouveau_drm_load. 这个函数虽然看起来不是特别长,但每一个调用的函数展开后就会变得非常长了! // /drivers/gpu/drm/nouveau/nouveau_drm.c 364 static int 365 nouveau_drm_load(struct drm_device *dev, unsigned long flags)