Storm系列(十四)架构分析之Executor-输入和输出处理

Executor的数据

mk-executor-data函数用于定义Executor中含有的数据。

Executor的输入处理

根据executor-id从Worker的:executor-receive-queue-map中获得Disruptor Queue 如下:

receive-queue ((:executor-receive-queue-map worker) executor-id)

说明:

Worker的接收线程从ZMQ收到数据后,线程会根据目标的Task Id找到对应的Executor,并将数据发送到该Executor所对应的接收DisruptorQueue中,对于DisruptorQueue中的消息Bolt会调用executor方法处理,而Spout类型的Executor则调用Ack或Fail处理。

mk-task-receive函数定义了一个函数来处理DisruptorQueue中的消息,通过调用disruptor/clojure-handler函数获取消息处理函数,该消息处理函数会在收到新消息时被调用。

函数原型:

defn mk-task-receive [executor-data tuple-action-fn]

方法说明:

  1. tuple-action-fn为Executor的消息处理函数(Spout与Bolt各不同)。
  2. 若存在消息来源task-id,则调用一次tuple-action-fn函数,若不存在消息来源taks-id,则在该Executor上的所有Task上调用tuple-action-fn函数。
  3. 在创建Spout或Bolt时,会调用mk-task-receiver函数并将结果存储于event-handler变量中:
    event-handler(mk-task-receiver executor-data tuple-action-fn)
    在Spout中以非阻塞方式接收数据:
    (disruptor/consume-batch receive-queue event-handler)
    在Bolt中以阻塞方式接收数据:
    (disruptor/consume-when-available receive-queue event-handler)

Executor的输出及发送

每个Executor都会有一个输出的Disruptor Queue对象,Executor在发送消息时首先会将消息发送到该队列,Executor会启动一个发送线程来处理该队列中的数据,该线程调用Worker中的mk-transfer-fn产生的函数对数据进行处理,或者把数据通过ZMQ发送(mk-transfer-tuples-handler)到其它Worker,或者直接发送到与该Worker上的其它Executor相对应的接收Disruptor Queue中。

Start-batch-transfer->worker-handler!函数调用disruptor/consume-loop*函数来启动用于发送数据的队列线程。

函数原型:

defn start-batch-transfer->worker-handler! [worker executor-data]

