storm 文档(3)----入门指导

转载请注明出处:http://blog.csdn.net/beitiandijun/article/details/41517897

源地址:http://storm.apache.org/documentation/Tutorial.html

本文主要讲述了如何创建Storm topologies以及如何将它们部署在Storm集群中。Java是主要使用的语言,但是依然使用少量Python例子证明了Storm的多语言特性。

初步配置:

本文使用的例子源自storm-start项目。建议你复制这个项目并验证这个项目。阅读 设置开发环境创建Storm项目,然后配置你的环境。

Storm集群组成:

Storm集群类似于Hadoop集群。在Hadoop平台上,你可以运行“MapReducejobs”,在Storm平台上,你可以运行“topologies”。“Jobs”和“topologies”它们是不同的-----一个关键的不同点是:MapReduce任务最终会结束,然而topology可以永久处理消息(直到你杀死它)。

Storm集群中有两种节点: 控制(master)节点和工作(worker)节点。 控制节点上运行的daemon称为“nimbus”,这类似于Hadoop的“job tracker”。nimbus负责向集群分发代码,向机器分发任务,以及监控是否失败。

每一个worker节点运行的daemon称为“Supervisor”,它监听分发给所在机器的任务,并按照nimbus的分配开始和结束worker进程。每一个worker进程执行一个topology的子集;一个运行的topology包含分布在很多机器上的很多worker进程。

supervisor

zookeeper                           supervisor

supervisor

nimbus <------------> zookeeper<------------>      supervisor

supervisor

zookeeper                           supervisor

supervisor

Nimbus 和 Supervisor之间的协作是通过Zookeeper集群完成的。另外, Nimbus  daemon以及Supervisor daemon都是快速失败并且无状态的; 所有的状态都保存在Zookeeper或者本地localdisk中。这就意味着你可以使用命令“kill  -9”杀死Nimbus或者Supersors,而当它们重启的时候没有丢失任何东西。
这种设计使得Storm 集群拥有不可思议的稳定性。

拓扑(Topologies)

在Storm上进行实时运算时,可以创建“topologies”。Topology是计算节点组成的图。topology处理逻辑上包含的每个节点以及节点之间的连接,共同显示了数据是如何在节点之间传递的。

可以直接的运行topology。首先,你需要将你的代码以及依赖库打包成单独的jar;然后,你需要按照下面运行命令行:

storm     jar    all-my-code.jar         backtype.storm.MyTopology arg1 arg2

这条命令执行类backtype.storm.MyTopology,参数为arg1和arg2. 类的主要函数定义了topology并向nimbus提交topology任务。storm  jar这部分负责连接Nimbus并提交jar。

因为topology定义就是Thrift 结构,而Nimbus是Thrift服务,你可以使用任何语言创建并提交topology。上面的例子是使用JVM-based语言的最简单的例子。有关启动和停止topologies的更多信息可以查看 生产节点上运行topologies

流(stream)

Storm的核心抽象是"stream"。stream是tuples的无界序列。为了在分布式条件下以可靠的方式发送stream到新stream, storm提供了一些基础的方式。例如, 你可以将tweets的数据流转换到trendingtopics的数据流。

Storm用来提供数据转换的基本单元是“spouts”和“bolts”。 Spouts和bolts拥有的接口可以使你实现一些特定的应用逻辑。

Spout是数据流来源。例如,spout可以从Kestrel读取tuples并将它们作为数据流发射。或者,Spout可以连接Twitter API并且发射tweets的数据流。

Bolt是数据流消耗者,它可以消耗任意数量的输入流,做一些处理,并且有可能发射新数据流。一些复杂的数据流转换,像从tweets数据流计算某个topics的趋势,一般需要多个步骤,这样也就需要多个bolts。 Bolts可以处理多种数据,例如来自run functions、filter tuples,然后做一些数据流聚合、合并,与数据库交互等。

