Storm之trident聚合操作介绍

Trident主要有5类操作:

1、作用在本地的操作,不产生网络传输。

2、对数据流的重分布,不改变流的内容,但是产生网络传输。

3、聚合操作,有可能产生网络传输。

4、作用在分组流(grouped streams)上的操作。

5、Merge和join

这里主要介绍一下3和4,希望对大家有所帮助,如有错误请指正!

首先说几个名词:

Partition在Storm中并发的最小执行单元是task;在trident中partition相当于task的角色。

Grouped streams对数据流做groupBy操作后,将key相同的流组织在一起,形成若干组流。

Global aggregation没有groupBy的聚合,即全局聚合。

AggregatorTrident中定义的用于实现聚合方法的接口。

下面开始介绍:

作用在trident的Stream对象上的与聚合相关的主要方法:

aggregate

partitionAggregate

persistentAggregate

groupBy

partitionBy

partitionPersist

parallelismHint

作为聚合操作的一个参数,实现聚合功能的主要接口有:

Aggregator<T>

CombinerAggregator<T>

ReducerAggregator<T>

----------------------华丽丽的分割线----------------------------------

使用过Hive的人都知道,不含group by的聚合SQL在转化成Hadoop作业后,在编译时就确定了只能有1个reduce,因为全局聚合在汇总阶段只能由1个计算单元完成。同样的道理,当aggregate方法用于无groupBy的global aggregation时,每个批次(batch)的流也只能在1个partition中执行(使用Aggregator或ReducerAggregator接口,而CombinerAggregator例外,后面会讲到)。当我们使用aggregate计算global aggregation时,如果通过parallelismHint设置了并发数为n,trident的做法是通过轮循的方式让不同的批次依次在n个partition中执行,实际上还是在串行执行,意义不大。因此使用aggregate做global aggregation时,并不能实现并发的功能,只适用于数据量不大的场景,这时候最好把并发设成1,否则对资源是一种浪费。

值得一提的是,如果实现了自定义分组的CustomStreamGrouping接口,后面再跟global aggregation,例如:
trident.newStream(“TRIDENT_SPOUT”, new  MySpout())
       .partition(new  MyCustomStreamGrouping())
       .aggregate(new MyAggregator(), new  Fields(“out1”))
       .parallelismHint(10);

这时候实际上我们的自定义分组是不起作用的,因为上面已经说明,此时并发并没有真正开启,而是采取的轮循策略。只有将aggregate换成partitionAggregate,自定义的分组才会起作用。

使用aggregate做分组聚合是它的强项,此时可以充分发挥并发的特性。但是需要注意,假设并发度设置为10,而我们groupBy的key的不同值实际上只有2个,那势必有很多partition在空跑,造成资源浪费。

partitionAggregate通常用于global aggregation时的本地化聚合,类似于Hadoop中的map阶段。partitionAggregate是在每一个partition内独立调用自己的聚合操作,互不干涉。最后还需要把局部聚合值emit出来,通过网络传输供后面的aggregate做全局聚合。通过这种策略,可以实现global aggregation的并发。partitionAggregate的前面不能跟groupBy方法,因为groupBy方法返回的GroupedStream对象没有partitionAggregate方法。

Aggregator<T>接口是三种实现聚合功能的接口中最通用的一种。Aggregator<T>要实现5个方法:

prepare只在启动拓扑时调用1次。如果设置了并发度,则每一个partition调用1次。
cleanup只在正常关闭拓扑时调用1次。如果设置了并发度,则每一个partition调用1次。
init对于global aggregation,每个批次调用1次。如果使用的是partitionAggregate,则每一个批次的每一个partition调用1次。对于Grouped Streams,每个相同的key组成的数据流调用1次。需要注意的是,如果使用的是事务型spout,同时某个批次处理失败后导致该批次消息重发,则在接下来处理时init有可能会调用多次。因此init里面的代码逻辑应该要支持同一批的重复调用。
aggregate每1个tuple调用1次。
complete对于global aggregation,每个批次调用1次。如果使用的是partitionAggregate,则每一个批次的每一个partition调用1次。对于Grouped Streams,每个相同的key组成的数据流调用1次。

