Storm概念、原理详解及其应用(一)BaseStorm

本文借鉴官文,添加了一些解释和看法,其中有些理解,写的比较粗糙,有问题的地方希望大家指出。写这篇文章,是想把一些官文和资料中基础、重点拿出来,能总结出便于大家理解的话语。与大多数“wordcount”代码不同的是,并不会有如何运行第一storm代码等内容,只有在运行完代码后,发现需要明白:“知其然,并知其所以然”。

Storm是什么?为什么要用Storm?为什么不用Spark?

第一个问题,以下概念足以解释:

Storm是基于数据流的实时处理系统,提供了大吞吐量的实时计算能力。通过数据入口获取每条到来的数据,在一条数据到达系统的时候,立即会在内存中进行相应的计算;Storm适合要求实时性较高的数据分析场景。

第二问题:

很多场景下,我们希望系统能够实时的处理一条数据、甚至是事务。也就是说,在处理数据、事务的过程中,到达系统,并能马上得到结果。其次,在成万上亿条数据大量涌入系统时,也要求“实时”的到事务处理的结果。此时,单个节点已经是杯水车薪了,而Storm的关键一项是因为它支持分布式并行计算!如果说,你遇到了以上相似的场景,那Storm可以当仁不让的扛起实时处理的大旗!

第三个问题:

这个问题其实很难界定,因为Spark在RDD粒度上,可以满足实时计算的要求,当然,使用RDD还有其他优势;但总的来说,Storm 的实时性更强。其次,Storm的框架完全按照流式处理的思想构建,和项目场景结合性更强一些。(Spark 用的不是很多,欢迎吐槽。)

进入正题,

在看Storm之前,很多人都对Hadoop有一定了解,为了能更快入戏,我们以Hadoop为参照,以下是它使用yarn之前的架构,对照Storm Server框架理解。

Hadoop、Storm系统和组件接口对比表:

Storm框架:

上面这幅图是Stom框架图,和很多分布式系统一样,基于zk作为集群配置运行的元数据基础平台。

nimbus和supervisor是服务器端守护进程,守护进程的文章会在Storm概念、原理详解及其应用(二)Storm Cluster

以下是对启动一个应用所需要的集群上JVM进程线程的简单介绍,建议记忆后再继续阅读。

· Nodes (服务器):指配置在一个 Storm 集群中的服务器,会执行 topology 的一部分

运算。一个 Storm 集群可以包括一个或者多个工作 node 。

· Workers (JVM 虚拟机):指一个 node 上相互独立运行的 JVM 进程。每个 node 可

以配置运行一个或者多个 worker 。一个topology 会分配到一个或者多个 worker 上

运行。

· Executor (线程):指一个 worker 的jvm 进程中运行的 Java 线程。多个 task 可以

指派给同一个 executer 来执行。除非是明确指定, Storm 默认会给每个 executor 分

配一个 task。

· Task (bolt/spout 实例): task 是 spout 和bolt 的实例, 它们的 nextTuple() 和

execute() 方法会被executors 线程调用执行。

例如:



builder.setSpout(spoutName, spout, spoutParallelism).setNumTasks(2)

这里可以定义spoutParallelism = 2,即对应两个executor线程,tasks为两个实例。

(此处配置的原理,会在接下来会讲到worker和并发中解释。)

可以看出,虽然在这设置了多个task实例,但是并行度并没有提高(而executor在不同的worker上执行,存在并行),因为只有两个线程去运行这些实例,只有设置足够多的线程和实例才可以真正的提高并行度;在这设置多个实例主要是为了下面执行rebalance的时候用到。

为什么要用rebalance?

这里一直在启动、操作的是“线程”,真正的process需要在配置中设置worker数量,也就是说topology启动时已经决定了worker数量(即并行数量)。因为rebalance不需要修改代码,就可以动态修改topology的并行度,这样的话就必须提前配置好多个实例,在rebalance的时候主要是对之前设置多余的任务实例分配线程去执行。

在命令行动态修改并行度:

除了使用代码进行调整,还可以在shell命令行下对并行度进行调整。

storm rebalance mytopology -w 10 -n 2 -e spout=2 -e bolt=2

表示 10秒之后对mytopology进行并行度调整。把spout调整为2个executor,把bolt调整为2个executor

