Concepts:概念
原文:http://storm.apache.org/documentation/Concepts.html
这个列表展示了storm中的主要概念和相关详细信息。这些概念是:
- Topologies
- Streams
- Spouts
- Bolts
- Stream groupings
- Reliability
- Tasks
- Works
Topologies
实时处理逻辑被包含在一个storm topology中。一个storm topology 与MapReduce job相似。其中一个关键不同是MapReduce job最终会停止,但是topology会一直运行(除非你kill它)。一个topology是一系列通过数据流连接起来的spout和bolt。下面将描述这些概念。
更多资源:
- TopologyBuilder:在Java程序中,使用这个类构建topology
- Running topologies on a production cluster(在集群上运行topology)
- Local mode:如何在本地开发和调试topology
Streams
Stream是storm的核心抽象。Stream是通过分布式方式平行创造出的一个无限制的tuples队列。Stream要为tuples中的fields(属性)命名。在默认情况下,tuples中可以有integers,longs,shorts,bytes,Strings,doubles,floats,booleans和byte arrays。当然,你也可以在tuples中使用自定义序列化(serializers)类型。
每一个Stream在声明时都会给一个id。因为单一流的spouts和bolts非常常见,OutputFieldsDeclarer有非常方便的方法声明一个没有指定id的流。在这种情况下,这个流被给定了一个默认id:“default”
更多资源:
- Tuple:stream是由tuples组成的。
- OutputFieldsDeclarer:用来声明一个流
- Serialization:tuples的动态类型信息并声明自定义序列换类型
- ISerialization:自定义序列换类型必须实现的接口
- CONFIG.TOPOLOGY_SERIALIZATIONS:自定义序列换类型可以通过这个配置进行
Spouts
Spout是storm中流的源头。通常的spouts从外部数据源中读取数据然后向topology中发射。Spout分为可重发(reliable)和非可重发(unreliable)两种。一个可重发的spout会在storm处理失败后再次发送处理失败的tuple,而非可重发的spout在发射完一个tuple之后就不再关注后续处理。
Spout可以发射多条Stream。可以使用OutputFieldsDeclarer中的declareStream方法声明多个Stream,并在使用时通过SpoutOutputCollector的emit方法发射数据。
Spout中最主要的方法是nextTuple。nextTuple可以向topology发射数据或在没有数据要发射的时候返回。在spout实现类中没有必要给nextTuple加锁,因为storm会在同一个线程中调用所有的spout方法。
其他两个重要的方法是ack和fail。当storm发现从spout发射出的tuple在整个topology过程中处理成功或失败时,会调用响应的ack或fail方法。只有在可重发的spout中才会调用ack或fail方法。
更多资源:
- IRichSpout:所有spout必须实现的接口
- Guaranteeing message processing(保证消息处理机制)
Bolts
Topologies中的所有处理过程都是在bolts中完成的。Bolts通过过滤(filtering)、方法(function)、聚合(aggregation)、连接(joins)、访问数据库等方式做任意数据操作。
Bolts可以做简单的Stream转换。但是做复杂的Stream转换需要在更多的bolt中执行多个步骤。举例来说:讲一个tweet Stream转换为一个热门图片Stream至少需要两个步骤:一个bolt统计每个图片的关注者(retweets),另一个bolt算出前几名的图片(你可以用更加可拓展的方式处理这个数据Stream转换,比如使用3个bolt)
bolts可以发射多条Stream。可以使用OutputFieldsDeclarer中的declareStream方法声明多个Stream,并在使用时通过SpoutOutputCollector的emit方法发射数据。
当你在bolt中声明了一个输入Stream,就意味着bolt从另一个组件(component)上订阅了一个特定的Stream。如果你希望订阅另一个组件上的所有流,需要分别声明订阅。InputDeclarer提供了一个通过默认Stream id订阅流的方法。比如declarer.shuffleGrouping("1")
表示你从组件1上订阅了默认流,相当于declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)
.
Bolt中的主方法是execute
,每从输入Stream中读取数据时会调用它。Bolts通过OutputCollector发射新的tuples。Bolts需要在OutputCollector发射每个tuple完成之后调用ack方法,以便于storm知道tuple什么时候完成(可以最终确认原始spout发射的tuple是处理成功的)。通常情况下,tuple每处理一个输入tuple,会在输入tuple的基础上发射0或多个tuples,然后ack输入tuple,storm提供了一个接口IBasicBolt可以自动的调用ack方法。
在bolts中启动多个线程进行异步处理数据是一个完美的方法。OutputCollector是线程安全的,并且可以随时调用。
更多资料:
- IRichBolt: bolts实现的基础接口
- IBasicBolt:一个便利的接口,具备过滤功能和简单方法
- OutputCollector:bolts用来向输出steam发射tuples的实例。
- Guaranteeing message processing(保证消息处理机制)
Stream groupings(steam 组操作)
定义在topology中,每个bolt选择哪个Stream作为输入。一个Stream grouping定义了Stream在多个bolt’s tasks中如何分配。
在storm中有7种内置的Stream grouping方式,你可以通过实现CustomStreamGrouping接口来创建自己的Stream grouping。
- Shuffle grouping: Tuples被随机分配到每一个bolt’s task,以便于每一个bolt’s task获得相同数量的tuples。
- Fields grouping: Stream被根据属性(fields)进行分组。举例:如果一个Stream根据“user-id”分组,具有相同“user-id”属性的tuples会被发往同一个bolt’s task,具有不同“user-id”的tuples有可能发往不同的bolt’s task。
- All grouping:Stream会被重复的发往每一个bolt’s task,使用这个方式需要慎重。
- Global grouping:输入流会发往bolt’s tasks中的一个。具体来说,会发往最小id的task
- None grouping:这种方式表示你并不关心Stream如何分组。当前版本中,它的效果等同于shuffle grouping。Eventually though, Storm will push down bolts with none groupings to execute in the same thread as the bolt or spout they subscribe from (when possible).
- Direct grouping:这是一个特殊的grouping。这种方式可以让tuple的生产者决定消费者中哪一个task能够接收这个tuple。只有当一个Stream声明是一个direct stream时,Direct grouping方式才能生效。必须使用[emitDirect](/apidocs/backtype/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List)方法,才能将tuple发送到一个direct Stream中。一个bolt可以通过两种方式获取到消费者的taskid,一种是使用TopologyContext获取,另一种是通过跟踪OutputCollector中的emit方法的返回值(当tuples发送之后,会返回task ids)
- Local or shuffle grouping:如果目标bolt在一个worker进程中有多个或一个tasks,tuples会随机发送到进程内的tasks。否则,这种方式与shuffle grouping相同。
相关资料
TopologyBuilder:用来创建topology的类
InputDeclarer:这个对象在调用TopologyBuilder类的setBolt
方法时返回,用来声明一个bolt的输入Stream和这些Stream用什么样的grouping方式。
CoordinatedBolt:这个bolt用于分布式RPC topologies,并大量使用direct Stream和direct groupings。
Reliability
Storm确保没有个spout发出的tuple将会被topology完整的处理。通过建立一颗树来跟踪spout发出的每一个tuple,而且决定tuple有多长时间处理完毕。每一个topology有一个“message timeout”配置这个时间。如果storm发现tuple在这个时间内没有完成这棵树,那么这个tuple就是fail,并重新处理这个tuple。
为了很好的利用storm的可靠性机制,你必须告知storm什么时候在监控树上创建了一个新的路径,并在什么时候完成了一个tuple的处理。这些在bolts使用OutputCollector发送完tuple时要做的。在确定完成了emit方法之后,必须调用ack方法来告知处理了这个tuple。
更多信息在Guaranteeing message processing.
Tasks
每一个spout或bolt在集群中运行多个tasks。每一个task相当于程序中的一个线程,Stream grouping定义了如何将tuples从一个task集合到另一个task集合。你可以为每一个spout或bolt在类中TopologyBuilder的setSpout
或setBolt
方法
设置平行度(parallelism)。
Workers
Topologies执行一个或多个worker进程。每个worker进程是一个运行task子集的物理虚拟机。举例:如果一个topology一共有300个tasks和50个workers,那么每一个worker执行6个tasks。Storm尝试将tasks平均的分配到每一个worker。
相关资料:
Config.TOPOLOGY_WORKERS:执行topology的worker数量