Storm系列(十六)架构分析之Executor-Bolt

准备消息循环的数据

函数原型:

let[executor-sampler (mk-stats-sampler (:storm-conf executor-data))]

主要功能:

定义tuple-action-fn函数,该函数会根据TaskId获得对应的Bolt对象并调用其executor方法.

Bolt输入处理函数

函数原型:

tuple-action-fn (fn [task-id ^TupleImpl tuple])

主要功能:

获得Bolt对应的bolt-obj,调用executor回调方法。

Bolt的消息发送函数

函数原型:

bolt-emit (fn [stream anchors values task])

主要功能:

  1. 调用tasks-fn获取消息接收端的TaskId集合。
  2. 调用transfer-fn函数发送消息,该函数与Spout中实现类似(唯一的区别不使用overflow-buffer缓存).

Bolt对象的初始化

调用Bolt的prepare函数
初始化过程:

  1. 获取Bolt对象并定义相关方法。bolt-emit方法用于向Executor的消息发送队列中发送消息。
  2. 调用Bolt对象的prepare方法,同时实例化Bolt对象的OutputCollector对象作为prepare方法的传入参数,OutputCollector的emit方法将调用bolt-emit函数来发送消息,ack及fail方法则用来对消息进行跟踪。
  3. 调用mk-task-receiver函数来获得接收队列的处理函数(tuple-action-fn)。

消息循环

调用阻塞方式的 consume-batch-when-available函数对接收队列中的消息进行处理。

(fn [] (distruptor/comsume-batch-when-available receive-queuu event-handler))

 

创建Executor

创建Executor函数mk-executor

函数原型:

(defn mk-executor [worker executor-id])

方法说明:

  1. 调用mk-executor-data创建Executor的数据。
  2. 调用mk-task创建Executor中的每个Task对应的数据。
  3. 调用start-batch-transfer->worker-handler!方法启动Executor的数据发送线程。
  4. 调用mk-threads方法获得Executor的主循环线程,并通过with-error-reaction宏对mk-threads进行包装。当异常发生时调用report-error-and-die方法记录错误并退出。
  5. 实例化RunningExecutor对象用来操作executor.
  6. 实例化 Shutdownable用于退出Executor并清理相关资源,具体操作包括:
    a)结束DisruptorQueue的消息循环。
    b)结束Executor中的启动线程
    c)清理用户钩子的数据
    d)断开与Zookeeper的连接
    e)依次调用Executor中Spout或Bolt的close方法.

获取分组函数

函数原型:

(defn outbound-components [^WorkerTopologyContext worker-context component-id])

功能描述:

获取从组件到某一个流的分组函数,task-fn函数通过调用该分组函数可获得消息的目标Task集合。

函数说明:

  1. 调用WorkerTopologyContext对象的getTargets方法得到一个哈希表,该哈希表的键为当前组件所对应的流,值为一个哈希表,用于记录目标组件以何种方式从该流接收数据。
  2. 调用outbound-groupings函数获得分组函数。

outbound-groupings
函数的定义

(defn- outbound-groupings [^WorkerTopologyContext worker-context this-component-id stream-id out-fields component->grouping])

方法说明:

  1. 对目标组件进行过滤,若组件对应的TaskId集合为空,怎被过滤掉。
  2. 用map函数对组件及其分组方式进行处理,调用mk-grouper函数来产生分组函数,并最终返回一个保存有从组件到分组函数的映射关系的哈希表.

mk-grouper

返回一个函数,该函数返回一个TaskId集合,代表消息发送的目的Task集合

函数原型:

(defn mk-grouper [^WorkerTopologyContext context component-id stream-id ^Fields out-fields thrift-grouping ^List target-tasks])

函数说明:

  1. 获取与目标组件对应的Task的数目及排列后的列表,它们将作为计算目标Task的函数输入。某些分组方式只需要目标组件的Task数目,如:ShuffleGrouping操作.
  2. 针对Thrift类型,不同分组方式分别构建分组函数。

 

触发系统Ticks

setup-ticks!函数定期向Executor的接收消息队列发送Tick消息.Executor在收到Tick消息之后,就会执行发送队列的超时操作。setup-ticks!主要用于对Spout节点发送出去的消息进行操作操作。

函数原型:

(defn setup-ticks [worker executor-data])

函数说明:

  1. 配置项TOPOLOGY-TICK-TUPLE-FREQ-SECS用来控制向__system流及__tick流发送消息的频率,tick-time-secs用来保存该频率值,receive-queue为Executor对应的接收Disruptor Queue,context为WorkerTopologyContext对象,Tick消息只发送到本地Worker,并不能被其它Worker的Executor收到。
  2. 判断tick-time-secs是否设置,若已进行设置则开始设置系统的Tick消息.
  3. 若该节点为Spout节点且未设置消息超时,则打印消息退出。参数TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS用于调试模式,由于超时的消息会给系统调试带来额外的复杂性,因此可在调试过程中暂时关闭消息的超时操作。当Spout收到Tick消息时,可对缓存在pending对象中的数据进行超时操作。
  4. 利用Worker定义的用户计时器tick-time-secs为间隔设置计时器,并定义计时器回调函数,向receive-queue中发送一条消息,该消息对应的TaskId为nil,表示该Executor中所有的Task都会收到该消息,消息的内容为tick-time-secs,-1表示系统TaskId,最后一项表示该消息会被发送到SYSTEM-TICK_STREAM.