注意:并行度主要就是调整executor的数量,但是调整之后的executor的数量必须小于等于task的数量,如果分配的executor的线程数比task数量多的话也只能分配和task数量相等的executor。

概念:

官方对于Storm下名词概念的解释如下:

1、Topologies

2、Streams

3、Spouts

4、Bolts

5、Stream groupings

6、Reliability

7、Tasks

8、Workers

1、Topologies(拓扑)

Topology是Storm中实时应用的一种封装。其功能 analogous to a MapReducejob,但唯一不同的是它是循环执行的——无数据流等待,有数据流执行,直到被kill progress。

一个Topology是spouts和bolts组成并被Stream groupings连接的一副流程图,相关概念如下:

Resources:

Topology结构:

2、Streams (流)

Stream在Storm中是一个核心的抽象概念。一个流是由无数个元组序列构成,这些元组并行、分布式的被创建和执行。在stream的许多元组中,Streams被定义为以Fields区域命名的一种模式。默认情况下,元组支持:integers, longs, shorts, bytes, strings, doubles, floats, booleans, and byte arrays. 你也可以定义自己的序列化器,使这种风格类型能够被自然的使用在元组中。

每一个Stream在声明的时候都会赋予一个id。单个Stream——spouts和bolts,可以使用OutputFieldsDeclarer 的convenience方法声明一个stream,而不用指定一个id。但是这种方法会给予一个默认的id——default,相关概念如下:

Resources:

  • Tuple: streams are composed of tuples:Tuple是一个interface,对应的实现类 TupleImpl。
  • OutputFieldsDeclarer: used to declare streams and their schemas
  • Serialization: Information about Storm‘s dynamic typing of tuples and declaring custom serializations

Ps:Storm中的tuple是接口,没有具体实现,但原话这么解释的:

Storm needs to know how to serialize all the values in a tuple. By default, Storm  * knows how to serialize the primitive types, strings, and byte arrays.

3、Spouts

在Topology中,每个Spout都是一个Streams源,通常情况下,Spouts会从外部源读取Tuple,并输入这些Tuple到Topology中。

Spouts既是可靠的又是不可靠的,因为,可靠的spout会在发送Tuple失败的情况下,重复发送;相反,不可靠的spout会忘记它发送过的Tuple,无论是否成功。

Spout代码过程:

Spouts能够发送多个流:使用OutputFieldsDeclarer(interface)的declareStream方法声明多个流,并且当使用SpoutOutputCollector(实现2,接口模式)的emit方法可以指定这个流去发送Tuple。

Spouts的主要方法之一是:nextTuple() 发送tuple,nextTuple可以发送一个新的Tuple到Topology,或者当没有新的Tuple被发送的时候,就简单的返回。对于任何spout的实现,nextTuple都不能阻塞,因为Storm调用的所有spout都是基于同一个线程!

其次是 ack 和 fail 方法,它们都会被调用,当Storm发现一个tuple被从spout发射后,要么成功地完成的通过topology,要么错误的完成。ack 和 fail 方法只有在可靠的spouts下才能被调用。spout可靠性,请搜本页下面内容,或移至代码。

Resources:

Ps:nextTuple()方法中会发送Tuple,至于那种对象能发送,请看上述。

Qu:1、在代码中如何让声明的留和发送tuple联系起来,因为声明流的名称并不是tuple对象名?

2、是Storm中Spout的nextTuple对应一个线程,还是多个Spout对应一个线程?

answer:在集群中,应该是每个node的JVM中启动一个线程跑spout

4、Bolts

在Topologies中所有的处理都会在bolts中被执行,它能够过滤tuple、函数操作、合并(连接join、聚合aggregation)、数据库读写等。Bolt可以做复杂的流传输,需要多步骤、多bolt的连接。

Bolt也可以发射出一个或多个流,它需要使用OutputFieldsDeclarer 类的 declareStream 方法声明多个流,并且需要指定这个流去使用OutputCollectorl类的emit方法去发射。

当你声明一个bolt的输入流时,你需要订阅一个指定的其他组件的流。每一个流的订阅都是一个个添加。InputDeclarer类可以声明一个流在默认的流id上。 declarer.shuffleGrouping("1") 说明在组件“1”上订阅了这个默认流,等价于declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)。

