storm starter学习(二) - 流聚合

SingleJoinExample示例说明了storm中流聚合的应用,将具有相同tuple属性的数据流整合成一个新的数据流。来看一下Topology。先定义两个数据源genderSpout和ageSpout,Fields分别为("id", "gender")、("id", "age"),最终聚合后的数据流按id进行分组,输出为("gender", "age")。具体Topology如下:

// 定义数据源
FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("gender", genderSpout);
builder.setSpout("age", ageSpout);
builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id"))
        .fieldsGrouping("age", new Fields("id")); //数据流聚合

拓扑中流聚合的主要功能在SingleJoinBolt下,先来看一下prepare方法。

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
    _fieldLocations = new HashMap<String, GlobalStreamId>();
    _collector = collector;
    int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
    _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
    _numSources = context.getThisSources().size();
    Set<String> idFields = null;
    for (GlobalStreamId source : context.getThisSources().keySet()) {
      Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
      Set<String> setFields = new HashSet<String>(fields.toList());
      if (idFields == null)
        idFields = setFields;
      else
        idFields.retainAll(setFields);

      for (String outfield : _outFields) {
        for (String sourcefield : fields) {
          if (outfield.equals(sourcefield)) {
            _fieldLocations.put(outfield, source);
          }
        }
      }
    }
    _idFields = new Fields(new ArrayList<String>(idFields));

    if (_fieldLocations.size() != _outFields.size()) {
      throw new RuntimeException("Cannot find all outfields among sources");
    }
  }

首先在处理开始的地方,使用了TimeCacheMap。使用它的目的是,由于bolt在接收两个数据源的流数据时,同一id的两个数据流很可能不会在一个时间点内同时发出,因此需要对数据流先进行缓存,直到所有id相同的数据源都收到后再进行聚合处理,做完聚合处理后再将对应的tuple信息从缓存中删除。在处理过程中,有可能会出现某些id的tuple丢失,导致缓存中对应该id的其他tuples一直缓存在内存中,解决此问题的方法是设置timeout时间,定期清理过期tuples发送fail信息给spout。超时时间的大小设置需要结合具体应用的进行判断,尽量保证相同id的tuples会在较短的时间间隔内发送给bolt,避免重复timeout事件的放生。

TimeCacheMap中ExpireCallback方法如下:

private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> {
    @Override
    public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) {
      for (Tuple tuple : tuples.values()) {
        _collector.fail(tuple);
      }
    }
}

接下来在prepare中遍历TopologyContext中不同数据源,得到所有数据源(genderSpout和ageSpout),使用retainAll方法提取Set中公共的Filed信息,保存到变量_idFields中(id),将_outFileds中字段所在数据源记录下来,保存到_fieldLocations,以便在聚合时获取对应的字段值。

excute方法是执行最后的流聚合功能,代码如下:

public void execute(Tuple tuple) {
    List<Object> id = tuple.select(_idFields);
    GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());
    if (!_pending.containsKey(id)) {
      _pending.put(id, new HashMap<GlobalStreamId, Tuple>());
    }
    Map<GlobalStreamId, Tuple> parts = _pending.get(id);
    if (parts.containsKey(streamId))
      throw new RuntimeException("Received same side of single join twice");
    parts.put(streamId, tuple);
    if (parts.size() == _numSources) {
      _pending.remove(id);
      List<Object> joinResult = new ArrayList<Object>();
      for (String outField : _outFields) {
        GlobalStreamId loc = _fieldLocations.get(outField);
        joinResult.add(parts.get(loc).getValueByField(outField));
      }
      _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);

      for (Tuple part : parts.values()) {
        _collector.ack(part);
      }
    }
}

从tuple中取出id Fields信息和GlobalStreamId,判断当前id是否在_pending中存在,如不存在将当前数据放入到
_pending中。然后根据id来获取parts中对应的信息,如存在相同流id信息时,抛出异常:接收到来自同一Spout中id一致的tuple信息。不存在则放入到parts里。

如果parts已经包含了聚合数据源的个数_numSources时,本例中为2,表示相同id从genderSpout和ageSpout中发出的tuple都已经收到,可以进行聚合处理。从_pending队列中移除这条记录,然后开始构造聚合后的结果字段:依次遍历_outFields中各个字段,从_fieldLocations中取到这些outFiled字段对应的GlobalStreamId,紧接着从parts中取出GlobalStreamId对应的outFiled,放入聚合后的结果中。ack所有tuple。否则,继续等待两spout的流数据,直到缓存的数据源个数达到聚合要求。

最后,声明聚合后的输出字段,见declareOutputFields:

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(_outFields);
}

总结:

在此示例中使用一种叫做TimeCacheMap的数据结构,用于在内存中保存近期活跃的对象,它的实现非常地高效,而且可以自动删除过期不再活跃的对象。

storm starter学习(二) - 流聚合

时间: 2024-08-04 22:24:56

storm starter学习(二) - 流聚合的相关文章

