转:storm中一个Bolt发emit多次相同类型消息

在storm中的Bolt中可以处理完成逻辑后,向后面的Blot继续发送消息。

可以发送多个不同的消息,如:

collector.emit("update-delivered-status",new Values(emailDeliverStatus));  

collector.emit("save-request",new Values(udsn));  

也可以同一个类型的消息发送多个不同内容如;

for (int i = 0; i < emailParamVo.getReceiverNum(); i++)
            {
                EmailDeliverStatus emailDeliverStatus = new EmailDeliverStatus();
                emailDeliverStatus.setCategoryId(emailParamVo.getCategoryId());
                emailDeliverStatus.setUpdateTime(emailParamVo.getUpdateTime());
                emailDeliverStatus.setStatus(emailParamVo.getEventType());
                emailDeliverStatus.setUserId(emailParamVo.getUserId());
                emailDeliverStatus.setMessageDetail(emailParamVo.getMessageDetail());

                StringBuilder receiverBuilder = new StringBuilder(emailParamVo.getReceivers());
                receiverBuilder = receiverBuilder.deleteCharAt(0);
                receiverBuilder = receiverBuilder.deleteCharAt(receiverBuilder.length()-1);
                String[] receivers = receiverBuilder.toString().split(" ");
                String receiver = receivers[i];
                emailDeliverStatus.setEmailId(emailParamVo.getEmailIdPre() + i + "$" + receiver);
                emailDeliverStatus.setReceiver(receiver);
                collector.emit("update-delivered-status",new Values(emailDeliverStatus));
            }

上面的写法是没有问题的,因为for循环里面每次发送的对象都是一个新的实例,但是如果把创建实例的动作放到外面,如:

EmailDeliverStatus emailDeliverStatus = new EmailDeliverStatus();
            emailDeliverStatus.setCategoryId(emailParamVo.getCategoryId());
            emailDeliverStatus.setUpdateTime(emailParamVo.getUpdateTime());
            emailDeliverStatus.setStatus(emailParamVo.getEventType());
            emailDeliverStatus.setUserId(emailParamVo.getUserId());
            emailDeliverStatus.setMessageDetail(emailParamVo.getMessageDetail());

            for (int i = 0; i < emailParamVo.getReceiverNum(); i++)
            {
                StringBuilder receiverBuilder = new StringBuilder(emailParamVo.getReceivers());
                receiverBuilder = receiverBuilder.deleteCharAt(0);
                receiverBuilder = receiverBuilder.deleteCharAt(receiverBuilder.length()-1);
                String[] receivers = receiverBuilder.toString().split(" ");
                String receiver = receivers[i];
                emailDeliverStatus.setEmailId(emailParamVo.getEmailIdPre() + i + "$" + receiver);
                emailDeliverStatus.setReceiver(receiver);
                collector.emit("update-delivered-status",new Values(emailDeliverStatus));
            }

这样就有问题,按照逻辑,我们是想让其emit多个不同emailDeliverStatus对象的消息,但是实际上这样不不行的,因为storm的emit操作并不是立即执行的,

上面的代码就是建立在假设调用emit后,storm就会立即去发送消息。如果按照上面写法,会发现接收消息的bolt收到的for循环中的多个消息都是最后一个消息的重复多次。

因为storm并不是立即执行emit,而是在这个bolt执行的一个固定时间去emit的,所以emailDeliverStatus实例的初始化必须放到for循环的外面执行。

原文地址:http://blog.csdn.net/jsjwk/article/details/8495915

时间: 2024-09-28 17:04:57

转:storm中一个Bolt发emit多次相同类型消息的相关文章

storm 中的Python bolt的注意事项

