storm集群 + kafka单机性能测试

storm与kafka单机功能整合很顺利,但是到了storm集群环境和数据处理性能时则出现了一些问题,现将测试过程和问题简单记录如下:

性能指标:每分钟处理至少100万的信息(csv格式,100bytes左右),信息解析后持久化到DB中。

架构设计:flume读取文件缓存到kafka队列后消费到storm中

问题:

一、storm集群任务调度时出现如下问题,具体日志见下:

2014-09-24 16:47:38 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-computer7-62/ip:6706... [8]
2014-09-24 16:47:38 b.s.m.n.Client [INFO] connection established to a remote host Netty-Client-computer7-62/ip:6706, [id: 0x0b596170, /ip:34836 => computer7-62/ip:6706]
2014-09-24 16:47:38 b.s.m.n.Client [INFO] Closing Netty Client Netty-Client-computer7-60/ip:6706
2014-09-24 16:47:38 b.s.m.n.StormClientErrorHandler [INFO] Connection failed Netty-Client-computer7-60/ip:6706
java.nio.channels.ClosedChannelException: null
        at org.jboss.netty.channel.socket.nio.NioWorker.cleanUpWriteBuffer(NioWorker.java:649) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioWorker.writeFromUserCode(NioWorker.java:370) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:117) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.Channels.write(Channels.java:632) [netty-3.2.2.Final.jar:na]
        at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:70) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.Channels.write(Channels.java:611) [netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.Channels.write(Channels.java:578) [netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:259) [netty-3.2.2.Final.jar:na]
        at backtype.storm.messaging.netty.Client.flushRequest(Client.java:328) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.messaging.netty.Client.close(Client.java:272) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.6.0_24]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.6.0_24]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.6.0_24]
        at java.lang.reflect.Method.invoke(Method.java:616) ~[na:1.6.0_24]
        at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) [clojure-1.5.1.jar:na]
        at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:298) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker$mk_refresh_connections$this__5887.invoke(worker.clj:284) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.daemon.worker$mk_refresh_connections$this__5887.invoke(worker.clj:250) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.timer$schedule_recurring$this__1134.invoke(timer.clj:99) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.timer$mk_timer$fn__1117$fn__1118.invoke(timer.clj:50) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.timer$mk_timer$fn__1117.invoke(timer.clj:42) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:679) [na:1.6.0_24]
2014-09-24 16:47:38 b.s.m.n.Client [INFO] failed to send requests to computer7-60/ip:6706:
java.nio.channels.ClosedChannelException: null
        at org.jboss.netty.channel.socket.nio.NioWorker.cleanUpWriteBuffer(NioWorker.java:649) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioWorker.writeFromUserCode(NioWorker.java:370) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:117) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.Channels.write(Channels.java:632) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:70) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.Channels.write(Channels.java:611) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.Channels.write(Channels.java:578) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:259) ~[netty-3.2.2.Final.jar:na]
        at backtype.storm.messaging.netty.Client.flushRequest(Client.java:328) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.messaging.netty.Client.close(Client.java:272) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.6.0_24]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.6.0_24]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.6.0_24]
        at java.lang.reflect.Method.invoke(Method.java:616) ~[na:1.6.0_24]
        at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) [clojure-1.5.1.jar:na]
        at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:298) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker$mk_refresh_connections$this__5887.invoke(worker.clj:284) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.daemon.worker$mk_refresh_connections$this__5887.invoke(worker.clj:250) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.timer$schedule_recurring$this__1134.invoke(timer.clj:99) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.timer$mk_timer$fn__1117$fn__1118.invoke(timer.clj:50) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.timer$mk_timer$fn__1117.invoke(timer.clj:42) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:679) [na:1.6.0_24]
2014-09-24 16:47:38 b.s.m.n.Client [INFO] Waiting for pending batchs to be sent with Netty-Client-computer7-60/ip:6706..., timeout: 600000ms, pendings: 0
2014-09-24 16:47:38 b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: Client is being closed, and does not take requests any more
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.disruptor$consume_loop_STAR_$fn__758.invoke(disruptor.clj:94) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:679) [na:1.6.0_24]
Caused by: java.lang.RuntimeException: Client is being closed, and does not take requests any more
        at backtype.storm.messaging.netty.Client.send(Client.java:194) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__5927$fn__5928.invoke(worker.clj:322) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__5927.invoke(worker.clj:320) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        ... 6 common frames omitted
2014-09-24 16:47:38 b.s.m.n.Client [INFO] New Netty Client, connect to computer7-60, 6706, config: , buffer_size: 52

解决方式:问题比较低级,解决过程也是比较曲折,最终发现hosts文件配置错误(机器名中的小写L写成了数字1),导致worker节点间数据通讯出现问题,影响任务调度。

二、kafka性能瓶颈

kafka与storm整合时数据处理性能不是很好,未达到预期要求。一开始怀疑是kafkaspout代码问题,但是storm external中已经将其收录进来,感觉问题应该不是出在这里。后来看了一下kafkaspout实现,找到了可能的性能瓶颈点。kafka在设计时,为了增加并发访问及处理性能,在topic中加入了partitions属性,也就是将数据打散,提高并发与处理性能。由于队列信息offset是在客户端维护,kafkaspout在解决并发互斥时采用task与partitions一一对应的方式来解决互斥访问。topology在使用时,kafkaspout的并发度可以根据具体topic的partitions属性来设定。这样通过增加topic partitions和并发度(8),达到了预期的处理性能。

