Storm框架入门

1 Topology构成

和同样是计算框架的Mapreduce相比,Mapreduce集群上运行的是Job,而Storm集群上运行的是Topology。但是Job在运行结束之后会自行结束,Topology却只能被手动的kill掉,否则会一直运行下去。

Storm集群中有两种节点,一种是控制节点(Nimbus节点),另一种是工作节点(Supervisor节点)。所有Topology任务的提交必须在Storm客户端节点上进行(需要配置~/.storm/storm.yaml文件),由Nimbus节点分配给其他Supervisor节点进行处理。Nimbus节点首先将提交的Topology进行分片,分成一个个的Task,并将Task和Supervisor相关的信息提交到zookeeper集群上,Supervisor会去zookeeper集群上认领自己的Task,通知自己的Worker进程进行Task的处理。总体的Topology处理流程图为:

每个Topology都由Spout和Bolt组成,在Spout和Bolt传递信息的基本单位叫做Tuple,由Spout发出的连续不断的Tuple及其在相应Bolt上处理的子Tuple连起来称为一个Steam,每个Stream的命名是在其首个Tuple被Spout发出的时候,此时Storm会利用内部的Ackor机制保证每个Tuple可靠的被处理。

而Tuple可以理解成键值对,其中,键就是在定义在declareStream方法中的Fields字段,而值就是在emit方法中发送的Values字段。

2 Configuration

在运行Topology之前,可以通过一些参数的配置来调节运行时的状态,参数的配置是通过Storm框架部署目录下的conf/storm.yaml文件来完成的。在次文件中可以配置运行时的Storm本地目录路径、运行时Worker的数目等。

在代码中,也可以设置Config的一些参数,但是优先级是不同的,不同位置配置Config参数的优先级顺序为:

default.yaml<storm.yaml<topology内部的configuration<内部组件的special configuration<外部组件的special configuration

在storm.yaml中常用的几个选项为:


配置选项名称


配置选项作用


topology.max.task.parallelism


每个Topology运行时最大的executor数目


topology.workers


每个Topology运行时的worker的默认数目,若在代码中设置,则此选项值被覆盖


storm.zookeeper.servers


zookeeper集群的节点列表


storm.local.dir


Storm用于存储jar包和临时文件的本地存储目录


storm.zookeeper.root


Storm在zookeeper集群中的根目录,默认是“/”


ui.port


Storm集群的UI地址端口号,默认是8080


nimbus.host:


Nimbus节点的host


supervisor.slots.ports


Supervisor节点的worker占位槽,集群中的所有Topology公用这些槽位数,即使提交时设置了较大数值的槽位数,系统也会按照当前集群中实际剩余的槽位数来进行分配,当所有的槽位数都分配完时,新提交的Topology只能等待,系统会一直监测是否有空余的槽位空出来,如果有,就再次给新提交的Topology分配


supervisor.worker.timeout.secs


Worker的超时时间,单位为秒,超时后,Storm认为当前worker进程死掉,会重新分配其运行着的task任务


drpc.servers


在使用drpc服务时,drpc server的服务器列表


drpc.port


在使用drpc服务时,drpc server的服务端口

3 Spouts

Spout是Stream的消息产生源, Spout组件的实现可以通过继承BaseRichSpout类或者其他*Spout类来完成,也可以通过实现IRichSpout接口来实现。

需要根据情况实现Spout类中重要的几个方法有:

3.1 open方法

当一个Task被初始化的时候会调用此open方法。一般都会在此方法中对发送Tuple的对象SpoutOutputCollector和配置对象TopologyContext初始化。

示例如下:

1 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {2 3         _collector = collector;4 5 }

3.2 declareOutputFields方法

此方法用于声明当前Spout的Tuple发送流。Stream流的定义是通过OutputFieldsDeclare.declareStream方法完成的,其中的参数包括了发送的域Fields。

示例如下:

1 public void declareOutputFields(OutputFieldsDeclarer declarer) {2 3         declarer.declare(new Fields("word"));4 5     }

3.3 getComponentConfiguration方法

此方法用于声明针对当前组件的特殊的Configuration配置。

示例如下:

 1 public Map<String, Object> getComponentConfiguration() { 2  3         if(!_isDistributed) { 4  5             Map<String, Object> ret = new HashMap<String, Object>(); 6  7             ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 3); 8  9             return ret;10 11         } else {12 13             return null;14 15         }16 17     }

这里便是设置了Topology中当前Component的线程数量上限。

3.4 nextTuple方法

这是Spout类中最重要的一个方法。发射一个Tuple到Topology都是通过这个方法来实现的。

