storm基础框架分析

背景

前期收到的问题:

1、在Topology中我们可以指定spout、bolt的并行度,在提交Topology时Storm如何将spout、bolt自动发布到每个服务器并且控制服务的CPU、磁盘等资源的?

2、Storm处理消息时会根据Topology生成一棵消息树,Storm如何跟踪每个消息、如何保证消息不丢失以及如何实现重发消息机制?

上篇:storm是如何保证at least once语义的

回答了第2个问题。

本篇来建立一个基本的背景,来大概看下构成storm流式计算能力的一些基础框架,并部分回答第一个问题。

worker、executor、task的关系

  1. worker是一个进程.
  2. executor是一个线程,是运行tasks的物理容器.
  3. task是对spout/bolt/acker等任务的逻辑抽象.

supervisor会定时从zookeeper获取拓补信息topologies、任务分配信息assignments及各类心跳信息,以此为依据进行任务分配。

在supervisor同步时,会根据新的任务分配情况来启动新的worker或者关闭旧的worker并进行负载均衡。

worker通过定期的更新connections信息,来获知其应该通讯的其它worker。

worker启动时,会根据其分配到的任务启动一个或多个executor线程。这些线程仅会处理唯一的topology。

如果有新的tolopogy被提交到集群,nimbus会重新分配任务,这个后面会说到。

executor线程负责处理多个spouts或者多个bolts的逻辑,这些spouts或者bolts,也称为tasks。

具体有多少个worker,多少个executor,每个executor负责多少个task,是由配置和指定的parallelism-hint共同决定的,但这个值并不一定等于实际运行中的数目。

如果计算出的总的executors超过了nimbus的限制,此topology将不会得到执行。

并行度的作用:

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; 计算所有tolopogy的topology-id到executors的映射
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- compute-topology->executors [nimbus storm-ids]
  "compute a topology-id -> executors map"
  (into {} (for [tid storm-ids]
             {tid (set (compute-executors nimbus tid))})))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; 计算topology-id到executors的映射
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn- compute-executors [nimbus storm-id]
  (let [conf (:conf nimbus)
        storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
        component->executors (:component->executors storm-base)
        storm-conf (read-storm-conf conf storm-id)
        topology (read-storm-topology conf storm-id)
        task->component (storm-task-info topology storm-conf)]
    (->> (storm-task-info topology storm-conf)
         reverse-map
         (map-val sort)
         (join-maps component->executors)
         (map-val (partial apply partition-fixed))
         (mapcat second)
         (map to-executor-id)
         )))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; 计算topology的task-info
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn storm-task-info
  "Returns map from task -> component id"
  [^StormTopology user-topology storm-conf]
  (->> (system-topology! storm-conf user-topology)
       all-components
    ;; 获取每个组件的并行数
       (map-val (comp #(get % TOPOLOGY-TASKS) component-conf))
       (sort-by first)
       (mapcat (fn [[c num-tasks]] (repeat num-tasks c)))
       (map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1)))
       (into {})
       ))

上述代码会在nimbus进行任务分配时调用:

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; nimbus进行任务分配
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
mk-assignments
->compute-new-topology->executor->node+port
->compute-topology->executors
-> ...

线程模型及消息系统

基本关系如下所示:

  1. worker启动时,除了启动多个executor线程,还会启动多个工作线程来负责消息传递。
  2. worker会订阅到transfer-queue来消费消息,同时也会发布消息到transfer-queue,比如需要进行远程发布时(某个bolt在另一个进程或者节点上)。
  3. executor会发布消息到executor-send-queue比如emit tuple,同时会从executor-receive-queue消费消息,比如执行ack或者fail。
  4. batch-transfer-worker-handler线程订阅到executor-send-queue消费消息,并将消息发布到transfer-queue供worker消费。
  5. transfer-thread会订阅到transfer-queue消费消息,并负责将消息通过socket发送到远程节点的端口上。
  6. worker通过receive-thread线程来收取远程消息,并将消息以本地方式发布到消息中指定的executor对应的executor-receive-queue。executor按第3点来消费消息。
  7. 以上所有的消息队列都是Disruptor Queue,非常高效的线程间通讯框架。

所谓本地发布,是指在worker进程内及executor线程间进行消息发布。

