storm并发机制,通信机制,任务提交

一、storm的并发

  (1)Workers(JVMs):在一个物理节点上可以运行一个或多个独立的JVM进程。一个Topology可以包含一个或多个worker(并行的跑在不同的物理机上),所以worker process就是执行一个topology的子集, 并且worker只能对应于一个topology

  (2)Executors(threads):在一个workerJVM进程中运行着多个Java线程。一个executor线程可以执行一个或多个tasks。但一般默认每个executor只执行一个task。一个worker可以包含一个或多个executor,每个component(spout/bolt)至少对应于一个executor,所以可以说executor执行一个compenent(spout/bolt)的子集,同时一个executor只能对应于一个component(spout/bolt)。 
  (3)Tasks(bolt/spout instances):Task就是具体的处理逻辑对象,每一个Spout和Bolt会被当作很多task在整个集群里面执行。每一个task对应到一个线程,而stream grouping则是定义怎么从一堆task发射tuple到另外一堆task。

  (4)对于并发度的配置, 在storm里面可以在多个地方进行配置,在defaults.yaml,storm.yaml,topology-specific configuration,internal component-specific configuration,external component-specific configuration 中均可以对并发度进行配置
  (5)worker processes的数目, 可以通过配置文件和代码中配置, worker就是执行进程, 所以考虑并发的效果, 数目至少应该大亍machines的数目 
executor的数目, component的并发线程数,只能在代码中配置(通过setBolt和setSpout的参数)
  (6)tasks的数目, 可以不配置, 默认和executor1:1, 也可以通过setNumTasks()配置Topology的worker数通过config设置,即执行该topology的worker(java)进程数,可以通过storm rebalance命令任意调整。

二、storm的通信机制

  worker进程间消息传递机制,消息的接收和处理的大概流程如下图:

  

  (1)对于worker进程来说,为了管理流入和传出的消息,每个worker进程有一个独立的接收线程[一个worker进程运行一个专用的接收线程来负责将外部发送过来的消息移动到对应的executor线程的incoming-queue中](对配置的TCP端口supervisor.slots.ports进行监听);

  (2)对应Worker接收线程,每个worker存在一个独立的发送线程[transfer-queue的大小由参数topology.transfer.buffer.size来设置。transfer-queue的每个元素实际上代表一个tuple的集合],它负责从worker的transfer-queue中读取消息,并通过网络发送给其他worker

  (3)每个executor有自己的incoming-queue[executor的incoming-queue的大小用户可以自定义配置。]和outgoing-queue[executor的outgoing-queue的大小用户可以自定义配置]。Worker接收线程将收到的消息通过task编号传递给对应的executor(一个或多个)的incoming-queues;

  (4)每个executor有单独的线程分别来处理spout/bolt的业务逻辑,业务逻辑输出的中间数据会存放在outgoing-queue中,当executor的outgoing-queue中的tuple达到一定的阀值,executor的发送线程将批量获取outgoing-queue中的tuple,并发送到transfer-queue中。

  (5)每个worker进程控制一个或多个executor线程,用户可在代码中进行配置。其实就是我们在代码中设置的并发度个数。

三、storm的任务提交

  storm的任务提交机制大概如下图:

  

  (1)客户端提交topology给nimbus

  (2)提交的的jar包会上传到nimbus服务器的nimbus/inbox目录下,submitTopology方法会负责对这个topology进行处理,处理的过程包括对topology和storm的各种校验,首先检查storm的状态是否active,是否有同名的topology已经在storm中运行了(因为spout和bolt会指定id,storm会检查是否有使用了相同id的spout和bolt,注:任何一个id都不能以"_"开头,这种命名方式是系统保留的),之后nimbus还会建立topology的本地目录,nimbus/stormdist/topology-uuid,该目录包含三个文件,stormjar.jar(包含这个topology所有代码的jar包,从nimbus/inbox中移动过来),stormcode.ser(topology对象的序列化),stormconf.ser(运行这个topology的配置)

  (3)完成上述检查和建立目录的工作后,nimbus根据topology定义中的parallelism hint参数,来给spout/bolt设定task数目,并且分配对应的task id,最后把分配好的task信息写入到zookeeper的/task目录下

  (4)nimbus会在zookeeper上创建taskbeats目录,要求每个task每隔一段时间就要发个心跳信息

  (5)nimbus将分配好的任务,写入zookeeper,同时将topology的信息写入zookeeper/storms目录

  (6)supervisor定期扫描zookeeper上的storms目录,下载新的任务,根据nimbus指定的任务启动worker工作,同时supervisor还会定期删除不再运行的topology

  (7)worker根据分配到的任务,根据task id分辨出哪些spout和bolt,并创建网络连接用来做消息的发送。

原文地址:https://www.cnblogs.com/jiyukai/p/9472301.html

时间: 2024-10-09 04:36:34

storm并发机制,通信机制,任务提交的相关文章

消息通信机制NSNotificationCenter -备

