Assignment篇
回到storm-task-info,
(defn storm-task-info
"Returns map from task -> component id"
[^StormTopology user-topology storm-conf]
;; 添加acker,metrics bolt
(->> (system-topology! storm-conf user-topology)
;; 获得拓扑定义的所有组件
all-components
;;返回 componentid –>task num
(map-val (comp #(get % TOPOLOGY-TASKS) component-conf))
;;sort component-id
(sort-by first)
;;将组件按照task的数量生成对应数量的componentid
(mapcat (fn [[c num-tasks]] (repeat num-tasks c)))
;;给组件编号
(map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1)))
;; 返回类似于{1 "counter",2 "counter",3 "reducer"}
(into {})
))
得到所有topology的组件后,以{spt0 [executor 2 task 3] blt0 [executor4 task4] 为例,经过storm-task-info输出后变成:
{1 "blt0" 2 "blt0" 3 "blt0" 4 "blt0" 5 "spt0" 6 "spt0" 7 "spt0"}
这些task如何在executor上分配呢?
(defn- compute-executors [nimbus storm-id]
(let [conf (:conf nimbus)
;;读取topology的一些基本信息 StormBase对象
storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
;;组件对应的executors数量 Map结构 {"compution" 140, "sender" 16, "reducer" 20, "spout" 1}
component->executors (:component->executors storm-base)
;;加载配置文件
storm-conf (read-storm-conf conf storm-id)
;;topology = StormTopology.
topology (read-storm-topology conf storm-id)
;;storm-task-info 在backtype/storm/daemon/common.clj 中定义
;;{1 "counter",2 "counter",3 "reducer"}
task->component (storm-task-info topology storm-conf)]
;; storm-task-info 返回 {task-index, task-component-id}的结果
(->> (storm-task-info topology storm-conf)
;; reverse-map 在util.clj上定义,将value相同的key形成一个seq
reverse-map
;调用map-val方法 ,该方法在util.clj上定义 对task-index进行排序
(map-val sort) ;
;; join-maps 在utils定义
;; 也就是将 task 在component->executors 上对应的 executors数量取出来
;; 建立task -> (executors, task-index)的关系
(join-maps component->executors)
;;将task分配在executors上
(map-val (partial apply partition-fixed))
;; 获取分配好的task与executor
(mapcat second)
;;返回[start-task, end-task]
(map to-executor-id)
)))
过程描述:
- 将topology组件按照component-id排序后,按照task的数量生成相应的component-id的数量,并进行编号:
{spt0 [executor 2 task 3] blt0 [executor4 task4] ->
{1 "blt0" 2 "blt0" 3 "blt0" 4 "blt0" 5 "spt0" 6 "spt0" 7 "spt0"}
- 将上一步得到的结果按照component-id聚合 task index
{1 "blt0" 2 "blt0" 3 "blt0" 4 "blt0" 5 "spt0" 6 "spt0" 7 "spt0"}->
得到类似类似于{"blt0" [1 2 3 4] "spt0" [5 6 7]}
- 建立task executor task index的对应关系。
{"blt0" [1 2 3 4] "spt0" [5 6 7]} +{"blt0" 4 "spt0" 2} ->
{"blt0" (4 [1 2 3 4]) "spt0" (2 [5 6 7])}
- 将task分配在相应的executors上。
在Utils.java代码上,有具体的分配细节:
public static TreeMap<Integer, Integer> integerDivided(int sum, int numPieces) {
int base = sum / numPieces;
int numInc = sum % numPieces;
int numBases = numPieces - numInc;
TreeMap<Integer, Integer> ret = new TreeMap<Integer, Integer>();
ret.put(base, numBases);
if(numInc!=0) {
ret.put(base+1, numInc);
}
return ret;
}
通熟一点来说,这个就相当于将sum分配numPieces份,并且做到最大公平。
也就是numPieces分配一圈后,每份一个,继续从头分配,直到所有的sum分配完开始。
(defn partition-fixed
[max-num-chunks aseq]
(if (zero? max-num-chunks)
[]
;;获取task-executor分配细节
(let [chunks (->> (integer-divided (count aseq) max-num-chunks)
;;Returns a new map of the same (hashed/sorted) type, that does not contain a mapping for key(s)
;; 将integer-divided去后 key为0的 键值对 去除
;; 过滤无效分配
(#(dissoc % 0));
;加入 当 任务数为5 并发数设置为3 那么输入为{1 1 2 2} ,输出为([2 2] [1 1]) 按照key的降序排列
(sort-by (comp - first))
;;输入 ([2 2] [1 1]) 变成 (2 2 1)即表示每份的任务数量
(mapcat (fn [[size amt]] (repeat amt size)))
)]
;; chunks = (2 2 1)
;; 将任务分配到 task上,假设我们的taskindex = 3 4 5 6 7 即aseq = (3 4 5 6 7)
(loop [result []
[chunk & rest-chunks] chunks
data aseq]
(if (nil? chunk)
result
;; 将data按照 第chunk个数分割 成两个数组 取出前
;; chunk个赋值给c
(let [[c rest-data] (split-at chunk data)]
;;合并, 将c合并到result中
(recur (conj result c)
;; chunks = rest-chunks
;;aseq = rest-data
rest-chunks
rest-data)))))))
经过该步骤:
{"blt0" (4 [1 2 3 4]) "spt0" (2 [5 6 7])}
->{("blt0" [(1) (2) (3) (4)]) ("spt0" [(5 6) (7)])}
- 取出分配结果,生成executor分配细节
{("blt0" [(1) (2) (3) (4)]) ("spt0" [(5 6) (7)])} –>
{[1 1] [2 2] [3 3] [4 4] [5 6] [7 7]}
继续返回上一层:compute-executor->component
;;返回 task index对应的component
task->component (storm-task-info topology storm-conf) executor->component (into {} (for [executor executors
;;取出executor的start-task
:let [start-task (first executor)
component
;;根据task-index取出对应的component
(task->component start-task)]]
;; 返回executor对应的component
{executor component}))
在这一步具体完成的事情是将每个executor对应的component绑定起来
{[1 1] "blt0" [2 2] "blt0" [3 3] [4 4] [5 6] "spt0" [7 7] "spt0"}
回到nimbus.clj/read-topology-details这个方法,方法返回:
;; 创建一个topologyDetails对象
(TopologyDetails. storm-id
topology-conf
topology
(:num-workers storm-base)
executor->component
))
在这个对象中,包含了topology,task->executor->component的分配细节,topology的worker数量以及topology的配置信息等。
回到nimbus.clj上的mk-assignments方法,在这个方法中
;;得到所有topology的分配细节
topologies (into {} (for [tid topology-ids]
{tid (read-topology-details nimbus tid)}))
;; topologies = new Topologies(topologies)
topologies (Topologies. topologies)
;; read all the assignments
assigned-topology-ids (.assignments storm-cluster-state nil)
;; 返回所有topology对应的任务明细
existing-assignments (into {} (for [tid assigned-topology-ids]
(when (or (nil? scratch-topology-id) (not= tid scratch-topology-id))
{tid (.assignment-info storm-cluster-state tid nil)})))
;; make the new assignments for topologies
topology->executor->node+port
(compute-new-topology->executor->node+port
nimbus
existing-assignments
topologies
scratch-topology-id)
我们在来看看compute-new-topology->executor->node+port的实现:
在compute-new-topology->executor->node+port中有这样一句:
_ (update-all-heartbeats! nimbus existing-assignments topology->executors)
1.首先来看一下nimbus是如何更新缓存的。
- 首先根据当前存活的任务在zookeeper ;根据每个node+port 在心跳目录下取出每个进程上运行的executors的状态信息,返回{executor {:status :uptime :time-secs}}
executor-beats (.executor-beats storm-cluster-state storm-id (:executor->node+port existing-assignment))
- 然后更新nimbus进程中这个topology的每个executor的状态。
(into {}
(for [executor all-executors :let [curr (cache executor)]]
[executor
(update-executor-cache curr (get executor-beats executor) timeout)]
))))
- 将更新好的executor状态重新设置到nimbus进程缓存中。
本地进程中,缓存的executor的状态有三个:
{:is-timed-out, :nimbus-time, :executor-reported-time}
那么是如何判断一个executor超时的呢?
- 首先取出zookeeper中保存的executor的状态信息 :time-secs
- 然后取出本地缓存的该executor对应的:nimbus-time, :executor-reported-time 两个状态
(defn- update-executor-cache [curr hb timeout]
;; 取出在zookeeper workerheartbeat 刷新的时间
(let [reported-time (:time-secs hb)
{last-nimbus-time :nimbus-time
last-reported-time :executor-reported-time} curr
;;如果executor zookeeper上的心跳记录不为空 则report-time = zookeeper上记录的时间
reported-time (cond reported-time reported-time
;;如果本地保存的上次report-time不为空,则 report-time = 本地缓存时间
last-reported-time last-reported-time
;;都为nil时,返回0
:else 0)
;;如果本地保存的(last-nimbus-time =null || last-report-time !=report-time)则nimbus-time = 当前时间 否则为last-nimbus-time
nimbus-time (if (or (not last-nimbus-time)
(not= last-reported-time reported-time))
(current-time-secs)
last-nimbus-time
)]
;;true for –>nimbus-time!=nil&&(当前时间 - nimbus-time)>=timeout
{:is-timed-out (and
nimbus-time
(>= (time-delta nimbus-time) timeout))
:nimbus-time nimbus-time
:executor-reported-time reported-time}))
判断executor超时的逻辑是nimbus-time!=nil&&(当前时间 - nimbus-time)>=timeout
观察一些nimbus-time的赋值过程,可知当‘
(last-nimbus-time!=null&&last-reported-time=reported-time)时,此时的
nimbus-time = last-nimbus-time,也就是上次更新本地executor的时间,除此之外,nimbus-time=当前时间,这个显然 :is-time-out的值为false。
那么什么情况会发生这个呢?
假设我们更新过一次executor,也就是last-nimbus-time!=null, last-reported-time也是上次从zookeeper上取出的:time-secs, 当reported-time= last-reported-time时,也就是executor所在的进程没有刷新:time-secs这个值,那么此时nimbus-time = last-nimbus-time,如果该进程fail down,那么最终:is-time-out的值变成 true。
由此可知,worker必须timeout时间段内,去刷新一下Heartbeat,timeout为
(:conf nimbus) NIMBUS-TASK-TIMEOUT-SECS
这样也算符合错误检测。
继续往下,
topology->alive-executors (compute-topology->alive-executors nimbus existing-assignments topology->executors scratch-topology-id)
找出每个topology存活的executor,只要topology的executor对应的心跳缓存:is-time-out为false或者executor还没有启动,那么这些都算是一个活动的executor:
(let [start-time (get executor-start-times executor)
;; 查看executor 是否超时
is-timed-out (-> heartbeats-cache (get executor) :is-timed-out)]
(if (and start-time
(or
;;如果executor在NIMBUS-TASK-LAUNCH-SECS时间内还没开始启动
(< (time-delta start-time)
(conf NIMBUS-TASK-LAUNCH-SECS))
(not is-timed-out)
))
true
…
)
剔除每个topology超时的executors后,下一步将那些超时的executos占用的资源[主机+端口号]释放出来,以便再次进行分配。
返回{机器 [端口号,…]}
supervisor->dead-ports (compute-supervisor->dead-ports nimbus existing-assignments topology->executors topology->alive-executors)
下一步,获取存活的executors的分配情况,返回
{tid (SchedulerAssignmentImpl. tid executor->slot)})))
找出那些缺失去任务的topologies,判断条件:
;; 如果topology的分配情况为空
或者存活的executors和总executors不一致
或者 使用的进程数量<用户设置的进程数量
;;
经过一些列的判断,最终执行到这一句:
_ (.schedule (:scheduler nimbus) topologies cluster)
这个方法会调用DefaultSchedule.clj上的schedule方法。
具体的分配细节在EventScheduler当中,大致的细节如下:
- 求出topology需要的worker数量n,(总需求量 – 已分配的)
- 获取集群内所有可用的workerslot,如{:a [6701 6702] :b [6708 6714] :c [6799]}
- 将获取的workerslot进行排序,得到类似于这样的结果
([:a 6701] [:b 6708] [:c 6799] [:a 6702] [:b 6714])
- 从第3个得到的排序结果中取出前n个,比如n=2时,得到结果:
([:a 6701] [:b 6708])
- 获取需要分配的任务j,设|j| = 5;
- 将第3步骤获取的结果按照|j| 重复输出,得到结果:
([:a 6701] [:b 6708] [:a 6701] [:b 6708] [:a 6701] [:b 6708] [:a 6701] [:b 6708] [:a 6701] [:b 6708])
- 将在j中第i个任务分配到第i个上一步产生的workerslot队列当中。
- 按照workerslot 聚合分配的任务,{[:a 6701] [task] [:b 6708] [task]}
- 将分配信息写入到zookeeper中
具体的分配细节参考EventScheduler.clj源码。
至此,我们大致完成了nimbus分配任务的过程,
mk-assignments是一个定时任务,没隔NIMBUS-MONITOR-FREQ-SECS 便会去执行一次,因此可以及时检测worker异常以及重新分配任务。
/ *
* 比如我们有5个任务 3个executor 怎样分呢?
* 返回map {2 2 1 1} 也就是说有2个executor,其任务数为2, 1个executor的任务数为1
* 转化成l为 (2 2 1) task (3 4 5 6 7)
*
* for(i in l)
* 在task中取出i个,并将task置成剩余的数组
* 将取出的i个放到一个数组中
* 也就得到类似于这样的结果
* {[3 4] [5 6] [7]}
* 这样就完成了任务与executor的对应关系
* executor 与worker 又是怎样对应的呢?
* 1. 取出集群中可用的workerslot 如 {[:a 6703] [:a 6704] [:b 6705] [:b 6717] [:c 6713]}
* 2. 然后排序,依次取出每个supervisor的一个slot,形成一个集合j ([:a 6703] [:b 6705] [:c 6713] [:a 6704] [:b 6717])
* 3. 然后按照executor的数量 i,重复输出i次集合j,将输出结果放在另一个集合k中,于是我们得到类似于这样的集合k:
* ([:a 6703] [:b 6705] [:c 6713] [:a 6704] [:b 6717]... [:a 6703] [:b 6705] [:c 6713] [:a 6704] [:b 6717])
* 4. 对于x in executors,那么x对应的workerslot = x%|j|
* 存在这样的关系
* executors[x] = workerslots[x]
*/