Spouts和Bolts组成的网络就是topology,这是向Storm集群提交任务的高层抽象。topology是流转换的图,图中的每个节点都是spout或者bolt。图中级联显示了哪些bolts订阅了哪些流。当spout或者bolt向流发射tuple时,它会向所有提出订阅请求的bolts发射tuple。

bolt— bolt

/                 /

spout        —     bolt

\

spout  —  bolt

topology中节点之间的连接显示了tuples是如何传递的。 例如, 如果Spout A 与Bolt B之间存在连接, SpoutA与Bolt C之间存在连接, Bolt B与Bolt C之间存在连接,则每次Spout A发射一个tuple,它向Bolt B和Bolt C都会发送tuple。 Bolt B输出的所有tuples也都会发送到Bolt C。

Storm topology中每个节点都会并行运行。 在topology中,你可以指定每个节点的并行数目, 则Storm在集群中每个节点就会孵化同样数目的线程进行运算。

topology可以永久运行,直到你杀死它。 Storm将自动重新分配任何失败的tasks。另外,Storm保证不会丢失任何数据,即使机器挂掉并且消息被抛弃。

Data model(数据模式)

Storm使用tuples作为它的数据模式。tuple就是已经命名的值的列表,并且tuple的域可以是任何类型的对象。在包装之外,Storm支持所有基本类型、字符串以及字节数组作为tuple值。要使用任何类型的对象,你只需要实现这种类型的序列化器。

topology中每个节点都必须声明: 它所要发射的tuples的输出域。如下所示:bolt声明了: 它发射的两个tuples,它们的域为“double”和“triple”:

public class DoubleAndTripleBolt extendsBaseRichBolt {
         privateOutputCollectorBase  _collector;

         @Override
         publicvoid prepare( Map conf, TopologyContext context, OutputCollectorBase collector){
                   _collector= collector;
         }

         @Override
         publicvoid execute( Tuple input ) {
                   intval = input.getInteger(0);
                   _collector.emit(input,new Values( val * 2, val * 3) );
                   _collector.ack(input );
         }

         @Override
         publicvoid declareOutputFields( OutputFieldsDeclarer declarer ){
                   declarer.declare(newFields("double", "triple"));
         }
}

函数declareOutputFields为组件声明了输出域:["double", "triple"]。 有关bolt其他的部分将在下面介绍。

简单的topology

看一个简单的topology例子,来学习一些其他的概念,并看一下代码组成。下面看一下storm-starter中的ExclamationTopology函数定义:

TopologyBuilder builder = newTopologyBuilder();
builder.setSpout("words", newTestWordSpout(), 10 );
builder.setBolt("exclaim1", new ExclamationBolt(),3).shuffleGrouping("words");
builder.setBolt("exclaim2", newExclamationBolt(), 2).shuffleGrouping("exclaim1");

topology 包含一个Spout和两个bolts。 Spout发射词汇,每个bolts在它们输入的后面增加"!!!".节点将按流水线作业: spout向第一个bolt发射,第一个bolt向第二个bolt发射。如果spout发射tuples ["bob"]和["john"],则第二个bolt将会发送["bob!!!!!!"]和["john!!!!!!"]。

代码使用setSpout 和 setBolt方法定义节点。这些方法可以将很多东西作为输入:用户特定的id,包含处理逻辑的对象,你想要节点并行处理的数量。本例中, spout被给与的id为"words",bolts被给与的ids分别是“exclaim1”和“exclaim2”。

包含处理逻辑的对象可以通过IRichSpout接口 实现spouts,同时可以通过IRichBolt接口
实现bolts。

最后的参数10、3、2,表示希望节点并行处理数,这是可选的。它显示了集群上有多少线程执行这些组件。如果省略它, Storm将会为每个节点分配一个线程。