消息通信机制NSNotificationCenter的学习.最近写程序需要用到这类,研究了下,现把成果和 NSNotificationCenter是专门供程序中不同类间的消息通信而设置的,使用起来极为方便, 长话短说. 设置通知,就是说要在什么地方(哪个类)接受通知,一般在初始化中做. [[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(test:) name:@"test" object:

LMT NEW PBS作业排队系统的队列通信机制

LMT NEW PBS作业排队系统的队列通信机制 LMT NEW PBS作业排队运算系统提供了一种队列通信机制,允许消息按某种排队规则存储到持续介质或非持续介质(如内存)中,然后再转发给其它处理进程.这种存储转发机制可以保证在两个通信实体之间传递的消息不丢失.不重传,从而保证交易的完整性. LMT NEW PBS的队列通信 LMT NEW PBS的队列通信用到了两个服务器:消息队列服务器和消息转发服务器.消息队列服务器用于对消息进行出队入队管理,消息转发服务器用于将消息从队列中取出,转发给服务器

linux之通信机制

通信是一个比较重要的概念.只要存在多于一个执行单元(并发),就有可能存在通信. linux上的并发主要分为多进程(任务)和多线程.linux也提供了多个通信机制来支持不同进程或者不同线程之间的信息传递. 通信方式主要包括管道,套接字,消息队列,共享内存,信号量,互斥量,信号(如kill -0检测进程是否正常)等. 管道: 1. CGI技术.deamon进程接收一个请求后,创建子进程执行CGI程序,通过无名管道跟子进程进行通信,父进程负责写fd[0],子进程负责写fd[1].然后deamon程序通

LMT NODE PBS作业排队系统的队列通信机制

LMT NODE PBS作业排队系统的队列通信机制 LMT NODE PBS作业排队运算系统提供了一种队列通信机制,允许消息按某种排队规则存储到持续介质或非持续介质(如内存)中,然后再转发给其它处理进程.这种存储转发机制可以保证在两个通信实体之间传递的消息不丢失.不重传,从而保证交易的完整性. LMT NODE PBS的队列通信 LMT NODE PBS的队列通信用到了两个服务器:消息队列服务器和消息转发服务器.消息队列服务器用于对消息进行出队入队管理,消息转发服务器用于将消息从队列中取出,转发

Storm ack和fail机制再论

之前对这个的理解有些问题,今天用到有仔细梳理了一遍,记录一下   首先开启storm tracker机制的前提是, 1. 在spout emit tuple的时候,要加上第3个参数messageid 2. 在配置中acker数目至少为1 3. 在bolt emit的时候,要加上第二个参数anchor tuple,以保持tracker链路   流程, 1. 当tuple具有messageid时,spout会把该tuple加到pending list里面    并发消息给acker,通知acker开

python—day29 守护进程、互斥锁、模拟抢票、IPC通信机制、生产者消费者模型

1.守护进程: 什么是守护进程,假如你是皇帝,每日每夜守护你的就是太监,守护进程就相当于太监,当皇帝驾崩后太监也需要陪葬,所以守护进程当父进程销毁时就一起销毁: 1 from multiprocessing import Process 2 3 import time 4 5 def task(name): 6 7 time.sleep(0.5) 8 print('%s' %name) 9 10 11 if __name__ == '__main__': 12 p = Process(targe

Android中的常见通信机制和Linux中的通信机制

Handler Handler是Android系统中的一种消息传递机制,起作用是应对多线程场景.将A进程的消息传递给B线程,实现异步消息处理.很多情况是将工作线程中需要更新UI的操作消息传递给UI主线程,而实现更新UI操作. 因为工作线程和主线程是共享地址空间,即Handler实例对象mHandler位于线程间共享的内存堆上,工作线程和主线程直接使用该对象,只需要注意多线程的同步问题.工作系统通过mHandler向其成员变量MessageQueue中添加Message,而主线程一直处于loop中

TensorFlow中的通信机制——Rendezvous(一)本地传输

背景 [作者:DeepLearningStack,阿里巴巴算法工程师,开源TensorFlow Contributor] 在TensorFlow源码中我们经常能看到一个奇怪的词--Rendezvous.如果从仔细统计该单词出现的频率和模块,你会发现无论在单机还是分布式,无论在core目录还是contrib目录都存在它的身影,所涉及的模块非常多.Rendezvous是一个法语单词,发音也比较特殊,一般直译为"约会.相会.会和",而在TensorFlow中,Rendezvous是用来完成消

PHP-FPM 与 Nginx 的通信机制总结

PHP-FPM 介绍 CGI 协议与 FastCGI 协议 每种动态语言( PHP,Python 等)的代码文件需要通过对应的解析器才能被服务器识别,而 CGI 协议就是用来使解释器与服务器可以互相通信.PHP 文件在服务器上的解析需要用到 PHP 解释器,再加上对应的 CGI 协议,从而使服务器可以解析到 PHP 文件. 由于 CGI 的机制是每处理一个请求需要 fork 一个 CGI 进程,请求结束再kill掉这个进程,在实际应用上比较浪费资源,于是就出现了CGI 的改良版本 FastCGI