[转]JStorm介绍

一、简介

Storm是开源的分布式容错实时计算系统,目前被托管在GitHub上,遵循 Eclipse Public License 1.0。最初由BackType开发,现在已被Twitter收入麾下。Storm最新版本是Storm 0.9,核心采用Clojure实现。Storm为分布式实时计算提供了一组通用原语,可被用于“流处理”之中,实时处理消息;Storm也可被用于“连续计算”(continuous computation),对数据流做连续处理,在计算时就将结果以流的形式输出给用户;它还可被用于“分布式RPC”,以并行的方式执行运算。

Storm主要特点如下:
0、简单的编程模型。类似于MapReduce降低了并行批处理复杂性,Storm降低了实时处理的复杂性。
1、语言无关。Storm的消息处理组件可以用任何语言来定义。默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。
2、容错性。如果在消息处理过程中出了一些异常,Storm会重新调度出问题的处理逻辑。Storm保证一个处理单元永远运行,除非显式杀掉。
3、可伸缩性。Storm的可伸缩性可以使其每秒处理的消息量达到很高。为了扩展一个实时计算任务,需要做的就是增加节点并且提高计算任务的并行度设置(parallelism setting)。Storm应用在10个节点的集群上每秒可以处理高达1000000个消息,包括每秒一百多次的数据库调用[5]。同时Storm使用ZooKeeper来协调集群内的各种配置使得Storm的集群可以很容易扩展。
4、保证无数据丢失。实时系统必须保证所有的数据被成功的处理。 那些会丢失数据的系统的适用场景非常窄,而Storm保证每一条消息都会被处理。
5、适用场景广泛。消息流处理、持续计算、分布式方法调用等是Storm适用场景广泛的基础,Storm的这些基础原语可以满足大量的场景。

虽然Storm具备诸多优势,但也存在不足:
0、Storm目前还存在Nimbus SPOF的问题;
1、存在雪崩问题;
2、资源粒度较粗;
3、Clojure实现引入了学习成本;

为此,阿里巴巴中间件团队用Java重新实现了类Storm的JStorm,同样被托管在GitHub上,遵循 Eclipse Public License 1.0,目前版本0.9.3。相关资料显示,阿里巴巴内部已经大规模部署了Storm/JStorm集群。
JStorm继承了Storm的所有优点,同时与Storm相比JStorm所特有的如下特点:
0、兼容Storm接口。开发者在Storm上运行的程序无需任何修改即可运行在JStorm上。
1、Nimbus HA。解决了Storm的Nimbus单点问题,支持自动热备切换Nimbus。
2、更细粒度的资源划分。JStorm从CPU、MEMORY、DISK和NET四个维度进行任务调度,同时不存在任务抢占问题。
3、可定制的任务调度机制。(Storm的任务调度目前也可定制)
4、更好的性能。通过底层ZeroMQ和Netty使JStorm具有更好的性能,同时具有更好的稳定性。
5、解决了Storm的雪崩问题。通过Netty和disruptor机制实现RPC保证可以匹配的数据发送和接收速度避免雪崩问题。

此外,JStorm通过减少对zookeeper的访问量、增加反序列化线程、优化ACK、增加监控内容及JAVA本身优势等各个方面优化了Storm的性能和稳定性。总之,JStorm比Storm更强大、更稳定、性能更好
(本文后面所述关于JStorm的部分内容同样适用Storm)

二、数据模型

JStorm通过一系列基本元素实现实时计算的目标,其中包括了Topology、Stream、Spout、Bolt等等。JStorm在模型上和MapReduce有很多相似的地方,下表从不同维度对JStorm和MapReduce进行了比较。


MapReduce


JStorm


Role


JobTracker


Nimbus


TaskTracker


Supervisor


Child


Worker


Application


Job


Topology


Interface


Mapper/Reducer


Spout/Bolt

