Twitter Storm学习笔记

官方英文文档:http://storm.apache.org/documentation/Documentation.html

本文是学习笔记,转载整合加翻译,主要是为了便于学习。

一、基本概念

参考:http://storm.apache.org/documentation/Concepts.html

此段转自:http://xumingming.sinaapp.com/117/twitter-storm%E7%9A%84%E4%B8%80%E4%BA%9B%E5%85%B3%E9%94%AE%E6%A6%82%E5%BF%B5/

  1. Topologies
  2. Streams
  3. Spouts
  4. Bolts
  5. Stream groupings
  6. Reliability
  7. Tasks
  8. Workers
  9. Configuration

先看一张storm里面各种对象的一个示意图:

storm里面各个对象的示意图

计算拓补: Topologies

一个实时计算应用程序的逻辑在storm里面被封装到topology对象里面, 我把它叫做计算拓补. Storm里面的topology相当于Hadoop里面的一个MapReduce Job, 它们的关键区别是:一个MapReduce Job最终总是会结束的, 然而一个storm的topoloy会一直运行 — 除非你显式的杀死它。 一个Topology是Spouts和Bolts组成的图状结构, 而链接Spouts和Bolts的则是Stream groupings。

消息流: Streams

消息流是storm里面的最关键的抽象。一个消息流是一个没有边界的tuple序列, 而这些tuples会被以一种分布式的方式并行地创建和处理。 对消息流的定义主要是对消息流里面的tuple的定义, 我们会给tuple里的每个字段一个名字。 并且不同tuple的对应字段的类型必须一样。 也就是说: 两个tuple的第一个字段的类型必须一样, 第二个字段的类型必须一样, 但是第一个字段和第二个字段可以有不同的类型。 在默认的情况下, tuple的字段类型可以是: integer, long, short, byte, string, double, float, boolean和byte array。 你还可以自定义类型 — 只要你实现对应的序列化器。

每个消息流在定义的时候会被分配给一个id, 因为单向消息流是那么的普遍, OutputFieldsDeclarer定义了一些方法让你可以定义一个stream而不用指定这个id。在这种情况下这个stream会有个默认的id: 1.

消息源: Spouts

消息源Spouts是storm里面一个topology里面的消息生产者。一般来说消息源会从一个外部源读取数据并且向topology里面发出消息: tuple。 消息源Spouts可以是可靠的也可以是不可靠的。一个可靠的消息源可以重新发射一个tuple如果这个tuple没有被storm成功的处理, 但是一个不可靠的消息源Spouts一旦发出一个tuple就把它彻底忘了 — 也就不可能再发了。

消息源可以发射多条消息流stream。要达到这样的效果, 使用OutFieldsDeclarer.declareStream来定义多个stream, 然后使用SpoutOutputCollector来发射指定的sream。

Spout类里面最重要的方法是nextTuple要么发射一个新的tuple到topology里面或者简单的返回如果已经没有新的tuple了。要注意的是nextTuple方法不能block Spout的实现, 因为storm在同一个线程上面调用所有消息源Spout的方法。

另外两个比较重要的Spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack, 否则调用fail。storm只对可靠的spout调用ack和fail。

消息处理者: Bolts

所有的消息处理逻辑被封装在bolts里面。 Bolts可以做很多事情: 过滤, 聚合, 查询数据库等等等等。

Bolts可以简单的做消息流的传递。复杂的消息流处理往往需要很多步骤, 从而也就需要经过很多Bolts。比如算出一堆图片里面被转发最多的图片就至少需要两步: 第一步算出每个图片的转发数量。第二步找出转发最多的前10个图片。(如果要把这个过程做得更具有扩展性那么可能需要更多的步骤)。

Bolts可以发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义stream, 使用OutputCollector.emit来选择要发射的stream。

Bolts的主要方法是execute, 它以一个tuple作为输入,Bolts使用OutputCollector来发射tuple, Bolts必须要为它处理的每一个tuple调用OutputCollector的ack方法,以通知storm这个tuple被处理完成了。– 从而我们通知这个tuple的发射者Spouts。 一般的流程是: Bolts处理一个输入tuple,  发射0个或者多个tuple, 然后调用ack通知storm自己已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack。

Stream groupings: 消息分发策略

