Spark API 之 combineByKey(一)

1       前言

combineByKey是使用Spark无法避免的一个方法,总会在有意或无意,直接或间接的调用到它。从它的字面上就可以知道,它有聚合的作用,对于这点不想做过多的解释,原因很简单,因为reduceByKey、aggregateByKey、foldByKey等函数都是使用它来实现的。

combineByKey是一个高度抽象的聚合函数,可以用于数据的聚合和分组,由它牵出的shuffle也是Spark中重中之重,现在就让我们去看看它到底是怎么去实现的。

不足或错误之处, 烦请指出更正。

2       方法源码介绍

这是PairRDDFunctions里面的combineByKey的方法片段,这两个方法放在一块,就是说明了,调用该方法若不填分区函数Partitioner则使用HashPartitioner,默认情况下会使用Map段合并(这个是对shuffle而言的)。

3       方法源码走读

废话不多说,直接贴源码,

有注释,则看注释,注释要表达的意思就是combineByKey是一个范函数,使用一组自定义聚合函数以Key为聚合条件进行聚合,至于其他的就不多说了,往下看代码。

首先就进行了判断,Key是否为数组,假如是数组则不能使用Map段合并和HashPartitioner,原因:

要想进行Map段合并和Hash分区,那么Key就必须可以通过比较内容是否相同来确定Key是否相等以及通过内容计算hash值,进而进行合并和分区,然而数组判断相等和计算hash值并不是根据它里面的内容,而是根据数组在堆栈中的信息来实现的

接着往下,构造了一个Aggregator,这玩意可以说是combineByKey的核心,因为聚合全是交给它来完成的。进去看看下Aggregator。

上面是Aggregator的默认构造器,需要传入三个自定义的方法,现在重点说说这三个方法的意义:

首先紧跟着Aggregator的三个泛型,第一个K,这个是你进行combineByKey也就是聚合的条件Key,可以是任意类型。后面的V,C两个泛型是需要聚合的值的类型,和聚合后的值的类型,两个类型是可以一样,也可以不一样,例如,Spark中用的多的reduceByKey这个方法,若聚合前的值为long,那么聚合后仍为long。再比如groupByKey,若聚合前为String,那么聚合后为Iterable<String>。

再看三个自定义方法:

  1. createCombiner

这个方法会在每个分区上都执行的,而且只要在分区里碰到在本分区里没有处理过的Key,就会执行该方法。执行的结果就是在本分区里得到指定Key的聚合类型C(可以是数组,也可以是一个值,具体还是得看方法的定义了。)

2. mergeValue

这方法也会在每个分区上都执行的,和createCombiner不同,它主要是在分区里碰到在本分区内已经处理过的Key才执行该方法,执行的结果就是将目前碰到的Key的值聚合到已有的聚合类型C中。

其实方法1和2放在一起看,就是一个if判断条件,进来一个Key,就去判断一下若以前没出现过就执行方法1,否则执行方法2.

3. mergeCombiner

前两个方法是实现分区内部的相同Key值的数据合并,而这个方法主要用于分区间的相同Key值的数据合并,形成最终的结果。

接下来就看看Aggregator实现了哪些方法。

从它的方法列表上来看,其实就它只有三个方法,那就依次来看看这三个方法是干嘛的:

  1. combineValuesByKey

看到这个名字,再根据构造器,就可以猜出,这个方法主要实现的就是分区内部的数据合并。看它的代码:

这里根据是否可以刷磁盘分了两条路,其实做的事情都是一样的,区别是在存储数据的时候一个当内存不够是直接oom,一个是可以刷磁盘。代码的实现很简单,就是迭代一个分区的数据,然后不断插入或更新Map里面的数据,这里就不再细说。

2. combineCombinersByKey

这个方法主要是实现分区间的数据合并,也就是合并combineValuesByKey的结果,看它是怎么实现的:

代码就不说了,和combineValuesByKey如出一辙,只是使用的自定义的方法不同而已。

3. updateMetrics

这个方法和刷磁盘有关,

就是记录下,当前是否刷了磁盘,刷了多少。

到这里Aggregator就结束了,接着combineByKey往下。

实例化Aggregator后,接着就是判断,是否需要重新分区(shuffle):

  1. 不需要分区

当self.partitioner == Some(partitioner)时,也就是分区实例是同一个的时候,就不需要分区了,因此只需要对先用的分区进行combineValuesByKey操作就好了,没有分区间的合并了,也不需要shuffle了。

2. 需要分区

两个分区器不一样,需要对现在分区的零散数据按Key重新分区,目的就是在于将相同的Key汇集到同一个分区上,由于数据分布的不确定性,因此有可能现在的每个分区的数据是由重新分区后的所有分区的部分数据构成的(宽依赖),因此需要shuffle,则构建ShuffledRDD,