由此联想,之前遇到的flume缓存到kafka队列的问题也可能是partitions设定方式问题导致,后续再测试验证一下。

时间: 2024-08-25 18:40:04

storm集群 + kafka单机性能测试的相关文章

Storm 系列(三)Storm 集群部署和配置

Storm 系列(三)Storm 集群部署和配置 本章中主要介绍了 Storm 的部署过程以及相关的配置信息.通过本章内容,帮助读者从零开始搭建一个 Storm 集群.相关的过程和主要的配置选项是 Storm 的运维人员需要重点关注的,对部署和配置选项不感兴趣的读者,可以跳过本章. 在开始 Storm 之旅前,我们先看一下 Storm 部署和配置的相关信息,并提交一个 Topology,了解 Storm 的基本原理.Storm 的部署模式包括单机和集群环境,同时在向 Storm 环境中提交 To

Storm集群组件和编程模型

 Storm工作原理: Storm是一个开源的分布式实时计算系统,常被称为流式计算框架.什么是流式计算呢?通俗来讲,流式计算顾名思义:数据流源源不断的来,一边来,一边计算结果,再进入下一个流. 比如一般金融系统一直不断的执行,金融交易.用户全部行为都记录进日志里,日志分析出站点运维.猎户信息.海量数据使得单节点处理只是来.所以就用到分布式计算机型,storm 是当中的典型代表之中的一个,一般应用场景是:中间使用一个消息队列系统如kafka,先将消息缓存起来,storm 中有非常多的节点,分布

Storm集群安装详解

Storm集群安装详解 storm有两种操作模式: 本地模式和远程模式. 本地模式:你可以在你的本地机器上开发测试你的topology, 一切都在你的本地机器上模拟出来; 远端模式:你提交的topology会在一个集群的机器上执行. 本文以Twitter Storm官方Wiki为基础,详细描述如何快速搭建一个Storm集群,其中,项目实践中遇到的问题及经验总结,在相应章节以“注意事项”的形式给出. 1.   Strom集群组件 Storm集群中包含两类节点:主控节点(Master Node)和工

Apache Storm 集群环境搭建

Apache storm 是一个由twitter开源的大数据处理系统,与其他系统不同的是,storm旨在用于分布式实时处理并且与语言无关.笔者所认知的storm使用场景诸 如 实时日志分析.网站用户行为实时分析.实时计算等,目前很多公司也都把storm作为自己的大数据架构的一部分,来实现一些实时业务的处理. 相信大家都和我有一样的认知,那就是现在的技术都是项目驱动模式,没有最好的技术,只有最适合自己项目的技术.下面先跟大家分享一下我对storm的一些简单了解: storm的优点: 1.简单的编程

Storm 系列(四)—— Storm 集群环境搭建

一.集群规划 这里搭建一个 3 节点的 Storm 集群:三台主机上均部署 Supervisor 和 LogViewer 服务.同时为了保证高可用,除了在 hadoop001 上部署主 Nimbus 服务外,还在 hadoop002 上部署备用的 Nimbus 服务.Nimbus 服务由 Zookeeper 集群进行协调管理,如果主 Nimbus 不可用,则备用 Nimbus 会成为新的主 Nimbus. 二.前置条件 Storm 运行依赖于 Java 7+ 和 Python 2.6.6 +,所

Storm集群上的开发 ,Topology任务的编写 之 WordCount程序的编写(六)

由之前的学习,Storm的程序构成有Topology,Spout.Blot组成. 构建工程第一步 :引入jar,把storm集群中的/usr/local/apps/apache-storm-1.0.3/lib目录jar包全部引入 程序项目架构 :

安装storm集群

手工安装Storm集群 注:最新的Storm已不再必须依赖ZeroMQ,各种依赖的库和软件也已经有更新的版本. 要手工安装Storm,需要先安装以下软件 Zookeeper集群(安装方法详见管理向导) Java6.0 Python2.6.6 Unzip命令 NOTE: Nimbus和管理进程将要依赖Java.Python和unzip命令 安装本地库: 安装ZeroMQ: 1 wget http://download.zeromq.org/historic/zeromq-2.1.7.tar.gz

Storm集群安装部署步骤【详细版】

作者: 大圆那些事 | 文章可以转载,请以超链接形式标明文章原始出处和作者信息 网址: http://www.cnblogs.com/panfeng412/archive/2012/11/30/how-to-install-and-deploy-storm-cluster.html 本文以Twitter Storm官方Wiki为基础,详细描述如何快速搭建一个Storm集群,其中,项目实践中遇到的问题及经验总结,在相应章节以“注意事项”的形式给出. 1. Storm集群组件 Storm集群中包含两

1.1 Storm集群安装部署步骤

安装storm集群,需要依赖以下组件: Zookeeper Python Zeromq Storm JDK JZMQ 故安装过程根据上面的组件分为以下几步: 安装JDK 安装Zookeeper集群 安装Python及依赖 安装Storm 另外,操作系统环境为:Centos6.4,安装用户为:root. 1. 安装JDK 安装jdk有很多方法,可以参考文博客使用yum安装CDH Hadoop集群中的jdk安装步骤,需要说明的是下面的zookeeper集群安装方法也可以参考此文. 不管你用什么方法,