时间: 2025-01-02 15:03:54

Storm系列(十六)架构分析之Executor-Bolt的相关文章

Storm系列(十五)架构分析之Executor-Spout

Spout实现mk-threads接口用于创建与Executor对应的消息循环主函数. defmulti mk-threads executor-selector Mk-threads函数的主消息循环通过async-loop方法实现,若传入的函数为工厂方法,则在第一次调用该方法时进行初始化,并返回用于消息循环的函数. Spout输入处理函数 spout的输入处理函数采用非阻塞的方式从接收队列中获取消息: (disruptor/consume-batch receive-queue event-h

Storm系列(十四)架构分析之Executor-输入和输出处理

Executor的数据 mk-executor-data函数用于定义Executor中含有的数据. Executor的输入处理 根据executor-id从Worker的:executor-receive-queue-map中获得Disruptor Queue 如下: 1  receive-queue ((:executor-receive-queue-map worker) executor-id) 说明: Worker的接收线程从ZMQ收到数据后,线程会根据目标的Task Id找到对应的Ex

Storm 系列(六)—— Storm 项目三种打包方式对比分析

一.简介 在将 Storm Topology 提交到服务器集群运行时,需要先将项目进行打包.本文主要对比分析各种打包方式,并将打包过程中需要注意的事项进行说明.主要打包方式有以下三种: 第一种:不加任何插件,直接使用 mvn package 打包: 第二种:使用 maven-assembly-plugin 插件进行打包: 第三种:使用 maven-shade-plugin 进行打包. 以下分别进行详细的说明. 二.mvn package 2.1 mvn package的局限 不在 POM 中配置

Cocos2d-x 3.x 图形学渲染系列十六

笔者介绍:姜雪伟,IT公司技术合伙人,IT高级讲师,CSDN社区专家,特邀编辑,畅销书作者,国家专利发明人;已出版书籍:<手把手教你架构3D游戏引擎>电子工业出版社和<Unity3D实战核心技术详解>电子工业出版社等. 每个引擎都有自己的处理Shader类,Cocos使用的是GLProgram类,之所以定义GLProgram类,是因为在引擎中需要有一个类管理模型的信息和矩阵信息声明.在GLProgram类中定义了模型顶点的属性,这些属性在加载模型时,用于解释模型文件内容时用于做属性

Hadoop运维记录系列(十六)

应了一个国内某电信运营商集群恢复的事,集群故障很严重,做了HA的集群Namenode挂掉了.具体过程不详,但是从受害者的只言片语中大概回顾一下历史的片段. Active的namenode元数据硬盘满了,满了,满了...上来第一句话就如雷贯耳. 运维人员发现硬盘满了以后执行了对active namenode的元数据日志执行了 echo "" > edit_xxxx-xxxx...第二句话如五雷轰顶. 然后发现standby没法切换,切换也没用,因为standby的元数据和日志是5月

Storm系列(十九)普通事务ITransactionalSpout及示例

普通事务API详解 1  public interface ITransactionalSpout<T> extends IComponent { 2      public interface Coordinator<X> { 3          // 事务初始化 4          X initializeTransaction(BigInteger txid, X prevMetadata); 5          // 启动事务,返回true表示开始 6        

Storm系列(十)聚流示例

功能:将多个数据源的数据汇集到一个处理单元进行集中分类处理: 入口类TestMain 1  public class TestMain { 2    3      public static void main(String[] args) { 4          TopologyBuilder builder = new TopologyBuilder(); 5          builder.setSpout("random1", new RandomWordSpout1(),

Java设计模式菜鸟系列(十六)原型模式建模与实现

转载请注明出处:http://blog.csdn.net/lhy_ycu/article/details/39997337 原型模式(Prototype):该模式的思想就是将一个对象作为原型,对其进行复制.克隆,产生一个和原对象类似的新对象.而这里的复制有两种:浅复制.深复制. 浅复制:将一个对象复制后,基本数据类型的变量都会重新创建,而引用类型,指向的还是原对象所指向的. 深复制:将一个对象复制后,不论是基本数据类型还有引用类型,都是重新创建的.简单来说,就是深复制进行了完全彻底的复制,而浅复

MySQL---数据库从入门走向大神系列(十六)-JavaWeb分页技术实例演示1

分页,是一种将所有数据分段展示给用户的技术.用户每次看到的不 是全部数据,而是其中的一部分,如果在其中没有找到自己想要的内容,用户可以通过指定页码或是点上/下一页的方式进行翻页. 本例演示静态分页,也就是先设置好每页显示10行,再根据总行数,来算出总页数,将所有页数的页号都显示出来. 相关算法(技术): 总行数(num): select count(1) from stud; 每页显示的行数(n): 固定值---已知的一个常量 页数: pageSize= num/n +( (num%n==0)?