Trident中 FixedBatchSpout分析

FixedBatchSpout 继承自 IBatchSpout

IBatchSpout 方法

public interface IBatchSpout extends Serializable {
    void open(Map conf, TopologyContext context);
    void emitBatch(long batchId, TridentCollector collector);
    void ack(long batchId);
    void close();
    Map getComponentConfiguration();
    Fields getOutputFields();
}
 FixedBatchSpout代码

package storm.trident.testing;

import backtype.storm.Config;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import java.util.List;
import java.util.Map;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;

public class FixedBatchSpout implements IBatchSpout {

    Fields fields;
    List<Object>[] outputs;
    int maxBatchSize;

    public FixedBatchSpout(Fields fields, int maxBatchSize, List<Object>... outputs) {
        this.fields = fields; // 输出字段
        this.outputs = outputs;  // 保存至本地, 每个对象都是一个List<Object>
        this.maxBatchSize = maxBatchSize; //  该批次最大发射次数,但是不是唯一决定元素
    }

    int index = 0;
    boolean cycle = false;

    public void setCycle(boolean cycle) {
        this.cycle = cycle;
    }

    @Override
    public void open(Map conf, TopologyContext context) {
        index = 0;
    }

    @Override
    public void emitBatch(long batchId, TridentCollector collector) {
        //Utils.sleep(2000);
        if(index>=outputs.length && cycle) {
            index = 0;  // 超过下标后,让index归零, 继续循环发送
        }

       //  在不超过outputs大小的情况下,每次发射一个List<Object>
        for(int i=0; index < outputs.length && i < maxBatchSize; index++, i++) {
            collector.emit(outputs[index]);
        }
    }

    @Override
    public void ack(long batchId) {

    }

    @Override
    public void close() {
    }

    @Override
    public Map getComponentConfiguration() {
        Config conf = new Config();
        conf.setMaxTaskParallelism(1); // 最大并行度,默认是1. 好像没提供接口来修改, 很奇怪。
        return conf;
    }

    @Override
    public Fields getOutputFields() {
        return fields ;  // 输出字段
    }
}

外部使用

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1,
                new Values("ab ab ab ab ab ab ab ab ab ab"));  // 这里设置为1,表示每批只发送一个List<Value>,但是设置更大,也不会出错,参见上面的代码注释,它要同时满足不超过数组大小,所以不会越界。
         spout.setCycle(true);  // 设置则表示会一直发送,如果不用它一直发射, 可以注释掉。

其他就是trident内部调用。

如分析有误,请指出,谢谢。。

时间: 2024-10-12 15:41:13

Trident中 FixedBatchSpout分析的相关文章

Trident中的解析包含的函数操作与投影操作

一:函数操作 1.介绍 Tuple本身是不可变的 Function只是在原有的基础上追加新的tuple 2.说明 如果原来的字段是log,flag 新增之后的tuple可以访问这些字段,log,flag,orderId,orderAmt,memberId 3.先写驱动类 增加了解析 然后再将解析的日志进行打印 1 package com.jun.trident; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalClus

如何看TCO在虚拟化解决方案中的分析与对比

如何看TCO在虚拟化解决方案中的分析与对比 所谓TCO (Total cost of ownership) 即总体拥有成本,是一种经常采用的技术评价标准,它的目标是分析和对比在一定时间范围内所拥有的包括首次购置成本TCA (Total cost of acquisition) 和每年运维成本在内的总体成本.在某些情况下,这一总体成本是一个为获得可比较的现行开支而对3到5年生命周期范围内的成本进行平均的值. TCO的对比应该明确一个前提,就是IT系统的实现功能.性能.可靠性等方面基本相同,或者说满

使用crash提取vmcore中预分析信息

一.介绍 在linux系统内核发生崩溃或者服务器hang住时,Kdump(kernel crash dump:内核崩溃转储设备)生成vmcore文件,通过分析vmcore信息判断原因,而 crash是一个被广泛应用的内核奔溃转储文件分析工具,前提系统必须安装crash工具和内核调试工具kernel-debuginfo. 二.工具的安装与调试 1.安装包的版本,要与linux内核一致,查看linux内核版本: #uname -a 2.安装.配置.启动kdump:       安装kdump:  

or1200中IMMU分析(续)

以下内容摘自<步步惊芯--软核处理器内部设计分析>一书 2 IMMU中的特殊寄存器 OR1200处理器中的IMMU包含第2组特殊寄存器,如表10.1所示. ITLBW0MRx是指令TLB匹配寄存器,其格式如表10.2所示. 表10.2是OpenRISC 1000规范中的定义,实际在OR1200处理器中只实现了其中一部分字段,包括VPN(Virtual Page Number)的一部分.V(Valid标志位).ITLBW0MRx对应图10.7中MR_RAM的表项,每一个表项对应一个ITLBW0M

or1200中IMMU分析

以下内容摘自<步步惊芯--软核处理器内部设计分析>一书 1 IMMU结构 OR1200中实现IMMU的文件有or1200_immu_top.v.or1200_immu_tlb.v.or1200_spram.v,其中使用or1200_immu_top.v实现了IMMU模块,使用or1200_immu_tlb.v实现了ITLB模块,or1200_spram.v是一个单口RAM,使用其实现了ITLB的表项.如图10.5所示.本小节将分别介绍IMMU模块与其余模块的连接关系.ITLB结构. 1.1 I

or1200中IMMU分析(再续)

以下内容摘自<步步惊芯--软核处理器内部设计分析>一书 ITLB代码分析 ITLB是IMMU中的主要模块,其实现也相对独立.简单.本节对ITLB的代码进行分析.ITLB的输入输出接口如图10.10所示,图中左边是输入接口,右边是输出接口. 因为在ITLB中实现了第2组特殊寄存器,所以有spr_cs.spr_write.spr_addr.spr_dat_i.spr_dat_o等接口,这些接口的含义在分析特殊寄存器类指令的时候已经学习过,应该是非常熟悉的.剩下的输入输出接口含义如下: tlb_en

大开测试:性能-如何在Analysis图表中添加分析注释(连载26)

7.26  如何在Analysis图表中添加分析注释 1.问题提出 Analysis提供了十分丰富的图表,我们可以借助这些图表分析系统的性能,为了使图表更加直观,方便专业及其非专业人事的阅读,提供分析注释是十分必要的,那么LoadRunner的Analysis提供这种功能了吗? 2.问题解答 LoadRunner提供了丰富的图表,通过这些图表可以供性能分析人员分析系统瓶颈,为了使自己和他人方便阅读分析结果,LoadRunner提供了在图表上添加注释信息的功能,下面以"Throughput - R

(转)ffmpeg 中 av_read_frame_internal分析

作者: chenwei1983    时间: 2012-3-5 04:21 PM标题: ffmpeg 中 av_read_frame_internal分析                            原出处:http://www.chinavideo.org/viewthread.php?action=printable&tid=13846av_read_frame_internal 在ffmpeg中实现了将format格式的packet,最终转换成一帧帧的es流packet,并解析填

Trident中的DRPC实现

一:介绍 1.说明 Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU intensive的计算.DRPC的storm topology以函数的参数流作为输入,而把这些函数调用的返回值作为topology的输出流. 2.工作机制 Distributed RPC是由一个"DPRC Server"协调的(storm自带了一个实现)DRPC服务器协调 1) 接收一个RPC请求. 2) 发送请求到storm topology 3) 从storm topology接收结果