setBolt返回InputDeclarer对象,它用来定义Bolt的输入。这里,组件"exclaim1"声明了:它将阅读由组件"words"使用随机组方式发射的所有tuples;组件"exclaim2"声明了:它将阅读由组件"exclaim1"使用随机组方式发射的所有tuples。随机组方式是指:
tuples应当从输入任务随机的分发到bolts‘s 任务。在组件之间,有很多种将数据分组的方式。

如果想要组件"exclaim2"接受所有由组件"words"和组件"exclaim1"两者发送的tuples,则你可以按照一下方式定义"exclaim2":

builder.setBolt( "exclaim2", newExclamationBolt(), 5 )
                            .shuffleGrouping("words")
                            .shuffleGrouping("exclaim1");

就像你看到的,对于bolt而言,输入声明可以成链式结构,将多个来源串成一个链子。

现在深入一下topology中spouts和bolts的实现。 Spouts主要负责将新的消息发送给topology。 TestWordSpout 在这个topology中将发送一个随机单词,发送频率为每100ms一个tuple,随机单词源自列表{‘nathan‘, ‘mike‘, ‘jackson‘, ‘golda‘, ‘bertels‘]。nextTuple()在TestWordSpout中的实现如下:

public void nextTule(){
         Utils.sleep(100);
         finalString[] words = new String[] {"nathan", "mike","jackson", "golda", "bertel"};
         finalRandom rand = new Random();
         finalString word = words[ rand.nextInt(words.length)];
         _collector.emit(new Values(word));
}

如你所看,实现很直接。

ExclamationBolt 在它输入后面增加字符串"!!!"。 看一下ExclamationBolt的整个实现:

public static class ExclamationBoltimplements IRichBolt {
         OutputCollector_collector;

         publicvoid prepare( Map conf, TopologyContext context, OutputCollector collector){
                   _collector= collector;
         }

         publicvoid excute(Tuple tuple ){
                   _collector.emit(tuple,new Values(tuple.getString(0) + "!!!"));
                   _collector.ack(tuple);
         }

         publicvoid cleanup() {
         }

         publicvoid declareOutputFields ( OutputFieldsDeclarer declarer ){
                   declarer.declare(new Fields("word"));
         }

         publicMap getComponentConfiguration(){
                   returnnull;
         }
}

prepare方法给bolt提供了OutputCollector,它用来发射来自bolt的tuples。 Tuples可以在任一时刻从bolt中发射-----在prepare、execute、cleanup等方法中,甚至是其他线程中异步方式。prepare的实现简单的将OutputCollector保存为实例变量,这个变量在之后的execute方法中会用到。

execute方法接收来自bolt输入的tuple。ExclamationBolt获取tuple的第一个域,并在其后增加"!!!"然后作为新的tuple发射。如果你实现一个bolt可以订阅多个输入来源,你可以通过使用Tuple#getSourceComponent方法获取tuple来源于哪个组件。

还有一些其他的事情可以使用execute方法,即将输入tuple作为第一个参数传递给emit方法,最后一行代码ack()将确认输入tuple。这些都是Storm可靠性保证API中的部分,将在后面加以解释。

cleanup方法在Bolt停止的时候调用,并且释放任何打开的资源。不能保证这个方法在集群中会被调用:例如,如果任务所在的机器损坏,没有办法调用这个方法。cleanup方法主要是为了你在本地节点(Storm集群在进程中模拟实现)运行topologies时,可以避免资源泄露的同时运行或者杀死topologies。

declareOutputFields方法显示:ExclamationBolt发射一个域为"word"的tuples。

getComponentConfiguration方法允许你配置这个组件。更多讨论可以查看Configuration

像cleanup以及getComponentConfiguration等方法在bolt实现中并不常用。你可以使用提供默认值的基本类定义简洁的bolts,在这些简洁的的实现中,ExclamationBolt可以通过扩展BaseRichBolt获得比较简洁的实现,就像:

public static class ExclamationBolt extendsBaseRichBolt {
         OutputCollector_collector;