定义一个Topology的其中一步是定义每个bolt接受什么样的流作为输入。stream grouping就是用来定义一个stream应该如果分配给Bolts上面的多个Tasks。

storm里面有6种类型的stream grouping:

  1. Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证每个bolt接收到的tuple数目相同。
  2. Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts, 而不同的userid则会被分配到不同的Bolts。
  3. All Grouping: 广播发送, 对于每一个tuple, 所有的Bolts都会收到。
  4. Global Grouping: 全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
  5. Non Grouping: 不分组, 这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果, 有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。
  6. Direct Grouping: 直接分组,  这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)

可靠性

storm保证每个tuple会被topology完整的执行。storm会追踪由每个spout tuple所产生的tuple树(一个bolt处理一个tuple之后可能会发射别的tuple从而可以形成树状结构), 并且跟踪这棵tuple树什么时候成功处理完。每个topology都有一个消息超时的设置, 如果storm在这个超时的时间内检测不到某个tuple树到底有没有执行成功, 那么topology会把这个tuple标记为执行失败,并且过一会会重新发射这个tuple。

为了利用storm的可靠性特性,在你发出一个新的tuple以及你完成处理一个tuple的时候你必须要通知storm。这一切是由OutputCollector来完成的。通过它的emit方法来通知一个新的tuple产生了, 通过它的ack方法通知一个tuple处理完成了。

Tasks: 任务

每一个Spout和Bolt会被当作很多task在整个集群里面执行。每一个task对应到一个线程,而stream grouping则是定义怎么从一堆task发射tuple到另外一堆task。你可以调用TopologyBuilder.setSpout()和TopBuilder.setBolt来设置并行度 — 也就是有多少个task。

工作进程

一个topology可能会在一个或者多个工作进程里面执行,每个工作进程执行整个topology的一部分。比如对于并行度是300的topology来说,如果我们使用50个工作进程来执行,那么每个工作进程会处理其中的6个tasks(其实就是每个工作进程里面分配6个线程)。storm会尽量均匀的工作分配给所有的工作进程。

配置

storm里面有一堆参数可以配置来调整nimbus, supervisor以及正在运行的topology的行为, 一些配置是系统级别的, 一些配置是topology级别的。所有有默认值的配置的默认配置是配置在default.xml里面的。你可以通过定义个storm.xml在你的classpath厘米来覆盖这些默认配置。并且你也可以在代码里面设置一些topology相关的配置信息  – 使用StormSubmitter。当然,这些配置的优先级是: default.xml < storm.xml < TOPOLOGY-SPECIFIC配置。

二、Storm集群中的一些角色

参考资料:

http://www.aboutyun.com/thread-6873-1-1.html

  • Nimbus:负责资源分配和任务调度。
  • Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程。
  • Worker:运行具体处理组件逻辑的进程。
  • Task:worker中每一个spout/bolt的线程称为一个task. 在storm0.8之后,task不再与物理线程对应,同一个spout/bolt的task可能会共享一个物理线程,该线程称为executor。

下面这个图描述了以上几个角色之间的关系。

三、Storm多语言支持

1.ShellBolt原理

Storm中以Topology作为运行的基本单位。而Topology又是由Spout和Bolt组成,实际上Spout是数据接入者,而Bolt才是Topology中数据的真正处理者。

于是我们只需要能将程序封装为一个Topology中Bolt的就可以了。而Storm提出的ShellBolt就完成了该功能,ShellBolt本质上是一个壳子程序,他允许开发者将自己的程序(任意的程序)封装成一个ShellBolt,从而加入到Topology中运行。是不是很神奇呢?

下面我们就来看一下ShellBolt的原理。

1)ShellBolt本质上是一个Bolt;

2)ShellBolt中接收Shell命令,根据Shell命令调用创建一个ShellProcess。并分别启动两个线程,向该ShellProcess发送消息和读取消息;

3)再进一步,ShellProcess实际上又是调用了ProcessBuild创建了一个Process,并通过该Process的InputStream,OutputStream,ErrorStream和该Process进行交互。

到这里,事情似乎变得清晰了:

ShellBolt本质上就是通过Shell命令,启动新的进程,并通过该进程的stdIn,stdOut,stdErr和其进行交互。

方法看似简单,其实蕴含了至理“简单的才是最好的”。