示例如下:

 1 public void nextTuple() { 2  3         Utils.sleep(100); 4  5         final String[] words = new String[] {"twitter","facebook","google"}; 6  7         final Random rand = new Random(); 8  9         final String word = words[rand.nextInt(words.length)];10 11         _collector.emit(new Values(word));12 13     }

这里便是从一个数组中随机选取一个单词作为Tuple,然后通过_collector发送到Topology。

另外,除了上述几个方法之外,还有ack、fail和close方法等。Storm在监测到一个Tuple被成功处理之后会调用ack方法,处理失败会调用fail方法,这两个方法在BaseRichSpout类中已经被隐式的实现了。

4 Bolts

Bolt类接收由Spout或者其他上游Bolt类发来的Tuple,对其进行处理。Bolt组件的实现可以通过继承BasicRichBolt类或者IRichBolt接口来完成。

Bolt类需要实现的主要方法有:

4.1 prepare方法

此方法和Spout中的open方法类似,为Bolt提供了OutputCollector,用来从Bolt中发送Tuple。Bolt中Tuple的发送可以在prepare方法中、execute方法中、cleanup等方法中进行,一般都是些在execute中。

示例如下:

1 public void prepare(Map conf, TopologyContext context, OutputCollector collector) {2 3         _collector = collector;4 5     }

4.2 declareOutputFields方法

用于声明当前Bolt发送的Tuple中包含的字段,和Spout中类似。

示例如下:

1 public void declareOutputFields(OutputFieldsDeclarer declarer) {2 3         declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds"));4 5 }

此例说明当前Bolt类发送的Tuple包含了三个字段:"obj", "count", "actualWindowLengthInSeconds"。

4.3 getComponentConfiguration方法

和Spout类一样,在Bolt中也可以有getComponentConfiguration方法。

示例如下:

1 public Map<String, Object> getComponentConfiguration() {2 3         Map<String, Object> conf = new HashMap<String, Object>();4 5         conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);6 7         return conf;8 9     }

此例定义了从系统组件“_system”的“_tick”流中发送Tuple到当前Bolt的频率,当系统需要每隔一段时间执行特定的处理时,就可以利用这个系统的组件的特性来完成。

4.4 execute方法

这是Bolt中最关键的一个方法,对于Tuple的处理都可以放到此方法中进行。具体的发送也是通过emit方法来完成的。此时,有两种情况,一种是emit方法中有两个参数,另一个种是有一个参数。

(1)emit有一个参数:此唯一的参数是发送到下游Bolt的Tuple,此时,由上游发来的旧的Tuple在此隔断,新的Tuple和旧的Tuple不再属于同一棵Tuple树。新的Tuple另起一个新的Tuple树。

(2)emit有两个参数:第一个参数是旧的Tuple的输入流,第二个参数是发往下游Bolt的新的Tuple流。此时,新的Tuple和旧的Tuple是仍然属于同一棵Tuple树,即,如果下游的Bolt处理Tuple失败,则会向上传递到当前Bolt,当前Bolt根据旧的Tuple流继续往上游传递,申请重发失败的Tuple。保证Tuple处理的可靠性。

这两种情况要根据自己的场景来确定。

示例如下:

 1 public void execute(Tuple tuple) { 2  3         _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); 4  5         _collector.ack(tuple); 6  7     } 8  9 public void execute(Tuple tuple) {10 11         _collector.emit(new Values(tuple.getString(0) + "!!!"));12 13     }

此外还有ack方法、fail方法、cleanup方法等。其中cleanup方法和Spout中的close方法类似,都是在当前Component关闭时调用,但是针对实时计算来说,除非一些特殊的场景要求以外,这两个方法一般都很少用到。

5 Stream grouping

上文中介绍了Topology的基本组件Spout和Bolt,在Topology中,数据流Tuple的处理就是不断的通过调用不同的Spout和Bolt来完成的。不同的Bolt和Spout的上下游关系是通过在入口类中定义的。示例如下:

 1 builder = new TopologyBuilder(); 2  3 builder.setSpout(spoutId, new TestWordSpout(), 5); 4  5 builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word")); 6  7 builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields("obj")); 8 builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId); 9

此例中的builder是TopologyBuilder对象,通过它的createTopology方法可以创建一个Topology对象,同时此builder还要定义当前Topology中用到的Spout和Bolt对象,分别通过setSpout方法和setBolt方法来完成。

setSpout方法和setBolt方法中的第一个参数是当前的Component组件的Stream流ID号;第二个参数是具体的Component实现类的构造;第三个参数是当前Component的并行执行的线程数目,Storm会根据这个数字的累加和来确定Topology的Task数目。最后的小尾巴*Grouping是指的一个Stream应如何分配数据给Bolt上面的Task。目前Storm的Stream Grouping有如下几种:

(1)ShuffleGrouping:随机分组,随机分发Stream中的tuple,保证每个Bolt的Task接收Tuple数量大致一致;

(2)FieldsGrouping:按照字段分组,保证相同字段的Tuple分配到同一个Task中;

(3)AllGrouping:广播发送,每一个Task都会受到所有的Tuple;

(4)GlobalGrouping:全局分组,所有的Tuple都发送到同一个Task中,此时一般将当前Component的并发数目设置为1;

(5)NonGrouping:不分组,和ShuffleGrouping类似,当前Task的执行会和它的被订阅者在同一个线程中执行;

(6)DirectGrouping:直接分组,直接指定由某个Task来执行Tuple的处理,而且,此时必须有emitDirect方法来发送;

(7) localOrShuffleGrouping:和ShuffleGrouping类似,若Bolt有多个Task在同一个进程中,Tuple会随机发给这些Task。

不同的的Grouping,需要根据不同的场景来具体设定,不一而论。

6 Topology运行

6.1 Topology运行方式

Topology的运行可以分为本地模式和分布式模式,模式的设置可以在配置文件中设定,也可以在代码中设置。

(1)本地运行的提交方式:

1 LocalCluster cluster = new LocalCluster();2 3 cluster.submitTopology(topologyName, conf, topology);4 5 cluster.killTopology(topologyName);6 7 cluster.shutdown();

(2)分布式提交方式:

StormSubmitter.submitTopology(topologyName, topologyConfig, builder.createTopology());

需要注意的是,在Storm代码编写完成之后,需要打包成jar包放到Nimbus中运行,打包的时候,不需要把依赖的jar都打进去,否则如果把依赖的storm.jar包打进去的话,运行时会出现重复的配置文件错误导致Topology无法运行。因为Topology运行之前,会加载本地的storm.yaml配置文件。

在Nimbus运行的命令如下:

storm jar StormTopology.jar maincalss args

6.2 Topology运行流程

  有几点需要说明的地方:

(1)Storm提交后,会把代码首先存放到Nimbus节点的inbox目录下,之后,会把当前Storm运行的配置生成一个stormconf.ser文件放到Nimbus节点的stormdist目录中,在此目录中同时还有序列化之后的Topology代码文件;

(2)在设定Topology所关联的Spouts和Bolts时,可以同时设置当前Spout和Bolt的executor数目和task数目,默认情况下,一个Topology的task的总和是和executor的总和一致的。之后,系统根据worker的数目,尽量平均的分配这些task的执行。worker在哪个supervisor节点上运行是由storm本身决定的;

(3)任务分配好之后,Nimbes节点会将任务的信息提交到zookeeper集群,同时在zookeeper集群中会有workerbeats节点,这里存储了当前Topology的所有worker进程的心跳信息;

(4)Supervisor节点会不断的轮询zookeeper集群,在zookeeper的assignments节点中保存了所有Topology的任务分配信息、代码存储目录、任务之间的关联关系等,Supervisor通过轮询此节点的内容,来领取自己的任务,启动worker进程运行;

(5)一个Topology运行之后,就会不断的通过Spouts来发送Stream流,通过Bolts来不断的处理接收到的Stream流,Stream流是无界的。

最后一步会不间断的执行,除非手动结束Topology。

6.3 Topology方法调用流程

Topology中的Stream处理时的方法调用过程如下:

有几点需要说明的地方:

(1)每个组件(Spout或者Bolt)的构造方法和declareOutputFields方法都只被调用一次。

(2)open方法、prepare方法的调用是多次的。入口函数中设定的setSpout或者setBolt里的并行度参数指的是executor的数目,是负责运行组件中的task的线程         的数目,此数目是多少,上述的两个方法就会被调用多少次,在每个executor运行的时候调用一次。相当于一个线程的构造方法。

(3)nextTuple方法、execute方法是一直被运行的,nextTuple方法不断的发射Tuple,Bolt的execute不断的接收Tuple进行处理。只有这样不断地运行,才会产         生无界的Tuple流,体现实时性。相当于线程的run方法。

(4)在提交了一个topology之后,Storm就会创建spout/bolt实例并进行序列化。之后,将序列化的component发送给所有的任务所在的机器(即Supervisor节           点),在每一个任务上反序列化component。

(5)Spout和Bolt之间、Bolt和Bolt之间的通信,是通过zeroMQ的消息队列实现的。

