storm spout的速度抑制问题

转发请注明原文地址:http://www.cnblogs.com/dongxiao-yang/p/6031398.html

最近协助同事优化一个并发消费kafka数据用来计算的任务,压测过程中发现有两个spout对应的topic消费速度明显低于其他topic的指标,每个spout分配10个并发消费速度到了1w左右完全就上不去了,通过监控埋点分析出spout以及下游的bolt代码块里面的业务代码执行耗时完全不高于其余可以正常消费的topic对应的spout组件。

最后只能摘出有问题的代码新做一个demo进行测试,发现把nextTuple中 collector.emit()这个方法的调用注销,只保留读取kafka的逻辑后demo程序的消费kafka速度也同样卡在了一个很低的速度,查看问题程序代码nextTuple的调用逻辑大概如下

if(booleanfunction)

{

collector.emit(....)

}

其中booleanfunction指代一个执行了业务代码并返回boolean值的方法,推测这个方法在实际线上并没有每次都返回true进入调用emit方法的环节,

修改代码如下

if(booleanfunction)

{

collector.emit(....)

}

else

{

collector.emit(....)

}

相当于每次nextTuple调用都会运行emit方法,任务重新上线后10个spout消费轻松突破30W+。

产生问题的原因是由于storm的spout在nextTuple代码执行的时候,emit方法每次执行后会在内存里更新一个emitted-count的变量值,如果spout的发现emitted-count跟上次调用完毕后的值一致,表明nextTuple函数没有发送出去消息,此时会调用spout-wait-strategy的的emitEmpty方法,默认这个方法会sleep一毫秒。所以在没有emit的情况下nextTuple理论上最大的调用频率就是1000/s

参考资料

storm spout emit 问题

2 《Storm 源码分析》 第10章 10.3.5 消息循环

p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 11.0px Monaco; color: #0326cc }
span.s1 { color: #000000 }

时间: 2024-10-07 19:10:00

storm spout的速度抑制问题的相关文章

storm-kafka(storm spout作为kafka的消费端)

storm是grovvy写的 kafka是scala写的 storm-kafka  storm连接kafka consumer的插件 下载地址: https://github.com/wurstmeister/storm-kafka-0.8-plus 除了需要storm和kafka相关jar包还需要google-collections-1.0.jar 以及zookeeper相关包 curator-framework-1.3.3.jar和curator-client-1.3.3.jar 以前由co

Storm构建分布式实时处理应用初探(转)

最近利用闲暇时间,又重新研读了一下Storm.认真对比了一下Hadoop,前者更擅长的是,实时流式数据处理,后者更擅长的是基于HDFS,通过MapReduce方式的离线数据分析计算.对于Hadoop,本身不擅长实时的数据分析处理.两者的共同点都是分布式的架构,而且,都类似有主/从关系的概念.本文中我就不具体阐述Strom集群和Zookeeper集群如何部署的问题,我想通过一个实际的案例切入,分析一下如何利用Storm,完成实时分析处理数据的. Storm本身是Apache托管的开源的分布式实时计

storm 经常使用类

弄 <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.5</version> </dependency> 经常使用类 backtype.storm.topology.TopologyBuilder 用来建立topology. SpoutDeclarer backtyp

Storm项目:流数据监控1《设计文档…

该文档为实实在在的原创文档,转载请注明作者及出处. 类型 详细 备注 2 该文档为原创模拟项目:流数据监控<1>文档<流数据监控设计文档>,相继会给出流数据监控<2>文档<流数据监控代码解析>及其他文档 2  该部分有源码(熬夜写出来的哦) CSDN中相应项目CODE链接:戳这里     相关描述 2  有任何其他想法,可以邮件[email protected] 2 文档及相关资料下载请到个人360云盘http://yunpan.cn/QGf2GDaRFpc

Storm构建分布式实时处理应用初探

最近利用闲暇时间,又重新研读了一下Storm.认真对比了一下Hadoop,前者更擅长的是,实时流式数据处理,后者更擅长的是基于HDFS,通过MapReduce方式的离线数据分析计算.对于Hadoop,本身不擅长实时的数据分析处理.两者的共同点都是分布式的架构,而且,都类似有主/从关系的概念.本文中我就不具体阐述Strom集群和Zookeeper集群如何部署的问题,我想通过一个实际的案例切入,分析一下如何利用Storm,完成实时分析处理数据的. Storm本身是Apache托管的开源的分布式实时计

Flume+Kafka+Storm+Redis实时分析系统基本架构

PS:历史原因作者账号名为:ymh198816,但事实上作者的生日并不是1988年1月6日 今天作者要在这里通过一个简单的电商网站订单实时分析系统和大家一起梳理一下大数据环境下的实时分析系统的架构模型.当然这个架构模型只是实时分析技术的一 个简单的入门级架构,实际生产环境中的大数据实时分析技术还涉及到很多细节的处理, 比如使用Storm的ACK机制保证数据都能被正确处理, 集群的高可用架构, 消费数据时如何处理重复数据或者丢失数据等问题,根据不同的业务场景,对数据的可靠性要求以及系统的复杂度的要

Storm可靠性实例解析——ack机制

对于Storm,它有一个很重要的特性:“Guarantee no data loss” ——可靠性 很显然,要做到这个特性,必须要track每个data的去向和结果.Storm是如何做到的呢——acker机制. 先概括下acker所参与的工作流程: Spout创建一个新的Tuple时,会发一个消息通知acker去跟踪: Bolt在处理Tuple成功或失败后,也会发一个消息通知acker: acker会找到发射该Tuple的Spout,回调其ack或fail方法. 我们说RichBolt和Basi

storm 常用类

获得 <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.5</version> </dependency> 常用类 backtype.storm.topology.TopologyBuilder 用来建立topology. SpoutDeclarer backtype

Storm系列二: Storm拓扑设计

Storm系列二: Storm拓扑设计 在本篇中,我们就来根据一个案例,看看如何去设计一个拓扑, 如何分解问题以适应Storm架构,同时对Storm拓扑内部的并行机制会有一个基本的了解. 本章代码都在: [email protected]:zyzdisciple/storm_study.git 项目下的 user_behavior包下. 问题案例 有这样一种场景,在前端存在会话,我们会不断收到来自前端的消息,消息包含消息的发送时间,消息内容,结束标识, 消息的发送者, SessionId等其他信