2.ShellBolt可能存在的问题

上面说到ShellBolt通过通过Shell命令,启动新的进程,并通过该进程的stdIn,stdOut,stdErr和其进行交互。这样做可能可回导致一下三个问题:

1)潜藏风险

交互完全是读写进程的StdIn,StdOut;这就对编程者提出了要求,不能私自向stdIn和StdOUt上输出东西,也就是说在程序中绝对不能有printf,scanf,cout,cin,System.Out,System.in之类的。

这一点,对于小规模程序或者完全新开发的程序可以进行约定。可是对于打响程序,或者基于已有程序进行重构的时候就变得不靠谱了,只有上帝才知道有没有人偷偷的写了stdin或stdout,这必然会导致程序崩溃。而且你根本无从查起。

2)效率低

还是交互,通过读写进程的StdIn,StdOut交互,所有数据必须是文本的,Storm中通过Json编码实现。而编码解码,写Stdio,Stdout,这种交互方式的效率无疑是比较低的。

3)僵尸进程

ShellBolt根据Shell命令,启动新的进程,而目前还没有很好的方法保证它会杀死所有他启动的进程。本人在使用的过程中就经常发现一个topology停止后,后台经常会驻留有还没有死掉的进程。

4)占用资源

ShellBolt根据Shell命令,启动新的进程。也即是说在task较多时,它会启动很多进程,比较占用资源。当然这个不能说是缺点,因为进程是独立空间,当你的程序需要的资源比较多时,启动单独的进程是很好的选择。

四、Storm性能测试

参考:http://blog.csdn.net/jmppok/article/details/17614431

g) Storm使用外部处理程序时的性能

本测试用例主要测试使用外部处理程序的情况下,系统的整体性能。使用外部处理程序的时候,storm将外部处理程序作为子程序来运行,并使用Json格式来交换数据。本测试中我们使用python脚本作为外部处理程序。

  1. 测试原理图如下:

    测试中,sender,processer等都是单节点的,所以本测试结果为单条处理线的处理能力。测试结果如下:
  2. CPU利用率如下
  3. 各进程CPU利用情况
  4. 内存使用情况
  5. 吞吐量(tuples/s)
  6. Tuple处理延迟

测试结果分析:由上面的测试可见,使用外部处理程序时,系统的处理能力只有1000 tuple/s,性能下降明显。 分析认为性能陡降的原因有二: 1) 所有的tuple都经过Json格式与外部程序交互,格式转换的过程耗费了CPU circle; 2) storm把外部处理程序当做子进程,使用linux管道来通信,由于linux 管道( pipe)使用的是4K大小的页面做中转,所以在数据量较大的时候会有性能的损耗,测试中每个消息至少为1K bytes,所以很快就会将pipe使用的内存用完儿产生等待。增加外部程序个数(即processer处理单元的并行度, 不超过系统中CPU的个数),性能基本上有线性的提升。

由处理延迟的测试结果可见,使用外部处理程序的时候,tuple处理延迟比使用storm内建的处理机制要大十倍左右。

测试结论

经过上面的测试我们可以得出以下的结论:

  • storm单条流水线的处理能力大约为20000 tupe/s, (每个tuple大小为1000字节)
  • storm系统本省的处理延迟为毫秒级
  • 在集群中横向扩展可以增加系统的处理能力,实测结果为1.6倍
  • Storm中大量的使用了线程,即使单条处理流水线的系统,也有十几个线程在同时运行,所以几乎所有的16个CPU都在运行状态,load average 约为 3.5
  • Jvm GC一般情况下对系统性能影响有限,但是内存紧张时,GC会成为系统性能的瓶颈
  • 使用外部处理程序性能下降明显,所以在高性能要求下,尽量使用storm内建的处理模式
时间: 2024-10-07 05:43:09

Twitter Storm学习笔记的相关文章

storm学习笔记完整记录(一)

storm有两种运行模式(本地模式和集群模式) 1. 首先创建一个类似于HelloWorld的简单程序,以便进入storm的大门,包结构如下: 2.从包结构可以知道,这是一个Maven Project,pom.xml的内容如下: <project xmlns="http://maven.apache.org/POM/4.0.0"          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    

storm学习笔记