Bolts的主要方法是execute 方法,它会吸收作为输入的一个新Tuple。Bolts使用 OutputCollector  对象发射新的Tuples。Bolts必须对每一个tuple调用OutputCollector 的ack 方法,以便于Storm知道什么时候元组们被处理完成(可以最终确定它的安全对于包装这个初始化spout tuples)。 共同处理一个输入元组的情况下,发射0或多个元组们基于元组,然后包装输入元组,Storm提供一个IBasicBolt接口的自动包装。

在Bolts异步处理的时候,完全可以启动新线程;同时OutputCollector是线程安全的,可以在任何时候被调用。

Resources:

Ps:bolt发送或接收的数据流都可以多对多的进行。

5、Stream groupings 流分组

定义一个拓扑部分是指定了每个bolt门闩的流都应该作为输入被接收。一个流分组定义为:在门闩的任务之中如何区分流。

在Storm中有8种流分组方式,通过实现CustomStreamGroupingj接口,你可以实现一种风格流分组方式:

Storm 定义了八种内置数据流分组的方式:

1、Shuffle grouping(随机分组):这种方式会随机分发 tuple 给bolt 的各个 task,每个bolt 实例接收到的相同数量的 tuple 。

2、Fields grouping(按字段分组):根据指定字段的值进行分组。比如说,一个数据流根据“ word”字段进行分组,所有具有相同“ word ”字段值的 tuple 会路由到同一个 bolt 的 task 中。

3、All grouping(全复制分组):将所有的 tuple 复制后分发给所有 bolt task。每个订阅数据流的 task 都会接收到 tuple 的拷贝。

4、Globle grouping(全局分组):这种分组方式将所有的 tuples 路由到唯一一个 task 上。Storm 按照最小的 task ID 来选取接收数据的 task 。注意,当使用全局分组方式时,设置 bolt 的 task 并发度是没有意义的(spout并发有意义),因为所有 tuple 都转发到同一个 task 上了。使用全局分组的时候需要注意,因为所有的 tuple 都转发到一个 JVM 实例上,可能会引起  Storm 集群中某个 JVM 或者服务器出现性能瓶颈或崩溃。

5、None grouping(不分组):在功能上和随机分组相同,是为将来预留的。

6、Direct grouping(指向型分组):数据源会调用 emitDirect() 方法来判断一个 tuple 应该由哪个 Storm 组件来接收。只能在声明了是指向型的数据流上使用。

7、Local or shuffle grouping (本地或随机分组):和随机分组类似,但是,会将 tuple 分发给同一个 worker 内的bolt task (如果 worker 内有接收数据的 bolt task )。其他情况下,采用随机分组的方式。取决于topology 的并发度,本地或随机分组可以减少网络传输,从而提高 topology 性能。

8、Partial Key grouping: The stream is partitioned by the fields specified in the grouping, like the Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed. This paper provides a good explanation of how it works and the advantages it provides.

Resources:

  • TopologyBuilder: use this class to define topologies
  • InputDeclarer: this object is returned whenever setBolt is called onTopologyBuilder and is used for declaring a bolt‘s input streams and how those streams should be grouped

6、Reliability

Storm保证每一个spout tuple都将会在拓扑中完整的被处理。处理过程:它会追踪这个tuple tree被每一个spout tuple所触发,并且确定tuple tree已经成功完成。每个拓扑都有一个“信息超时”与之相关联。假如Storm未能检测到一个spout tuple已经超时完成,它将舍弃并重新执行这个tuple。

为了改善Storm的可靠性能力,你可以告诉Storm什么时候需要在元组树种创建一个新的边界,告诉Storm无论在什么时候都可以完成处理一个独立的tuple。Bolt们都使用了OutputCollector对象去发射tuple。“锚定”(实际上就是mark)的完成于这个emit方法,你可以声明一个元组使用了ack方法而被完成。

以上详细的解释了可靠消息处理。

7、Tasks

每个喷口spout或者门闩bolt都有许多任务在集群中执行。每一个任务对应一个执行线程,流分组定义了如何从一个任务集到另外一个任务集发送元组。你可以使用TopologyBuilder 类的setSpout和setBolt方法,为每一个spout或bolt是设置并行度和并发度。

Ps:Tasks可以理解为每个节点上的任务实例,运行在对应executor线程上。

8、Workers

拓扑执行要通过一个或多个worker进程。每一个worker进程都是一个物理的JVM和这个拓扑中执行了一个所有这个任务的子集。