         publicvoid prepare (Map conf, TopologyContext context, OutputCollector collector ){
                   _collector= collector;
         }

         publicvoid execute (Tuple tuple){
                   _collector.emit(tuple, new Values( tuple.getString(0) + "!!!" ));
                   _collector.ack(tuple);
         }

         publicvoid declareOutputFields ( OutputFieldsDeclarer declarer ){
                   declarer.declare(new Fields("word"));
         }
}

在本地模式下上运行ExclamationTopology

下面看一下如何在本地模式下运行ExclamationTopology,以及它是如何工作的。

Storm运行有两种模式:本地模式以及分布式模式。 本地模式中, Storm通过使用线程模拟workernodes,这都是在进程内实现的。当你在storm-starter中运行topologies时,它们将在本地模式下运行,同时你能够看到每个组件发射的消息内容。更多介绍请查看本地模式

分布式模式中, Storm运行在集群中。当你像master提交topology时,你也需要提交运行topology所需的所有节点。 master既管理如何划分节点,又分配workers去运行topology。如果workers失败了, master会重新在某个节点分配它们。更多介绍请看集群模式

下面是本地模式下运行ExclamationTopology的代码:

Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);

LocalCluster cluster =new LocalCluster();
cluster.submitTopology("test",conf,builder.ctreateTopology());
Utils.sleep(100000);
cluster.killTopology("test");
cluster.shutdown();
 

首先,代码通过创建LocalCluster对象定义了进程内集群。向虚拟集群提交topology和向分布式集群提交topology是相同的。它通过调用submitTopology方法向LocalCluster对象提交topology,其中submitTopology方法将运行中topology的名字、topology的配置、以及topology本身作为参数。

cluster.submitTopology()参数解释:

topology名字用来确认topology, 这样一来,你就可以通过名字杀死它。topology在你杀死它之前会一直运行。

配置参数conf用来优化运行时topology。以下两种配置比较常见:

1、TOPOLOGY_WORKERS( set withsetNumWorkers):指定了你在集群上分配多少进程执行topology。 topology中的每个组件都有很多线程运行。分配给每个组件的线程数可以通过setBolt和setSpout方法配置(见前文)。这些线程存在于worker进程中。每个worker进程都包含一定数量的线程,这些线程服务于组件。例如, 假设所有组件上有300个线程,而配置选项中指定了50个worker,则每个worker会执行6个线程,每个线程都会属于不同的组件。你可以通过调整每个组件的并行线程数以及worker进程数来优化Storm
topology的性能。

2、TOPOLOGY_DEBUG(set withsetDebug),当设置为真时,Storm会通过一个组件记录每次发送消息的log。这对于测试本地模式下的topology非常有用,但是在运行集群模式下topology时一般需要关闭。

还有一些有关topology的其他配置,可以参考Javadoc forConfig

有关如何配置开发环境并且如何在本地模式下运行topology(例如在eclipse),可以参考

Creatinga new Storm project

Stream grouping

stream grouping通知topology在两个组件之间如何发送tuples。 记住, spouts以及bolts在集群上并行执行很多任务。 如果你观察一些topology在任务层面上是如何运行的,可以看下面:

当一个任务用于从Bolt A 发送tuple到Bolt B时,那它将发送tuple到哪个任务?

“stream grouping”回答了这个问题,它会通知Storm如何在一系列任务之间发送tuples。在深入不同类型的stream groupings之前,可以看一下storm-starter中topology。WordCountTopology从spout读取句子,并从WordCountBolt中输出所识别的词汇的总数:

TopologyBuilder builder = newTopologyBuilder();

builder.setSpout("sentences", newRandomSentenceSpout(), 5);
builder.setBolt("split", newSplitSentence(), 8 )
           .shuffleGrouping( "split", newSplitSentence(), 8);
builder.setBolt( "count", newWordCount(), 12 )
           .fieldsGrouping("split", newFields("word"));