(6)上图没有列出ack方法和fail方法,在一个Tuple被成功处理之后,需要调用ack方法来标记成功,否则调用fail方法标记失败,重新处理这个Tuple。

6.4 Topology并行度

在Topology的执行单元里,有几个和并行度相关的概念。

(1)worker:每个worker都属于一个特定的Topology,每个Supervisor节点的worker可以有多个,每个worker使用一个单独的端口,它对Topology中的每个component运行一个或者多个executor线程来提供task的运行服务。

(2)executor:executor是产生于worker进程内部的线程,会执行同一个component的一个或者多个task。

(3)task:实际的数据处理由task完成,在Topology的生命周期中,每个组件的task数目是不会发生变化的,而executor的数目却不一定。executor数目小于等于task的数目,默认情况下,二者是相等的。

在运行一个Topology时,可以根据具体的情况来设置不同数量的worker、task、executor,而设置的位置也可以在多个地方。

(1)worker设置:

(1.1)可以通过设置yaml中的topology.workers属性

(1.2)在代码中通过Config的setNumWorkers方法设定

(2)executor设置:

通过在Topology的入口类中setBolt、setSpout方法的最后一个参数指定,不指定的话,默认为1;

(3)task设置:

(3.1) 默认情况下,和executor数目一致;

(3.2)在代码中通过TopologyBuilder的setNumTasks方法设定具体某个组件的task数目;

6.5 终止Topology

通过在Nimbus节点利用如下命令来终止一个Topology的运行:

storm kill topologyName

kill之后,可以通过UI界面查看topology状态,会首先变成KILLED状态,在清理完本地目录和zookeeper集群中的和当前Topology相关的信息之后,此Topology就会彻底消失了。

7 Topology跟踪

Topology提交后,可以在Nimbus节点的web界面查看,默认的地址是http://NimbusIp:8080。

8 Storm应用

上面给出了如何编写Storm框架任务Topology的方法,那么在哪些场景下能够使用Storm框架呢?下面介绍Storm框架的几个典型的应用场景。

(1)利用Storm框架的DRPC进行大量的函数并行调用,即实现分布式的RPC;

(2)利用Storm框架的Transaction Topology,可以进行实时性的批量更新或者查询数据库操作或者应用需要同一批内的消息以及批与批之间的消息并行处理这样的场景,此时Topology中只能有一个TrasactionalSpout;

(3)利用滑动窗口的逻辑结合Storm框架来计算得出某段时间内的售出量最多的产品、购买者最多的TopN地区等;

(4)精确的广告推送,在用户浏览产品的时候,将浏览记录实时性的搜集,发送到Bolt,由Bolt来根据用户的账户信息(如果有的话)完成产品的分类统计,产品的相关性查询等逻辑计算之后,将计算结果推送给用户;

(5)实时日志的处理,Storm可以和一个分布式存储结合起来,实时性的从多个数据源发送数据到处理逻辑Bolts,Bolts完成一些逻辑处理之后,交给分布式存储框架进行存储,此时,Spout可以是多个;

(6)实时性的监控舆论热点,比如针对某个关键词,在用户查询的时候,产生数据源Spout,结合语义分析等,由Bolt来完成查询关键词的统计分析,汇总当前的舆论热点;

(7)数据流的实时聚合操作。

9 参考网址

http://xumingming.sinaapp.com/138/twitter-storm%E5%85%A5%E9%97%A8/

https://github.com/nathanmarz/storm/wiki

http://nathanmarz.github.io/storm/doc/index-all.html

https://github.com/nathanmarz

时间: 2024-11-02 10:20:19

Storm框架入门的相关文章

实时计算框架之二:Storm之入门实例

预备.开火.瞄准-- 1 总结与提升 自1月份来,可谓是浮浮荡荡,一波三折呀. 先是参加了公司组织的创意马拉松大赛,虽说24小时内完成了作品,但是自己感觉上效果很差,自然成绩也是不高.通过这24小时持续的奋斗以及后来的各种产品描述等环节,发现了开发上的许多缺点.首先,对我们的产品进行了深入的认识和了解,也在产品之上,发现了更多可以发展走向成功的点子,这是我觉得最棒的一点:其次,短时间内和队员进行协作交流,生成产品,这之间的沟通非常重要:第三,选择C++作为24小时创作的语言,开发效率相对而言是非

Storm学习入门视频教程

Storm流计算从入门到精通之技术篇(高并发策略.批处理事务.Trident精解.运维监控.企业场景)课程讲师:Cloudy课程分类:大数据适合人群:初级课时数量:28课时用到技术:Storm集群.Zookeeper集群等涉及项目:网站PV.UV案例实战.其他案例咨询qq:1840215592课程亮点:1.Storm全面.系统.深入讲解,采用最新的稳定版本Storm 0.9.0.1 :2.注重实践,对较抽象难懂的技术点如Grouping策略.并发度及线程安全.批处理事务.DRPC.Storm T