例子:如果拓扑的联合并发数为300,分配了50个worker,因此每一个worker将会执行6个task(task将作为worker的线程)。Storm将会均匀的分配任务到所有worker上。

Resources:

Worker结构:

Topology的并发机制:

storm的Worker、Executor、Task默认配置都是1

1、增加worker(本地模式无效,只有一个JVM)

Config对象的setNumWorkers()方法:

Config config = new Config();

config.setNumWorkers(2):

2、配置executor 和 task

默认都为1,setXXX指定一个Worker中有几个线程,而后面的setNumXXX指定总共需要执行的tasks数量,因此,一个Thread--Executor中需要跑tasks/threads个任务。

topologyBuilder.setSpout(SENTENCE_SPOUT_ID, spout, 2);

// StormBaseSpout -> StormBaseBolt

topologyBuilder.setBolt(SPLIT_BOLT_ID, bolt).setNumTasks(2).shuffleGrouping(SENTENCE_SPOUT_ID);

// StormBaseBolt -> StormBaseBoltSecond

topologyBuilder.setBolt(COUNT_BOLT_ID, boltSecond, 4).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));

// StormBaseBoltSecond -> StormBaseBoltThird

topologyBuilder.setBolt(REPORT_BOLT_ID, boltThird).globalGrouping(COUNT_BOLT_ID);

storm的处理保障机制:

1、spout的可靠性

spout会记录它所发射出去的tuple,当下游任意一个bolt处理失败时spout能够重新发射该tuple。在spout的nextTuple()发送一个tuple时,为实现可靠消息处理需要给每个spout发出的tuple带上唯一ID,并将该ID作为参数传递给SpoutOutputCollector的emit()方法:collector.emit(new Values("value1","value2"), tupleID);

实际上Values extends ArrayList<Object>

保障过程中,每个bolt每收到一个tuple,都要向上游应答或报错,在tuple树上的所有bolt都确认应答,spout才会隐式调用ack()方法表明这条消息(一条完整的流)已经处理完毕,将会对编号ID的消息应答确认;处理报错、超时则会调用fail()方法。

2、bolt的可靠性

bolt的可靠消息处理机制包含两个步骤:

a、当发射衍生的tuple,需要锚定读入的tuple

b、当处理消息时,需要应答或报错

可以通过OutputCollector中emit()的一个重载函数锚定或tuple:collector.emit(tuple, new Values(word)); 并且需要调用一次this.collector.ack(tuple)应答。

以上就是storm的基础概念,阅读完后并不能满足你去实现代码的需求,因为需要一个可demo代码,作为模仿的基础。这里就不做提供了,毕竟网上一大堆。

最近在研究Storm源代码,不想与“源码分析”一样只告诉该类代码:结构、方式、用到了什么技术,而希望写一些“特殊”的内容;当然有可能也不能免俗,但尽力写点不同的东西。

内容有不妥的地方,希望大家指正,希望能一起进步,文笔欠佳,见谅。

此处配置的原理,会在接下来会讲到worker和并发解释。

时间: 2024-10-12 11:32:51

Storm概念、原理详解及其应用(一)BaseStorm的相关文章

理解泛函的概念(图像处理中的数学原理详解)

全文目录请见 图像处理中的数学原理详解(Part1 总纲) http://blog.csdn.net/baimafujinji/article/details/48467225 2.4  从泛函到变分法 作为数学分析的一个分支,变分法(Calculus of Variations)在物理学.经济学以及信息技术等诸多领域都有着广泛而重要的应用.变分法是研究依赖于某些未知函数的积分型泛函极值的普遍方法.换句话说,求泛函极值的方法就称为是变分法. 2.4.1  理解泛函的概念 变分法是现代泛函分析理论

SVM-支持向量机原理详解与实践之一

目录(?)[+] 前言 SVM机器学习与深度学习 人工智能领域 机器学习与深度学习 SVM简介 SVM原理分析 快速理解SVM原理 线性可分和线性不可分 函数间隔和几何间隔 超平面分析与几何间隔详解 二次最优化 SVM-支持向量机原理详解与实践 前言 去年由于工作项目的需要实际运用到了SVM和ANN算法,也就是支持向量机和人工神经网络算法,主要是实现项目中的实时采集图片(工业高速摄像头采集)的图像识别的这一部分功能,虽然几经波折,但是还好最终还算顺利完成了项目的任务,忙碌一年,趁着放假有时间好好