SplitSentence 将它所接受的每个句子的每个单词作为tuple发射。WordCount保存了单词与次数之间的内存映射。每次WordCount收到一个词汇,它会更新当前的统计状态。

stream grouping包含几种不同的方式:

最简单的stream grouping方式成为“shuffle grouping”(随机分组),这种方式中, 将tuple随机的发向某个任务。 WordCountTopology应用随机分组的方式, 将tuples从RandomSentencesSpout发送到SplitSentence bolt。它可以将处理tuples的工作均匀的分布在所有的SplitSentence bolt的任务上。

另一个更加有趣的分组方式是“fields grouping”, SplitSentence bolt与WordCount bolt之间使用的就是fields grouping。对于WordCount bolt功能来说,关键点是将同一个word完整的发送到一个task。否则,会有不止一个任务看到这个word,但是多个tasks看到的word并不完整,因此会导致统计不准。fields grouping通过他们fields子集来划分数据流。这就使去同一个task的子集具有相同的值。因为 WordCount使用fields为“word”的fields
grouping方式定于SplitSentence的输出流,因此同一个词总会发送到同一个任务并且bolt输出正确的结果。

fields grouping是实现流合并、流聚合以及其他用例的基础。 背后的实现是:fields grouping使用哈希模式实现tuple的发送。

还有一些其他的种类的stream grouping,可以参考Concepts

使用不同的语言定义Bolts

Bolts可以使用任何语言定义。 使用其他语言的Bolts作为子进程来执行,Storm通过stdin/stdout使用JSON消息格式与这些子进程通信。通信协议需要100左右的协议库。目前,Storms拥有包括Ruby/python/Fancy语言的协议库。

下面是WordCountTopology中SplitSentence bolt的定义:

public static class SplitSentence extendsShellBolt implements IRichBolt {
         publicSplitSentence(){
                   super(“python”, “splitsentence.py”);
         }

         publicvoid declareOutputFields(OutputFieldsDeclarer declarer){
                   declarer.declare(newFields(“word”));
         }

}

SplitSentence 继承ShellBolt,并且声明,它使用splitsentence.py作为参数在运行时使用python。这里是splitsentence.py具体内容:

import storm

class SplitSentenceBolt(storm.BasicBolt):
         defprocess (self, tup):
                   words= tup.values[0].split(" ");
                   forword  in words:
                            storm.emit([word])

SplitSentceBolt().run()

更多有关使用其他语言实现spouts和bolts,以及如何使用其他语言(避免完全使用JVM)创建topologies,可以查看non-JVMlanguages with storm

可靠的消息处理

本文的前面,我们略过了发射tuples的某些方面。这些方面就是Storm可靠性API部分: Storm如何保证spout发出的每条消息都会得到处理。可以查看Guaranteeingmessage processing 获取更多消息,即它如何可靠工作以及如何做才能利用storm的这些可靠性优点

transaction topologies

Storm保证每条消息都可以被topology至少处理一次。人们通常会问: storm是如何工作的,例如在storm上层是如何计算的,会不会过度计算。Storm中有一个名为transaction topologies的特性,它可以保证大多数计算会取得一次准确的消息传递。更多有关transaction topologies请查看这里

分布式RPC(remote procedure call protocol)

本文说明了Storm上层是如何进行基本流处理的。有了Storm这些基本的东西,你可以做很多事情。最有趣的应用之一就是分布式RPC,此应用中你可以并行处理一些强度很大的运算。更多有关分布式RPC资料可以阅读这里

结论

本文叙述了开发、测试、部署storm topology的大概介绍。文档的其他部分会深入探讨怎样使用Storm。

时间: 2024-11-05 07:25:15

storm 文档(3)----入门指导的相关文章

storm文档(6)----storm手册目录

