基于Storm构建分布式实时处理应用初探

Storm对比Hadoop,前者更擅长的是实时流式数据处理,后者更擅长的是基于HDFS,通过MapReduce方式的离线数据分析计算。对于Hadoop,本身不擅长实时的数据分析处理。两者的共同点都是分布式架构,而且都类似有主/从关系的概念。

本文不会具体阐述Storm集群和Zookeeper集群如何部署的问题,这里想通过一个实际的案例切入,分析一下如何利用Storm完成实时分析处理数据。

Storm本身是Apache托管的开源的分布式实时计算系统,它的前身是Twitter Storm。在Storm问世以前,处理海量的实时数据信息,大部分是类似于使用消息队列,加上工作进程/线程的方式。这使得构建这类的应用程序,变得异常的复杂。很多的业务逻辑中,你不得不考虑消息的发送和接收,线程之间的并发控制等等问题。而其中的业务逻辑可能只是占据整个应用的一小部分,而且很难做到业务逻辑的解耦。但是Storm的出现改变了这种局面,它首先抽象出数据流Stream的抽象概念,一个Stream指的是tuples组成的无边界的序列。后面又继续提出Spouts、Bolts的概念。Spouts在Storm里面是数据源,专门负责生成流。而Bolts则是以流作为输入,并重新生成流作为输出,并且Bolts还会继续指定它输入的流应该如何划分。最后Storm是通过拓扑(Topology)这种抽象概念,组织起若干个Spouts、Bolts构成的分布式数据处理网络。Storm设计的时候,就有意的把Spouts、Bolts组成的拓扑(Topology)网络通过Thrift服务方式进行封装,这个做法,使得Storm的Spouts、Bolts组件可以通过目前主流的任意语言实现,使得整个框架的兼容性和扩展性更加优秀。

在Storm里面拓扑(Topology)的概念,非常类似Hadoop里面MapReduce的Job的概念。不同的是Storm的拓扑(Topology)只要你启动了,它就会一直运行下去,除非你kill掉;而MapReduce的Job最终它是会结束的。基于这样的模式,使得Storm非常适合处理实时性的数据分析、持续计算、DRPC(分布式RPC)等。

下面就结合实际的案例,设计分析一下,如何利用Storm改善应用的处理性能。

某通信公司的垃圾短信监控平台,实时地上传每个省的疑似垃圾短信用户的垃圾短信内容文件,每个省则根据文件中垃圾短信的内容,解析过滤出,包含指定敏感关键字的垃圾短信进行入库。被入库的垃圾短信用户被列为敏感用户,是重点监控对象,毕竟乱发这些垃圾短信是非常不对的。垃圾短信监控平台生成的文件速度非常惊人,原来的传统做法是,根据每个省的每一个地市,对应一个独立应用,串行化地解析、过滤敏感关键字,来进行入库处理。但是,从现状来看,程序处理的性能并不高效,常常造成文件积压,没有及时处理入库。

现在,我们就通过Storm来重新梳理、组织一下上述的应用场景。

首先,我先说明一下,该案例中Storm集群和Zookeeper集群的部署情况,如下图所示:

Nimbus对应的主机是192.168.95.134是Storm主节点,其余两台从节点Supervisor对应的主机分别是192.168.95.135(主机名:slave1)、192.168.95.136(主机名:slave2)。同样的,Zookeeper集群也是部署在上述节点上。

Storm集群和Zookeeper集群会互相通信,因为Storm就是基于Zookeeper的。然后先启动每个节点的Zookeeper服务,其次分别启动Storm的Nimbus、Supervisor服务。具体可以到Storm安装的bin目录下面启动服务,启动命令分别为storm nimbus > /dev/null 2 > &1 &和storm supervisor > /dev/null 2 > &1 &。然后用jps观察启动的效果。没有问题的话,在Nimbus服务对应的主机上启动Storm UI监控对应的服务,在Storm安装目录的bin目录输入命令:storm ui >/dev/null 2>&1 &。然后打开浏览器输入:http://{Nimbus服务对应的主机ip}:8080,这里就是输入:http://192.168.95.134:8080/。观察Storm集群的部署情况,如下图所示:

可以发现,我们的Storm的版本是0.9.5,它的从节点(Supervisor)有2个,分别是slave1、slave2。一共的woker的数量是8个(Total slots)。Storm集群我们已经部署完毕,也启动成功了。现在就利用Storm的方式,重新改写一下这种敏感信息实时监控过滤的应用。先看下Storm方式的拓扑结构图:

其中的SensitiveFileReader-591、SensitiveFileReader-592(用户短信采集器,分地市)代表的是Storm中的Spouts组件,表示一个数据的源头,这里是表示从服务器的指定目录下,读取疑似垃圾短信用户的垃圾短信内容文件。当然Spouts的组件你可以根据实际的需求,扩展出许多Spouts。

然后读取出文件中每一行的内容之后,就是分析文件的内容组件了,这里是指:SensitiveFileAnalyzer(监控短信内容拆解分析),它负责分析出文件的格式内容。

为了简单演示起见,我这里定义文件的格式为如下内容(随便写一个例子):home_city=591&user_id=5911000&msisdn=10000&sms_content=abc-slave1。每个列之间用&进行连接。其中home_city=591表示疑似垃圾短信的用户归属地市编码,591表示福州、592表示厦门;user_id=5911000表示疑似垃圾短信的用户标识;msisdn=10000表示疑似垃圾短信的用户手机号码;sms_content=abc-slave1代表的就是垃圾短信的内容了。SensitiveFileAnalyzer代表的就是Storm中的Bolt组件,用来处理Spouts“流”出的数据。

最后,就是我们根据解析好的数据,匹配业务规定的敏感关键字,进行过滤入库了。这里我们是把过滤好的数据存入MySQL数据库中。负责这项任务的组件是:SensitiveBatchBolt(敏感信息采集处理),当然它也是Storm中的Bolt组件。好了,以上就是完整的Storm拓扑(Topology)结构了。

现在,我们对于整个敏感信息采集过滤监控的拓扑结构,有了一个整体的了解之后,我们再来看下如何具体编码实现!先来看下整个工程的代码层次结构,它如下图所示:

首先来看下,我们定义的敏感用户的数据结构RubbishUsers,假设我们要过滤的敏感用户的短信内容中,要包含“racketeer”、“Bad”等敏感关键字。具体代码如下:

现在,我们看下敏感信息数据源组件SensitiveFileReader的具体实现,它负责从服务器的指定目录下面,读取疑似垃圾短信用户的垃圾短信内容文件,然后把每一行的数据,发送给下一个处理的Bolt(SensitiveFileAnalyzer),每个文件全部发送结束之后,在当前目录中,把原文件重命名成后缀bak的文件(当然,你可以重新建立一个备份目录,专门用来存储这种处理结束的文件),SensitiveFileReader的具体实现如下:

监控短信内容拆解分析器SensitiveFileAnalyzer,这个Bolt组件,接收到数据源SensitiveFileReader的数据之后,就按照上面定义的格式,对文件中每一行的内容进行解析,然后把解析完毕的内容,继续发送给下一个Bolt组件:SensitiveBatchBolt(敏感信息采集处理)。现在,我们来看下SensitiveFileAnalyzer这个Bolt组件的实现:

最后一个Bolt组件SensitiveBatchBolt(敏感信息采集处理)根据上游Bolt组件SensitiveFileAnalyzer发送过来的数据,然后跟业务规定的敏感关键字进行匹配,如果匹配成功,说明这个用户,就是我们要重点监控的用户,我们把它通过hibernate采集到MySQL数据库,统一管理。最后要说明的是,SensitiveBatchBolt组件还实现了一个监控的功能,就是定期打印出,我们已经采集到的敏感信息用户数据。现在给出SensitiveBatchBolt的实现:

由于是通过hibernate入库到MySQL,所以给出hibernate配置,首先是:hibernate.cfg.xml

对应的ORM映射配置文件rubbish-users.hbm.xml内容如下:

最后,还是通过Spring把hibernate集成起来,数据库连接池用的是:DBCP。对应的Spring配置文件jdbc-hibernate-bean.xml的内容如下:

到此为止,我们已经完成了敏感信息实时监控的所有的Storm组件的开发。现在,我们来完成Storm的拓扑(Topology),由于拓扑(Topology)又分为本地拓扑和分布式拓扑,因此封装了一个工具类StormRunner(拓扑执行器),对应的代码如下:

好了,现在我们把上面所有的Spouts/Bolts拼接成“拓扑”(Topology)结构,我们这里用的是分布式拓扑,来进行部署运行。具体的SensitiveTopology(敏感用户监控Storm拓扑)代码如下:

到此为止,所有的Storm组件已经开发完毕!现在,我们把上述工程打成jar包,放到Storm集群中运行,具体可以到Nimbus对应的Storm安装目录下面的bin目录,输入:storm jar + {jar路径}。

比如我这里是输入:storm jar /home/tj/install/SensitiveTopology.jar newlandframework.storm.topology.SensitiveTopology,然后,把疑似垃圾短信用户的垃圾短信内容文件放到指定的服务器下面的目录(/home/tj/data/591、/home/tj/data/592),最后打开刚才的Storm UI,观察任务的启动执行情况,这里如下图所示:

可以看到我们刚才提交的拓扑:SensitiveTopology已经成功提交到Storm集群里面了。这个时候,你可以鼠标点击SensitiveTopology,然后会打开如下的一个Spouts/Bolts的监控界面,如下图所示:

我们可以很清楚地看到:Spouts组件(用户短信采集器):SensitiveFileReader591、SensitiveFileReader592的线程数executors、任务提交emitted情况。以及Bolts组件:监控短信内容拆解分析器(SensitiveFileAnalyzer)、敏感信息采集处理(SensitiveBatchBolt)的运行情况,这样监控起来就非常方便。

此外,我们还可以到对应的Supervisor服务器对应的Storm安装目录下面的logs目录,查看一下worker的工作日志,我们来看下敏感信息监控过滤的处理情况,截图如下:

通过SensitiveBatchBolt模块的监控线程,可以看到,我们目前已经采集到了9个敏感信息用户了,再来看下,这些包含敏感关键字的用户有没有入库MySQL成功?

发现入库的结果也是9个,和日志打印的数量上是一致的。而且垃圾短信内容sms_content果然都包含了“racketeer”、“Bad”这些敏感关键字!完全符合我们的预期。而且,以后文件处理量上来了,我们可以通过调整设置Spouts/Bolts的并行度,和Worker的数量进行化解。当然,你还可以通过水平扩展集群的数量来解决这个问题。

目前在国内,就我个人看法,对Storm分析应用,做得最好的应该算是阿里巴巴,它在原来Storm的基础上加以改良,开源出JStorm,有兴趣的朋友,可以多关注一下。

借助Storm,我们可以很轻松地开发分布式实时处理应用,而上述场景的设计,只是Storm应用的一个案例。相比传统的单机服务器应用而言,集群化地并行协同计算处理,是云计算、大数据时代的一个趋势,也是我今后努力学习的方向。

结语

感谢您的观看,如有不足之处,欢迎批评指正。

如果有对大数据感兴趣的小伙伴或者是从事大数据的老司机可以加群:

658558542    

欢迎大家进群交流讨论,学习交流,共同进步。(里面还有大量的免费资料,帮助大家在成为大数据工程师,乃至架构师的路上披荆斩棘!)

最后祝福所有遇到瓶疾且不知道怎么办的大数据程序员们,祝福大家在往后的工作与面试中一切顺利。

原文地址:https://www.cnblogs.com/n23333/p/10261934.html

时间: 2024-10-06 20:29:49

基于Storm构建分布式实时处理应用初探的相关文章

Storm构建分布式实时处理应用初探(转)

最近利用闲暇时间,又重新研读了一下Storm.认真对比了一下Hadoop,前者更擅长的是,实时流式数据处理,后者更擅长的是基于HDFS,通过MapReduce方式的离线数据分析计算.对于Hadoop,本身不擅长实时的数据分析处理.两者的共同点都是分布式的架构,而且,都类似有主/从关系的概念.本文中我就不具体阐述Strom集群和Zookeeper集群如何部署的问题,我想通过一个实际的案例切入,分析一下如何利用Storm,完成实时分析处理数据的. Storm本身是Apache托管的开源的分布式实时计

Storm构建分布式实时处理应用初探

最近利用闲暇时间,又重新研读了一下Storm.认真对比了一下Hadoop,前者更擅长的是,实时流式数据处理,后者更擅长的是基于HDFS,通过MapReduce方式的离线数据分析计算.对于Hadoop,本身不擅长实时的数据分析处理.两者的共同点都是分布式的架构,而且,都类似有主/从关系的概念.本文中我就不具体阐述Strom集群和Zookeeper集群如何部署的问题,我想通过一个实际的案例切入,分析一下如何利用Storm,完成实时分析处理数据的. Storm本身是Apache托管的开源的分布式实时计