方法说明:

  1. 获取worker中的消息发送函数mk-transfer-fn赋值给变量worker-transfer-fn。
  2. 启动Disruptor Queue的消费者线程(disruptor/consume-loop*),batch-transfer-queue为Executor定义的Disruptor Queue对象,disruptor/handler构建EventHandler对象。
  3. Executor的report-error-and-die函数,对错误进行记录并退出进程。
  4. 在创建Executor的过程中,会启动system-threads(mk-executor函数):system-threads[(start-batch-transfer->worker-handler! worker   executor-data)].
  5. Executor在创建其数据时会创建发送队列,发送队列的创建过程:
    Batch-transfer->worker (disruptor/disruptor-queue
    (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
    :claim-strategy :single-threaded
    :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
时间: 2024-11-08 03:35:12

Storm系列(十四)架构分析之Executor-输入和输出处理的相关文章

Storm系列(十五)架构分析之Executor-Spout

Spout实现mk-threads接口用于创建与Executor对应的消息循环主函数. defmulti mk-threads executor-selector Mk-threads函数的主消息循环通过async-loop方法实现,若传入的函数为工厂方法,则在第一次调用该方法时进行初始化,并返回用于消息循环的函数. Spout输入处理函数 spout的输入处理函数采用非阻塞的方式从接收队列中获取消息: (disruptor/consume-batch receive-queue event-h

Storm系列(十六)架构分析之Executor-Bolt

准备消息循环的数据 函数原型: 1  let[executor-sampler (mk-stats-sampler (:storm-conf executor-data))] 主要功能: 定义tuple-action-fn函数,该函数会根据TaskId获得对应的Bolt对象并调用其executor方法. Bolt输入处理函数 函数原型: 1  tuple-action-fn (fn [task-id ^TupleImpl tuple]) 主要功能: 获得Bolt对应的bolt-obj,调用exe

S3C2416裸机开发系列十四_GCC下UCGUI的移植(2)

S3C2416裸机开发系列十四 GCC下UCGUI的移植(2) 象棋小子    1048272975 现在主要讲解一下在GCC移植UCGUI,Makefile工程如何加入目录,加入源码,c标准库,编译选项的设置. 笔者的Makefile模板提取自uboot,工程中加入目录,加入源码都是很简单的,详细的介绍请参考前面章节" GCC启动代码工程应用实例".下面主要介绍UCGUI目录下很多的源码文件Makefile的编写,一种可行的方式就是把GUI目录上所有的c文件,不管有无用到,均加入工程

S3C2416裸机开发系列十四_GCC下UCGUI的移植(1)

S3C2416裸机开发系列十四 GCC下UCGUI的移植(1) 象棋小子    1048272975 GUI(图形用户界面)极大地方便了非专业用户的使用,用户无需记忆大量的命令,取而代之的是可以通过窗口.菜单.按键等方式进行操作.在某些场合,设计一款人机界面丰富友好的嵌入式产品能赢得更多的用户.笔者此处就s3c2416基于UCGUI图形用户界面的使用作一个简单的介绍. 1. 代码准备 UCGUI 3.98源码,这个版本的UCGUI是开放源码的最高版本,之后版本只提供库文件,不再开源.笔者以UCG

Storm 系列(四)—— Storm 集群环境搭建

一.集群规划 这里搭建一个 3 节点的 Storm 集群:三台主机上均部署 Supervisor 和 LogViewer 服务.同时为了保证高可用,除了在 hadoop001 上部署主 Nimbus 服务外,还在 hadoop002 上部署备用的 Nimbus 服务.Nimbus 服务由 Zookeeper 集群进行协调管理,如果主 Nimbus 不可用,则备用 Nimbus 会成为新的主 Nimbus. 二.前置条件 Storm 运行依赖于 Java 7+ 和 Python 2.6.6 +,所

Hulu机器学习问题与解答系列 | 十四:如何对高斯分布进行采样

欢迎回到"采样"系列~ 今天的内容是 [如何对高斯分布进行采样] 场景描述 高斯分布,又称正态分布,是一个在数学.物理及工程领域都非常重要的概率分布.在实际应用中,我们经常需要对高斯分布进行采样.虽然在很多编程语言中,直接调用一个函数就可以生成高斯分布随机数,但了解其中的具体算法能够加深我们对相关概率统计知识的理解:此外,高斯分布的采样方法有多种,通过展示不同的采样方法在高斯分布上的具体操作以及性能对比,我们会对这些采样方法有更直观的印象. 问题描述 如果让你来实现一个高斯分布随机数生

JBoss 系列十四:JBoss7/WildFly如何加载外部的文件或properties文件

http://www.tuicool.com/articles/M7ZR3y 原文 http://blog.csdn.net/kylinsoong/article/details/12623997 主题 WildFlyXMLJBoss AS 内容概述 JBoss7/WildFly设底层框架一大亮点的Module class loading,及每一个相对独立的模块看作一个Module,每个Module都有自己的class loader,Module之间可以相互依赖,如下图: 当JBoss7安装完成

Storm系列(十九)普通事务ITransactionalSpout及示例

普通事务API详解 1  public interface ITransactionalSpout<T> extends IComponent { 2      public interface Coordinator<X> { 3          // 事务初始化 4          X initializeTransaction(BigInteger txid, X prevMetadata); 5          // 启动事务,返回true表示开始 6        

Hadoop运维记录系列(十四)

周末去了趟外地,受托给某省移动公司做了一下Hadoop集群故障分析和性能调优,把一些问题点记录下来. 该系统用于运营商的信令数据,大约每天1T多数据量,20台Hadoop服务器,赞叹一下运营商乃真土豪,256G内存,32核CPU,却挂了6块2T硬盘.还有10台左右的服务器是64G内存,32核CPU,4~6块硬盘,据用户反馈,跑数据很慢,而且会有失败,重跑一下就好了. 软件环境是RedHat 6.2,CDH Hadoop 4.2.1. 总容量260多TB,已使用200多T. 首先,这硬件配置属于倒