Storm task-slot分配

Assignment篇

回到storm-task-info,

(defn storm-task-info

"Returns map from task -> component id"

[^StormTopology user-topology storm-conf]

;; 添加acker,metrics bolt

(->> (system-topology! storm-conf user-topology)

;; 获得拓扑定义的所有组件

all-components

;;返回 componentid >task num

(map-val (comp #(get % TOPOLOGY-TASKS) component-conf))

;;sort component-id

(sort-by first)

;;将组件按照task的数量生成对应数量的componentid

(mapcat (fn [[c num-tasks]] (repeat num-tasks c)))

;;给组件编号

(map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1)))

;; 返回类似于{1 "counter",2 "counter",3 "reducer"}

(into {})

))

得到所有topology的组件后,以{spt0 [executor 2 task 3] blt0 [executor4 task4] 为例,经过storm-task-info输出后变成:

{1 "blt0" 2 "blt0" 3 "blt0" 4 "blt0" 5 "spt0" 6 "spt0" 7 "spt0"}

这些task如何在executor上分配呢?

(defn- compute-executors [nimbus storm-id]

(let [conf (:conf nimbus)

;;读取topology的一些基本信息 StormBase对象

storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)

;;组件对应的executors数量 Map结构 {"compution" 140, "sender" 16, "reducer" 20, "spout" 1}

component->executors (:component->executors storm-base)

;;加载配置文件

storm-conf (read-storm-conf conf storm-id)

;;topology = StormTopology.

topology (read-storm-topology conf storm-id)

;;storm-task-info 在backtype/storm/daemon/common.clj 中定义

;;{1 "counter",2 "counter",3 "reducer"}

task->component (storm-task-info topology storm-conf)]

;; storm-task-info 返回 {task-index, task-component-id}的结果

(->> (storm-task-info topology storm-conf)

;; reverse-map 在util.clj上定义,将value相同的key形成一个seq

reverse-map

;调用map-val方法 ,该方法在util.clj上定义 对task-index进行排序

(map-val sort) ;

;; join-maps 在utils定义

;; 也就是将 task 在component->executors 上对应的 executors数量取出来

;; 建立task -> (executors, task-index)的关系

(join-maps component->executors)

;;将task分配在executors上

(map-val (partial apply partition-fixed))

;; 获取分配好的task与executor

(mapcat second)

;;返回[start-task, end-task]

(map to-executor-id)

)))

过程描述:

  1. 将topology组件按照component-id排序后,按照task的数量生成相应的component-id的数量,并进行编号:

{spt0 [executor 2 task 3] blt0 [executor4 task4] ->

{1 "blt0" 2 "blt0" 3 "blt0" 4 "blt0" 5 "spt0" 6 "spt0" 7 "spt0"}

  1. 将上一步得到的结果按照component-id聚合 task index

    {1 "blt0" 2 "blt0" 3 "blt0" 4 "blt0" 5 "spt0" 6 "spt0" 7 "spt0"}->

    得到类似类似于{"blt0" [1 2 3 4] "spt0" [5 6 7]}

  2. 建立task executor task index的对应关系。

    {"blt0" [1 2 3 4] "spt0" [5 6 7]} +{"blt0" 4 "spt0" 2} ->

    {"blt0" (4 [1 2 3 4]) "spt0" (2 [5 6 7])}

  1. 将task分配在相应的executors上。

在Utils.java代码上,有具体的分配细节:

public static TreeMap<Integer, Integer> integerDivided(int sum, int numPieces) {

int base = sum / numPieces;

int numInc = sum % numPieces;

int numBases = numPieces - numInc;

TreeMap<Integer, Integer> ret = new TreeMap<Integer, Integer>();

ret.put(base, numBases);

if(numInc!=0) {

ret.put(base+1, numInc);

}

return ret;

}

通熟一点来说,这个就相当于将sum分配numPieces份,并且做到最大公平。

也就是numPieces分配一圈后,每份一个,继续从头分配,直到所有的sum分配完开始。

(defn partition-fixed

[max-num-chunks aseq]

(if (zero? max-num-chunks)

[]

;;获取task-executor分配细节

(let [chunks (->> (integer-divided (count aseq) max-num-chunks)

;;Returns a new map of the same (hashed/sorted) type, that does not contain a mapping for key(s)

;; 将integer-divided去后 key为0的 键值对 去除

;; 过滤无效分配

(#(dissoc % 0));

;加入 当 任务数为5 并发数设置为3 那么输入为{1 1 2 2} ,输出为([2 2] [1 1]) 按照key的降序排列

(sort-by (comp - first))

;;输入 ([2 2] [1 1]) 变成 (2 2 1)即表示每份的任务数量

(mapcat (fn [[size amt]] (repeat amt size)))

)]

;; chunks = (2 2 1)

;; 将任务分配到 task上,假设我们的taskindex = 3 4 5 6 7 即aseq = (3 4 5 6 7)

(loop [result []

[chunk & rest-chunks] chunks

data aseq]

(if (nil? chunk)

result

;; 将data按照 第chunk个数分割 成两个数组 取出前

;; chunk个赋值给c

(let [[c rest-data] (split-at chunk data)]

;;合并, 将c合并到result中

(recur (conj result c)

;; chunks = rest-chunks

;;aseq = rest-data

rest-chunks

rest-data)))))))

经过该步骤:

{"blt0" (4 [1 2 3 4]) "spt0" (2 [5 6 7])}

->{("blt0" [(1) (2) (3) (4)]) ("spt0" [(5 6) (7)])}

  1. 取出分配结果,生成executor分配细节

    {("blt0" [(1) (2) (3) (4)]) ("spt0" [(5 6) (7)])} –>

    {[1 1] [2 2] [3 3] [4 4] [5 6] [7 7]}

继续返回上一层:compute-executor->component

;;返回 task index对应的component

task->component (storm-task-info topology storm-conf) executor->component (into {} (for [executor executors

;;取出executor的start-task

:let [start-task (first executor)

component

;;根据task-index取出对应的component

(task->component start-task)]]

;; 返回executor对应的component

{executor component}))

在这一步具体完成的事情是将每个executor对应的component绑定起来

{[1 1] "blt0" [2 2] "blt0" [3 3] [4 4] [5 6] "spt0" [7 7] "spt0"}

回到nimbus.clj/read-topology-details这个方法,方法返回:

;; 创建一个topologyDetails对象

(TopologyDetails. storm-id

topology-conf

topology

(:num-workers storm-base)

executor->component

))

在这个对象中,包含了topology,task->executor->component的分配细节,topology的worker数量以及topology的配置信息等。

回到nimbus.clj上的mk-assignments方法,在这个方法中

;;得到所有topology的分配细节

topologies (into {} (for [tid topology-ids]

{tid (read-topology-details nimbus tid)}))

;; topologies = new Topologies(topologies)

topologies (Topologies. topologies)

;; read all the assignments

assigned-topology-ids (.assignments storm-cluster-state nil)

;; 返回所有topology对应的任务明细

existing-assignments (into {} (for [tid assigned-topology-ids]

(when (or (nil? scratch-topology-id) (not= tid scratch-topology-id))

{tid (.assignment-info storm-cluster-state tid nil)})))

;; make the new assignments for topologies

topology->executor->node+port

(compute-new-topology->executor->node+port

nimbus

existing-assignments

topologies

scratch-topology-id)

我们在来看看compute-new-topology->executor->node+port的实现:

在compute-new-topology->executor->node+port中有这样一句:

_ (update-all-heartbeats! nimbus existing-assignments topology->executors)

1.首先来看一下nimbus是如何更新缓存的。

  1. 首先根据当前存活的任务在zookeeper ;根据每个node+port 在心跳目录下取出每个进程上运行的executors的状态信息,返回{executor {:status :uptime :time-secs}}

    executor-beats (.executor-beats storm-cluster-state storm-id (:executor->node+port existing-assignment))

  2. 然后更新nimbus进程中这个topology的每个executor的状态。

(into {}

(for [executor all-executors :let [curr (cache executor)]]

[executor

(update-executor-cache curr (get executor-beats executor) timeout)]

))))

  1. 将更新好的executor状态重新设置到nimbus进程缓存中。

本地进程中,缓存的executor的状态有三个:

{:is-timed-out, :nimbus-time, :executor-reported-time}

那么是如何判断一个executor超时的呢?

  1. 首先取出zookeeper中保存的executor的状态信息 :time-secs
  2. 然后取出本地缓存的该executor对应的:nimbus-time, :executor-reported-time 两个状态

(defn- update-executor-cache [curr hb timeout]

;; 取出在zookeeper workerheartbeat 刷新的时间

(let [reported-time (:time-secs hb)

{last-nimbus-time :nimbus-time

last-reported-time :executor-reported-time} curr

;;如果executor zookeeper上的心跳记录不为空 则report-time = zookeeper上记录的时间

reported-time (cond reported-time reported-time

;;如果本地保存的上次report-time不为空,则 report-time = 本地缓存时间

last-reported-time last-reported-time

;;都为nil时,返回0

:else 0)

;;如果本地保存的(last-nimbus-time =null || last-report-time !=report-time)则nimbus-time = 当前时间 否则为last-nimbus-time

nimbus-time (if (or (not last-nimbus-time)

(not= last-reported-time reported-time))

(current-time-secs)

last-nimbus-time

)]

;;true for >nimbus-time!=nil&&(当前时间 - nimbus-time)>=timeout

{:is-timed-out (and

nimbus-time

(>= (time-delta nimbus-time) timeout))

:nimbus-time nimbus-time

:executor-reported-time reported-time}))

