storm数据流分流和合并

数据流分流和合并

数据流经常需要分流与合并操作,如下图所示:

分流

分流有2钟情况,第一种是,相同的tuple发往下一级不同的bolt, 第二种,分别发送不同的tuple到不同的下级bolt上。

发送相同tuple

其实和普通1v1 发送一模一样,就是有2个或多个bolt接收同一个spout或bolt的数据 举例来说:

SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME,
                new SequenceSpout(), spoutParal);

builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
                        SequenceTopologyDef.SEQUENCE_SPOUT_NAME);

builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1)
                        .shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);

发送不同的tuple

当发送不同的tuple到不同的下级bolt时, 这个时候,就需要引入stream概念,发送方发送a 消息到接收方A‘时使用stream A, 发送b 消息到接收方B‘时,使用stream B

在topology提交时:

               builder.setBolt(SequenceTopologyDef.SPLIT_BOLT_NAME, new SplitRecord(), 2).shuffleGrouping(
                        SequenceTopologyDef.SEQUENCE_SPOUT_NAME);

                builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
                        SequenceTopologyDef.SPLIT_BOLT_NAME,  // --- 发送方名字
                        SequenceTopologyDef.TRADE_STREAM_ID); // --- 接收发送方该stream 的tuple

                builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1)
                        .shuffleGrouping(SequenceTopologyDef.SPLIT_BOLT_NAME, // --- 发送方名字
                                SequenceTopologyDef.CUSTOMER_STREAM_ID);      // --- 接收发送方该stream 的tuple

在发送消息时:

public void execute(Tuple tuple, BasicOutputCollector collector) {
     tpsCounter.count();

     Long tupleId = tuple.getLong(0);
     Object obj = tuple.getValue(1);

     if (obj instanceof TradeCustomer) {

         TradeCustomer tradeCustomer = (TradeCustomer)obj;

         Pair trade = tradeCustomer.getTrade();
         Pair customer = tradeCustomer.getCustomer();

            collector.emit(SequenceTopologyDef.TRADE_STREAM_ID,
                    new Values(tupleId, trade));

            collector.emit(SequenceTopologyDef.CUSTOMER_STREAM_ID,
                    new Values(tupleId, customer));
     }else if (obj != null){
         LOG.info("Unknow type " + obj.getClass().getName());
     }else {
         LOG.info("Nullpointer " );
     }

 }

定义输出流格式:

public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declareStream(SequenceTopologyDef.TRADE_STREAM_ID, new Fields("ID", "TRADE"));
  declarer.declareStream(SequenceTopologyDef.CUSTOMER_STREAM_ID, new Fields("ID", "CUSTOMER"));
 }
 //接受消息时,需要判断数据流

 if (input.getSourceStreamId().equals(SequenceTopologyDef.TRADE_STREAM_ID) ) {
            customer = pair;
            customerTuple = input;

            tradeTuple = tradeMap.get(tupleId);
            if (tradeTuple == null) {
                customerMap.put(tupleId, input);
                return;
            }

            trade = (Pair) tradeTuple.getValue(1);

        }

数据流合并

生成topology时

在下面例子中, MergeRecord 同时接收SequenceTopologyDef.TRADE_BOLT_NAME 和SequenceTopologyDef.CUSTOMER_BOLT_NAME 的数据

               builder.setBolt(SequenceTopologyDef.TRADE_BOLT_NAME, new PairCount(), 1).shuffleGrouping(
                        SequenceTopologyDef.SPLIT_BOLT_NAME,
                        SequenceTopologyDef.TRADE_STREAM_ID);

                builder.setBolt(SequenceTopologyDef.CUSTOMER_BOLT_NAME, new PairCount(), 1)
                        .shuffleGrouping(SequenceTopologyDef.SPLIT_BOLT_NAME,
                                SequenceTopologyDef.CUSTOMER_STREAM_ID);

                builder.setBolt(SequenceTopologyDef.MERGE_BOLT_NAME, new MergeRecord(), 1)
                        .shuffleGrouping(SequenceTopologyDef.TRADE_BOLT_NAME)
                        .shuffleGrouping(SequenceTopologyDef.CUSTOMER_BOLT_NAME);

发送方

发送的bolt和普通一样,无需特殊处理

接收方