所谓远程发布,是指在worker进程间、不同的机器间进行消息发布。

任务调度及负载均衡

任务调度的主要角色

  1. nimbus将可以工作的worker称为worker-slot.
  2. nimbus是整个集群的控管核心,总体负责了topology的提交、运行状态监控、负载均衡及任务重新分配,等等工作。

    nimbus分配的任务包含了topology代码所在的路径(在nimbus本地)、tasks、executors及workers信息。

    worker由node + port唯一确定。

  3. supervisor负责实际的同步worker的操作。一个supervisor称为一个node。所谓同步worker,是指响应nimbus的任务调度和分配,进行worker的建立、调度与销毁。

    其通过将topology的代码从nimbus下载到本地以进行任务调度。

  4. 任务分配信息中包含task到worker的映射信息task -> node + host,所以worker节点可据此信息判断跟哪些远程机器通讯。

集群的状态机:

集群状态管理

集群的状态是通过一个storm-cluster-state的对象来描述的。

其提供了许多功能接口,比如:

  1. zookeeper相关的基本操作,如create-node、set-data、remove-node、get-children等.
  2. 心跳接口,如supervisor-heartbeat!、worker-heatbeat!等.
  3. 心跳信息,如executors-beats等.
  4. 启动、更新、停止storm,如update-storm!等.

如下图所示:

任务调度的依据

  1. zookeeper是整个集群状态同步、协调的核心组件。
  2. supervisor、worker、executor等组件会定期向zookeeper写心跳信息。
  3. 当topology出现错误、或者有新的topology提交到集群时,topologies信息会同步到zookeeper。
  4. nimbus会定期监视zookeeper上的任务分配信息assignments,并将重新分配的计划同步到zookeeper。

所以,nimbus会根据心跳、topologies信息及已分配的任务信息为依据,来重新分配任务,如下图所示:

任务调度的时机

  1. 如上文的状态机图所示,rebalance和do-reblalance(比如来自web调用)会触发mk-assignments即任务(重新)分配。
  2. 同时,nimbus进程启动后,会周期性地进行mk-assignments调用,以进行负载均衡和任务分配。
  3. 客户端通过storm jar … topology 方式提交topology,会通过thrift接口调用nimbus的提交功能,此时会启动storm,并触发mk-assignments调用。

topology提交过程

一个topology的提交过程:

  1. 非本地模式下,客户端通过thrift调用nimbus接口,来上传代码到nimbus并触发提交操作.
  2. nimbus进行任务分配,并将信息同步到zookeeper.
  3. supervisor定期获取任务分配信息,如果topology代码缺失,会从nimbus下载代码,并根据任务分配信息,同步worker.
  4. worker根据分配的tasks信息,启动多个executor线程,同时实例化spout、bolt、acker等组件,此时,等待所有connections(worker和其它机器通讯的网络连接)启动完毕,此storm-cluster即进入工作状态。
  5. 除非显示调用kill topology,否则spout、bolt等组件会一直运行。

主要过程如下图所示:

结语

以上,基本阐述了storm的基础框架,但未涉及trident机制,也基本回答了问题1。

终。

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-11-05 12:25:56

storm基础框架分析的相关文章

Java并发基础框架AbstractQueuedSynchronizer初探(ReentrantLock的实现分析)

AbstractQueuedSynchronizer是实现Java并发类库的一个基础框架,Java中的各种锁(RenentrantLock, ReentrantReadWriteLock)以及同步工具类(Semaphore, CountDownLatch)等很多都是基于AbstractQueuedSynchronizer实现的.AbstractQueuedSynchronizer 一般简称AQS,Abstract表示他是一个抽象类,Queued表示他是基于先进先出 FIFO 等待队列实现的,Sy

Google Test测试框架分析

Google Test测试框架分析 一.简介 Google Test是由Google主导的一个开源的C++自动化测试框架,简称GTest.GTest基于xUnit单元测试体系,和CppUint类似,可以看作是JUnit.PyUnit等对C++的移植. 下图是GTest测试框架的测试过程,表示的是GTest的两种测试方式. 下面将使用一个极其简单的例子表示xUnit测试的主要过程.如对Hummer的CTXString类的成员方法GetLength进行测试.详见下面GTest代码和注释说明. //

机器视觉项目基础框架

