转载请注明出处:http://blog.csdn.net/beitiandijun/article/details/41517897
源地址:http://storm.apache.org/documentation/Tutorial.html
本文主要讲述了如何创建Storm topologies以及如何将它们部署在Storm集群中。Java是主要使用的语言,但是依然使用少量Python例子证明了Storm的多语言特性。
初步配置:
本文使用的例子源自storm-start项目。建议你复制这个项目并验证这个项目。阅读 设置开发环境 并 创建Storm项目,然后配置你的环境。
Storm集群组成:
Storm集群类似于Hadoop集群。在Hadoop平台上,你可以运行“MapReducejobs”,在Storm平台上,你可以运行“topologies”。“Jobs”和“topologies”它们是不同的-----一个关键的不同点是:MapReduce任务最终会结束,然而topology可以永久处理消息(直到你杀死它)。
Storm集群中有两种节点: 控制(master)节点和工作(worker)节点。 控制节点上运行的daemon称为“nimbus”,这类似于Hadoop的“job tracker”。nimbus负责向集群分发代码,向机器分发任务,以及监控是否失败。
每一个worker节点运行的daemon称为“Supervisor”,它监听分发给所在机器的任务,并按照nimbus的分配开始和结束worker进程。每一个worker进程执行一个topology的子集;一个运行的topology包含分布在很多机器上的很多worker进程。
supervisor
zookeeper supervisor
supervisor
nimbus <------------> zookeeper<------------> supervisor
supervisor
zookeeper supervisor
supervisor
Nimbus 和 Supervisor之间的协作是通过Zookeeper集群完成的。另外, Nimbus daemon以及Supervisor daemon都是快速失败并且无状态的; 所有的状态都保存在Zookeeper或者本地localdisk中。这就意味着你可以使用命令“kill -9”杀死Nimbus或者Supersors,而当它们重启的时候没有丢失任何东西。
这种设计使得Storm 集群拥有不可思议的稳定性。
拓扑(Topologies)
在Storm上进行实时运算时,可以创建“topologies”。Topology是计算节点组成的图。topology处理逻辑上包含的每个节点以及节点之间的连接,共同显示了数据是如何在节点之间传递的。
可以直接的运行topology。首先,你需要将你的代码以及依赖库打包成单独的jar;然后,你需要按照下面运行命令行:
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
这条命令执行类backtype.storm.MyTopology,参数为arg1和arg2. 类的主要函数定义了topology并向nimbus提交topology任务。storm jar这部分负责连接Nimbus并提交jar。
因为topology定义就是Thrift 结构,而Nimbus是Thrift服务,你可以使用任何语言创建并提交topology。上面的例子是使用JVM-based语言的最简单的例子。有关启动和停止topologies的更多信息可以查看 生产节点上运行topologies
。
流(stream)
Storm的核心抽象是"stream"。stream是tuples的无界序列。为了在分布式条件下以可靠的方式发送stream到新stream, storm提供了一些基础的方式。例如, 你可以将tweets的数据流转换到trendingtopics的数据流。
Storm用来提供数据转换的基本单元是“spouts”和“bolts”。 Spouts和bolts拥有的接口可以使你实现一些特定的应用逻辑。
Spout是数据流来源。例如,spout可以从Kestrel读取tuples并将它们作为数据流发射。或者,Spout可以连接Twitter API并且发射tweets的数据流。
Bolt是数据流消耗者,它可以消耗任意数量的输入流,做一些处理,并且有可能发射新数据流。一些复杂的数据流转换,像从tweets数据流计算某个topics的趋势,一般需要多个步骤,这样也就需要多个bolts。 Bolts可以处理多种数据,例如来自run functions、filter tuples,然后做一些数据流聚合、合并,与数据库交互等。
Spouts和Bolts组成的网络就是topology,这是向Storm集群提交任务的高层抽象。topology是流转换的图,图中的每个节点都是spout或者bolt。图中级联显示了哪些bolts订阅了哪些流。当spout或者bolt向流发射tuple时,它会向所有提出订阅请求的bolts发射tuple。
bolt— bolt
/ /
spout — bolt
\
spout — bolt
topology中节点之间的连接显示了tuples是如何传递的。 例如, 如果Spout A 与Bolt B之间存在连接, SpoutA与Bolt C之间存在连接, Bolt B与Bolt C之间存在连接,则每次Spout A发射一个tuple,它向Bolt B和Bolt C都会发送tuple。 Bolt B输出的所有tuples也都会发送到Bolt C。
Storm topology中每个节点都会并行运行。 在topology中,你可以指定每个节点的并行数目, 则Storm在集群中每个节点就会孵化同样数目的线程进行运算。
topology可以永久运行,直到你杀死它。 Storm将自动重新分配任何失败的tasks。另外,Storm保证不会丢失任何数据,即使机器挂掉并且消息被抛弃。
Data model(数据模式)
Storm使用tuples作为它的数据模式。tuple就是已经命名的值的列表,并且tuple的域可以是任何类型的对象。在包装之外,Storm支持所有基本类型、字符串以及字节数组作为tuple值。要使用任何类型的对象,你只需要实现这种类型的序列化器。
topology中每个节点都必须声明: 它所要发射的tuples的输出域。如下所示:bolt声明了: 它发射的两个tuples,它们的域为“double”和“triple”:
public class DoubleAndTripleBolt extendsBaseRichBolt { privateOutputCollectorBase _collector; @Override publicvoid prepare( Map conf, TopologyContext context, OutputCollectorBase collector){ _collector= collector; } @Override publicvoid execute( Tuple input ) { intval = input.getInteger(0); _collector.emit(input,new Values( val * 2, val * 3) ); _collector.ack(input ); } @Override publicvoid declareOutputFields( OutputFieldsDeclarer declarer ){ declarer.declare(newFields("double", "triple")); } }
函数declareOutputFields为组件声明了输出域:["double", "triple"]。 有关bolt其他的部分将在下面介绍。
简单的topology
看一个简单的topology例子,来学习一些其他的概念,并看一下代码组成。下面看一下storm-starter中的ExclamationTopology函数定义:
TopologyBuilder builder = newTopologyBuilder(); builder.setSpout("words", newTestWordSpout(), 10 ); builder.setBolt("exclaim1", new ExclamationBolt(),3).shuffleGrouping("words"); builder.setBolt("exclaim2", newExclamationBolt(), 2).shuffleGrouping("exclaim1");
topology 包含一个Spout和两个bolts。 Spout发射词汇,每个bolts在它们输入的后面增加"!!!".节点将按流水线作业: spout向第一个bolt发射,第一个bolt向第二个bolt发射。如果spout发射tuples ["bob"]和["john"],则第二个bolt将会发送["bob!!!!!!"]和["john!!!!!!"]。
代码使用setSpout 和 setBolt方法定义节点。这些方法可以将很多东西作为输入:用户特定的id,包含处理逻辑的对象,你想要节点并行处理的数量。本例中, spout被给与的id为"words",bolts被给与的ids分别是“exclaim1”和“exclaim2”。
包含处理逻辑的对象可以通过IRichSpout接口 实现spouts,同时可以通过IRichBolt接口
实现bolts。
最后的参数10、3、2,表示希望节点并行处理数,这是可选的。它显示了集群上有多少线程执行这些组件。如果省略它, Storm将会为每个节点分配一个线程。
setBolt返回InputDeclarer对象,它用来定义Bolt的输入。这里,组件"exclaim1"声明了:它将阅读由组件"words"使用随机组方式发射的所有tuples;组件"exclaim2"声明了:它将阅读由组件"exclaim1"使用随机组方式发射的所有tuples。随机组方式是指:
tuples应当从输入任务随机的分发到bolts‘s 任务。在组件之间,有很多种将数据分组的方式。
如果想要组件"exclaim2"接受所有由组件"words"和组件"exclaim1"两者发送的tuples,则你可以按照一下方式定义"exclaim2":
builder.setBolt( "exclaim2", newExclamationBolt(), 5 ) .shuffleGrouping("words") .shuffleGrouping("exclaim1");
就像你看到的,对于bolt而言,输入声明可以成链式结构,将多个来源串成一个链子。
现在深入一下topology中spouts和bolts的实现。 Spouts主要负责将新的消息发送给topology。 TestWordSpout 在这个topology中将发送一个随机单词,发送频率为每100ms一个tuple,随机单词源自列表{‘nathan‘, ‘mike‘, ‘jackson‘, ‘golda‘, ‘bertels‘]。nextTuple()在TestWordSpout中的实现如下:
public void nextTule(){ Utils.sleep(100); finalString[] words = new String[] {"nathan", "mike","jackson", "golda", "bertel"}; finalRandom rand = new Random(); finalString word = words[ rand.nextInt(words.length)]; _collector.emit(new Values(word)); }
如你所看,实现很直接。
ExclamationBolt 在它输入后面增加字符串"!!!"。 看一下ExclamationBolt的整个实现:
public static class ExclamationBoltimplements IRichBolt { OutputCollector_collector; publicvoid prepare( Map conf, TopologyContext context, OutputCollector collector){ _collector= collector; } publicvoid excute(Tuple tuple ){ _collector.emit(tuple,new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } publicvoid cleanup() { } publicvoid declareOutputFields ( OutputFieldsDeclarer declarer ){ declarer.declare(new Fields("word")); } publicMap getComponentConfiguration(){ returnnull; } }
prepare方法给bolt提供了OutputCollector,它用来发射来自bolt的tuples。 Tuples可以在任一时刻从bolt中发射-----在prepare、execute、cleanup等方法中,甚至是其他线程中异步方式。prepare的实现简单的将OutputCollector保存为实例变量,这个变量在之后的execute方法中会用到。
execute方法接收来自bolt输入的tuple。ExclamationBolt获取tuple的第一个域,并在其后增加"!!!"然后作为新的tuple发射。如果你实现一个bolt可以订阅多个输入来源,你可以通过使用Tuple#getSourceComponent方法获取tuple来源于哪个组件。
还有一些其他的事情可以使用execute方法,即将输入tuple作为第一个参数传递给emit方法,最后一行代码ack()将确认输入tuple。这些都是Storm可靠性保证API中的部分,将在后面加以解释。
cleanup方法在Bolt停止的时候调用,并且释放任何打开的资源。不能保证这个方法在集群中会被调用:例如,如果任务所在的机器损坏,没有办法调用这个方法。cleanup方法主要是为了你在本地节点(Storm集群在进程中模拟实现)运行topologies时,可以避免资源泄露的同时运行或者杀死topologies。
declareOutputFields方法显示:ExclamationBolt发射一个域为"word"的tuples。
getComponentConfiguration方法允许你配置这个组件。更多讨论可以查看Configuration。
像cleanup以及getComponentConfiguration等方法在bolt实现中并不常用。你可以使用提供默认值的基本类定义简洁的bolts,在这些简洁的的实现中,ExclamationBolt可以通过扩展BaseRichBolt获得比较简洁的实现,就像:
public static class ExclamationBolt extendsBaseRichBolt { OutputCollector_collector; publicvoid prepare (Map conf, TopologyContext context, OutputCollector collector ){ _collector= collector; } publicvoid execute (Tuple tuple){ _collector.emit(tuple, new Values( tuple.getString(0) + "!!!" )); _collector.ack(tuple); } publicvoid declareOutputFields ( OutputFieldsDeclarer declarer ){ declarer.declare(new Fields("word")); } }
在本地模式下上运行ExclamationTopology
下面看一下如何在本地模式下运行ExclamationTopology,以及它是如何工作的。
Storm运行有两种模式:本地模式以及分布式模式。 本地模式中, Storm通过使用线程模拟workernodes,这都是在进程内实现的。当你在storm-starter中运行topologies时,它们将在本地模式下运行,同时你能够看到每个组件发射的消息内容。更多介绍请查看本地模式。
分布式模式中, Storm运行在集群中。当你像master提交topology时,你也需要提交运行topology所需的所有节点。 master既管理如何划分节点,又分配workers去运行topology。如果workers失败了, master会重新在某个节点分配它们。更多介绍请看集群模式。
下面是本地模式下运行ExclamationTopology的代码:
Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(2); LocalCluster cluster =new LocalCluster(); cluster.submitTopology("test",conf,builder.ctreateTopology()); Utils.sleep(100000); cluster.killTopology("test"); cluster.shutdown();
首先,代码通过创建LocalCluster对象定义了进程内集群。向虚拟集群提交topology和向分布式集群提交topology是相同的。它通过调用submitTopology方法向LocalCluster对象提交topology,其中submitTopology方法将运行中topology的名字、topology的配置、以及topology本身作为参数。
cluster.submitTopology()参数解释:
topology名字用来确认topology, 这样一来,你就可以通过名字杀死它。topology在你杀死它之前会一直运行。
配置参数conf用来优化运行时topology。以下两种配置比较常见:
1、TOPOLOGY_WORKERS( set withsetNumWorkers):指定了你在集群上分配多少进程执行topology。 topology中的每个组件都有很多线程运行。分配给每个组件的线程数可以通过setBolt和setSpout方法配置(见前文)。这些线程存在于worker进程中。每个worker进程都包含一定数量的线程,这些线程服务于组件。例如, 假设所有组件上有300个线程,而配置选项中指定了50个worker,则每个worker会执行6个线程,每个线程都会属于不同的组件。你可以通过调整每个组件的并行线程数以及worker进程数来优化Storm
topology的性能。
2、TOPOLOGY_DEBUG(set withsetDebug),当设置为真时,Storm会通过一个组件记录每次发送消息的log。这对于测试本地模式下的topology非常有用,但是在运行集群模式下topology时一般需要关闭。
还有一些有关topology的其他配置,可以参考Javadoc forConfig。
有关如何配置开发环境并且如何在本地模式下运行topology(例如在eclipse),可以参考
Stream grouping
stream grouping通知topology在两个组件之间如何发送tuples。 记住, spouts以及bolts在集群上并行执行很多任务。 如果你观察一些topology在任务层面上是如何运行的,可以看下面:
当一个任务用于从Bolt A 发送tuple到Bolt B时,那它将发送tuple到哪个任务?
“stream grouping”回答了这个问题,它会通知Storm如何在一系列任务之间发送tuples。在深入不同类型的stream groupings之前,可以看一下storm-starter中topology。WordCountTopology从spout读取句子,并从WordCountBolt中输出所识别的词汇的总数:
TopologyBuilder builder = newTopologyBuilder(); builder.setSpout("sentences", newRandomSentenceSpout(), 5); builder.setBolt("split", newSplitSentence(), 8 ) .shuffleGrouping( "split", newSplitSentence(), 8); builder.setBolt( "count", newWordCount(), 12 ) .fieldsGrouping("split", newFields("word"));
SplitSentence 将它所接受的每个句子的每个单词作为tuple发射。WordCount保存了单词与次数之间的内存映射。每次WordCount收到一个词汇,它会更新当前的统计状态。
stream grouping包含几种不同的方式:
最简单的stream grouping方式成为“shuffle grouping”(随机分组),这种方式中, 将tuple随机的发向某个任务。 WordCountTopology应用随机分组的方式, 将tuples从RandomSentencesSpout发送到SplitSentence bolt。它可以将处理tuples的工作均匀的分布在所有的SplitSentence bolt的任务上。
另一个更加有趣的分组方式是“fields grouping”, SplitSentence bolt与WordCount bolt之间使用的就是fields grouping。对于WordCount bolt功能来说,关键点是将同一个word完整的发送到一个task。否则,会有不止一个任务看到这个word,但是多个tasks看到的word并不完整,因此会导致统计不准。fields grouping通过他们fields子集来划分数据流。这就使去同一个task的子集具有相同的值。因为 WordCount使用fields为“word”的fields
grouping方式定于SplitSentence的输出流,因此同一个词总会发送到同一个任务并且bolt输出正确的结果。
fields grouping是实现流合并、流聚合以及其他用例的基础。 背后的实现是:fields grouping使用哈希模式实现tuple的发送。
还有一些其他的种类的stream grouping,可以参考Concepts。
使用不同的语言定义Bolts
Bolts可以使用任何语言定义。 使用其他语言的Bolts作为子进程来执行,Storm通过stdin/stdout使用JSON消息格式与这些子进程通信。通信协议需要100左右的协议库。目前,Storms拥有包括Ruby/python/Fancy语言的协议库。
下面是WordCountTopology中SplitSentence bolt的定义:
public static class SplitSentence extendsShellBolt implements IRichBolt { publicSplitSentence(){ super(“python”, “splitsentence.py”); } publicvoid declareOutputFields(OutputFieldsDeclarer declarer){ declarer.declare(newFields(“word”)); } }
SplitSentence 继承ShellBolt,并且声明,它使用splitsentence.py作为参数在运行时使用python。这里是splitsentence.py具体内容:
import storm class SplitSentenceBolt(storm.BasicBolt): defprocess (self, tup): words= tup.values[0].split(" "); forword in words: storm.emit([word]) SplitSentceBolt().run()
更多有关使用其他语言实现spouts和bolts,以及如何使用其他语言(避免完全使用JVM)创建topologies,可以查看non-JVMlanguages with storm。
可靠的消息处理
本文的前面,我们略过了发射tuples的某些方面。这些方面就是Storm可靠性API部分: Storm如何保证spout发出的每条消息都会得到处理。可以查看Guaranteeingmessage processing 获取更多消息,即它如何可靠工作以及如何做才能利用storm的这些可靠性优点
transaction topologies
Storm保证每条消息都可以被topology至少处理一次。人们通常会问: storm是如何工作的,例如在storm上层是如何计算的,会不会过度计算。Storm中有一个名为transaction topologies的特性,它可以保证大多数计算会取得一次准确的消息传递。更多有关transaction topologies请查看这里。
分布式RPC(remote procedure call protocol)
本文说明了Storm上层是如何进行基本流处理的。有了Storm这些基本的东西,你可以做很多事情。最有趣的应用之一就是分布式RPC,此应用中你可以并行处理一些强度很大的运算。更多有关分布式RPC资料可以阅读这里。
结论
本文叙述了开发、测试、部署storm topology的大概介绍。文档的其他部分会深入探讨怎样使用Storm。