实时计算任务需要打包成Topology提交,和MapReduce Job相似,不同的是,MapReduce Job在计算完成后结束,而JStorm的Topology任务一旦提交永远不会结束,除非显式停止。

计算任务Topology是由不同的Spout和Bolt通过Stream连接起来的DAG图。下面是一个典型Topology的结构示意图:

其中:

Spout:JStorm的消息源。用于生产消息,一般是从外部数据源(如MQ/RDBMS/NoSQL/RTLog等)不间断读取数据并向下游发送消息。

Bolt:JStorm的消息处理者。用于为Topology进行消息处理,Bolt可以执行查询、过滤、聚合及各种复杂运算操作,Bolt的消息处理结果可以作为下游Bolt的输入不断迭代。

Stream:JStorm中对数据进行的抽象,它是时间上无界的Tuple元组序列。在Topology中Spout是Stream的源头,负责从特定数据源发射Stream;Bolt可以接收任意多个Stream输入然后进行数据的加工处理,如果需要Bolt还可以发射出新Stream给下游Bolt。

Tuple:JStorm使用Tuple作为数据模型,存在于任意两个有数据交互的组件(Spout/Bolt)之间。每个Tuple是一组具有各自名称的值,值可以是任何类型,JStorm支持所有的基本类型、字符串以及字节数组,也可以使用自定义类型(需实现对应序列化器)作为值类型。简单来说,Tuple就是一组实现了序列化器带有名称的Java对象集合。

从整个Topology上看,Spout/Bolt可以看作DAG的节点,Stream是连接不同节点之间的有向边,Tuple则是流过Stream的数据集合。
下面是一个Topology内部Spout和Bolt之间的数据流关系:

Topology中每一个计算组件(Spout和Bolt)都有一个并行度,在创建Topology时指定(默认为1),JStorm在集群内分配对应个数的线程Task并行。

如上图示,既然对于Spout/Bolt都会有多个线程来并行执行,那么如何在两个组件(Spout和Bolt)之间发送Tuple会成为新的问题。

JStorm通过定义Topology时为每个Bolt指定输入Stream以及指定提供的若干种数据流分发(Stream Grouping)策略用来解决这一问题。

JStorm提供了以下几种Stream Grouping策略:
0) Shuffle Grouping:随机分组,随机派发Stream里面的Tuple,保证每个Bolt接收到的Tuple数目大致相同,通过轮询随机的方式使得下游Bolt之间接收到的Tuple数目差值不超过1。
1) Fields Grouping:按字段分组,具有同样字段值的Tuple会被分到相同Bolt里的Task,不同字段值则会被分配到不同Task。
2) All Grouping:广播分组,每一个Tuple,所有的Bolt都会收到。
3) Global Grouping:全局分组,Tuple被分配到Bolt中ID值最低的的一个Task。
4) Non Grouping:不分组,Tuple会按照完全随机的方式分发到下游Bolt。
5) Direct Grouping:直接分组,Tuple需要指定由Bolt的哪个Task接收。 只有被声明为Direct Stream的消息流可以声明这种分组方法。
6) Local or Shuffle Grouping:基本同Shuffle Grouping。
7) Custom Grouping:用户自定义分组策略,CustomStreamGrouping是自定义分组策略时用户需要实现的接口。

三、系统架构

JStorm与Hadoop相似,保持了Master/Slave的简洁优雅架构。与Hadoop不同,JStorm的M/S之间不是直接通过RPC交换心跳信息,而是借助ZK来实现,这样的设计虽然引入了第三方依赖,但是简化了Nimbus/Supervisor的设计,同时也极大提高了系统的容错能力。

整个JStorm系统中共存三类不同的Daemon进程,分别是Nimbus,Supervisor和Worker。