判断executor超时的逻辑是nimbus-time!=nil&&(当前时间 - nimbus-time)>=timeout

观察一些nimbus-time的赋值过程,可知当‘

(last-nimbus-time!=null&&last-reported-time=reported-time)时,此时的

nimbus-time = last-nimbus-time,也就是上次更新本地executor的时间,除此之外,nimbus-time=当前时间,这个显然 :is-time-out的值为false。

那么什么情况会发生这个呢?

假设我们更新过一次executor,也就是last-nimbus-time!=null, last-reported-time也是上次从zookeeper上取出的:time-secs, 当reported-time= last-reported-time时,也就是executor所在的进程没有刷新:time-secs这个值,那么此时nimbus-time = last-nimbus-time,如果该进程fail down,那么最终:is-time-out的值变成 true。

由此可知,worker必须timeout时间段内,去刷新一下Heartbeat,timeout为

(:conf nimbus) NIMBUS-TASK-TIMEOUT-SECS

这样也算符合错误检测。

继续往下,

topology->alive-executors (compute-topology->alive-executors nimbus existing-assignments topology->executors scratch-topology-id)

找出每个topology存活的executor,只要topology的executor对应的心跳缓存:is-time-out为false或者executor还没有启动,那么这些都算是一个活动的executor:

(let [start-time (get executor-start-times executor)

;; 查看executor 是否超时

is-timed-out (-> heartbeats-cache (get executor) :is-timed-out)]

(if (and start-time

(or

;;如果executor在NIMBUS-TASK-LAUNCH-SECS时间内还没开始启动

(< (time-delta start-time)

(conf NIMBUS-TASK-LAUNCH-SECS))

(not is-timed-out)

))

true

)

剔除每个topology超时的executors后,下一步将那些超时的executos占用的资源[主机+端口号]释放出来,以便再次进行分配。

返回{机器 [端口号,…]}

supervisor->dead-ports (compute-supervisor->dead-ports nimbus existing-assignments topology->executors topology->alive-executors)

下一步,获取存活的executors的分配情况,返回

{tid (SchedulerAssignmentImpl. tid executor->slot)})))

找出那些缺失去任务的topologies,判断条件:

;; 如果topology的分配情况为空

或者存活的executors和总executors不一致

或者 使用的进程数量<用户设置的进程数量

;;

经过一些列的判断,最终执行到这一句:

_ (.schedule (:scheduler nimbus) topologies cluster)

这个方法会调用DefaultSchedule.clj上的schedule方法。

具体的分配细节在EventScheduler当中,大致的细节如下:

  1. 求出topology需要的worker数量n,(总需求量 – 已分配的)
  2. 获取集群内所有可用的workerslot,如{:a [6701 6702] :b [6708 6714] :c [6799]}
  3. 将获取的workerslot进行排序,得到类似于这样的结果

    ([:a 6701] [:b 6708] [:c 6799] [:a 6702] [:b 6714])

  4. 从第3个得到的排序结果中取出前n个,比如n=2时,得到结果:

    ([:a 6701] [:b 6708])

  5. 获取需要分配的任务j,设|j| = 5;
  6. 将第3步骤获取的结果按照|j| 重复输出,得到结果:

([:a 6701] [:b 6708] [:a 6701] [:b 6708] [:a 6701] [:b 6708] [:a 6701] [:b 6708] [:a 6701] [:b 6708])

  1. 将在j中第i个任务分配到第i个上一步产生的workerslot队列当中。
  2. 按照workerslot 聚合分配的任务,{[:a 6701] [task] [:b 6708] [task]}
  3. 将分配信息写入到zookeeper中

    具体的分配细节参考EventScheduler.clj源码。

至此,我们大致完成了nimbus分配任务的过程,

mk-assignments是一个定时任务,没隔NIMBUS-MONITOR-FREQ-SECS 便会去执行一次,因此可以及时检测worker异常以及重新分配任务。

/ *

* 比如我们有5个任务 3个executor 怎样分呢?

* 返回map {2 2 1 1} 也就是说有2个executor,其任务数为2, 1个executor的任务数为1

* 转化成l为 (2 2 1) task (3 4 5 6 7)

*

* for(i in l)

* 在task中取出i个,并将task置成剩余的数组

* 将取出的i个放到一个数组中

* 也就得到类似于这样的结果

* {[3 4] [5 6] [7]}

* 这样就完成了任务与executor的对应关系

* executor 与worker 又是怎样对应的呢?

* 1. 取出集群中可用的workerslot 如 {[:a 6703] [:a 6704] [:b 6705] [:b 6717] [:c 6713]}

* 2. 然后排序,依次取出每个supervisor的一个slot,形成一个集合j ([:a 6703] [:b 6705] [:c 6713] [:a 6704] [:b 6717])

