摘要: 在Hadoop生态圈中,针对大数据进行批量计算时,通常需要一个或者多个MapReduce作业来完成,但这种批量计算方式是满足不了对实时性要求高的场景。那Storm是怎么做到的呢?
博主福利 给大家赠送一套hadoop视频课程
授课老师是百度 hadoop 核心架构师
内容包括hadoop入门、hadoop生态架构以及大型hadoop商业实战案例。
讲的很细致, MapReduce 就讲了 15 个小时。
学完后可以胜任 hadoop 的开发工作,很多人学的这个课程找到的工作。
(包括指导书、练习代码、和用到的软件都打包了)
先到先得先学习。联系老师微信ganshiyu1026,备注OSchina。即可免费领取
部分视频截图展示
流式计算解决方案-Storm
在Hadoop生态圈中,针对大数据进行批量计算时,通常需要一个或者多个MapReduce作业来完成,但这种批量计算方式是满足不了对实时性要求高的场景。
Storm是一个开源分布式实时计算系统,它可以实时可靠地处理流数据。
本章内容:
1) Storm特点
2) Storm基本概念
3) Storm分组模式
4) Storm系统架构
5) Storm容错机制
6) 一个简单的Storm实现
1. Storm特点
在Storm出现之前,进行实时处理是非常痛苦的事情,我们主要的时间都花在关注往哪里发消息,从哪里接收消息,消息如何序列化,真正的业务逻辑只占了源代码的一小部分。一个应用程序的逻辑运行在很多worker上,但这些worker需要各自单独部署,还需要部署消息队列。最大问题是系统很脆弱,而且不是容错的:需要自己保证消息队列和worker进程工作正常。
Storm完整地解决了这些问题。它是为分布式场景而生的,抽象了消息传递,会自动地在集群机器上并发地处理流式计算,让你专注于实时处理的业务逻辑。
Storm有如下特点:
1) 编程简单:开发人员只需要关注应用逻辑,而且跟Hadoop类似,Storm提供的编程原语也很简单
2) 高性能,低延迟:可以应用于广告搜索引擎这种要求对广告主的操作进行实时响应的场景。
3) 分布式:可以轻松应对数据量大,单机搞不定的场景
4) 可扩展:随着业务发展,数据量和计算量越来越大,系统可水平扩展
5) 容错:单个节点挂了不影响应用
6) 消息不丢失:保证消息处理
不过Storm不是一个完整的解决方案。使用Storm时你需要关注以下几点:
1) 如果使用的是自己的消息队列,需要加入消息队列做数据的来源和产出的代码
2) 需要考虑如何做故障处理:如何记录消息处理的进度,应对Storm重启,挂掉的场景
3) 需要考虑如何做消息的回退:如果某些消息处理一直失败怎么办?
2. Storm与Hadoop区别
1) 定义及架构
Hadoop是Apache的一个项目,是一个能够对大量数据进行分布式处理的软件框架。
Storm是Apache基金会的孵化项目,是应用于流式数据实时处理领域的分布式计算系统。
Hadoop |
Storm |
|
系统角色 |
JobTracker |
Nimbus |
TaskTracker |
Supervisor |
|
Child |
Worker |
|
应用名称 |
Job |
Topology |
组件接口 |
Mapper/Reducer |
Spout/Bolt |
2) 应用方面
Hadoop是分布式批处理计算,强调批处理,常用于数据挖掘和分析。
Storm是分布式实时计算,强调实时性,常用于实时性要求较高的地方。
3) 计算处理方式
Hadoop是磁盘级计算,进行计算时,数据在磁盘上,需要读写磁盘;Hadoop应用MapReduce的思想,将数据切片计算来处理大量的离线数据。Hadoop处理的数据必须是已经存放在HDFS上或者类似HBase的数据库中,所以Hadoop实现的时候是通过移动计算到这些存放数据的机器上来提高效率的。
Storm是内存级计算,数据直接通过网络导入内存。Storm是一个流计算框架,处理的数据是实时消息队列中的,需要写好一个Topology逻辑,然后将接收进来的数据进行处理,所以Storm是通过移动数据平均分配到机器资源来获得高效率的。
4) 数据处理方面
数据来源:Hadoop是HDFS上某个文件夹下的数据,数据量可能以TB来计;而Storm则是实时新增的某一笔数据。
处理过程:Hadoop是Map阶段到Reduce阶段的;Storm是由用户定义处理流程,流程中可以包含多个步骤,每个步骤可以是数据源(SPOUT),也可以是处理逻辑(BOLT)。
是否结束:Hadoop最后必须要结束;而Storm没有结束状态,到最后一步时,就停在那,直到有新数据进入时再重新开始。
处理速度:Hadoop以处理HDFS上大量数据为目的,速度慢;Storm只要处理新增的某一笔数据即可,故此它的速度很快。
适用场景:Hadoop主要是处理一批数据,对时效性要求不高,需要处理就提交一个JOB;而Storm主要是处理某一新增数据的,故此时效性要求高。
总结,Hadoop和Storm并没有真的优劣之分,它们只是在各自的领域上有着独特的性能而已,若是真的把它们进行单纯的比较,反而是有失公平了。事实上,只有在最合适的方面使用最合适的大数据平台,才能够真正体现出它们的价值,也才能够真正为我们的工作提供最为便捷的助力!
3. Storm基本概念
1) Topology
一个Storm拓扑打包了一个实时处理程序的逻辑。一个Storm拓扑跟一个MapReduce的任务(job)是类似的。主要区别是MapReduce任务最终会结束,而拓扑会一直运行(当然直到你杀死它)。一个拓扑是一个通过流分组(Stream Grouping)把Spout和Bolt连接到一起的拓扑结构。图的每条边代表一个Bolt订阅了其他Spout或者Bolt的输出流。一个拓扑就是一个复杂的多阶段的流计算。
2) Tuple
元组是Storm提供的一个轻量级的数据格式,可以用来包装你需要实际处理的数据。元组是一次消息传递的基本单元。一个元组是一个命名的值列表,其中的每个值都可以是任意类型的。元组是动态地进行类型转化的—字段的类型不需要事先声明。在Storm中编程时,就是在操作和转换由元组组成的流。通常,元组包含整数,字节,字符串,浮点数,布尔值和字节数组等类型。要想在元组中使用自定义类型,就需要实现自己的序列化方式。
3) Stream
流是Storm中的核心抽象。一个流由无限的元组序列组成,这些元组会被分布式并行地创建和处理。通过流中元组包含的字段名称来定义这个流。
每个流声明时都被赋予了一个ID。只有一个流的Spout和Bolt非常常见,所以OutputFieldsDeclarer提供了不需要指定ID来声明一个流的函数(Spout和Bolt都需要声明输出的流)。这种情况下,流的ID是默认的“default”。
4) Spout
Spout(喷嘴,这个名字很形象)是Storm中流的来源。通常Spout从外部数据源,如消息队列中读取元组数据并吐到拓扑里。Spout可以是可靠的(reliable)或者不可靠(unreliable)的。可靠的Spout能够在一个元组被Storm处理失败时重新进行处理,而非可靠的Spout只是吐数据到拓扑里,不关心处理成功还是失败了。
Spout可以一次给多个流吐数据。此时需要通过OutputFieldsDeclarer的declareStream函数来声明多个流并在调用SpoutOutputCollector提供的emit方法时指定元组吐给哪个流。
Spout中最主要的函数是nextTuple,Storm框架会不断调用它去做元组的轮询。如果没有新的元组过来,就直接返回,否则把新元组吐到拓扑里。nextTuple必须是非阻塞的,因为Storm在同一个线程里执行Spout的函数。
Spout中另外两个主要的函数是Ack和fail。当Storm检测到一个从Spout吐出的元组在拓扑中成功处理完时调用Ack,没有成功处理完时调用Fail。只有可靠型的Spout会调用Ack和Fail函数。
5) Bolt
在拓扑中所有的计算逻辑都是在Bolt中实现的。一个Bolt可以处理任意数量的输入流,产生任意数量新的输出流。Bolt可以做函数处理,过滤,流的合并,聚合,存储到数据库等操作。Bolt就是流水线上的一个处理单元,把数据的计算处理过程合理的拆分到多个Bolt、合理设置Bolt的task数量,能够提高Bolt的处理能力,提升流水线的并发度。
Bolt可以给多个流吐出元组数据。此时需要使用OutputFieldsDeclarer的declareStream方法来声明多个流并在使用[OutputColletor](https://storm.apache.org/javadoc/apidocs/backtype/storm/task/OutputCollector.html)的emit方法时指定给哪个流吐数据。
当你声明了一个Bolt的输入流,也就订阅了另外一个组件的某个特定的输出流。如果希望订阅另一个组件的所有流,需要单独挨个订阅。InputDeclarer有语法糖来订阅ID为默认值的流。例如declarer.shuffleGrouping("redBolt")订阅了redBolt组件上的默认流,跟declarer.shuffleGrouping("redBolt", DEFAULT_STREAM_ID)是相同的。
在Bolt中最主要的函数是execute函数,它使用一个新的元组当作输入。Bolt使用OutputCollector对象来吐出新的元组。Bolts必须为处理的每个元组调用OutputCollector的ack方法以便于Storm知道元组什么时候被各个Bolt处理完了(最终就可以确认Spout吐出的某个元组处理完了)。通常处理一个输入的元组时,会基于这个元组吐出零个或者多个元组,然后确认(ack)输入的元组处理完了,Storm提供了IBasicBolt接口来自动完成确认。
必须注意OutputCollector不是线程安全的,所以所有的吐数据(emit)、确认(ack)、通知失败(fail)必须发生在同一个线程里。更多信息可以参照问题定位
6) Task
每个Spout和Bolt会以多个任务(Task)的形式在集群上运行。每个任务对应一个执行线程,流分组定义了如何从一组任务(同一个Bolt)发送元组到另外一组任务(另外一个Bolt)上。可以在调用TopologyBuilder的setSpout和setBolt函数时设置每个Spout和Bolt的并发数。
7) Component
组件(component)是对Bolt和Spout的统称
8) Stream Grouping
定义拓扑的时候,一部分工作是指定每个Bolt应该消费哪些流。流分组定义了一个流在一个消费它的Bolt内的多个任务(task)之间如何分组。流分组跟计算机网络中的路由功能是类似的,决定了每个元组在拓扑中的处理路线。
在Storm中有七个内置的流分组策略,你也可以通过实现CustomStreamGrouping接口来自定义一个流分组策略:
洗牌分组(Shuffle grouping): 随机分配元组到Bolt的某个任务上,这样保证同一个Bolt的每个任务都能够得到相同数量的元组。
字段分组(Fields grouping): 按照指定的分组字段来进行流的分组。例如,流是用字段“user-id”来分组的,那有着相同“user-id”的元组就会分到同一个任务里,但是有不同“user-id”的元组就会分到不同的任务里。这是一种非常重要的分组方式,通过这种流分组方式,我们就可以做到让Storm产出的消息在这个”user-id”级别是严格有序的,这对一些对时序敏感的应用(例如,计费系统)是非常重要的。
Partial Key grouping: 跟字段分组一样,流也是用指定的分组字段进行分组的,但是在多个下游Bolt之间是有负载均衡的,这样当输入数据有倾斜时可以更好的利用资源。这篇论文很好的解释了这是如何工作的,有哪些优势。
All grouping: 流会复制给Bolt的所有任务。小心使用这种分组方式。
Global grouping: 整个流会分配给Bolt的一个任务。具体一点,会分配给有最小ID的任务。
不分组(None grouping): 说明不关心流是如何分组的。目前,None grouping等价于洗牌分组。
Direct grouping:一种特殊的分组。对于这样分组的流,元组的生产者决定消费者的哪个任务会接收处理这个元组。只能在声明做直连的流(direct streams)上声明Direct groupings分组方式。只能通过使用emitDirect系列函数来吐元组给直连流。一个Bolt可以通过提供的TopologyContext来获得消费者的任务ID,也可以通过OutputCollector对象的emit函数(会返回元组被发送到的任务的ID)来跟踪消费者的任务ID。
Local or shuffle grouping:如果目标Bolt在同一个worker进程里有一个或多个任务,元组就会通过洗牌的方式分配到这些同一个进程内的任务里。否则,就跟普通的洗牌分组一样。
9) Reliability
Storm保证了拓扑中Spout产生的每个元组都会被处理。Storm是通过跟踪每个Spout所产生的所有元组构成的树形结构并得知这棵树何时被完整地处理来达到可靠性。每个拓扑对这些树形结构都有一个关联的“消息超时”。如果在这个超时时间里Storm检测到Spout产生的一个元组没有被成功处理完,那Spout的这个元组就处理失败了,后续会重新处理一遍。
为了发挥Storm的可靠性,需要你在创建一个元组树中的一条边时告诉Storm,也需要在处理完每个元组之后告诉Storm。这些都是通过Bolt吐元组数据用的OutputCollector对象来完成的。标记是在emit函数里完成,完成一个元组后需要使用Ack函数来告诉Storm。
10) Workers
拓扑以一个或多个Worker进程的方式运行。每个Worker进程是一个物理的Java虚拟机,执行拓扑的一部分任务。例如,如果拓扑的并发设置成了300,分配了50个Worker,那么每个Worker执行6个任务(作为Worker内部的线程)。Storm会尽量把所有的任务均分到所有的Worker上。
4. Storm系统架构
1) 主节点(Nimbus):
在分布式系统中,调度服务非常重要,它的设计,会直接关系到系统的运行效率,错误恢复(fail over),故障检测(error detection)和水平扩展(scale)的能力。
集群上任务(task)的调度由一个Master节点来负责。这台机器上运行的Nimbus进程负责任务的调度。另外一个进程是Storm UI,可以界面上查看集群和所有的拓扑的运行状态。
2) 从节点(Supervisor)
Storm集群上有多个从节点,他们从Nimbus上下载拓扑的代码,然后去真正执行。Slave上的Supervisor进程是用来监督和管理实际运行业务代码的进程。在Storm 0.9之后,又多了一个进程Logviewer,可以用Storm UI来查看Slave节点上的log文件。
3) 协调服务Zookeeper:
ZooKeeper在Storm上不是用来做消息传输用的,而是用来提供协调服务(coordination service),同时存储拓扑的状态和统计数据。
l Supervisor,Nimbus和worker都在ZooKeeper留下约定好的信息。例如Supervisor启动时,会在ZooKeeper上注册,Nimbus就可以发现Supervisor;Supervisor在ZooKeeper上留下心跳信息,Nimbus通过这些心跳信息来对Supervisor进行健康检测,检测出坏节点
l 由于Storm组件(component)的状态信息存储在ZooKeeper上,所以Storm组件就可以无状态,可以 kill -9来杀死
例如:Supervisors/Nimbus的重启不影响正在运行中的拓扑,因为状态都在ZooKeeper上,从ZooKeeper上重新加载一下就好了
l 用来做心跳
Worker通过ZooKeeper把孩子executor的情况以心跳的形式汇报给Nimbus
Supervisor进程通过ZK把自己的状态也以心跳的形式汇报给Nimbua
l 存储最近任务的错误情况(拓扑停止时会删除)
4) 进程Worker
运行具体处理组件逻辑的进程,一个Topology可能会在一个或者多个worker里面执行,每个worker是一个物理JVM并且执行整个Topology的一部分
例如:对于并行度是300的topology来说,如果我们使用50个工作进程来执行,那么每个工作进程会处理其中的6个tasks,Storm会尽量均匀的工作分配给所有的worker
5) Task
Worker中的每一个spout/bolt的线程称为一个task,每一个spout和bolt会被当作很多task在整个集群里执行,每一个executor对应到一个线程,在这个线程上运行多个task,Stream Grouping则是定义怎么从一堆task发射tuple到另外一堆task,可以调用TopologyBuilder类的setSpout和setBolt来设置并行度(也就是有多少个task)
5. Storm容错机制
Storm的容错机制包括架构容错和数据容错。
1) 架构容错:
Nimbus和Supervisor进程被设计成快速失败(fail fast)的(当遇到异常的情况,进程就会挂掉)并且是无状态的(状态都保存在Zookeeper或者在磁盘上)。
最重要的是,worker进程不会因为Nimbus或者Supervisor挂掉而受影响。这跟Hadoop是不一样的,当JobTracker挂掉,所有的任务都会没了。
当Nimbus挂掉会怎样?
如果Nimbus是以推荐的方式处于进程监管(例如通过supervisord)之下,那它会被重启,不会有任何影响。
否则当Nimbus挂掉后:
l 已经存在的拓扑可以继续正常运行,但是不能提交新拓扑
l 正在运行的worker进程仍然可以继续工作。而且当worker挂掉,supervisor会一直重启worker。
l 失败的任务不会被分配到其他机器(是Nimbus的职责)上了
当一个Supervisor(slave节点)挂掉会怎样?
如果Supervisor是以推荐的方式处于进程监管(例如通过(supervisord)[supervisord.org/])之下,那它会被重启,不会有任何影响
否则当Supervisor挂掉:分配到这台机器的所有任务(task)会超时,Nimbus会把这些任务(task)重新分配给其他机器。
当一个worker挂掉会怎么样?
当一个worker挂掉,supervisor会重启它。如果启动一直失败那么此时worker也就不能和Nimbus保持心跳了,Nimbus会重新分配worker到其他机器。
Nimbus算是一个单点故障吗?
如果Nimbus节点挂掉,worker进程仍然可以继续工作。而且当worker挂掉,supervisor会一直重启worker。但是,没有了Nimbus,当需要的时候(如果worker机器挂掉了)worker就不能被重新分配到其他机器了。
所以答案是,Nimbus在“某种程度”上属于单点故障的。在实际中,这种情况没什么大不了的,因为当Nimbus进程挂掉,不会有灾难性的事情发生
2) 数据容错:
Storm中的每一个Topology中都包含有一个Acker组件。 Acker组件的任务就是跟踪从某个task中的Spout流出的每一个messageId所绑定的Tuple树中的所有Tuple的处理情况。如果在用户设置的最大超时时间(timetout 可以通过 Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS来指定)内这些Tuple没有被完全处理,那么Acker会告诉Spout该消息处理失败,相反则会告知Spout该消息处理成功,它会分别调用Spout中的fail和ack方法。
6. 一个简单的Storm实现
实现一个拓扑包括一个spout和两个bolt。Spout发送单词。每个bolt在输入数据的尾部追加字符串“!!!”。三个节点排成一条线:spout发射给首个bolt,然后,这个bolt再发射给第二个bolt。如果spout发射元组“bob”和“john”,然后,第二个bolt将发射元组“bob!!!!!!”和“john!!!!!!”。
1) 其中Topology代码如下,定义整个网络拓扑图:
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("words", new TestWordSpout(), 10); builder.setBolt("exclaim1", new ExclamationBolt(), 3) .shuffleGrouping("words"); builder.setBolt("exclaim2", new ExclamationBolt(), 2) .shuffleGrouping("exclaim1"); |
2) Spout实现:
public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); } |
3) Bolt实现:
public static class ExclamationBolt implements IRichBolt { OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } |
7. Storm常用配置
1) Config.TOPOLOGY_WORKERS:
这个设置用多少个工作进程来执行这个topology。比如,如果你把它设置成25, 那么集群里面一共会有25个java进程来执行这个topology的所有task。如果你的这个topology里面所有组件加起来一共有150的并行度,那么每个进程里面会有6个线程(150 / 25 = 6)。
2) Config.TOPOLOGY_ACKERS:
这个配置设置acker任务的并行度。默认的acker任务并行度为1,当系统中有大量的消息时,应该适当提高acker任务的并发度。设置为0,通过此方法,当Spout发送一个消息的时候,它的ack方法将立刻被调用;
3) Config.TOPOLOGY_MAX_SPOUT_PENDING:
这个设置一个spout task上面最多有多少个没有处理的tuple(没有ack/failed)回复, 我们推荐你设置这个配置,以防止tuple队列爆掉。
4) Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS:
这个配置storm的tuple的超时时间 – 超过这个时间的tuple被认为处理失败了。这个设置的默认设置是30秒