Nimbus:JStorm中的主控节点,Nimbus类似于MR的JT,负责接收和验证客户端提交的Topology,分配任务,向ZK写入任务相关的元信息,此外,Nimbus还负责通过ZK来监控节点和任务健康情况,当有Supervisor节点变化或者Worker进程出现问题时及时进行任务重新分配。Nimbus分配任务的结果不是直接下发给Supervisor,也是通过ZK维护分配数据进行过渡。特别地,JStorm 0.9.0领先Apache Storm实现了Nimbus HA,由于Nimbus是Stateless节点,所有的状态信息都交由ZK托管,所以HA相对比较简单,热备Nimbus subscribe ZK关于Master活跃状态数据,一旦发现Master出现问题即从ZK里恢复数据后可以立即接管。

Supervisor:JStorm中的工作节点,Supervisor类似于MR的TT,subscribe ZK分配到该节点的任务数据,根据Nimbus的任务分配情况启动/停止工作进程Worker。Supervisor需要定期向ZK写入活跃端口信息以便Nimbus及时监控。Supervisor不执行具体的数据处理工作,所有的数据处理工作都交给Worker完成。

Worker:JStorm中任务执行者,Worker类似于MR的Task,所有实际的数据处理工作最后都在Worker内执行完成。Worker需要定期向Supervsior汇报心跳,由于在同一节点,同时为保持节点的无状态,Worker定期将状态信息写入本地磁盘,Supervisor通过读本地磁盘状态信息完成心跳交互过程。Worker绑定一个独立端口,Worker内所有单元共享Worker的通信能力。

Nimbus、Supervisor和Worker均为Stateless节点,支持Fail-Fast,这为JStorm的扩展性和容错能力提供了很好的保障。

还剩一个问题是Topology的各个计算组件(Spout/Bolt)如何映射到计算资源上。梳理这个问题前需要先明确Worker/Executor/Task之间的关系:

0、Worker:完整的Topology任务是由分布在多个Supervisor节点上的Worker进程(JVM)来执行,每个Worker都执行且仅执行Topology任务的一个子集。

1、Executor:Worker内部会有一个或多个Executor,每个Executor对应一个线程。Executor包括SpoutExecutor和BoltExecutor,同一个Worker里所有的*Executor只能属于某一个Topology里的执行单元。

2、Task:执行具体数据处理实体,也就是用户实现的Spout/Blot实例。一个Executor可以对应多个Task,定义Topology时指定,默认Executor和Task一一对应。这就是说,系统中Executor数量一定是小于等于Task数量(#Executor≤#Task)。

下图给出了一个简单的例子,上半部分描述的是Topology结构及相关说明,其中定义了整个Topology的worker=2,DAG关系,各个计算组件的并行度;下半部分描述了Topology的Task在Supervisor节点的分布情况。从中可以看出Topology到Executor之间的关系。

0、Worker数在提交Topology时在配置文件中指定;

例:#Worker=2

1、执行线程/Executor数在定义Topology的各计算组件并行度时决定,可以不指定,默认为1。其中各个计算组件的并行度之和即为该Topology执行线程总数。

例:#Executor=sum(#parallelism hint)=2+2+6=10

2、Task数目也在定义Toplogy时确定,若不指定默认每个Executor线程对应一个Task,若指定Task数目会在指定数目的线程里平均分配。

例:#Task=sum(#task)=2+4+6=12,其中Executor4={Task0,Task1}

四、 关键流程

0、Topology提交

JStorm为用户提供了StormSubmitter. submitTopology用来向集群提交Topology,整个提交流程:

Client端:
0)客户端简单验证;
1)检查是否已经存在同名Topology;
2)提交jar包;
3)向Nimbus提交Topology;

Nimbus端:
0)Nimbus端简单合法性检查;
1)生成Topology Name;
2)序列化配置文件和Topology Code;
3)Nimbus本地准备运行时所需数据;
4)向ZK注册Topology和Task;
5)将Task压入分配队列等待TopologyAssign分配;

1、任务调度策略