* 3. 然后按照executor的数量 i,重复输出i次集合j,将输出结果放在另一个集合k中,于是我们得到类似于这样的集合k:

* ([:a 6703] [:b 6705] [:c 6713] [:a 6704] [:b 6717]... [:a 6703] [:b 6705] [:c 6713] [:a 6704] [:b 6717])

* 4. 对于x in executors,那么x对应的workerslot = x%|j|

* 存在这样的关系

* executors[x] = workerslots[x]

*/

时间: 2024-10-05 05:00:15

Storm task-slot分配的相关文章

Storm中Task数的设置与计算(1.0.1版本)

==思考问题1== 向集群提交一个拓扑的时候,Storm是如何计算Task数以及Executor数的? ==思考问题2:== 构建拓扑的时候,有3个地方会影响task数,这3个地方之间有什么关系? builder.setSpout("spout", new RandomSentenceSpout(), 5); //parallelism-hint builder.setSpout("spout", new RandomSentenceSpout(), 5).setN

大数据:Spark Core(二)Driver上的Task的生成、分配、调度

1. 什么是Task? 在前面的章节里描述过几个角色,Driver(Client),Master,Worker(Executor),Driver会提交Application到Master进行Worker上的Executor上的调度,显然这些都不是Task. Spark上的几个关系可以这样理解: Application: Application是Driver在构建SparkContent的上下文的时候创建的,就像申报员,现在要构建一个能完成任务的集群,需要申报的是这次需要多少个Executor(可

Android中Task任务栈的分配

首先我们来看下Task的定义,Google是这样定义Task的:a task is what the user experiences as an "application." It's a group of related activities, arranged in a stack. A task is a stack of activities, not a class or an element in the manifest file. 这意思就是说Task实际上是一个Ac

Storm入门学习随记

推荐慕课网视频:http://www.imooc.com/video/10055 ====Storm的起源. Storm是开源的.分布式.流式计算系统 什么是分布式呢?就是将一个任务拆解给多个计算机去执行,让许多机器共通完成同一个任务, 把这个多机的细节给屏蔽,对外提供同一个接口.同一个服务,这样的系统就是分布式系统. 在多年以前并没有非常范用的分布式系统,即使存在,也都是限定在指定的领域, 当然,也有人尝试从中提取出共通的部分,发明一个通用的分布式系统,但是都没有很好的结果. 后来,Googl

storm 原理简介及单机版安装指南(转)

本文翻译自: https://github.com/nathanmarz/storm/wiki/Tutorial Storm是一个分布式的.高容错的实时计算系统. Storm对于实时计算的的意义相当于Hadoop对于批处理的意义.Hadoop为我们提供了Map和Reduce原语,使我们对数据进行批处理变的非常的简单和优美.同样,Storm也对数据的实时计算提供了简单Spout和Bolt原语. Storm适用的场景: 1.流数据处理:Storm可以用来用来处理源源不断的消息,并将处理之后的结果保存

Hadoop平台提供离线数据和Storm平台提供实时数据流

1.准备工作 2.一个Storm集群的基本组件 3.Topologies 4.Stream 5.数据模型(Data Model) 6.一个简单的Topology 7.流分组策略(Stream grouping) 8.使用别的语言来定义Bolt 9.可靠的消息处理 10.单机版安装指南 本文翻译自: https://github.com/nathanmarz/storm/wiki/Tutorial Storm是一个分布式的.高容错的实时计算系统.Storm对于实时计算的的意义相当于Hadoop对于

mac10.9下eclipse的storm开发环境搭建

博文作者:迦壹 博客地址:http://idoall.org/home.php?mod=space&uid=1&do=blog&id=545 转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作! --------------------------------------- 目录: 一.什么是STORM? 二.搭建标题中的环境都需要哪些软件? 三.如何制作eclipse的storm开发环境 四.参考资料     一.什么是SOTRM? STORM

storm 原理简介及单机版安装指南

目录[-] 1.准备工作 2.一个Storm集群的基本组件 3.Topologies 4.Stream 5.数据模型(Data Model) 6.一个简单的Topology 7.流分组策略(Stream grouping) 8.使用别的语言来定义Bolt 9.可靠的消息处理 10.单机版安装指南 本文翻译自: https://github.com/nathanmarz/storm/wiki/Tutorial Storm是一个分布式的.高容错的实时计算系统.Storm对于实时计算的的意义相当于Ha

Storm调度

p.MsoNormal { margin: 0pt; margin-bottom: .0001pt; text-align: justify; font-family: Calibri; font-size: 10.5000pt } h1 { margin-top: 5.0000pt; margin-bottom: 5.0000pt; text-align: left; font-family: 宋体; font-weight: bold; font-size: 24.0000pt } span