Storm概念、应用详解(一)BaseStorm

Storm是基于数据流的实时处理系统,提供了大吞吐量的实时计算能力。通过数据入口获取每条到来的数据,在一条数据到达系统的时候,立即会在内存中进行相应的计算;Storm适合要求实时性较高的数据分析场景。

Hadoop、Storm系统和组件接口对比表:

Storm框架:

· Nodes (服务器):指配置在一个 Storm 集群中的服务器,会执行 topology 的一部分

运算。一个 Storm 集群可以包括一个或者多个工作 node 。

· Workers (JVM 虚拟机):指一个 node 上相互独立运行的 JVM 进程。每个 node 可

以配置运行一个或者多个 worker 。一个topology 会分配到一个或者多个 worker 上

运行。

· Executeor (线程):指一个 worker 的jvm 进程中运行的 Java 线程。多个 task 可以

指派给同一个 executer 来执行。除非是明确指定, Storm 默认会给每个 executor 分

配一个 task。

· Task (bolt/spout 实例): task 是 spout 和bolt 的实例, 它们的 nextTuple() 和

execute() 方法会被executors 线程调用执行。

虽然在这设置了多个task实例,但是并行度并没有很大提高,因为只有两个线程去运行这些实例,只有设置足够多的线程和实例才可以真正的提高并行度。

在这设置多个实例主要是为了下面执行rebalance的时候用到,因为rebalance不需要修改代码,就可以动态修改topology的并行度,这样的话就必须提前配置好多个实例,在rebalance的时候主要是对之前设置多余的任务实例分配线程去执行。

在命令行动态修改并行度

除了使用代码进行调整,还可以在shell命令行下对并行度进行调整。

storm rebalance mytopology -w 10 -n 2 -e spout=2 -e bolt=2

表示 10秒之后对mytopology进行并行度调整。把spout调整为2个executor,把bolt调整为2个executor

注意:并行度主要就是调整executor的数量,但是调整之后的executor的数量必须小于等于task的数量,如果分配的executor的线程数比task数量多的话也只能分配和task数量相等的executor。

官方对于Storm下名词概念的解释如下:

1、Topologies

2、Streams

3、Spouts

4、Bolts

5、Stream groupings

6、Reliability

7、Tasks

8、Workers

1、Topologies(拓扑)

Topology是Storm中实时应用的一种封装。其功能 analogous to a MapReducejob,但唯一不同的是它是循环执行的——无数据流等待,有数据流执行,直到被kill progress。

一个Topology是spouts和bolts组成并被Stream groupings连接的一副流程图,相关概念如下:

Resources:

Topology结构:

2、Streams (流)

Stream在Storm中是一个核心的抽象概念。一个流是由无数个元组序列构成,这些元组并行、分布式的被创建和执行。在stream的许多元组中,Streams被定义为以Fields区域命名的一种模式。默认情况下,元组支持:integers, longs, shorts, bytes,
strings, doubles, floats, booleans, and byte arrays. 你也可以定义自己的序列化器,使这种风格类型能够被自然的使用在元组中。

每一个Stream在声明的时候都会赋予一个id。单个Stream——spouts和bolts,可以使用OutputFieldsDeclarer 的convenience方法声明一个stream,而不用指定一个id。但是这种方法会给予一个默认的id——default,相关概念如下:

Resources:

  • Tuple:
    streams are composed of tuples:Tuple是一个interface,对应的实现类 TupleImpl。
  • OutputFieldsDeclarer:
    used to declare streams and their schemas
  • Serialization:
    Information about Storm‘s dynamic typing of tuples and declaring custom serializations

Ps:Storm中的tuple是接口,没有具体实现,但原话这么解释的:

Storm needs to know how to serialize all the values in a tuple. By default, Storm

* knows how to serialize the primitive types, strings, and byte arrays.

3、Spouts

在Topology中,每个Spout都是一个Streams源,通常情况下,Spouts会从外部源读取Tuple,并输入这些Tuple到Topology中。

Spouts既是可靠的又是不可靠的,因为,可靠的spout会在发送Tuple失败的情况下,重复发送;相反,不可靠的spout会忘记它发送过的Tuple,无论是否成功。

Spout代码过程:

Spouts能够发送多个流:使用OutputFieldsDeclarer(interface)的declareStream方法声明多个流,并且当使用SpoutOutputCollector(实现2,接口模式)的emit方法可以指定这个流去发送Tuple。

Spouts的主要方法之一是:nextTuple() 发送tuple,nextTuple可以发送一个新的Tuple到Topology,或者当没有新的Tuple被发送的时候,就简单的返回。对于任何spout的实现,nextTuple都不能阻塞,因为Storm调用的所有spout都是基于同一个线程!

