Storm常见模式——流聚合

转自:http://www.cnblogs.com/panfeng412/archive/2012/06/04/storm-common-patterns-of-stream-join.html

流聚合(stream join)是指将具有共同元组(tuple)字段的数据流(两个或者多个)聚合形成一个新的数据流的过程。

从定义上看,流聚合和SQL中表的聚合(table join)很像,但是二者有明显的区别:table join的输入是有限的,并且join的语义是非常明确的;而流聚合的语义是不明确的并且输入流是无限的。

数据流的聚合类型跟具体的应用有关。一些应用把两个流发出的所有的tuple都聚合起来——不管多长时间;而另外一些应用则只会聚合一些特定的tuple。而另外一些应用的聚合逻辑又可能完全不一样。而这些聚合类型里面最常见的类型是把所有的输入流进行一样的划分,这个在storm里面用fields grouping在相同字段上进行grouping就可以实现。

下面是对storm-starter(代码见:https://github.com/nathanmarz/storm-starter)中有关两个流的聚合的示例代码剖析:

先看一下入口类SingleJoinExample

(1)这里首先创建了两个发射源spout,分别是genderSpout和ageSpout:

        FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
        FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("gender", genderSpout);
        builder.setSpout("age", ageSpout);

其中genderSpout包含两个tuple字段:id和gender,ageSpout包含两个tuple字段:id和age(这里流聚合就是通过将相同id的tuple进行聚合,得到一个新的输出流,包含id、gender和age字段)。

(2)为了不同的数据流中的同一个id的tuple能够落到同一个task中进行处理,这里使用了storm中的fileds grouping在id字段上进行分组划分:

        builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age")))
                .fieldsGrouping("gender", new Fields("id"))
                .fieldsGrouping("age", new Fields("id"));

从中可以看到,SingleJoinBolt就是真正进行流聚合的地方。下面我们来看看:

(1)SingleJoinBolt构造时接收一个Fileds对象,其中传进的是聚合后将要被输出的字段(这里就是gender和age字段),保存到变量_outFileds中。

(2)接下来看看完成SingleJoinBolt的构造后,SingleJoinBolt在真正开始接收处理tuple之前所做的准备工作(代码见prepare方法):

a)首先,将保存OutputCollector对象,创建TimeCacheMap对象,设置超时回调接口,用于tuple处理失败时fail消息;紧接着记录数据源的个数:

        _collector = collector;
        int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
        _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
        _numSources = context.getThisSources().size();

b)遍历TopologyContext中不同数据源,得到所有数据源(这里就是genderSpout和ageSpout)中公共的Filed字段,保存到变量_idFields中(例子中就是id字段),同时将_outFileds中字段所在数据源记录下来,保存到一张HashMap中_fieldLocations,以便聚合后获取对应的字段值。

        Set<String> idFields = null;
        for(GlobalStreamId source: context.getThisSources().keySet()) {
            Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
            Set<String> setFields = new HashSet<String>(fields.toList());
            if(idFields==null) idFields = setFields;
            else idFields.retainAll(setFields);

            for(String outfield: _outFields) {
                for(String sourcefield: fields) {
                    if(outfield.equals(sourcefield)) {
                        _fieldLocations.put(outfield, source);
                    }
                }
            }
        }
        _idFields = new Fields(new ArrayList<String>(idFields));

        if(_fieldLocations.size()!=_outFields.size()) {
            throw new RuntimeException("Cannot find all outfields among sources");
        }

(3)好了,下面开始两个spout流的聚合过程了(代码见execute方法):

首先,从tuple中获取_idFields字段,如果不存在于等待被处理的队列_pending中,则加入一行,其中key是获取到的_idFields字段,value是一个空的HashMap<GlobalStreamId, Tuple>对象,记录GlobalStreamId到Tuple的映射。

        List<Object> id = tuple.select(_idFields);
        GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());
        if(!_pending.containsKey(id)) {
            _pending.put(id, new HashMap<GlobalStreamId, Tuple>());
        }

从_pending队列中,获取当前GlobalStreamId streamId对应的HashMap对象parts中:

        Map<GlobalStreamId, Tuple> parts = _pending.get(id);

如果streamId已经包含其中,则抛出异常,接收到同一个spout中的两条一样id的tuple,否则将该streamid加入parts中:

        if(parts.containsKey(streamId)) throw new RuntimeException("Received same side of single join twice");
        parts.put(streamId, tuple);

如果parts已经包含了聚合数据源的个数_numSources时,从_pending队列中移除这条记录,然后开始构造聚合后的结果字段:依次遍历_outFields中各个字段,从_fieldLocations中取到这些outFiled字段对应的GlobalStreamId,紧接着从parts中取出GlobalStreamId对应的outFiled,放入聚合后的结果中。

        if(parts.size()==_numSources) {
            _pending.remove(id);
            List<Object> joinResult = new ArrayList<Object>();
            for(String outField: _outFields) {
                GlobalStreamId loc = _fieldLocations.get(outField);
                joinResult.add(parts.get(loc).getValueByField(outField));
            }

最后通过_collector将parts中存放的tuple和聚合后的输出结果发射出去,并ack这些tuple已经处理成功。

            _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);

            for(Tuple part: parts.values()) {
                _collector.ack(part);
            }    }

否则,继续等待两个spout流中这个streamid都到齐后再进行聚合处理。

(4)最后,声明一下输出字段(代码见declareOutputFields方法):

    declarer.declare(_outFields);
