Storm中Task数的设置与计算(1.0.1版本)

==思考问题1==

向集群提交一个拓扑的时候,Storm是如何计算Task数以及Executor数的?

==思考问题2:==

构建拓扑的时候,有3个地方会影响task数,这3个地方之间有什么关系?

builder.setSpout("spout", new RandomSentenceSpout(), 5); //parallelism-hint
builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTask(1);
builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTasks(1).setMaxTaskParallelism(1);

==关于Task数的计算==

storm的拓扑分配的是在nimbus.clj中完成的。

代码路径:org/apache/storm/daemon/nimbus.clj

函数名称:mk-assignments

在整个处理过程中,有一个函数非常重要,看了之后上面的3个关系多少会清晰很多。

这个代码是用clojure语言编写的,没有用过的人多少看起来会费劲,如果用普通的java来写,大概思路是这个样子的

num-tasks = (TOPOLOGY-TASKS == null ? TOPOLOGY-TASKS : parallelism-hint)
max-parallelism = TOPOLOGY-MAX-TASK-PARALLELISM    

if (max-parallelism != null) {
    if (max-parallelism < num-tasks) {
        task数= max-parallelism
    } else {
        task数= num-tasks
    }
} else {
    task数= num-tasks
}

如果将3个参数进行排列组合之后,获得结果如下:

==关于executor数的来源==

下图是一个从其他博客剪切过来的图片,可以看出:executor数是从我们的拓扑中取得的。

启动的时候,会将所有component的parallelism-hint累加,形成我们大家熟知的“并行度”。

可但是,但可是,我在查看setNumTasks函数的源码注释的时候,可以看出来一个信息

红线部分的直译(英文不好,翻译的可能不准确):

在整个拓扑结构的生命周期内,Spout/Bolt的任务数总是相同的,但是Spout/Bolt的执行器executors(线程)的数量可以随着时间而变化。

那么,问题来了,什么时候executor的数量会变化呢?我能想到的至少有一种场景吧。

使用storm rebalance命令,来动态调整拓扑“并行度”的时候,executor的数量是一定会变化的。

rebalance命令的参数如下所示:

==简单总结==

1、有3个地方可以影响Task数,根据3个参数的结果决定Task数。

2、executor数 = 所有组件的parallelism-hint总数。

3、task数在生命周期内不变,executor数可能改变。

-------------

参考博客:

https://www.cnblogs.com/ierbar0604/p/4386480.html

http://lib.csdn.net/article/60/42875

原文地址:https://www.cnblogs.com/quchunhui/p/8271349.html

时间: 2024-11-10 18:19:57

Storm中Task数的设置与计算(1.0.1版本)的相关文章

Strom Topology执行分析:worker数,Bolt实例数,executor数,task数

在创建Storm的Topology时,我们通常使用如下代码:builder.setBolt("cpp", new CppBolt(), 3).setNumTasks(5).noneGrouping(pre_name); Config conf = new Config(); conf.setNumWorkers(3); 参数1:bolt名称 "cpp"参数2:bolt类型 CppBolt参数3:bolt的并行数,parallelismNum,即运行topology时

Strom Topology执行分析:worker数,Bolt实例数,executor数,task数(转)

来自:http://blog.csdn.net/jmppok/article/details/17244599 在创建Storm的Topology时,我们通常使用如下代码: builder.setBolt("cpp", new CppBolt(), 3).setNumTasks(5).noneGrouping(pre_name); Config conf = new Config(); conf.setNumWorkers(3); 参数1:bolt名称 "cpp"

Storm中Spout使用注意事项小结

Storm中Spout用于读取并向计算拓扑中发送数据源,最近在调试一个topology时遇到了系统qps低,处理速度达不到要求的问题,经过排查后发现是由于对Spout的使用模式不当导致的多线程同步等待.这里罗列几点个人觉得编写Spout代码时需要特别注意的地方: 1. 最常用的模式是使用一个线程安全的queue,如BlockingQueue,spout主线程从queue中读取数据:另外的一个或多个线程负责从数据源(如各种消息中间件.db等)读取数据并放入queue中. 2. 如果不关心数据是否丢

storm源码之理解Storm中Worker、Executor、Task关系【转】

[原]storm源码之理解Storm中Worker.Executor.Task关系 Storm在集群上运行一个Topology时,主要通过以下3个实体来完成Topology的执行工作:1. Worker(进程)2. Executor(线程)3. Task 下图简要描述了这3者之间的关系:                                                    1个worker进程执行的是1个topology的子集(注:不会出现1个worker为多个topology服

Hadoop中map数的计算

转载▼ Hadoop中在计算一个JOB需要的map数之前首先要计算分片的大小.计算分片大小的公式是: goalSize = totalSize / mapred.map.tasks minSize = max {mapred.min.split.size, minSplitSize} splitSize = max (minSize, min(goalSize, dfs.block.size)) totalSize是一个JOB的所有map总的输入大小,即Map input bytes.参数map

storm中的一些概念

1.topology 一个topolgy是spouts和bolts组成的图,通过stream groupings将图中的spout和bolts连接起来:如图所示: 一个topology会一直运行知道你手动kill掉,Storm自动重新分配执行失败的任务,并且Storm可以保证你不会有数据丢失(如果开启了高可靠性的话).如果一些机器意外停机它上面的所有任务会被转移到其他机器上: 运行一个toplogy很简单,首先,把你所有的代码以及所依赖的jar打进一个jar中.然后运行类似下面的命令: stor

Storm中的可靠性

我们知道Storm有一个很重要的特性,那就是Storm API能够保证它的一个Tuple能够被完全处理,这一点尤为重要,其实storm中的可靠性是由spout和bolt组件共同完成的,下面就从spout和bolt两个方便给大家介绍一下storm中的可靠性,最后会给出一个实现了可靠性的例子. 1.Spout的可靠性保证 在Storm中,消息处理可靠性从Spout开始的.storm为了保证数据能正确的被处理, 对于spout产生的每一个tuple,storm都能够进行跟踪,这里面涉及到了ack/fa

storm中的基本概念

Storm是一个流计算框架,处理的数据是实时消息队列中的,所以需要我们写好一个topology逻辑放在那,接收进来的数据来处理,所以是通过移动数据平均分配到机器资源来获得高效率. Storm的优点是全内存计算,因为内存寻址速度是硬盘的百万倍以上,所以Storm的速度相比较Hadoop非常快(瓶颈是内存,cpu).其缺点就是不够灵活:必须要先写好topology结构来等数据进来分析. Storm 关注的是数据多次处理一次写入,而 Hadoop 关注的是数据一次写入,多次查询使用.Storm系统运行

0x3f3f3f3f...编程中无穷大常量的设置技巧

转自 http://aikilis.tk/ 如果问题中各数据的范围明确,那么无穷大的设定不是问题,在不明确的情况下,很多程序员都取0x7fffffff作为无穷大,因为这是32-bit int的最大值.如果这个无穷大只用于一般的比较(比如求最小值时min变量的初值),那么0x7fffffff确实是一个完美的选择,但是在更多的情况下,0x7fffffff并不是一个好的选择. 很多时候我们并不只是单纯拿无穷大来作比较,而是会运算后再做比较,例如在大部分最短路径算法中都会使用的松弛操作:if (d[u]