Storm学习笔记 一.简介 本文使用的Storm版本为1.0.1 Storm是一个免费开源的分布式实时计算系统,它使得可靠地处理无限的数据流更加容易,可以实时的处理Hadoop的批量任务.Storm简单易用,且支持各种主流的程序语言. Storm有很多适用场景:实时分析.在线机器学习.连续计算.分布式RPC.分布式ETL.易扩展.支持容错,可确保你的数据得到处理,易于构建和操控. 下图是Storm"流式数据处理"的概念图,即数据像水流一样从数据源头源源不断的流出,经过每个节点,每个节

storm学习笔记(一)

1.storm介绍 storm是一种用于事件流处理的分布式计算框架,它是有BackType公司开发的一个项目,于2014年9月加入了Apahche孵化器计划并成为其旗下的顶级项目之一.Storm可以方便地在一个计算机集群中编写与扩展复杂的实时计算,Storm用于实时处理,就好比 Hadoop 用于批处理.Storm保证每个消息都会得到处理,而且它很快--在一个小集群中,每秒可以处理数以百万计的消息.更棒的是你可以使用任意编程语言来做开发.storm源码:githup storm特点: 简单的编程

Twitter Storm学习之二-基本概念介绍

2.1 Storm基本概念 在运行一个Storm任务之前,需要了解一些概念: Topologies Streams Spouts Bolts Stream groupings Reliability Tasks Workers Configuration Storm集群和Hadoop集群表面上看很类似.但是Hadoop上运行的是MapReduce jobs,而在Storm上运行的是拓扑(topology),这两者之间是非常不一样的.一个关键的区别是: 一个MapReduce job最终会结束,

Storm学习笔记——安装配置

1.安装一个zookeeper集群 2.上传storm的安装包,解压 3.修改配置文件conf/storm.yaml #所使用的zookeeper集群主机storm.zookeeper.servers:- "weekend01"- "weekend02"- "weekend03" #nimbus所在的主机名nimbus.host: "weekend01" #可以不用配置 supervisor.slots.ports-6701-

Storm学习笔记(1)Hello WordCount - 单机模式

古人云,纸上得来终觉浅,绝知此事要躬行.翻译过来,就是学东西哪有不踩坑的. 因为工作原因要折腾Storm,环境和第一个例子折腾了好久,搞完了回头看,吐血的简单. Storm有两种模式,单机和集群.入门当然选单机. 1.安装JDK,配置Eclipse环境 2.建立一个Maven工程,在pom.xml加上这段: <dependency>     <groupId>org.apache.storm</groupId>     <artifactId>storm-c

WeX5学习笔记

目录 WeX5学习笔记... 1 1.轻松看透WeX5产品能力和技术... 1 2.WeX5可以怎么玩?... 3 一.纯本地App. 3 二.关联一个网站,希望默认就打开某页... 4 三.UI设计器... 4 四.打包神器... 4 五.标准玩法... 4 3.WeX5 App与服务端交互原理... 4 4.Account示例程序... 5 5.Takeout示例程序... 7 5.1Index.w.. 7 5.2mapActivity.w.. 13 问题... 13 6.页面间交互视频..

Android学习笔记(二十一)——实战:程序数据共享

//此系列博文是<第一行Android代码>的学习笔记,如有错漏,欢迎指正! 我们继续在Database项目的基础上继续开发,通过内容提供器来给它加入外部访问接口.首先将 MyDatabaseHelper 中使用 Toast弹出创建数据库成功的提示去除掉,因为跨程序访问时我们不能直接使用 Toast. 一.添加一个 DatabaseProvider 类: 1 public class DatabaseProvider extends ContentProvider { 2 public sta

Asp.Net Identity学习笔记+MVC5默认项目解析_第三方登入&授权总结

Identity学习笔记 Asp.Net Identity学习笔记+MVC5默认项目解析_基础用法 Asp.Net Identity学习笔记+MVC5默认项目解析_授权&Claim Asp.Net Identity学习笔记+MVC5默认项目解析_第三方登入&授权总结 Identity学习笔记第三方登入配置登入案例登入技术总结本地,已登入本地,未登入第三方登入 第三方登入 本文介绍Identity的第三方登入技术.到目前为止只介绍了CookieAuthentication这种授权方式,即浏览