Storm系列(十三)架构分析之Worker-维护ZMQ连接

Worker根据Topology的定义及分配到自身的任务情况,计算出发出的消息被那些Task接收,由于Worker上分配的任务可能被调整,因此Worker需要定时的更新这些连接信息。

ZMQ连接信息更新

函数:mk-refresh-connections

方法原型:

(defn mk-refresh-connection [worker])

方法说明:

  1. 调用worker-outbound-tasks返回从worker参数接收数据的TaskId集合(outbound-tasks).
  2. 定义this函数及相应的回调函数,并将自己注册到定时器:refresh-connections-timer中。
  3. 回调函数带有callback参数用于监听Zookeeper对应节点变化并同步。
  4. 调用storm-cluster-state的assignment函数获取与storm-id对应的Topology的任务分配。
  5. 调用:executor->node+port存储Executor到node+port的映射关系,再调用to-task->node+port根据Executor中的TaskId集合构建TaskId到node+port的映射关系,利用select-keys函数和outbound-tasks集合进行过滤,得到从该Worker接收消息的TaskId到node+port的映射关系,最后调用map-val和endpoint->string函数获取node+port的字符串表示,最终结果为一个TaskId到node+port的哈希表(my-assignment).
  6. 对my-assingmnet进行过滤,移除所有存在该worker的TaskId结果保存到needed-assignment。
  7. 从needed-assignment中获取目标Worker集合(node_port)存储到needed-connections,从needed-assignment中获取目标节点上的TaskId集合存储到needed-tasks中。
  8. 获取worker节点中缓存的从node+port到ZMQ Socket的哈希表的所有键列表,存储到current-connections中。
  9. 判断哪些连接需要新建,哪些连接可以关闭,分别保存到new-connections和remove-connections中。
  10. 调用msg/connect方法根据new-connections中的node+poer创建新的连接,并更新到cached-node+port。
  11. 将cached-task->node+port更新为my-assignment。
  12. 调用需要删除的Socket的close方法,将这些Socket从:cached-node+port中移除。

应用过程

:refresh-connections-timer(mk-halting-timer)
refresh-connections (mk-refresh-connections worker)
_(refresh-connections nil)
(schedule-recurring (:refresh-connections-timer worker) 0 (conf TASK-REGRESH-POLL-SECS) refresh-connections).

代码说明:

  1. 调用mk-halting-timer创建计时器,该代码在创建Worker数据时调用。
  2. 创建一个用于更新连接的函数,然后立即执行refresh-connections函数更新ZMQ,然后不断执行该函数,执行间隔为TASK-REFRESH-SECS默认为10秒,在mk-worker函数中调用.

从Zookeeper中获取Topology活跃情况

refresh-storm-active函数获取Topology的状态信息.

方法原型:

(defn refresh-storm-active ([worker])…)

方法说明:

  1. 提供匿名函数做为callback参数,在相应Zookeeper节点变化时回调。
  2. 通过:refresh-active-timer计时器完成定期调用,默认为10秒。
    (shcedule-recurring(:refresh-active-timer worker) 0 (conf TASK-FEFRESH-POLL_SECS) (partial refresh-active worker)).
  3. 调用:storm-cluster-state的storm-base方法获取Topology的基础信息。
  4. 判断该Topology是否处于活跃状态(:active),并将判断结果存储于:storm-active-atom变量中。

Worker中接收函数

Mk-transfer-local-fn函数用于产生并发送消息到Executor的接收队列,同一Worker内部的Executor之间通过该函数传递消息。

方法原型:

(defn mk-transfer-local-fn [worker])

方法说明:

  1. 调用:short-executor-receive-queue-map返回Executor中第一个Task的TaskId到该Executor对应的接收队列的映射关系,保存到short-executor-queue-map变量。
  2. 调用task->short-executor返回从该Worker中的TaskId到Executor中第一个Task的TaskId的映射关系。
  3. 定义返回的匿名函数,该函数输入为ZMQ收到的一组消息tuple-batch,按照与消息TaskId对应的Executor中第一个Task的TaskId对消息进行分组,变量grouped对应的键为Executor中第一个Task的TaskId,值为属于该Executor的一组消息。
  4. 通过short-executor-receive-queue-map得到TaskId与Executor相对应的接收消息队列q。
  5. 调用disruptor/publish方法将收到的消息发送到队列q.

Woker中的发送函数

Mk-transfer-fn用于Executor的数据发送,分别有以下两种情况:

  1. 目标TaskId与发送TaskId属于同一个Worker,此时不需要跨进程传输消息,可将消息通过mk-tansfer-local-fn发送至接收端Executor的接收队列。
  2. 消息的目标TaskId跟发送TaskId属于不同的Worker中,此时则将消息序列化(KryoTupleSerializer)后发送至Worker的发送队列,由Worker(mk-tranfer-tuples-handler)负责将队列的消息通过ZMQ发送出去.

方法原型:

defn mk-transfer [worker]

该方法返回一个参数为系列化器serializer和一组消息的函数。

不同Worker间的通信

Worker中有一个额外的线程对transfer-queue(worker对应的消息发送队列)进行监听,Mk-transfer-tuples-handler用于创建对应的消息处理器。

方法原型:

defn mk-transfer-tuples-handler [worker]
  1. 调用cached-node+port获取Worker中与目标node+port相对应的ZMQ Socket连接,保存到node+port->socket.
  2. 调用worker-data的cached-task->node+port获取TaskId到node+port的映射,保存到task->node+port.
  3. 定义一个clojure-handler,对应的函数定义为fn [packets _ batch-end?],第一个参数为一组消息packets,第二个忽略,第三个为为结束标记。
  4. 调用msg/seng函数将消息发送出去。

 