源地址:http://storm.apache.org/documentation/Documentation.html storm基础知识 l  Javadoc l  概念 l  配置 l  保证消息处理机制 l  容错性能 l  命令行客户端 l  理解storm topology并行机制 l  FAQ trident 对storm来说,trident是可选接口.它提供了准确的一次性处理.事务性数据存储保持以及一系列通用数据流分析操作. l  Trident指导-----基本概念及浏览 l 

storm文档(12)----自己搭建storm集群

转载请注明出处:http://blog.csdn.net/beitiandijun/article/details/41802543 ubuntu下  storm  安装步骤 安装storm之前首先需要安装一些依赖库: zookeeper.JDK 6.python2.6.6.jzmq.zeromq 这些库所需要的依赖库不再一一笔述. 以下为具体安装过程: 一.安装JDK zookeeper要求安装JDK 6或更高版本( 目前最新稳定版本为JDK8), 但是由于storm要求安装JDK 6, 因此

storm文档(10)----容错

源地址:http://storm.apache.org/documentation/Fault-tolerance.html 本文主要介绍Storm作为容错系统的设计细节. 当worker死掉时会发生什么? 当worker死掉时, supervisor将重启它. 如果worker启动总是失败,则worker就不能发送心跳消息给Nimbus, 那Nimbus就会重新在另一台machine上启动它. 当node死掉时会发生什么? 分配到这个节点的所有tasks都会超时,那Nimbus会将这些task

storm文档(11)----搭建storm集群

转载请注明出处:http://blog.csdn.net/beitiandijun/article/details/41684717 源地址:http://storm.apache.org/documentation/Setting-up-a-Storm-cluster.html 本文叙述了storm集群搭建和运行步骤.如果你打算在AWS上进行的话,可以使用storm-deploy项目.storm-deploy在EC2上完全自动进行下载.配置.以及storm集群的安装等步骤.它也为你配置了Gan

storm文档(5)----创建storm新项目

源地址:http://storm.apache.org/documentation/Creating-a-new-Storm-project.html 本文主要介绍如何配置开发的storm项目.步骤如下: 1.将storm jar包加到classpath中 2.如果使用多语言特性,将多语言实现的目录加到classpath中 下面跟着一块看一下在Eclipse环境中如何配置storm-starter项目. 将Storm jars包加到classpath中 你需要将storm jars包加到你的cl

storm文档(4)----开发环境环境搭建

转载请注明出处:http://blog.csdn.net/beitiandijun/article/details/41519053 源地址:http://storm.apache.org/documentation/Setting-up-development-environment.html 本文大体介绍了如何搭建Storm开发环境.总的来说,步骤如下: 1.下载storm release版本.解压缩,然后将解压缩版本的/bin目录放在你的环境变量PATH中. 2.远程集群上topologi

storm文档(9)----消息处理保证机制

转载请注明出处:http://blog.csdn.net/beitiandijun/article/details/41577125 源地址:http://storm.apache.org/documentation/Guaranteeing-message-processing.html Storm保证:每条离开spout的消息都可以得到"fullyprocessed".本文描述了storm如何实现这种保证以及你如何能够从Storm这种可靠性能力中受益. "fully pr

storm文档(7)----基本概念

转载请注明出处:http://blog.csdn.net/beitiandijun/article/details/41546195 源地址:http://storm.apache.org/documentation/Concepts.html 本文介绍了storm的主要概念,并且给出相关链接供你查看更多信息.本文讨论的概念如下所示: 1.Topologies 2.Streams 3.Spouts 4.Bolts 5.Stream Grouping 6.Reliability 7.Tasks 8

storm文档(8)----配置文件说明

转载请注明出处:http://blog.csdn.net/beitiandijun/article/details/41547569 源地址:http://storm.apache.org/documentation/Configuration.html storm由丰富的configure选项, 用来调整nibus.supervisor.以及运行时topologies的行为.某些配置选项是系统配置,例如topology基础配置,修改某个topology的这些选项有可能影响到所有topologi