SVM -支持向量机原理详解与实践之二

SVM -支持向量机原理详解与实践之二 SVM原理分析 以下内容接上篇. 拉格朗日对偶性(Largrange duality)深入分析 前面提到了支持向量机的凸优化问题中拉格朗日对偶性的重要性. 因为通过应用拉格朗日对偶性我们可以寻找到最优超平面的二次最优化, 所以以下可以将寻找最优超平面二次最优化(原问题),总结为以下几个步骤: 在原始权重空间的带约束的优化问题.(注意带约束) 对优化问题建立拉格朗日函数 推导出机器的最优化条件 最后就是在对偶空间解决带拉格朗日乘子的优化问题. 注:以上这个四

Influxdb原理详解

本文属于<InfluxDB系列教程>文章系列,该系列共包括以下 15 部分: InfluxDB学习之InfluxDB的安装和简介 InfluxDB学习之InfluxDB的基本概念 InfluxDB学习之InfluxDB的基本操作 InfluxDB学习之InfluxDB的HTTP API写入操作 InfluxDB学习之InfluxDB数据保留策略(Retention Policies) InfluxDB学习之InfluxDB连续查询(Continuous Queries) InfluxDB学习之

SVM -支持向量机原理详解与实践之三

SVM -支持向量机原理详解与实践之三 什么是核 什么是核,核其实就是一种特殊的函数,更确切的说是核技巧(Kernel trick),清楚的明白这一点很重要. 为什么说是核技巧呢?回顾到我们的对偶问题:     映射到特征空间后约束条件不变,则为:     在原始特征空间中主要是求,也就是和的内积(Inner Product),也称数量积(Scalar Product)或是点积(Dot Product),映射到特征空间后就变成了求,也就是和的映射到特征空间之后的内积,就如我前面所提到的在原始空间

[转] VLAN原理详解

VLAN原理详解 标签: VLANAccess-LinkTrunk-Link802.1QISL 2013-07-26 18:05 27901人阅读 评论(15) 收藏 举报  分类: 网络通信/流媒体(30)  目录(?)[+] 1.为什么需要VLAN 1.1 什么是VLAN? VLAN(Virtual LAN),翻译成中文是“虚拟局域网”.LAN可以是由少数几台家用计算机构成的网络,也可以是数以百计的计算机构成的企业网络.VLAN所指的LAN特指使用路由器分割的网络——也就是广播域. 在此让我

LAMP平台搭建及其原理详解

LAMP平台搭建及其原理详解 LAMP平台搭建基础概念 LAMP:提到LAMP很多人会认为LAMP是Linux ,Apache,Mysql,PHP.但是随着技术的不断发展,当今的Lamp,已经不仅仅是这么简单了,这里我们的P除了PHP其实还包括:phython,perl    .而M也不仅仅指的是mysql,也包括mariadb. LAMP平台顾名思义就是Linux,apache,mysql(mariadb),php(phython,perl)的结合.按照他们的结合方式不同,大致可以分成三类:

块级格式化上下文(block formatting context)、浮动和绝对定位的工作原理详解

CSS的可视化格式模型中具有一个非常重要地位的概念——定位方案.定位方案用以控制元素的布局,在CSS2.1中,有三种定位方案——普通流.浮动和绝对定位: 普通流:元素按照先后位置自上而下布局,inline元素水平排列,直到行被占满后换行,block元素则被渲染为完整的一行,除非指定,所有元素默认为普通流定位. 浮动:浮动布局中,元素首先按照普通流位置出现,然后根据浮动方向尽可能向左或右偏移,效果与文本环绕相似. 绝对定位:元素会脱离普通流,因此绝对定位元素不会对其兄弟元素产生影响(与float不

图像处理中的数学原理详解21——PCA实例与图像编码

欢迎关注我的博客专栏"图像处理中的数学原理详解" 全文目录请见 图像处理中的数学原理详解(总纲) http://blog.csdn.net/baimafujinji/article/details/48467225 图像处理中的数学原理详解(已发布的部分链接整理) http://blog.csdn.net/baimafujinji/article/details/48751037 如果你对PCA的推导和概念还不是很清楚,建议阅读本文的前导文章 http://blog.csdn.net/