基于Storm构建实时热力分布项目实战

详情请交流  QQ  709639943 01.基于Storm构建实时热力分布项目实战 02.以慕课网日志分析为例 进入大数据 Spark SQL 的世界 03.Spring Cloud微服务实战视频课程 04.漫谈spring cloud 与 spring boot 基础架构 05.Java秒杀系统方案优化 高性能高并发实战 06.Java深入微服务原理改造房产销售平台 07.快速上手Linux 玩转典型应用 08.漫谈spring cloud分布式服务架构 09.Java Spring Se

Netty构建分布式消息队列(AvatarMQ)设计指南之架构篇

目前业界流行的分布式消息队列系统(或者可以叫做消息中间件)种类繁多,比如,基于Erlang的RabbitMQ.基于Java的ActiveMQ/Apache Kafka.基于C/C++的ZeroMQ等等,都能进行大批量的消息路由转发.它们的共同特点是,都有一个消息中转路由节点,按照消息队列里面的专业术语,这个角色应该是broker.整个消息系统通过这个broker节点,进行从消息生产者Producer到消费者Consumer的消息路由.当然了,生产者和消费者可以是多对多的关系.消息路由的时候,可以

基于dubbo的分布式项目实例应用(一)

本文主要学习dubbo服务的启动检查.集群容错.服务均衡.线程模型.直连提供者.只定阅.只注册等知识点,希望通过实例演示进一步理解和掌握这些知识点. 启动检查 Dubbo缺省会在启动消费者时检查依赖的服务是否可用,不可用时会抛出异常,阻止Spring初始化完成,以便上线时,能及早发现问题,默认check=true. 关闭没有提供者时报错 <dubbo:reference interface="com.mcweb.api.service.IUserService" id="

Netty构建分布式消息队列实现原理浅析

在本人的上一篇博客文章:Netty构建分布式消息队列(AvatarMQ)设计指南之架构篇 中,重点向大家介绍了AvatarMQ主要构成模块以及目前存在的优缺点.最后以一个生产者.消费者传递消息的例子,具体演示了AvatarMQ所具备的基本消息路由功能.而本文的写作目的,是想从开发.设计的角度,简单的对如何使用Netty,构建分布式消息队列背后的技术细节.原理,进行一下简单的分析和说明. 首先,在一个企业级的架构应用中,究竟何时需引入消息队列呢?本人认为,最经常的情况,无非这几种:做业务解耦.事件

PL2121-基于Storm构建实时热力分布项目实战

新年伊始,学习要趁早,点滴记录,学习就是进步! 不要到处找了,抓紧提升自己. 对于学习有困难不知道如何提升自己可以加扣:1225462853 获取资料. 下载地址:https://pan.baidu.com/s/1o9rZpj0 基于Storm构建实时热力分布项目实战 Storm是实时流处理领域的一柄利器,本课程采用最新的Storm版本1.1.0,从0开始由浅入深系统讲解,深入Storm内部机制,掌握Storm整合周边大数据框架的使用,从容应对大数据实时流处理! 谢谢大家的支持,我会努力给大家分

基于Dubbo框架构建分布式服务 (二)

Dubbo是Alibaba开源的分布式服务框架,我们可以非常容易地通过Dubbo来构建分布式服务,并根据自己实际业务应用场景来选择合适的集群容错模式,这个对于很多应用都是迫切希望的,只需要通过简单的配置就能够实现分布式服务调用,也就是说服务提供方(Provider)发布的服务可以天然就是集群服务,比如,在实时性要求很高的应用场景下,可能希望来自消费方(Consumer)的调用响应时间最短,只需要选择Dubbo的Forking Cluster模式配置,就可以对一个调用请求并行发送到多台对等的提供方

[转载] 基于Dubbo框架构建分布式服务

转载自http://shiyanjun.cn/archives/1075.html Dubbo是Alibaba开源的分布式服务框架,我们可以非常容易地通过Dubbo来构建分布式服务,并根据自己实际业务应用场景来选择合适的集群容错模式,这个对于很多应用都是迫切希望的,只需要通过简单的配置就能够实现分布式服务调用,也就是说服务提供方(Provider)发布的服务可以天然就是集群服务,比如,在实时性要求很高的应用场景下,可能希望来自消费方(Consumer)的调用响应时间最短,只需要选择Dubbo的F