其次是 ack 和 fail 方法,它们都会被调用,当Storm发现一个tuple被从spout发射后,要么成功地完成的通过topology,要么错误的完成。ack 和 fail 方法只有在可靠的spouts下才能被调用。spout可靠性,请搜本页下面内容,或移至代码。

Resources:

Ps:nextTuple()方法中会发送Tuple,至于那种对象能发送,请看上述。

Qu:1、在代码中如何让声明的留和发送tuple联系起来,因为声明流的名称并不是tuple对象名?

2、是Storm中Spout的nextTuple对应一个线程,还是多个Spout对应一个线程?

answer:在集群中,应该是每个node的JVM中启动一个线程跑spout

4、Bolts

在Topologies中所有的处理都会在bolts中被执行,它能够过滤tuple、函数操作、合并(连接join、聚合aggregation)、数据库读写等。Bolt可以做复杂的流传输,需要多步骤、多bolt的连接。

Bolt也可以发射出一个或多个流,它需要使用OutputFieldsDeclarer 类的 declareStream 方法声明多个流,并且需要指定这个流去使用OutputCollectorl类的emit方法去发射。

当你声明一个bolt的输入流时,你需要订阅一个指定的其他组件的流。每一个流的订阅都是一个个添加。InputDeclarer类可以声明一个流在默认的流id上。 declarer.shuffleGrouping("1") 说明在组件“1”上订阅了这个默认流,等价于declarer.shuffleGrouping("1",
DEFAULT_STREAM_ID)。

Bolts的主要方法是execute 方法,它会吸收作为输入的一个新Tuple。Bolts使用 OutputCollector
对象发射新的Tuples。Bolts必须对每一个tuple调用OutputCollector 的ack 方法,以便于Storm知道什么时候元组们被处理完成(可以最终确定它的安全对于包装这个初始化spout
tuples)。
共同处理一个输入元组的情况下,发射0或多个元组们基于元组,然后包装输入元组,Storm提供一个IBasicBolt接口的自动包装。

在Bolts异步处理的时候,完全可以启动新线程;同时OutputCollector是线程安全的,可以在任何时候被调用。

Resources:

Ps:bolt发送或接收的数据流都可以多对多的进行。

5、Stream groupings 流分组

定义一个拓扑部分是指定了每个bolt门闩的流都应该作为输入被接收。一个流分组定义为:在门闩的任务之中如何区分流。

在Storm中有8种流分组方式,通过实现CustomStreamGroupingj接口,你可以实现一种风格流分组方式:

Storm 定义了八种内置数据流分组的方式:

1、Shuffle grouping(随机分组):这种方式会随机分发 tuple 给bolt 的各个 task,每个bolt 实例接收到的相同数量的 tuple 。

2、Fields grouping(按字段分组):根据指定字段的值进行分组。比如说,一个数据流根据“ word”字段进行分组,所有具有相同“ word ”字段值的 tuple 会路由到同一个 bolt 的 task 中。

3、All grouping(全复制分组):将所有的 tuple 复制后分发给所有 bolt task。每个订阅数据流的 task 都会接收到 tuple 的拷贝。

4、Globle grouping(全局分组):这种分组方式将所有的 tuples 路由到唯一一个 task 上。Storm 按照最小的 task ID 来选取接收数据的 task 。注意,当使用全局分组方式时,设置 bolt 的 task 并发度是没有意义的(spout并发有意义),因为所有 tuple 都转发到同一个 task 上了。使用全局分组的时候需要注意,因为所有的 tuple 都转发到一个 JVM 实例上,可能会引起
Storm 集群中某个 JVM 或者服务器出现性能瓶颈或崩溃。

5、None grouping(不分组):在功能上和随机分组相同,是为将来预留的。

6、Direct grouping(指向型分组):数据源会调用 emitDirect() 方法来判断一个 tuple 应该由哪个 Storm 组件来接收。只能在声明了是指向型的数据流上使用。

7、Local or shuffle grouping (本地或随机分组):和随机分组类似,但是,会将 tuple 分发给同一个 worker 内的bolt task (如果 worker 内有接收数据的 bolt task )。其他情况下,采用随机分组的方式。取决于topology 的并发度,本地或随机分组可以减少网络传输,从而提高 topology 性能。

8、Partial Key grouping: The stream is partitioned by the fields specified in the grouping, like the
Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed. This
paper
 provides a good explanation of how it works and the advantages it provides.

