Spark 的combineByKey函数

在Spark中有许多聚类操作是基于combineByKey的,例如group那个家族的操作等。所以combineByKey这个函数也是比较重要,所以下午花了点时间看来下这个函数。也参考了http://www.tuicool.com/articles/miueaqv这篇博客。

先看下combineByKey定义:

/**

* Generic function to combine the elements for each key using a custom set of aggregation

* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C

* Note that V and C can be different -- for example, one might group an RDD of type

* (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:

*

* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)

* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)

* - `mergeCombiners`, to combine two C‘s into a single one.

*

* In addition, users can control the partitioning of the output RDD, and whether to perform

* map-side aggregation (if a mapper can produce multiple items with the same key).

*/

def combineByKey[C](createCombiner: V => C,

mergeValue: (C, V) => C,

mergeCombiners: (C, C) => C,

partitioner: Partitioner,

mapSideCombine: Boolean = true,

serializer: Serializer = null): RDD[(K, C)] = {

//实现略

}

这个函数主要是将键值对[(K,V)]转换为[(K,C)],并且这里的V,C类型可以不同。

对于里面的三个函数的作用,上述的博客的例子讲得很通俗,所以就拿过来直接讲下。

假设我们要将一堆的各类水果给榨果汁,并且要求果汁只能是纯的,不能有其他品种的水果。那么我们需要一下几步:

1 定义我们需要什么样的果汁

2 定义一个榨果汁机,即给定水果,就能给出我们定义的果汁

3 定义一个果汁混合器,即能将相同类型的水果果汁给混合起来。

那么有了这三步,我们就只需要往这个果汁机中仍水果,那么这个果汁机就会产生果汁,并且果汁经过果汁混合器就能将相同品种的水果给聚在一块了。

那么对比上述三步,combineByKey的三个函数也就是这三个功能

1 createCombiner就是定义了v如何转换为c

2 mergeValue 就是定义了如何给定一个V将其与原来的C合并成新的C

3 就是定义了如何将相同key下的C给合并成一个C

下面以一个例子来说明,例如有

data=sc.parallelize([("a",2),("a",3),("b",4)])这个数据,后面的数字表示该字符权重,我们现在想求每个字符出现的平均权重。

data.combineByKey((lambda v:(v,1)),

(lambda c,v:(c[0]+v,c[1]+1)),

(lambda x,y:(x[0]+y[0],x[1]+y[1])))

那么第一个函数

lambda v:(v,1)中的参数v为键值对的value,我们定义C为(v,1)后面的1用与计数

第二个函数

(lambda c,v:(c[0]+v,c[1]+1)) 这里的参数c可以理解为已经榨好的果汁,v为新加进去的水果,在这里假设此时c为(2,1)v为3,那么应该对应的权重相加,并且计数加1,c[0]+v就是权重相加,c[0]是2,接着就是对其计数加1.(其实这里已经是对相同的key进行归类了,否则c和v的key不同,在这里就会混乱)

第三个函数

(lambda x,y:(x[0]+y[0],x[1]+y[1])),这里是将相同品种的水果果汁给混合起来,所以这里的参数x,y其实都是c类型的。所以这里需要做的就是相应的的权重相加,并且计数相加。这里的x[0]+y[0]就是权重相加,x[1]+y[1]就是计数相加。

总结:

虽然对于细节了解不够深,但是猜测第二个函数像是Hadoop中的local combiner就是对本地中的相同的key的水果进行榨汁混合,第三个函数像是在全局中对相同的key的水果进行混合(此时不需要榨汁了)。

时间: 2024-10-10 03:35:47

Spark 的combineByKey函数的相关文章

spark中flatMap函数用法--spark学习(基础)

spark中flatMap函数用法--spark学习(基础) 在spark中map函数和flatMap函数是两个比较常用的函数.其中 map:对集合中每个元素进行操作. flatMap:对集合中每个元素进行操作然后再扁平化. 理解扁平化可以举个简单例子 val arr=sc.parallelize(Array(("A",1),("B",2),("C",3))) arr.flatmap(x=>(x._1+x._2)).foreach(prin

Spark核心RDD:combineByKey函数详解

https://blog.csdn.net/jiangpeng59/article/details/52538254 为什么单独讲解combineByKey? 因为combineByKey是Spark中一个比较核心的高级函数,其他一些高阶键值对函数底层都是用它实现的.诸如 groupByKey,reduceByKey等等 如下给出combineByKey的定义,其他的细节暂时忽略(1.6.0版的函数名更新为combineByKeyWithClassTag) def combineByKey[C]

spark的runJob函数2

上一篇我们讲到了spark的runJob方法提交job运行,runJob在提交时,需要RDD和一个函数,那么运行机制是什么呢?函数如何运行的呢?首先job被提交后,需要切分stage,然后每个stage会划分成一组task提交executor运行.如何切分stage和task,需要另写一篇来解读.那么我们下面来分析如何运行task.我们看下面代码 private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: Rpc

详解Spark sql用户自定义函数:UDF与UDAF

UDAF = USER DEFINED AGGREGATION FUNCTION Spark sql提供了丰富的内置函数供猿友们使用,辣为何还要用户自定义函数呢?实际的业务场景可能很复杂,内置函数hold不住,所以Spark sql提供了可扩展的内置函数接口:哥们,你的业务太变态了,我满足不了你,自己按照我的规范去定义一个sql函数,该怎么折腾就怎么折腾! 例如,MySQL数据库中有一张task表,共两个字段taskid (任务ID)与taskParam(JSON格式的任务请求参数).简单起见,

spark之combineByKey

combineByKey def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitio

spark通过combineByKey算子实现条件性聚合的方法

实际开发过程中遇到了需要实现选择性聚合的场景,即对于某一个key对应的数据,满足条件的记录进行聚合,不满足条件的则不聚合. 使用spark处理这种计算场景时,想到了使用combineByKey算子,先将输入数据中的value映射成含一个元素的ArrayBuffer(scala中相当于java中的ArrayList),然后在聚合时对满足聚合条件的记录聚合后覆盖这一个ArrayBuffer,不满足条件的待聚合的两条记录都填入ArrayBuffer.最后调用flatMap将ArrayBuffer中的元

Spark 系列(十一)—— Spark SQL 聚合函数 Aggregations

一.简单聚合 1.1 数据准备 // 需要导入 spark sql 内置的函数包 import org.apache.spark.sql.functions._ val spark = SparkSession.builder().appName("aggregations").master("local[2]").getOrCreate() val empDF = spark.read.json("/usr/file/json/emp.json"

Spark常用函数讲解--键值RDD转换

摘要: RDD:弹性分布式数据集,是一种特殊集合 ‚ 支持多种来源 ‚ 有容错机制 ‚ 可以被缓存 ‚ 支持并行操作,一个RDD代表一个分区里的数据集RDD有两种操作算子:         Transformation(转换):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住       了数据集的逻辑操作         Ation(执行):触发Spark作业的运行,真正触发转换算子的计算 本系列主要讲解Spark中常用的函数操作:   

Spark计算均值

作者:Syn良子 出处:http://www.cnblogs.com/cssdongl 转载请注明出处 用spark来快速计算分组的平均值,写法很便捷,话不多说上代码 object ColumnValueAvg extends App { /** * ID,Name,ADDRESS,AGE * 001,zhangsan,chaoyang,20 * 002,zhangsa,chaoyang,27 * 003,zhangjie,chaoyang,35 * 004,lisi,haidian,24 *