Trident主要有5类操作:
1、作用在本地的操作,不产生网络传输。
2、对数据流的重分布,不改变流的内容,但是产生网络传输。
3、聚合操作,有可能产生网络传输。
4、作用在分组流(grouped streams)上的操作。
5、Merge和join
这里主要介绍一下3和4,希望对大家有所帮助,如有错误请指正!
首先说几个名词:
Partition:在Storm中并发的最小执行单元是task;在trident中partition相当于task的角色。
Grouped streams:对数据流做groupBy操作后,将key相同的流组织在一起,形成若干组流。
Global aggregation:没有groupBy的聚合,即全局聚合。
Aggregator:Trident中定义的用于实现聚合方法的接口。
下面开始介绍:
作用在trident的Stream对象上的与聚合相关的主要方法:
aggregate
partitionAggregate
persistentAggregate
groupBy
partitionBy
partitionPersist
parallelismHint
作为聚合操作的一个参数,实现聚合功能的主要接口有:
Aggregator<T>
CombinerAggregator<T>
ReducerAggregator<T>
----------------------华丽丽的分割线----------------------------------
使用过Hive的人都知道,不含group by的聚合SQL在转化成Hadoop作业后,在编译时就确定了只能有1个reduce,因为全局聚合在汇总阶段只能由1个计算单元完成。同样的道理,当aggregate方法用于无groupBy的global aggregation时,每个批次(batch)的流也只能在1个partition中执行(使用Aggregator或ReducerAggregator接口,而CombinerAggregator例外,后面会讲到)。当我们使用aggregate计算global aggregation时,如果通过parallelismHint设置了并发数为n,trident的做法是通过轮循的方式让不同的批次依次在n个partition中执行,实际上还是在串行执行,意义不大。因此使用aggregate做global aggregation时,并不能实现并发的功能,只适用于数据量不大的场景,这时候最好把并发设成1,否则对资源是一种浪费。
值得一提的是,如果实现了自定义分组的CustomStreamGrouping接口,后面再跟global aggregation,例如:
trident.newStream(“TRIDENT_SPOUT”, new MySpout())
.partition(new MyCustomStreamGrouping())
.aggregate(new MyAggregator(), new Fields(“out1”))
.parallelismHint(10);
这时候实际上我们的自定义分组是不起作用的,因为上面已经说明,此时并发并没有真正开启,而是采取的轮循策略。只有将aggregate换成partitionAggregate,自定义的分组才会起作用。
使用aggregate做分组聚合是它的强项,此时可以充分发挥并发的特性。但是需要注意,假设并发度设置为10,而我们groupBy的key的不同值实际上只有2个,那势必有很多partition在空跑,造成资源浪费。
partitionAggregate通常用于global aggregation时的本地化聚合,类似于Hadoop中的map阶段。partitionAggregate是在每一个partition内独立调用自己的聚合操作,互不干涉。最后还需要把局部聚合值emit出来,通过网络传输供后面的aggregate做全局聚合。通过这种策略,可以实现global aggregation的并发。partitionAggregate的前面不能跟groupBy方法,因为groupBy方法返回的GroupedStream对象没有partitionAggregate方法。
Aggregator<T>接口是三种实现聚合功能的接口中最通用的一种。Aggregator<T>要实现5个方法:
prepare:只在启动拓扑时调用1次。如果设置了并发度,则每一个partition调用1次。
cleanup:只在正常关闭拓扑时调用1次。如果设置了并发度,则每一个partition调用1次。
init:对于global aggregation,每个批次调用1次。如果使用的是partitionAggregate,则每一个批次的每一个partition调用1次。对于Grouped Streams,每个相同的key组成的数据流调用1次。需要注意的是,如果使用的是事务型spout,同时某个批次处理失败后导致该批次消息重发,则在接下来处理时init有可能会调用多次。因此init里面的代码逻辑应该要支持同一批的重复调用。
aggregate:每1个tuple调用1次。
complete:对于global aggregation,每个批次调用1次。如果使用的是partitionAggregate,则每一个批次的每一个partition调用1次。对于Grouped Streams,每个相同的key组成的数据流调用1次。
再说一下CombinerAggregator<T>,它比较有趣,前面提到使用aggregate做global aggregation无法开启并发。但是当CombinerAggregator<T>与aggregate配合使用时,例如:
trident.newStream(“TRIDENT_SPOUT”, new MySpout())
.parallelismHint(10)
.aggregate(new MyCombinerAggregator(), new Fields(“out1”));
Trident会把拓扑自动拆分成2个bolt,第一个bolt做局部聚合,类似于Hadoop中的map;第二个bolt通过接收网络传输过来的局部聚合值最后做一个全局聚合,类似于Hadoop中的reduce。在上面的例子中,局部聚合开启了10个并发,这就实现了使用aggregate做global aggregation时真正开启并发。当然,使用partitionAggregate可以实现同样的功能。类似于:
trident.newStream(“TRIDENT_SPOUT”, new MySpout())
.partitionAggregate(new MyAggregator(), new Fields(“out1”))
.parallelismHint(10)
.aggregate(new Fields(“out1”), new MyAggregator(), new Fields(“out2”));
有三点需要注意:
1、自动优化后的第一个bolt是本地化操作,因此它可以和它前面或者后面挨着的所有each合并在同一个bolt里面。
2、parallelismHint(n)要写在aggregate的前面,如果写在aggregate后面,将导致本地化操作的第一个bolt的并发度为1,而全局聚合的第二个bolt的并发度为n,而实际上第二个bolt并不能真正开启并发,只是前面提到的轮循而已。
3、综合1和2,把parallelismHint(n)写在aggregate的前面会导致spout同时开启n的并发度,因此要注意自己实现的spout类是否支持并发发送。
CombinerAggregator<T>需要实现3个方法:
init:每条tuple调用1次,对tuple做预处理。
combine:每条tuple调用1次,和之前的聚合值做combine。如果是第一条tuple则和zero返回的值做combine。
zero:当没有数据流时的处理逻辑。
整个CombinerAggregator<T>会在每批次结束时将combine的结果做一次emit。
persistentAggregate是实现聚合的另外一种方式。前面介绍的聚合可以看成是对每个批次的数据做本批次内的聚合计算,至于批次之间如何merge需要自己处理。而persistentAggregate可以看成是对源源不断发送过来数据流做一个总的聚合,每个批次的聚合值只是一个中间状态,通过与trident新提出的state概念结合,实现中间状态的持久化,同时支持事务性。persistentAggregate不能使用Aggregator<T>,只能使用CombinerAggregator<T>或者ReducerAggregator<T>。
关于state接口,它的使用场景非常多,这里先不做详细介绍。它可以作为Stream.stateQuery的参数按批次对持久化的数据做查询;也可以配合Stream.partitionPersist按批次做持久化操作,类似于IBatchBolt<T>.finishBatch所能实现的功能。
EOF