从0.9.0开始,JStorm提供非常强大的调度功能,基本上可以满足大部分的需求,同时支持自定义任务调度策略。JStorm的资源不再仅是Worker的端口,而从CPU/Memory/Disk/Net等四个维度综合考虑。
Nimbus任务调度算法[2]如下:
0)优先使用自定义任务分配算法,当资源无法满足需求时,该任务放到下一级任务分配算法;
1)使用历史任务分配算法(如果打开使用历史任务属性),当资源无法满足需求时,该任务放到下一级任务分配算法;
2)使用默认资源平衡算法,计算每个Supervisor上剩余资源权值,取权值最高的Supervisor分配任务。

2、Acker机制

为保证无数据丢失,Storm/JStorm使用了非常漂亮的可靠性处理机制,如图当定义Topology时指定Acker,JStorm除了Topology本身任务外,还会启动一组称为Acker的特殊任务,负责跟踪Topolgogy DAG中的每个消息。每当发现一个DAG被成功处理完成,Acker就向创建根消息的Spout任务发送一个Ack信号。Topology中Acker任务的并行度默认parallelism hint=1,当系统中有大量的消息时,应该适当提高Acker任务的并行度。

Acker按照Tuple Tree的方式跟踪消息。当Spout发送一个消息的时候,它就通知对应的Acker一个新的根消息产生了,这时Acker就会创建一个新的Tuple Tree。当Acker发现这棵树被完全处理之后,他就会通知对应的Spout任务。

Acker任务保存了数据结构Map<MessageID,Map< TaskID, Value>>,
其中MessageID是Spout根消息ID,TaskID是Spout任务ID,Value表示一个64bit的长整型数字,是树中所有消息的随机ID的异或结果。通过TaskID,Acker知道当消息树处理完成后通知哪个Spout任务,通过MessageID,Acker知道属于Spout任务的哪个消息被成功处理完成。Value表示了整棵树的的状态,无论这棵树多大,只需要这个固定大小的数字就可以跟踪整棵树。当消息被创建和被应答的时候都会有相同的MessageID发送过来做异或。当Acker发现一棵树的Value值为0的时候,表明这棵树已经被成功处理完成。

例如,对于前面Topology中消息树,Acker数据的变化过程:
Step0.A发送T0给B后:
R0=r0
<id0,<taskA,R0>>
Step1.B接收到T0并成功处理后向C发送T1,向D发送T2:
R1=R0^r1^r2=r0^r1^r2
<id0,<taskA,R0^R1>>
=<id0,<taskA,r0^r0^r1^r2>>
=<id0,<taskA,r1^r2>>
Step2.C接收到T1并成功处理后:
R2=r1
<id0,<taskA,r1^r2^R2>>
=<id0,<taskA,r1^r2^r1>>
=<id0,<taskA,r2>>
Step3.D接收到T2并成功处理后:
R3=r2
<id0,<taskA,r2^R3>>
=<id0,<taskA,r2^r2>>
=<id0,<taskA,0>>
当结果为0时Acker可以通知taskA根消息id0的消息树已被成功处理完成。

需要指出的是,Acker并不是必须的,当实际业务可以容忍数据丢失情况下可以不用Acker,对数据丢失零容忍的业务必须打开Acker,另外当系统的消息规模较大是可适当增加Acker的并行度。

3、故障恢复

0)节点故障

Nimbus故障。Nimbus本身无状态,所以Nimbus故障不会影响正在正常运行任务,另外Nimbus HA保证Nimbus故障后可以及时被备份Nimbus接管。
Supervisors节点故障。Supervisor故障后,Nimbus会将故障节点上的任务迁移到其他可用节点上继续运行,但是Supervisor故障需要外部监控并及时手动重启。
Worker故障。Worker健康状况监控由Supervisor负责,当Woker出现故障时,Supervisor会及时在本机重试重启。
Zookeeper节点故障。Zookeeper本身具有很好的故障恢复机制,能保证至少半数以上节点在线就可正常运行,及时修复故障节点即可。