再说一下CombinerAggregator<T>,它比较有趣,前面提到使用aggregate做global aggregation无法开启并发。但是当CombinerAggregator<T>与aggregate配合使用时,例如:
trident.newStream(“TRIDENT_SPOUT”, new  MySpout())
       .parallelismHint(10)
       .aggregate(new MyCombinerAggregator(), new Fields(“out1”));

Trident会把拓扑自动拆分成2个bolt,第一个bolt做局部聚合,类似于Hadoop中的map;第二个bolt通过接收网络传输过来的局部聚合值最后做一个全局聚合,类似于Hadoop中的reduce。在上面的例子中,局部聚合开启了10个并发,这就实现了使用aggregate做global aggregation时真正开启并发。当然,使用partitionAggregate可以实现同样的功能。类似于:
trident.newStream(“TRIDENT_SPOUT”, new  MySpout())
       .partitionAggregate(new  MyAggregator(), new  Fields(“out1”))
       .parallelismHint(10)
       .aggregate(new Fields(“out1”), new MyAggregator(), new  Fields(“out2”));
有三点需要注意:
1、自动优化后的第一个bolt是本地化操作,因此它可以和它前面或者后面挨着的所有each合并在同一个bolt里面。
2、parallelismHint(n)要写在aggregate的前面,如果写在aggregate后面,将导致本地化操作的第一个bolt的并发度为1,而全局聚合的第二个bolt的并发度为n,而实际上第二个bolt并不能真正开启并发,只是前面提到的轮循而已。
3、综合1和2,把parallelismHint(n)写在aggregate的前面会导致spout同时开启n的并发度,因此要注意自己实现的spout类是否支持并发发送。

CombinerAggregator<T>需要实现3个方法:
init每条tuple调用1次,对tuple做预处理。
combine每条tuple调用1次,和之前的聚合值做combine。如果是第一条tuple则和zero返回的值做combine。
zero当没有数据流时的处理逻辑。
整个CombinerAggregator<T>会在每批次结束时将combine的结果做一次emit。

persistentAggregate是实现聚合的另外一种方式。前面介绍的聚合可以看成是对每个批次的数据做本批次内的聚合计算,至于批次之间如何merge需要自己处理。而persistentAggregate可以看成是对源源不断发送过来数据流做一个总的聚合,每个批次的聚合值只是一个中间状态,通过与trident新提出的state概念结合,实现中间状态的持久化,同时支持事务性。persistentAggregate不能使用Aggregator<T>,只能使用CombinerAggregator<T>或者ReducerAggregator<T>。

关于state接口,它的使用场景非常多,这里先不做详细介绍。它可以作为Stream.stateQuery的参数按批次对持久化的数据做查询;也可以配合Stream.partitionPersist按批次做持久化操作,类似于IBatchBolt<T>.finishBatch所能实现的功能。

EOF

时间: 2024-11-02 20:51:47

Storm之trident聚合操作介绍的相关文章

第16课-数据库开发及ado.net-数据库SQl,创建数据库和表,增删改语句,约束,top和Distinct,聚合函数介绍

第16课-数据库开发及ado.net 数据库SQl,创建数据库和表,增删改语句,约束,top和Distinct,聚合函数介绍 SQL语句入门(脚本.命令) SQL全名是结构化查询语言(Structured Query Language) SOL语句是和DBMS“交谈”专用的语言,不同的DBMS都认SQL语法. Sql中字符串使用单引号:通过写俩个单引号来转义一个单引号. Sql中的注释“——” 单行注释比较好 判断俩个数据是否相等使用=(单等号) 在sql语句中sql代码不区分大小写 SQL主要

MongoDB中的聚合操作

根据MongoDB的文档描述,在MongoDB的聚合操作中,有以下五个聚合命令. 其中,count.distinct和group会提供很基本的功能,至于其他的高级聚合功能(sum.average.max.min),就需要通过mapReduce来实现了. 在MongoDB2.2版本以后,引入了新的聚合框架(聚合管道,aggregation pipeline ,使用aggregate命令),是一种基于管道概念的数据聚合操作. Name Description count Counts the num