时间: 2024-10-19 01:51:22

Storm常见模式——流聚合的相关文章

Storm入门(九)Storm常见模式之流聚合

流聚合(stream join)是指将具有共同元组(tuple)字段的数据流(两个或者多个)聚合形成一个新的数据流的过程. 从定义上看,流聚合和SQL中表的聚合(table join)很像,但是二者有明显的区别:table join的输入是有限的,并且join的语义是非常明确的:而流聚合的语义是不明确的并且输入流是无限的. 数据流的聚合类型跟具体的应用有关.一些应用把两个流发出的所有的tuple都聚合起来--不管多长时间:而另外一些应用则只会聚合一些特定的tuple.而另外一些应用的聚合逻辑又可

Storm常见模式——批处理

Storm对流数据进行实时处理时,一种常见场景是批量一起处理一定数量的tuple元组,而不是每接收一个tuple就立刻处理一个tuple,这样可能是性能的考虑,或者是具体业务的需要. 例如,批量查询或者更新数据库,如果每一条tuple生成一条sql执行一次数据库操作,数据量大的时候,效率会比批量处理的低很多,影响系统吞吐量. 当然,如果要使用Storm的可靠数据处理机制的话,应该使用容器将这些tuple的引用缓存到内存中,直到批量处理的时候,ack这些tuple. 下面给出一个简单的代码示例:

Twitter Storm: storm的一些常见模式

这篇文章列举出了storm topology里面的一些常见模式: 流聚合(stream join) 批处理(Batching) BasicBolt 内存内缓存 + fields grouping 组合 计算top N 用TimeCacheMap来高效地保存一个最近被更新的对象的缓存 分布式RPC: CoordinatedBolt和KeyedFairBolt 流聚合(stream join) 流聚合把两个或者多个数据流聚合成一个数据流 — 基于一些共同的tuple字段.流聚合和SQL里面table

storm starter学习(二) - 流聚合

SingleJoinExample示例说明了storm中流聚合的应用,将具有相同tuple属性的数据流整合成一个新的数据流.来看一下Topology.先定义两个数据源genderSpout和ageSpout,Fields分别为("id", "gender").("id", "age"),最终聚合后的数据流按id进行分组,输出为("gender", "age").具体Topology如下:

Storm分布式实时流计算框架相关技术总结

Storm分布式实时流计算框架相关技术总结 Storm作为一个开源的分布式实时流计算框架,其内部实现使用了一些常用的技术,这里是对这些技术及其在Storm中作用的概括介绍.以此为基础,后续再深入了解Storm的内部实现细节. 1. Zookeeper集群 Zookeeper是一个针对大型分布式系统的可靠协调服务系统,其采用类似Unix文件系统树形层次结构的数据模型(如:/zoo/a,/zoo/b),节点内可存储少量数据(<1M,当节点存储大数据量时,实际应用中可能出现同步问题). Zookeep

HCNA配置手工负载分担模式链路聚合

一.配置手工负载分担模式链路聚合 1.手工负载分担模式链路聚合配置场景 当需要增加两台设备之间的带宽或可靠性,而两台设备中有一台不支持LACP协议时,可在Switch设备上创建手工负载分担模式的Eth-Trunk,并加入多个成员接口增加设备间的带宽及可靠性 2.步骤 2.1 配置Eth-Trunk工作模式为手工负载分担模式 执行命令system-view,进入系统视图. 执行命令interface eth-trunk trunk-id,进入Eth-Trunk接口视图. 执行命令mode manu

配置LACP模式链路聚合

组网需求 在两台Router设备上配置LACP模式链路聚合组,提高两设备之间的带宽与可靠性,具体要求如下: 两条活动链路具有负载分担的能力. 两设备间的链路具有一条冗余备份链路,当活动链路出现故障链路时,备份链路替代故障链路,保持数据传输的可靠性. 配置LACP模式链路聚合组网图 配置思路 采用如下的思路配置LACP模式链路聚合: 1. 在Router设备上创建Eth-Trunk,配置Eth-Trunk为LACP模式,实现链路聚合功能. 2. 将成员接口加入Eth-Trunk. 3. 配置系统优

什么是流,.NET中有哪些常见的流

分析问题 流是一种对于字节流的直接操作.例如在处理一个文件时,本质上需要通过操作系统提供的API来进行文件打开,读取文件中的字节流,再关闭文件等操作,而读取文件的过程就可以看作字节流的一个过程.常见的流类型有文件流.终端操作流.网络Socket等. 在.NET中,System.IO.Stream类型被设计为所有流类型的虚基类,所有常见的流类型都继承自System.IO.Stream类型,当程序员需要自定义一种流类型时,也应该直接或者间接继承自Stream类型..NET提供了丰富的内建的流类型,其

HCNA配置静态LACP模式链路聚合

1.静态LACP模式 静态LACP模式是一种利用LACP协议进行聚合参数协商.确定活动接口和非活动接口的链路聚合方式.该模式下,需手工创建Eth-Trunk,手工加入Eth-Trunk成员接口,由LACP协议协商确定活动接口和非活动接口. 静态LACP模式也称为M∶N模式.这种方式同时可以实现链路负载分担和链路冗余备份的双重功能.在链路聚合组中M条链路处于活动状态,这些链路负责转发数据并进行负载分担,另外N条链路处于非活动状态作为备份链路,不转发数据.当M条链路中有链路出现故障时,系统会从N条备