1)任务失败

Spout失败。消息不能被及时被Pull到系统中,造成外部大量消息不能被及时处理,而外部大量计算资源空闲。
Bolt失败。消息不能被处理,Acker持有的所有与该Bolt相关的消息反馈值都不能回归到0,最后因为超时最终Spout的fail将被调用。
Acker失败。Acker持有的所有反馈信息不管成功与否都不能及时反馈到Spout,最后同样因为超时Spout的fail将被调用。
任务失败后,需要Nimbus及时监控到并重新分配失败任务。

五、基础接口

这里把几个基础接口中注释摘出来说明其的作用:

0、ISpout: ISpout is the core interface for implementing spouts. A Spout is responsible for feeding messages into the topology for processing. For every tuple emitted by a spout, Storm will track the (potentially very large) DAG of tuples generated based on a tuple emitted by the spout. When Storm detects that every tuple in that DAG has been successfully processed, it will send an ack message to the Spout.
1、IBolt: IBolt represents a component that takes tuples as input and produces tuples as output. An IBolt can do everything from filtering to joining to functions to aggregations. It does not have to process a tuple immediately and may hold onto tuples to process later.
2、TopologyBuilder: TopologyBuilder exposes the Java API for specifying a topology for Storm to execute.
3、StormSubmitter: Use this class to submit topologies to run on the Storm cluster.

针对前面例子中的Topology这里给出一个简单的实现,其中略去了BlueSpout/GreeBolt/YellowBolt的具体实现,更多参考这里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void main (String[] args){
   Config conf = new Config();
   // use two worker processes
   conf.setNumWorkers(2);
   // set parallelism hint to 2
   topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2);
   topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
                  .setNumTasks(4)
                  .shuffleGrouping("blue-spout");
   topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
                  .shuffleGrouping("green-bolt");
   StormSubmitter.submitTopology(
        "mytopology",
        conf,
        topologyBuilder.createTopology());
}

JStorm更多包括事务在内的接口详见源码。

六、结语

本文对JStorm做了简单介绍,有错误之处敬请指正。

七、参考文档

[1]Storm社区.http://storm.incubator.apache.org/

[2]JStorm源码.https://github.com/alibaba/jstorm/

[3]Storm源码.https://github.com/nathanmarz/storm/

[4]Jonathan Leibiusky, Gabriel Eisbruch, etc. Getting Started with Storm.http://shop.oreilly.com/product/0636920024835.do. O’Reilly Media, Inc.

[5]Xumingming Blog.http://xumingming.sinaapp.com/

[6]量子恒道官方博客.http://blog.linezing.com/

[7]Google Image.http://images.google.com

时间: 2024-12-19 04:43:08

[转]JStorm介绍的相关文章

[转]Twemproxy 介绍与使用