Storm框架基础(一)

* Storm框架基础(一) Storm简述 如果你了解过SparkStreaming,那么Storm就可以类比着入门,在此我们可以先做一个简单的比较:  在SparkStreaming中: 我们曾尝试过每秒钟的实时数据处理,或者使用Window若干时间范围内的数据统一处理结果.亦或统计所有时间范围内的数据结果. 在Storm中: 我们可以根据进来的每一条数据进行实时处理,也就是说,Storm处理数据的速度,要小于1秒,也就是毫秒级别的. 如果你疑问,1秒处理1次数据,和进来1条数据处理1次有什

Storm使用入门之本地开发环境搭建

本篇博文详细告诉你如何安装Storm的本地开发环境,总体分为两步,具体如下: 1.从官网上下载Storm的发布包,下载完成后将其解压,并将解压后的bin目录添加到环境变量(PATH)中,以方便后续执行Storm的相关命令 2.修改Storm的配置文件(storm.yaml),主要是按照实际情况更新配置文件中的集群信息,然后将修改后的配置文件添加到目录(~/.storm/)中,目的是为了后续能够远程启动和停止集群上的计算任务(即topology) 接下来,咱们来详细地介绍每一个操作步骤. 首先,何

Shiro权限控制框架入门1:Shiro的认证流程以及基本概念介绍

前言:我在最开始学习Shiro这个框架时,在网上搜索到的一个介绍比较全面的教程是:<跟我学Shiro>系列教程.但是在我看了他写的前几篇文章后,我发现虽然他在这个系列教程中把shiro的一些特性介绍地非常全面详细,但是整个教程的叙述方式还是有很大缺陷的.文章与文章之间并没有很好地串联起来,每篇文章介绍的东西都过于分散了,如果是对shiro完全不了解的新手来看的话完全是一场噩梦.就像一个网友评价的这样: 看了看这个教程,看完之后都想放弃shiro了,完全看不懂,后来百度了很多别的资料才理解了sh

Dwz(J-UI)框架--入门

http://www.cnblogs.com/chenyongsai/p/4933982.html Dwz(J-UI)框架--入门 一.了解 概述:是中国人自己开发的基于jQuery实现的Ajax RIA开源框架. 目的:简单实用.扩展方便(在原有架构基础上扩展方便).快速开发.RIA思路.轻量级 使用:用html扩展的方式来代替javascript代码 思路:根据官网页面样例,查看官方代码包,查阅相关子页面,参阅帮助文档,添加固定的标签属性语法 优势:第一次打开页面时载入界面到客户端, 之后和

Farseer.net轻量级开源框架 入门篇:使用前说明

导航 目   录:Farseer.net轻量级开源框架 目录 上一篇:Farseer.net轻量级开源框架 入门篇: 框架性能测试 下一篇:Farseer.net轻量级开源框架 入门篇: 增.删.改.查操作演示 本篇讲解使用或者学习Farseer前需要知道一些事项: 在后续很多演示中,使用了很多扩展方法.但作者并没有明确出哪些是扩展的方法.所以读者要注意.在使用框架的时候,都需要引用扩展方法的命名空间:using FS.Extend; 为了方便,扩展方法统一放到FS.Extend中,在这里特别说

Java - Struts框架教程 Hibernate框架教程 Spring框架入门教程(新版) sping mvc spring boot spring cloud Mybatis

https://www.zhihu.com/question/21142149 http://how2j.cn/k/hibernate/hibernate-tutorial/31.html?tid=63 https://www.zhihu.com/question/29444491/answer/146457757 1. Java - Struts框架教程Struts 是Apache软件基金会(ASF)赞助的一个开源项目.通过采用JavaServlet/JSP技术,实现了基于Java EEWeb

【原创】NIO框架入门(一):服务端基于Netty4的UDP双向通信Demo演示

申明:本文由作者基于日常实践整理,希望对初次接触MINA.Netty的人有所启发.如需与作者交流,见文签名,互相学习. 学习交流 更多学习资料:点此进入 推荐 移动端即时通讯交流: 215891622 推荐 前言 NIO框架的流行,使得开发大并发.高性能的互联网服务端成为可能.这其中最流行的无非就是MINA和Netty了,MINA目前的主要版本是MINA2.而Netty的主要版本是Netty3和Netty4(Netty5已经被取消开发了:详见此文). 本文将演示的是一个基于Netty4的UDP服