storm starter学习(一)

官方提供的storm starter示例中,有很多应用的例子,对storm的应用场景理解很有帮助.本文结合源码来进行功能分解,记录一下,作为记忆索引吧. 先来看一个比较简单的示例:WordCountTopology,原版代码该示例是为了说明多语言适配而做的应用场景,主要功能是随机生成一些String,将这些String划分分组,统计各单词出现数量.后来修改了一下,去掉了py调用的地方.使用java来进行词组划分. 先来看一下Topology: public static void main(St

Storm入门(九)Storm常见模式之流聚合

流聚合(stream join)是指将具有共同元组(tuple)字段的数据流(两个或者多个)聚合形成一个新的数据流的过程. 从定义上看,流聚合和SQL中表的聚合(table join)很像,但是二者有明显的区别:table join的输入是有限的,并且join的语义是非常明确的:而流聚合的语义是不明确的并且输入流是无限的. 数据流的聚合类型跟具体的应用有关.一些应用把两个流发出的所有的tuple都聚合起来--不管多长时间:而另外一些应用则只会聚合一些特定的tuple.而另外一些应用的聚合逻辑又可

Storm常见模式——流聚合

转自:http://www.cnblogs.com/panfeng412/archive/2012/06/04/storm-common-patterns-of-stream-join.html 流聚合(stream join)是指将具有共同元组(tuple)字段的数据流(两个或者多个)聚合形成一个新的数据流的过程. 从定义上看,流聚合和SQL中表的聚合(table join)很像,但是二者有明显的区别:table join的输入是有限的,并且join的语义是非常明确的:而流聚合的语义是不明确的

Storm入门学习随记

推荐慕课网视频:http://www.imooc.com/video/10055 ====Storm的起源. Storm是开源的.分布式.流式计算系统 什么是分布式呢?就是将一个任务拆解给多个计算机去执行,让许多机器共通完成同一个任务, 把这个多机的细节给屏蔽,对外提供同一个接口.同一个服务,这样的系统就是分布式系统. 在多年以前并没有非常范用的分布式系统,即使存在,也都是限定在指定的领域, 当然,也有人尝试从中提取出共通的部分,发明一个通用的分布式系统,但是都没有很好的结果. 后来,Googl

Storm实时计算:流操作入门编程实践

转自:http://shiyanjun.cn/archives/977.html Storm实时计算:流操作入门编程实践 Storm是一个分布式是实时计算系统,它设计了一种对流和计算的抽象,概念比较简单,实际编程开发起来相对容易.下面,简单介绍编程实践过程中需要理解的Storm中的几个概念: Topology Storm中Topology的概念类似于Hadoop中的MapReduce Job,是一个用来编排.容纳一组计算逻辑组件(Spout.Bolt)的对象(Hadoop MapReduce中一

java基础学习总结——流

永不放弃,一切皆有可能!!! 只为成功找方法,不为失败找借口! java基础学习总结——流 一.JAVA流式输入/输出原理 流是用来读写数据的,java有一个类叫File,它封装的是文件的文件名,只是内存里面的一个对象,真正的文件是在硬盘上的一块空间,在这个文件里面存放着各种各样的数据,我们想读文件里面的数据怎么办呢?是通过一个流的方式来读,咱们要想从程序读数据,对于计算机来说,无论读什么类型的数据都是以010101101010这样的形式读取的.怎么把文件里面的数据读出来呢?你可以把文件想象成一

Redis学习二

Redis学习二 标签(空格分隔): Redis 一,link 链表结构 1,lpush key value (rpush插入到链表尾部) 作用: 把值插入到链接头部 2,rpop key(lpop key 返回并删除链表的头元素) 作用: 返回并删除链表尾元素 3,lrange key start stop 作用: 返回链表中[start ,stop]中的元素 规律: 左数从0开始,右数从-1开始 lrange link 0 2 lrange link 0 -1 4,lrem key coun

[Python 学习] 二、在Linux平台上使用Python

这一节,主要介绍在Linux平台上如何使用Python 1. Python安装. 现在大部分的发行版本都是自带Python的,所以可以不用安装.如果要安装的话,可以使用对应的系统安装指令. Fedora系统:先以root登入,运行 yum install python Ubuntu系统:在root组的用户, 运行 sudo apt-get install python 2. 使用的Python的脚本 Linux是一个以文件为单位的系统,那么我们使用的Python是哪一个文件呢? 这个可以通过指令

OpenCV for Python 学习 (二 事件与回调函数)

今天主要看了OpenCV中的事件以及回调函数,这么说可能不准确,主要是下面这两个函数(OpenCV中还有很多这些函数,可以在 http://docs.opencv.org/trunk/modules/highgui/doc/user_interface.html 找到,就不一一列举了),然后自己做了一个简单的绘图程序 函数如下: cv2.setMouseCallback(windowName, onMouse[, param]) cv2.createTrackbar(trackbarName,