Ruby操作MongoDB(进阶八)-聚合操作Aggregation

上篇博文讲述了排序规则collations的操作和设置方式.顺带介绍了一部分聚合aggregation的设置方式.本文继续介绍聚合操作. 聚合框架的操作处理完数据记录后在返回计算结果.集合操作将来源于多个文档的值归类到一起,这样就可疑在被归类的数据上进行多种操作,然后返回一个单独的结果 1 聚合管道 聚合管道是用于数据聚合的一个框架,是以数据处理管道概念为原型.将文档输入一个多级管道后,可疑将文档转换为聚合的结果.下面以restaurants作为数据集,通过将餐馆类归类,我们就可以使用聚合管道在

Java8中聚合操作collect、reduce方法详解

Stream的基本概念 Stream和集合的区别: Stream不会自己存储元素.元素储存在底层集合或者根据需要产生.Stream操作符不会改变源对象.相反,它会返回一个持有结果的新的Stream.3.Stream操作可能是延迟执行的,这意味着它们会等到需要结果的时候才执行.Stream操作的基本过程,可以归结为3个部分: 创建一个Stream.在一个或者多个操作中,将指定的Stream转换为另一个Stream的中间操作.通过终止(terminal)方法来产生一个结果.该操作会强制它之前的延时操

OpenStack/Gnocchi简介——时间序列数据聚合操作提前计算并存储起来,先算后取的理念

先看下 http://www.cnblogs.com/bonelee/p/6236962.html 这里对于环形数据库的介绍,便于理解归档这个操作! 转自:http://blog.sina.com.cn/s/blog_6de3aa8a0102wk0y.html 早期的OpenStack监控(遥测)项目ceilometer被一分为四(Ceilometer.Gnocchi.Aodh.Panko),各司其职!其中Ceilometer负责采集计量数据并加工预处理:Gnocchi主要用来提供资源索引和存储

基于Morphia实现MongoDB按小时、按天聚合操作

MongoDB按照天数或小时聚合 需求 最近接到需求,需要对用户账户下的设备状态,分别按照天以及小时进行聚合,以此为基础绘制设备状态趋势图. 实现思路是启动定时任务,对各用户的设备状态数据分别按照小时以及天进行聚合,并存储进数据库中供用户后续查询. 涉及到的技术栈分别为:Spring Boot,MongoDB,Morphia. 数据模型 @Data @Builder @Entity(value = "rawDevStatus", noClassnameStored = true) //

Update:sparksql:第3节 Dataset (DataFrame) 的基础操作 &amp; 第4节 SparkSQL_聚合操作_连接操作

8. Dataset (DataFrame) 的基础操作 8.1. 有类型操作 8.2. 无类型转换 8.5. Column 对象 9. 缺失值处理 10. 聚合 11. 连接 8. Dataset (DataFrame) 的基础操作 导读 这一章节主要目的是介绍 Dataset 的基础操作, 当然, DataFrame 就是 Dataset, 所以这些操作大部分也适用于 DataFrame 有类型的转换操作 无类型的转换操作 基础 Action 空值如何处理 统计操作 8.1. 有类型操作 分

聚合操作

聚合操作 Aggregate Performs a custom aggregation operation on the values in the collection. IList<String> strList =newList<String>(){"One","Two","Three","Four","Five"}; var commaSeperatedString = s

医疗微信营销方案之操作介绍

在家里自己操作项目以有1个有时间了,感觉没有上班时的压力,不用早起,也不用看老板的脸色工作.这可能就是自己在家里上班的好处.马上又是要到端午节了,又要到回家的日子了,离2016年又去了一半时间了,想想自己又挣了多少钱了!当初,定下的目标又实现 了多少了. 在上周我去一家医院面试了一个新媒体主管职位,叫我回来写一下方案书,以下是我整理出来的.给大家参考一下: 关于新媒体营销方案 一:营销方向 前期我的主要营销方向为微信+QQ主要的营销渠道(陌陌次要) 二:团队组建+投入成本 >前期准备招5个人工资