Resources:

  • TopologyBuilder:
    use this class to define topologies
  • InputDeclarer:
    this object is returned whenever setBolt is called onTopologyBuilder and
    is used for declaring a bolt‘s input streams and how those streams should be grouped

6、Reliability

Storm保证每一个喷口元组都将会在拓扑中完整的被处理。处理过程:它会追踪这个元组树被每一个喷口元组所触发,并且确定元组树已经成功完成。每个拓扑都有一个“信息超时”与之相关联。假如Storm未能检测到一个喷口元组已经超时完成(看不懂),它将舍弃并重新执行这个元组。

为了改善Storm的可靠性能力,你可以告诉Storm什么时候需要在元组树种创建一个新的边界,告诉Storm无论在什么时候都可以完成处理一个独立的元组。Bolt们都使用了OutputCollector对象去发射元组们。锚定的完成于这个emit方法,你可以声明一个元组使用了ack方法而被完成。

以上详细的解释了可靠消息处理。

7、Tasks

每个喷口spout或者门闩bolt都有许多任务在集群中执行。每一个任务对应一个执行线程,流分组定义了如何从一个任务集到另外一个任务集发送元组。你可以使用TopologyBuilder 类的setSpout和setBolt方法,为每一个喷口或门闩是设置并行。

Ps:可以理解task是并行处理。

8、Workers

拓扑执行要通过一个或多个worker进程。每一个worker进程都是一个物理的JVM和这个拓扑中执行了一个所有这个任务的子集。

例子:如果拓扑的联合并发数为300,分配了50个worker,因此每一个worker将会执行6个task(task将作为worker的线程)。Storm将会均匀的分配任务到所有worker上。

Resources:

Worker结构:

Topology的并发机制:

storm的Worker、Executor、Task默认配置都是1

1、增加worker(本地模式无效,只有一个JVM)

Config
对象的setNumWorkers()
方法:

Config config = new Config();

config.setNumWorkers(2):

2、配置executor 和 task

默认都为1,setXXX指定一个Worker中有几个线程,而后面的setNumXXX指定总共需要执行的tasks数量,因此,一个Thread--Executor中需要跑tasks/threads个任务。

topologyBuilder.setSpout(SENTENCE_SPOUT_ID, spout, 2);

// StormBaseSpout -> StormBaseBolt

topologyBuilder.setBolt(SPLIT_BOLT_ID, bolt).setNumTasks(2).shuffleGrouping(SENTENCE_SPOUT_ID);

// StormBaseBolt -> StormBaseBoltSecond

topologyBuilder.setBolt(COUNT_BOLT_ID, boltSecond, 4).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));

// StormBaseBoltSecond -> StormBaseBoltThird

topologyBuilder.setBolt(REPORT_BOLT_ID, boltThird).globalGrouping(COUNT_BOLT_ID);

storm的处理保障机制:

1、spout的可靠性

spout会记录它所发射出去的tuple,当下游任意一个bolt处理失败时spout能够重新发射该tuple。在spout的nextTuple()发送一个tuple时,为实现可靠消息处理需要给每个spout发出的tuple带上唯一ID,并将该ID作为参数传递给SpoutOutputCollector的emit()方法:collector.emit(new Values("value1","value2"), tupleID);

实际上Values extends ArrayList<Object>

保障过程中,每个bolt每收到一个tuple,都要向上游应答或报错,在tuple树上的所有bolt都确认应答,spout才会隐式调用ack()方法表明这条消息(一条完整的流)已经处理完毕,将会对编号ID的消息应答确认;处理报错、超时则会调用fail()方法。

2、bolt的可靠性

bolt的可靠消息处理机制包含两个步骤:

a、当发射衍生的tuple,需要锚定读入的tuple

b、当处理消息时,需要应答或报错

可以通过OutputCollector中emit()的一个重载函数锚定或tuple:collector.emit(tuple, new Values(word)); 并且需要调用一次this.collector.ack(tuple)应答。

时间: 2024-10-06 23:51:34

Storm概念、应用详解(一)BaseStorm的相关文章

设计模式之适配器模式 adapter 适配器模式分类概念角色详解 类适配器 对象适配器 接口适配器 双向适配器

