官方英文文档:http://storm.apache.org/documentation/Documentation.html
本文是学习笔记,转载整合加翻译,主要是为了便于学习。
一、基本概念
参考:http://storm.apache.org/documentation/Concepts.html
此段转自:http://xumingming.sinaapp.com/117/twitter-storm%E7%9A%84%E4%B8%80%E4%BA%9B%E5%85%B3%E9%94%AE%E6%A6%82%E5%BF%B5/
- Topologies
- Streams
- Spouts
- Bolts
- Stream groupings
- Reliability
- Tasks
- Workers
- Configuration
先看一张storm里面各种对象的一个示意图:
storm里面各个对象的示意图
计算拓补: Topologies
一个实时计算应用程序的逻辑在storm里面被封装到topology对象里面, 我把它叫做计算拓补. Storm里面的topology相当于Hadoop里面的一个MapReduce Job, 它们的关键区别是:一个MapReduce Job最终总是会结束的, 然而一个storm的topoloy会一直运行 — 除非你显式的杀死它。 一个Topology是Spouts和Bolts组成的图状结构, 而链接Spouts和Bolts的则是Stream groupings。
消息流: Streams
消息流是storm里面的最关键的抽象。一个消息流是一个没有边界的tuple序列, 而这些tuples会被以一种分布式的方式并行地创建和处理。 对消息流的定义主要是对消息流里面的tuple的定义, 我们会给tuple里的每个字段一个名字。 并且不同tuple的对应字段的类型必须一样。 也就是说: 两个tuple的第一个字段的类型必须一样, 第二个字段的类型必须一样, 但是第一个字段和第二个字段可以有不同的类型。 在默认的情况下, tuple的字段类型可以是: integer, long, short, byte, string, double, float, boolean和byte array。 你还可以自定义类型 — 只要你实现对应的序列化器。
每个消息流在定义的时候会被分配给一个id, 因为单向消息流是那么的普遍, OutputFieldsDeclarer定义了一些方法让你可以定义一个stream而不用指定这个id。在这种情况下这个stream会有个默认的id: 1.
消息源: Spouts
消息源Spouts是storm里面一个topology里面的消息生产者。一般来说消息源会从一个外部源读取数据并且向topology里面发出消息: tuple。 消息源Spouts可以是可靠的也可以是不可靠的。一个可靠的消息源可以重新发射一个tuple如果这个tuple没有被storm成功的处理, 但是一个不可靠的消息源Spouts一旦发出一个tuple就把它彻底忘了 — 也就不可能再发了。
消息源可以发射多条消息流stream。要达到这样的效果, 使用OutFieldsDeclarer.declareStream来定义多个stream, 然后使用SpoutOutputCollector来发射指定的sream。
Spout类里面最重要的方法是nextTuple要么发射一个新的tuple到topology里面或者简单的返回如果已经没有新的tuple了。要注意的是nextTuple方法不能block Spout的实现, 因为storm在同一个线程上面调用所有消息源Spout的方法。
另外两个比较重要的Spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack, 否则调用fail。storm只对可靠的spout调用ack和fail。
消息处理者: Bolts
所有的消息处理逻辑被封装在bolts里面。 Bolts可以做很多事情: 过滤, 聚合, 查询数据库等等等等。
Bolts可以简单的做消息流的传递。复杂的消息流处理往往需要很多步骤, 从而也就需要经过很多Bolts。比如算出一堆图片里面被转发最多的图片就至少需要两步: 第一步算出每个图片的转发数量。第二步找出转发最多的前10个图片。(如果要把这个过程做得更具有扩展性那么可能需要更多的步骤)。
Bolts可以发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义stream, 使用OutputCollector.emit来选择要发射的stream。
Bolts的主要方法是execute, 它以一个tuple作为输入,Bolts使用OutputCollector来发射tuple, Bolts必须要为它处理的每一个tuple调用OutputCollector的ack方法,以通知storm这个tuple被处理完成了。– 从而我们通知这个tuple的发射者Spouts。 一般的流程是: Bolts处理一个输入tuple, 发射0个或者多个tuple, 然后调用ack通知storm自己已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack。
Stream groupings: 消息分发策略
定义一个Topology的其中一步是定义每个bolt接受什么样的流作为输入。stream grouping就是用来定义一个stream应该如果分配给Bolts上面的多个Tasks。
storm里面有6种类型的stream grouping:
- Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证每个bolt接收到的tuple数目相同。
- Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts, 而不同的userid则会被分配到不同的Bolts。
- All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到。
- Global Grouping: 全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
- Non Grouping: 不分组, 这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果, 有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。
- Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)
可靠性
storm保证每个tuple会被topology完整的执行。storm会追踪由每个spout tuple所产生的tuple树(一个bolt处理一个tuple之后可能会发射别的tuple从而可以形成树状结构), 并且跟踪这棵tuple树什么时候成功处理完。每个topology都有一个消息超时的设置, 如果storm在这个超时的时间内检测不到某个tuple树到底有没有执行成功, 那么topology会把这个tuple标记为执行失败,并且过一会会重新发射这个tuple。
为了利用storm的可靠性特性,在你发出一个新的tuple以及你完成处理一个tuple的时候你必须要通知storm。这一切是由OutputCollector来完成的。通过它的emit方法来通知一个新的tuple产生了, 通过它的ack方法通知一个tuple处理完成了。
Tasks: 任务
每一个Spout和Bolt会被当作很多task在整个集群里面执行。每一个task对应到一个线程,而stream grouping则是定义怎么从一堆task发射tuple到另外一堆task。你可以调用TopologyBuilder.setSpout()和TopBuilder.setBolt来设置并行度 — 也就是有多少个task。
工作进程
一个topology可能会在一个或者多个工作进程里面执行,每个工作进程执行整个topology的一部分。比如对于并行度是300的topology来说,如果我们使用50个工作进程来执行,那么每个工作进程会处理其中的6个tasks(其实就是每个工作进程里面分配6个线程)。storm会尽量均匀的工作分配给所有的工作进程。
配置
storm里面有一堆参数可以配置来调整nimbus, supervisor以及正在运行的topology的行为, 一些配置是系统级别的, 一些配置是topology级别的。所有有默认值的配置的默认配置是配置在default.xml里面的。你可以通过定义个storm.xml在你的classpath厘米来覆盖这些默认配置。并且你也可以在代码里面设置一些topology相关的配置信息 – 使用StormSubmitter。当然,这些配置的优先级是: default.xml < storm.xml < TOPOLOGY-SPECIFIC配置。
二、Storm集群中的一些角色
参考资料:
http://www.aboutyun.com/thread-6873-1-1.html
- Nimbus:负责资源分配和任务调度。
- Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程。
- Worker:运行具体处理组件逻辑的进程。
- Task:worker中每一个spout/bolt的线程称为一个task. 在storm0.8之后,task不再与物理线程对应,同一个spout/bolt的task可能会共享一个物理线程,该线程称为executor。
下面这个图描述了以上几个角色之间的关系。
三、Storm多语言支持
1.ShellBolt原理
Storm中以Topology作为运行的基本单位。而Topology又是由Spout和Bolt组成,实际上Spout是数据接入者,而Bolt才是Topology中数据的真正处理者。
于是我们只需要能将程序封装为一个Topology中Bolt的就可以了。而Storm提出的ShellBolt就完成了该功能,ShellBolt本质上是一个壳子程序,他允许开发者将自己的程序(任意的程序)封装成一个ShellBolt,从而加入到Topology中运行。是不是很神奇呢?
下面我们就来看一下ShellBolt的原理。
1)ShellBolt本质上是一个Bolt;
2)ShellBolt中接收Shell命令,根据Shell命令调用创建一个ShellProcess。并分别启动两个线程,向该ShellProcess发送消息和读取消息;
3)再进一步,ShellProcess实际上又是调用了ProcessBuild创建了一个Process,并通过该Process的InputStream,OutputStream,ErrorStream和该Process进行交互。
到这里,事情似乎变得清晰了:
ShellBolt本质上就是通过Shell命令,启动新的进程,并通过该进程的stdIn,stdOut,stdErr和其进行交互。
方法看似简单,其实蕴含了至理“简单的才是最好的”。
2.ShellBolt可能存在的问题
上面说到ShellBolt通过通过Shell命令,启动新的进程,并通过该进程的stdIn,stdOut,stdErr和其进行交互。这样做可能可回导致一下三个问题:
1)潜藏风险
交互完全是读写进程的StdIn,StdOut;这就对编程者提出了要求,不能私自向stdIn和StdOUt上输出东西,也就是说在程序中绝对不能有printf,scanf,cout,cin,System.Out,System.in之类的。
这一点,对于小规模程序或者完全新开发的程序可以进行约定。可是对于打响程序,或者基于已有程序进行重构的时候就变得不靠谱了,只有上帝才知道有没有人偷偷的写了stdin或stdout,这必然会导致程序崩溃。而且你根本无从查起。
2)效率低
还是交互,通过读写进程的StdIn,StdOut交互,所有数据必须是文本的,Storm中通过Json编码实现。而编码解码,写Stdio,Stdout,这种交互方式的效率无疑是比较低的。
3)僵尸进程
ShellBolt根据Shell命令,启动新的进程,而目前还没有很好的方法保证它会杀死所有他启动的进程。本人在使用的过程中就经常发现一个topology停止后,后台经常会驻留有还没有死掉的进程。
4)占用资源
ShellBolt根据Shell命令,启动新的进程。也即是说在task较多时,它会启动很多进程,比较占用资源。当然这个不能说是缺点,因为进程是独立空间,当你的程序需要的资源比较多时,启动单独的进程是很好的选择。
四、Storm性能测试
参考:http://blog.csdn.net/jmppok/article/details/17614431
g) Storm使用外部处理程序时的性能
本测试用例主要测试使用外部处理程序的情况下,系统的整体性能。使用外部处理程序的时候,storm将外部处理程序作为子程序来运行,并使用Json格式来交换数据。本测试中我们使用python脚本作为外部处理程序。
- 测试原理图如下:
测试中,sender,processer等都是单节点的,所以本测试结果为单条处理线的处理能力。测试结果如下: - CPU利用率如下
- 各进程CPU利用情况
- 内存使用情况
- 吞吐量(tuples/s)
- Tuple处理延迟
测试结果分析:由上面的测试可见,使用外部处理程序时,系统的处理能力只有1000 tuple/s,性能下降明显。 分析认为性能陡降的原因有二: 1) 所有的tuple都经过Json格式与外部程序交互,格式转换的过程耗费了CPU circle; 2) storm把外部处理程序当做子进程,使用linux管道来通信,由于linux 管道( pipe)使用的是4K大小的页面做中转,所以在数据量较大的时候会有性能的损耗,测试中每个消息至少为1K bytes,所以很快就会将pipe使用的内存用完儿产生等待。增加外部程序个数(即processer处理单元的并行度, 不超过系统中CPU的个数),性能基本上有线性的提升。
由处理延迟的测试结果可见,使用外部处理程序的时候,tuple处理延迟比使用storm内建的处理机制要大十倍左右。
测试结论
经过上面的测试我们可以得出以下的结论:
- storm单条流水线的处理能力大约为20000 tupe/s, (每个tuple大小为1000字节)
- storm系统本省的处理延迟为毫秒级
- 在集群中横向扩展可以增加系统的处理能力,实测结果为1.6倍
- Storm中大量的使用了线程,即使单条处理流水线的系统,也有十几个线程在同时运行,所以几乎所有的16个CPU都在运行状态,load average 约为 3.5
- Jvm GC一般情况下对系统性能影响有限,但是内存紧张时,GC会成为系统性能的瓶颈
- 使用外部处理程序性能下降明显,所以在高性能要求下,尽量使用storm内建的处理模式