Storm可支持多种语言,其中就有python . 首先需要创建一个类, public static class BasieCalculateBolt extends ShellBolt implements IRichBolt { public BasieCalculateBolt() { super("python", "bolt_base_calculate.py"); } @Override public void declareOutputFields(O

storm中的一些概念

1.topology 一个topolgy是spouts和bolts组成的图,通过stream groupings将图中的spout和bolts连接起来:如图所示: 一个topology会一直运行知道你手动kill掉,Storm自动重新分配执行失败的任务,并且Storm可以保证你不会有数据丢失(如果开启了高可靠性的话).如果一些机器意外停机它上面的所有任务会被转移到其他机器上: 运行一个toplogy很简单,首先,把你所有的代码以及所依赖的jar打进一个jar中.然后运行类似下面的命令: stor

Storm中Spout使用注意事项小结

Storm中Spout用于读取并向计算拓扑中发送数据源,最近在调试一个topology时遇到了系统qps低,处理速度达不到要求的问题,经过排查后发现是由于对Spout的使用模式不当导致的多线程同步等待.这里罗列几点个人觉得编写Spout代码时需要特别注意的地方: 1. 最常用的模式是使用一个线程安全的queue,如BlockingQueue,spout主线程从queue中读取数据:另外的一个或多个线程负责从数据源(如各种消息中间件.db等)读取数据并放入queue中. 2. 如果不关心数据是否丢

Storm中的可靠性

我们知道Storm有一个很重要的特性,那就是Storm API能够保证它的一个Tuple能够被完全处理,这一点尤为重要,其实storm中的可靠性是由spout和bolt组件共同完成的,下面就从spout和bolt两个方便给大家介绍一下storm中的可靠性,最后会给出一个实现了可靠性的例子. 1.Spout的可靠性保证 在Storm中,消息处理可靠性从Spout开始的.storm为了保证数据能正确的被处理, 对于spout产生的每一个tuple,storm都能够进行跟踪,这里面涉及到了ack/fa

三:Storm设计一个Topology用来统计单词的TopN的实例

Storm的单词统计设计 一:Storm的wordCount和Hadoop的wordCount实例对比 二:Storm的wordCount的方案实例设计 三:建立maven项目,添加maven相关依赖包(1)输入:search.maven.org网址,在其中找到storm的核心依赖(2)将核心依赖添加到pom.xml文件中 <dependency>            <groupId>com.github.aloomaio</groupId>            

Storm设计一个Topology用来统计单词的TopN的实例

Storm的单词统计设计 一:Storm的wordCount和Hadoop的wordCount实例对比 二:Storm的wordCount的方案实例设计 三:建立maven项目,添加maven相关依赖包(1)输入:search.maven.org网址,在其中找到storm的核心依赖(2)将核心依赖添加到pom.xml文件中 <dependency>            <groupId>com.github.aloomaio</groupId>            

storm源码之理解Storm中Worker、Executor、Task关系【转】

[原]storm源码之理解Storm中Worker.Executor.Task关系 Storm在集群上运行一个Topology时,主要通过以下3个实体来完成Topology的执行工作:1. Worker(进程)2. Executor(线程)3. Task 下图简要描述了这3者之间的关系:                                                    1个worker进程执行的是1个topology的子集(注:不会出现1个worker为多个topology服

storm中的基本概念

Storm是一个流计算框架,处理的数据是实时消息队列中的,所以需要我们写好一个topology逻辑放在那,接收进来的数据来处理,所以是通过移动数据平均分配到机器资源来获得高效率. Storm的优点是全内存计算,因为内存寻址速度是硬盘的百万倍以上,所以Storm的速度相比较Hadoop非常快(瓶颈是内存,cpu).其缺点就是不够灵活:必须要先写好topology结构来等数据进来分析. Storm 关注的是数据多次处理一次写入,而 Hadoop 关注的是数据一次写入,多次查询使用.Storm系统运行

Storm 中什么是-acker,acker工作流程介绍

概述 我们知道storm一个很重要的特性是它能够保证你发出的每条消息都会被完整处理, 完整处理的意思是指: 一个tuple被完全处理的意思是: 这个tuple以及由这个tuple所导致的所有的tuple都被成功处理.而一个tuple会被认为处理失败了如果这个消息在timeout所指定的时间内没有成功处理. 也就是说对于任何一个spout-tuple以及它的所有子孙到底处理成功失败与否我们都会得到通知.关于如果做到这一点的原理,可以看看Twitter Storm如何保证消息不丢失这篇文章.从那篇文