Twemproxy是一种代理分片机制,由Twitter开源.Twemproxy作为代理,可接受来自多个程序的访问,按照路由规则,转发给后台的各个Redis服务器,再原路返回.该方案很好的解决了单个Redis实例承载能力的问题.当然,Twemproxy本身也是单点,需要用Keepalived做高可用方案.通过Twemproxy可以使用多台服务器来水平扩张redis服务,可以有效的避免单点故障问题.虽然使用Twemproxy需要更多的硬件资源和在redis性能有一定的损失(twitter测试约20%

[原创]jQuery的this和$(this)

网上有很多关于jQuery的this和$(this)的介绍,大多数只是理清了this和$(this)的指向,其实它是有应用场所的,不能一概而论在jQuery调用成员函数时,this就是指向dom对象. $(this)指向jQuery对象是无可厚非的,但this就是指向dom对象,这个是因为jQuery做了特殊的处理. 在创建dom的jQuery对象时,jQuery不仅仅为dom创建一个jQuery对象,而且还将dom存储在所创建对象的数组中. elem = document.getElement

[转]Lua中的元表与元方法

前言 元表对应的英文是metatable,元方法是metamethod.我们都知道,在C++中,两个类是无法直接相加的,但是,如果你重载了“+”符号,就可以进行类的加法运算.在Lua中也有这个道理,两个table类型的变量,你是无法直接进行“+”操作的,如果你定义了一个指定的函数,就可以进行了.那这篇博文就是主要讲的如果定义这个指定的函数,这个指定的函数时什么?希望对学习Lua的朋友有帮助. Lua是怎么做的? 通常,Lua中的每个值都有一套预定义的操作集合,比如数字是可以相加的,字符串是可以连

[转]Lua语言基础汇总(4) -- 函数

Lua中的函数和C++中的函数的含义是一致的,Lua中的函数格式如下: 1 2 3 function MyFunc(param)      -- Do something end 在调用函数时,也需要将对应的参数放在一对圆括号中,即使调用函数时没有参数,也必须写出一对空括号.对于这个规则只有一种特殊的例外情况:一个函数若只有一个参数,并且此参数是一个字符串或table构造式,那么圆括号便可以省略掉.看以下代码: 1 2 3 4 5 6 print "Hello World"      

Nutch 1.0 源代码分析[3] Plugin(2)

 Nutch 1.0 源代码分析[3] Plugin(2)  来自: http://c.tieba.baidu.com/p/3439551436 在URLNormalizers构造函数中,有一句没有看: this.extensionPoint =PluginRepository.get(conf).getExtensionPoint( URLNormalizer.X_POINT_ID); 看一下PluginRepository.get函数: public static synchronizedP

[翻译]The Neophyte&#39;s Guide to Scala Part 12: Type Classes

The Neophyte's Guide to Scala Part 12: Type Classes 过去的两周我们讨论了一些使我们保持DRY和灵活性的函数式编程技术,特别是函数组合,partial function的应用,以及currying.接下来,我将会继续讨论如何使你的代码尽可能的灵活. 但是,这次我们将不会讨论怎么使用函数作为一等对象来达到这个目的,而是使用类型系统,这次它不是阻碍着我们,而是使得我们的代码更灵活:你将会学到关于 type classes 的知识. 你可能会觉得这是一

[转]useradd 与adduser的区别

转自:Deit_Aaron的专栏 添加用户:useradd -m 用户名  然后设置密码  passwd 用户名 删除用户:userdel  -r  用户名 1. 在root权限下,useradd只是创建了一个用户名,如 (useradd  +用户名 ),它并没有在/home目录下创建同名文件夹,也没有创建密码,因此利用这个用户登录系统,是登录不了的,为了避免这样的情况出现,可以用 (useradd -m +用户名)的方式创建,它会在/home目录下创建同名文件夹,然后利用( passwd +

[转]iOS应用程序生命周期(前后台切换,应用的各种状态)详解

转载地址:http://blog.csdn.net/totogo2010/article/details/8048652 iOS的应用程序的生命周期,还有程序是运行在前台还是后台,应用程序各个状态的变换,这些对于开发者来说都是很重要的. iOS系统的资源是有限的,应用程序在前台和在后台的状态是不一样的.在后台时,程序会受到系统的很多限制,这样可以提高电池的使用和用户体验. //开发app,我们要遵循apple公司的一些指导原则,原则如下: 1.应用程序的状态 状态如下: Not running

[转]关于NSAutoreleasePool&#39; is unavailable: not available in automatic reference counting mode的解决方法

转载地址:http://blog.csdn.net/xbl1986/article/details/7216668 Xcode是Version 4.2 Build 4D151a 根据Objective-c 2.0程序设计上的旧版本的代码会发生NSAutoreleasePool' is unavailable: not available in automatic reference counting mode的错误 需要手动关闭工程中ARC 工程中 Build Settings--->Apple