一.背景 ?              虽然OPENCV是可以在多平台下面运行,并且通过封包(DLL)的形式,可以被多种程序所调用,但是在windows平台下面,OPENCV和MFC程序一起使用还是最常见,也是功能最强大的.这里搭建基础的MFC+OPENCV框架,为在此之上进行机器视觉设计奠定基础.在实现的过程中,有许多选择是由于自己的偏好和习惯,请辩证分析. 二.MFC部分具体设计实现 1)创建MFC对话框程序 2)添加并且设计menu,挂在主窗体上 3)添加tab并且编写内容,创建对应成员变

Android Bitmap 开源图片框架分析(精华三)

主要介绍这三个框架,都挺有名的,其他的框架估计也差不多了 Android-Universal-Image-Loaderhttps://github.com/nostra13/Android-Universal-Image-Loader ImageLoaderhttps://github.com/novoda/ImageLoader Volley(综合框架,包含图片部分)https://github.com/mcxiaoke/android-volley 扯淡时间,可以跳过这段这些开源框架的源码还

Android Bitmap 开源图片框架分析(精华四)

disk缓存主要难点在于内存缓存,disk缓存其实比较简单,就是图片加载完成后把图片文件存到本地方便下次使用 同样,先贴一下官方主页的介绍(主页地址见文章最开始处)和内存缓存差不多,根据算法不同提供了几种类别,可以自行通过ImageLoaderConfiguration.discCache(..)设置<ignore_js_op> 硬盘缓存,保存是以文件的形式框架提供了4种类型,具体算法规则不同,看名字我们大概也能知道对应意思 UnlimitedDiscCache                

【转载】微服务,我们需要哪些基础框架?

微服务(MicroServices)架构是当前互联网业界的一个技术热点,圈里有不少同行朋友当前有计划在各自公司开展微服务化体系建设,他们都有相同的疑问:一个微服务架构有哪些技术关注点(technical concerns)?需要哪些基础框架或组件来支持微服务架构?这些框架或组件该如何选型?笔者之前在两家大型互联网公司参与和主导过大型服务化体系和框架建设,同时在这块也投入了很多时间去学习和研究,有一些经验和学习心得,可以和大家一起分享. 服务注册.发现.负载均衡和健康检查 和单块(Monolith

微服务架构的基础框架选择:Spring Cloud还是Dubbo?

本文转自:http://mt.sohu.com/20160803/n462486707.shtml 最近一段时间不论互联网还是传统行业,凡是涉及信息技术范畴的圈子几乎都在讨论 微服务架构 .近期也看到各大技术社区开始组织一些沙龙和论坛来分享Spring Cloud的相关实施经验,这对于最近正在整理Spring Cloud相关套件内容与实例应用的我而言,还是有不少激励的. 目前,Spring Cloud在国内的知名度并不高,在前阵子的求职过程中,与一些互联网公司的架构师.技术VP或者CTO在交流时

微信公众平台开发教程(三) 基础框架搭建

微信公众平台开发教程(三) 基础框架搭建 上一章,我们已经初步讲解了微信公众账号开发的基本原理,今天我们来探索设计实现. 首先我们设计了模块层次图,当然图中只是给出一种实现方式,不局限于此.具体见下图. 主要功能介绍如下: 1)请求接口层.处理HTTP请求,及响应 2)分发层.由接口层传入请求,然后具体分析请求类型,分发至不同的处理器 3)业务逻辑层.这里是我们的具体业务逻辑了,根据请求,实现具体的业务逻辑. 4)数据层.我们在实现某个应用时可能需要访问数据,可以是数据库或者是文件.如果是简单应

TI BLE协议栈软件框架分析

看源代码的时候,一般都是从整个代码的入口处开始,TI  BLE 协议栈源码也不例外.它的入口main()函数就是整个程序的入口,由系统上电时自动调用. 它主要做了以下几件事情: (一)底层硬件初始化配置 (二)创建任务并初始化任务配置 (三)检测并执行有效的任务事件 Main() 函数源码如下: 一:底层硬件初始化设置 75行,设置系统时钟,使能内存缓冲功能. 78行,关中断,刚启动时,系统运行不稳定,一般会首先关中断. 81行,硬件相关的I/O 口配置. 84行,初始化mcu 内部的flash