SingleJoinExample示例说明了storm中流聚合的应用,将具有相同tuple属性的数据流整合成一个新的数据流。来看一下Topology。先定义两个数据源genderSpout和ageSpout,Fields分别为("id", "gender")、("id", "age"),最终聚合后的数据流按id进行分组,输出为("gender", "age")。具体Topology如下:
// 定义数据源 FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender")); FeederSpout ageSpout = new FeederSpout(new Fields("id", "age")); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("gender", genderSpout); builder.setSpout("age", ageSpout); builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id")) .fieldsGrouping("age", new Fields("id")); //数据流聚合
拓扑中流聚合的主要功能在SingleJoinBolt下,先来看一下prepare方法。
public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _fieldLocations = new HashMap<String, GlobalStreamId>(); _collector = collector; int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue(); _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback()); _numSources = context.getThisSources().size(); Set<String> idFields = null; for (GlobalStreamId source : context.getThisSources().keySet()) { Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId()); Set<String> setFields = new HashSet<String>(fields.toList()); if (idFields == null) idFields = setFields; else idFields.retainAll(setFields); for (String outfield : _outFields) { for (String sourcefield : fields) { if (outfield.equals(sourcefield)) { _fieldLocations.put(outfield, source); } } } } _idFields = new Fields(new ArrayList<String>(idFields)); if (_fieldLocations.size() != _outFields.size()) { throw new RuntimeException("Cannot find all outfields among sources"); } }
首先在处理开始的地方,使用了TimeCacheMap。使用它的目的是,由于bolt在接收两个数据源的流数据时,同一id的两个数据流很可能不会在一个时间点内同时发出,因此需要对数据流先进行缓存,直到所有id相同的数据源都收到后再进行聚合处理,做完聚合处理后再将对应的tuple信息从缓存中删除。在处理过程中,有可能会出现某些id的tuple丢失,导致缓存中对应该id的其他tuples一直缓存在内存中,解决此问题的方法是设置timeout时间,定期清理过期tuples发送fail信息给spout。超时时间的大小设置需要结合具体应用的进行判断,尽量保证相同id的tuples会在较短的时间间隔内发送给bolt,避免重复timeout事件的放生。
TimeCacheMap中ExpireCallback方法如下:
private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> { @Override public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) { for (Tuple tuple : tuples.values()) { _collector.fail(tuple); } } }
接下来在prepare中遍历TopologyContext中不同数据源,得到所有数据源(genderSpout和ageSpout),使用retainAll方法提取Set中公共的Filed信息,保存到变量_idFields中(id),将_outFileds中字段所在数据源记录下来,保存到_fieldLocations,以便在聚合时获取对应的字段值。
excute方法是执行最后的流聚合功能,代码如下:
public void execute(Tuple tuple) { List<Object> id = tuple.select(_idFields); GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()); if (!_pending.containsKey(id)) { _pending.put(id, new HashMap<GlobalStreamId, Tuple>()); } Map<GlobalStreamId, Tuple> parts = _pending.get(id); if (parts.containsKey(streamId)) throw new RuntimeException("Received same side of single join twice"); parts.put(streamId, tuple); if (parts.size() == _numSources) { _pending.remove(id); List<Object> joinResult = new ArrayList<Object>(); for (String outField : _outFields) { GlobalStreamId loc = _fieldLocations.get(outField); joinResult.add(parts.get(loc).getValueByField(outField)); } _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult); for (Tuple part : parts.values()) { _collector.ack(part); } } }
从tuple中取出id Fields信息和GlobalStreamId,判断当前id是否在_pending中存在,如不存在将当前数据放入到
_pending中。然后根据id来获取parts中对应的信息,如存在相同流id信息时,抛出异常:接收到来自同一Spout中id一致的tuple信息。不存在则放入到parts里。
如果parts已经包含了聚合数据源的个数_numSources时,本例中为2,表示相同id从genderSpout和ageSpout中发出的tuple都已经收到,可以进行聚合处理。从_pending队列中移除这条记录,然后开始构造聚合后的结果字段:依次遍历_outFields中各个字段,从_fieldLocations中取到这些outFiled字段对应的GlobalStreamId,紧接着从parts中取出GlobalStreamId对应的outFiled,放入聚合后的结果中。ack所有tuple。否则,继续等待两spout的流数据,直到缓存的数据源个数达到聚合要求。
最后,声明聚合后的输出字段,见declareOutputFields:
public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(_outFields); }
总结:
在此示例中使用一种叫做TimeCacheMap的数据结构,用于在内存中保存近期活跃的对象,它的实现非常地高效,而且可以自动删除过期不再活跃的对象。
storm starter学习(二) - 流聚合