其实到这里,我们就应该意识到,combineByKey的关键在于分区器partitioner,它是针对分区的一个操作,分区器的选择就决定了执行combineByKey后的结果,如果所给的分区器不能保证相同的Key值被分区到同一个分区,那么最终的合并的结果可能存在多个分区里有相同的Key

Shuffle的目的就是将零散于所有分区的数据按Key分区并集中。

需要shuffle的部分下部分再细说。

时间: 2024-08-09 22:02:08

Spark API 之 combineByKey(一)的相关文章

Spark API编程动手实战-01-以本地模式进行Spark API实战map、filter和co

首先以spark的本地模式测试spark API,以local的方式运行spark-shell: 先从parallelize入手吧: map操作后结果: 下面看下 filter操作: filter执行结果: 我们用最正宗的scala函数式编程的风格: 执行结果: 从结果 可以看出来,与之前那种分步奏方式结果是一样的 但采用这种方式,即是复合scala风格的写法,也是符合spark的应用程序风格的写法,在spark的编程中,大多数功能的实现都是只要一行代码即可完成.

Spark API编程动手实战-01-以本地模式进行Spark API实战map、filter和collect

首先以spark的本地模式测试spark API,以local的方式运行spark-shell: 先从parallelize入手吧: map操作后结果: 下面看下 filter操作: filter执行结果: 我们用最正宗的scala函数式编程的风格: 执行结果: 从结果 可以看出来,与之前那种分步奏方式结果是一样的 但采用这种方式,即是复合scala风格的写法,也是符合spark的应用程序风格的写法,在spark的编程中,大多数功能的实现都是只要一行代码即可完成.

图解Spark API

初识spark,需要对其API有熟悉的了解才能方便开发上层应用.本文用图形的方式直观表达相关API的工作特点,并提供了解新的API接口使用的方法.例子代码全部使用python实现. 1. 数据源准备 准备输入文件: $ cat /tmp/in apple bag bag cat cat cat 启动pyspark: $ ./spark/bin/pyspark 使用textFile创建RDD: >>> txt = sc.textFile("file:///tmp/in"

Spark调研笔记第5篇 - Spark API简单介绍

因为Spark是用Scala实现的,所以Spark天生支持Scala API.此外,还支持Java和Python API. 以Spark 1.3版本号的Python API为例.其模块层级关系例如以下图所看到的: 从上图可知,pyspark是Python API的顶层package,它包括了几个重要的subpackages.当中: 1) pyspark.SparkContext 它抽象了指向spark集群的一条连接,可用来创建RDD对象,它是API的主入口. 2) pyspark.SparkCo

Spark调研笔记第5篇 - Spark API简介

由于Spark是用Scala实现的,所以Spark天生支持Scala API,此外,还支持Java和Python API.以Spark 1.3版本的Python API为例,其模块层级关系如下图所示: 从上图可知,pyspark是Python API的顶层package,它包含了几个重要的subpackages,其中: 1) pyspark.SparkContext 它抽象了指向spark集群的一条连接,可用来创建RDD对象,它是API的主入口. 2) pyspark.SparkConf 通过它

spark算子:combineByKey

假设我们有一组个人信息,我们针对人的性别进行分组统计,并进行统计每个分组中的记录数. scala> val people = List(("male", "Mobin"), ("male", "Kpop"), ("female", "Lucy"), ("male", "Lufei"), ("female", "A

基于IDEA使用Spark API开发Spark程序

清明假期折腾了两天,总结了两种方式使用IDE进行spark程序,记录一下: 第一种方法比较简单,两种方式都是采用SBT进行编译的. 注意:本地不需要安装Scala程序,否则在编译程序时有版本兼容性问题. 一.基于Non-SBT方式 创建一个Scala IDEA工程 我们使用Non-SBT的方式,点击"Next" 命名工程,其他按照默认 点击"Finish"完成工程的创建 修改项目的属性 首先修改Modules选项 在src下创建两个文件夹,并把其属性改为source

Spark API综合实战:动手实战和调试Spark文件操作、动手实战操作搜狗日志文件、搜狗日志文件深入实战

这里,我以指定executor-memory参数的方式,启动spark-shell. 启动hadoop集群 [email protected]:/usr/local/hadoop/hadoop-2.6.0$ jps8457 Jps[email protected]:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh 启动spark集群 [email protected]:/usr/local/spark/spark-1.5.2-bin-hadoop

Spark API编程动手实战-03-以在Spark 1.2版本实现对Job输出结果进行排序

从前一篇文章中的wordcount的输出结果可以看出来结果是未经排序的,如何对spark的输出结果进行排序呢? 先对reduceByKey的结果进行key,value位置置换(数字,字符),然后再进行数字排序,再将key,value位置置换后就是排序后的结果了,最终将结果存储到HDFS中 可以发现我们成功对输出结果进行排序!