现实世界中的适配器模型 先来看下来几个图片,截图自淘宝 上图为港版的插头与港版的插座 上图为插座适配器卖家的描述图 上图为适配后的结果 现实世界中适配器模式 角色分类 这就是适配器模式在电源插座上的应用 我们看下在插座适配器中的几个重要角色 可以看得出来,大陆和港版插座面板,都是作为电源的角色,他们的功能是相似的或者说相近的 插头要使用插座,进而接通电流 现实世界到代码的转换 电源插座代码示例 港版插座面板 package adapter; /**目标角色 Target 接口 * 香港地区使用的

Storm文档详解

1.Storm基础概念 1.1.什么是storm? Apache Storm is a free and open source distributed realtime computation system. Storm是免费开源的分布式实时计算系统 实时和离线的区别: 1 离线计算:批量获取数据.批量传输数据.周期性批量计算数据.数据展示 代表技术:Sqoop批量导入数据.HDFS批量存储数据.MapReduce批量计算数据.Hive批量计算数据.***任务调度 2 流式计算:数据实时产生.

Storm框架使用详解

开篇:实时计算是针对海量数据计算,主要是弥补hadoop等框架只能进行离线批处理的不足.实时计算不一定要精确到秒级,个人理解是相对于离线的一种范称吧.主要应用场景有: 1)  数据源是不断产生的,服务端要不断处理接收的数据,同时回馈给客户端. Storm是基于流的处理框架.以将发送的tuple序列化,进行分发到相应处理端中.数据流在时间和数量上是无限的,这种数据时不断产生的,比如用户的访问历史,点击历史,搜索信息等等. 2)  处理器是循环等待消息的,消息一来即处理数据,进而得出结果.当上传to

Storm并发度详解

工作进程(Worker Process) Worker是Spout/Bolt中运行具体处理逻辑的进程.拓扑跨一个或多个Worker进程执行.每个Worker进程是一个物理的JVM和拓扑执行所有任务的一个子集.例如,如果合并并行度的拓扑是300,已经分配50个Worker,然后每个Worker将执行6个任务,Storm会尝试在所有Worker上均匀的发布任务. 执行器(Executor) Executor称为物理线程,每个Worker可以包含多个Executor. 任务(Task) Task是具体

Storm的并行度详解

Storm的并行度是非常重要的,通过提高并行度可以提高storm程序的计算能力. 那strom是如何提高并行度的呢? Strom程序的执行是由多个supervisor共同执行的.supervisor运行的是topology中的spout/bolt task task  是storm中进行计算的最小的运行单位,表示是spout或者bolt的运行实例. 程序执行的最大粒度的运行单位是进程,刚才说的task也是需要有进程来运行它的,在supervisor中,运行task的进程称为worker, Sup

Storm WordCount Topology详解

1 package org.apache.storm.storm_core; 2 3 import java.util.Map; 4 5 import backtype.storm.task.OutputCollector; 6 import backtype.storm.task.TopologyContext; 7 import backtype.storm.topology.OutputFieldsDeclarer; 8 import backtype.storm.topology.bas

Storm笔记整理(五):可靠性分析、定时任务与Storm UI参数详解

[TOC] 特别说明:前面的四篇Storm笔记中,关于计算总和的例子中的spout,使用了死循环的逻辑,实际上这样做是不正确的,原因很简单,Storm提供给我们的API中,nextTuple方法就是循环执行了,这相当于是做了双层循环.因为后面在做可靠性acker案例分析时发现,加入死循环逻辑后,该nextTuple所属于的那个task根本就没有办法跳出这个nextTuple方法,也就没有办法执行后面的ack或者是fail方法,这点尤其需要注意. Storm可靠性分析 基本原理 worker进程死

Storm 系列(五)—— Storm 编程模型详解

一.简介 下图为 Strom 的运行流程图,在开发 Storm 流处理程序时,我们需要采用内置或自定义实现 spout(数据源) 和 bolt(处理单元),并通过 TopologyBuilder 将它们之间进行关联,形成 Topology. 二.IComponent接口 IComponent 接口定义了 Topology 中所有组件 (spout/bolt) 的公共方法,自定义的 spout 或 bolt 必须直接或间接实现这个接口. public interface IComponent ex

Storm 系列(七)—— Storm 集成 Redis 详解

一.简介 Storm-Redis 提供了 Storm 与 Redis 的集成支持,你只需要引入对应的依赖即可使用: <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-redis</artifactId> <version>${storm.version}</version> <type>jar</type> <

IFC数据模式架构的四个概念层详解说明

IFC模型体系结构由四个层次构成,从下到上依次是 资源层(Resource Layer).核心层(Core Layer).交互层(Interoperability Layer).领域层(Domain Layer).每层中都包含一系列的信息描述模块,并且遵守一个规则:每个层次只能引用同层次和下层的信息资源,而不能引用上层的资源,当上层资源发生变动时,下层是不会受到影响的. ①资源层IFC体系架构中的最低层,能为其他层所引用.主要是描述标准中用到的基本信息,不针对具体的行业本身,是无整体结构的分散信