接收方是,区分一下来源component即可识别出数据的来源

        if (input.getSourceComponent().equals(SequenceTopologyDef.CUSTOMER_BOLT_NAME) ) {
            customer = pair;
            customerTuple = input;

            tradeTuple = tradeMap.get(tupleId);
            if (tradeTuple == null) {
                customerMap.put(tupleId, input);
                return;
            }

            trade = (Pair) tradeTuple.getValue(1);

        } else if (input.getSourceComponent().equals(SequenceTopologyDef.TRADE_BOLT_NAME)) {
            trade = pair;
            tradeTuple = input;

            customerTuple = customerMap.get(tupleId);
            if (customerTuple == null) {
                tradeMap.put(tupleId, input);
                return;
            }

            customer = (Pair) customerTuple.getValue(1);
        } 
				
时间: 2024-12-26 07:38:44

storm数据流分流和合并的相关文章

ubuntu16.04安装Storm数据流实时处理系统 集群

[email protected]:~# wget http://mirror.bit.edu.cn/apache/storm/apache-storm-1.1.1/apache-storm-1.1.1.tar.gz [email protected]:/usr/local/apache-storm-1.1.1# vim conf/storm.yaml storm.zookeeper.servers: - "master" - "slave1" - "sl

Storm中Trident流合并的例子demo

流的合并操作,是指根据两个流的关联条件将两个流合并成一个流,然后在进行后面的处理操作,如果使用Spout和Bolt这种编程模型的话写起来会比较困难和繁琐,因为要设置缓冲区来保存第一次过来的数据,后续还要进行两者的比较,使用Trident应用起来比较方便,对原来的编程模型进行了一定的抽象.代码实例: 需求: 两个spout: spout1:里面的数据是 name ,id 和tel, spout2是sex 和id: 首先对spout1进行过滤,过滤掉不是186的电话号码,然后显示 然后根据将过滤后的

数据流模型、Storm数据流模型

大数据流式计算:关键技术及系统实例

孙大为1, 张广艳1,2, 郑纬民1 摘要:大数据计算主要有批量计算和流式计算两种形态,目前,关于大数据批量计算系统的研究和讨论相对充分,而如何构建低延迟.高吞吐且持续可靠运行的大数据流式计算系统是当前亟待解决的问题且研究成果和实践经验相对较少.总结了典型应用领域中流式大数据所呈现出的实时性.易失性.突发性.无序性.无限性等特征,给出了理想的大数据流式计算系统在系统结构.数据传输.应用接口.高可用技术等方面应该具有的关键技术特征,论述并对比了已有的大数据流式计算系统的典型实例,最后阐述了大数据流

storm 文档(3)----入门指导

转载请注明出处:http://blog.csdn.net/beitiandijun/article/details/41517897 源地址:http://storm.apache.org/documentation/Tutorial.html 本文主要讲述了如何创建Storm topologies以及如何将它们部署在Storm集群中.Java是主要使用的语言,但是依然使用少量Python例子证明了Storm的多语言特性. 初步配置: 本文使用的例子源自storm-start项目.建议你复制这个

SSISDB6:使用数据分流

数据分流就是类似于数据流Path的Data Viewer,数据分流能够将数据导入一个file中,便于查看数据流中的数据.数据流分流必须通过代码来实现. To add data taps, the instance of the execution must be in the created state (a value of 1 in the status column of the catalog.operations (SSISDB Database)view) . The state v

_00019 Storm的体系结构介绍以及Storm入门案例(官网上的简单Java案例)

博文作者:妳那伊抹微笑 博客地址:http://blog.csdn.net/u012185296 个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在 技术方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ... 云计算技术 转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作! qq交流群:214293307  (期待与你一起学习,共同进步) # Storm

Storm基础

Storm基本概念 Storm是一个开源的实时计算系统,它提供了一系列的基本元素用于进行计算:Topology.Stream.Spout.Bolt等等. 在Storm中,一个实时应用的计算任务被打包作为Topology发布,这同Hadoop的MapReduce任务相似.但是有一点不同的是:在Hadoop中,MapReduce任务最终会执行完成后结束:而在Storm中,Topology任务一旦提交后永远不会结束,除非你显示去停止任务. 计算任务Topology是由不同的Spouts和Bolts,通

Storm框架使用详解

开篇:实时计算是针对海量数据计算,主要是弥补hadoop等框架只能进行离线批处理的不足.实时计算不一定要精确到秒级,个人理解是相对于离线的一种范称吧.主要应用场景有: 1)  数据源是不断产生的,服务端要不断处理接收的数据,同时回馈给客户端. Storm是基于流的处理框架.以将发送的tuple序列化,进行分发到相应处理端中.数据流在时间和数量上是无限的,这种数据时不断产生的,比如用户的访问历史,点击历史,搜索信息等等. 2)  处理器是循环等待消息的,消息一来即处理数据,进而得出结果.当上传to