发送监听线程的启动:

transfer-tuples (mk-transfer-tuples-handler worker)
transfer-thread (disruptor /consume-loop* (:transfer-queue worker) transfer-tuples)
时间: 2024-09-28 10:10:37

Storm系列(十三)架构分析之Worker-维护ZMQ连接的相关文章

Storm系列(十一)架构分析之Supervisor-管理Worker进程的事件线程

处理流程:   方法原型: (defn sync-processes [supervisor]) 函数说明: Supervisor是一个supervisor-data对象. 从local-state中获取LS_LOCAL_ASSIGNMENTS集合<port,Assignment>集合,保存到assigned-executors. 调用read-allocated-workers获取当前已经分配的Worker信息,<worker-id,<worker state,worker he

Qualcomm Android display架构分析

Android display架构分析(一) http://blog.csdn.net/BonderWu/archive/2010/08/12/5805961.aspx http://hi.baidu.com/leowenj/blog/item/429c2dd6ac1480c851da4b95.html 高通7系列硬件架构分析 如上图,高通7系列 Display的硬件部分主要由下面几个部分组成: A.MDP 高通MSM7200A内部模块,主要负责显示数据的转换和部分图像处理功能理,如YUV转RG

Cocos2d-x 3.0final 终结者系列教程04-引擎架构分析

从前有个跟我学Android的学生,老是问我: 沈老师,为什么Android中的窗口叫Activity,为什么要在onCreate方法中写setContentView(R.layout.main)? 我说: 你能不能按照我教你的实现一个窗口 第一步在AndroidManifest.xml中添加一个Activity标签 第二步写一个类继承Activity并覆盖onCreate方法 他说: 我想知道为什么要实现Activity和onCreate方法,之后我才能完成这个练习. 我说: 你实现了Acti

Storm 系列(三)Storm 集群部署和配置

Storm 系列(三)Storm 集群部署和配置 本章中主要介绍了 Storm 的部署过程以及相关的配置信息.通过本章内容,帮助读者从零开始搭建一个 Storm 集群.相关的过程和主要的配置选项是 Storm 的运维人员需要重点关注的,对部署和配置选项不感兴趣的读者,可以跳过本章. 在开始 Storm 之旅前,我们先看一下 Storm 部署和配置的相关信息,并提交一个 Topology,了解 Storm 的基本原理.Storm 的部署模式包括单机和集群环境,同时在向 Storm 环境中提交 To

tomcat架构分析 (connector NIO 实现)

出处:http://gearever.iteye.com 上一篇简单记录了缺省配置的connector的内部构造及消息流,同时此connector也是基于BIO的实现.除了BIO外,也可以通过配置快速部署NIO的connector.在server.xml中如下配置: Xml代码 <Connector port="80" URIEncoding="UTF-8" protocol="org.apache.coyote.http11.Http11NioPr

Storm系列二: Storm拓扑设计

Storm系列二: Storm拓扑设计 在本篇中,我们就来根据一个案例,看看如何去设计一个拓扑, 如何分解问题以适应Storm架构,同时对Storm拓扑内部的并行机制会有一个基本的了解. 本章代码都在: [email protected]:zyzdisciple/storm_study.git 项目下的 user_behavior包下. 问题案例 有这样一种场景,在前端存在会话,我们会不断收到来自前端的消息,消息包含消息的发送时间,消息内容,结束标识, 消息的发送者, SessionId等其他信

HRMS(人力资源管理系统)-从单机应用到SaaS应用-架构分析(功能性、非功能性、关键约束)-上篇

一.开篇 上一篇<HRMS(人力资源管理系统)-从单机应用到SaaS应用-系统介绍>我们已经详细的分析了HRMS系统具备的功能,并且从HRMS系统的概念.系统功能.HR行业管理现状及痛点.发展趋势及行业前景.行业内的服务提供商情况.HRMS系统的建设意义及价值等方面进行了系统化的分析梳理.我想大家已经对于HRMS系统的大体情况有了初步的了解,本篇将对HRMS系统的需求进行全方位的梳理(功能性需求.非功能性需求.系统约束等),这对于HRMS系统的架构设计来说是核心关键,是架构能否成功的前提.这也

Storm系列三: Storm消息可靠性保障

Storm系列三: Storm消息可靠性保障 在上一篇 Storm系列二: Storm拓扑设计 中我们已经设计了一个稍微复杂一点的拓扑. 而本篇就是在上一篇的基础上再做出一定的调整. 在这里先大概提一下上一篇的业务逻辑, 我们会不断收到来自前端的消息,消息包含消息的发送时间,消息内容,结束标识, 消息的发送者, SessionId等其他信息, 我们需要做的事情是当接收到消息之后,根据SessionId判断是否属于同一消息, 如果是的话将内容拼接, 如果结束标识为 true, 表示会话已结束,则存

[铁道部信息化管理]核心业务需求及逻辑架构分析

12306的已知信息.数据及问题 需求分析(一)-- 售票系统领域知识(区间票.订票.预留票) 需求分析(二)-- 涉众.用户体验 核心业务需求及逻辑架构分析 需求分析(三)-- 票仓 票仓设计(一)-- 预生成车票方案的优缺点 票仓设计(二)-- 区间二进制方案的优缺点 票仓设计(三)-- 平衡方案的优缺点 票务并发冲突处理原则设计(基于平衡方案) 缓存逻辑架构设计 数据库逻辑设计 灾难备份与恢复 快要太监了 :-( 由于各种个人原因, 铁道部的这个博文系列中止了很久.最近终于连自己都不好意思