spark之combineByKey

combineByKey

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

其中的参数:

createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C

mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C

mergeCombiners:合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C

numPartitions:结果RDD分区数,默认保持原有的分区数

partitioner:分区函数,默认为HashPartitioner

mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true



举例理解:

假设我们要将一堆的各类水果给榨果汁,并且要求果汁只能是纯的,不能有其他品种的水果。那么我们需要一下几步:

1 定义我们需要什么样的果汁。

2 定义一个榨果汁机,即给定水果,就能给出我们定义的果汁。--相当于hadoop中的local combiner

3 定义一个果汁混合器,即能将相同类型的水果果汁给混合起来。--相当于全局进行combiner

那么对比上述三步,combineByKey的三个函数也就是这三个功能

1 createCombiner就是定义了v如何转换为c

2 mergeValue 就是定义了如何给定一个V将其与原来的C合并成新的C

3 就是定义了如何将相同key下的C给合并成一个C

var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))

rdd1.combineByKey(
(v : Int) => List(v),             --将1 转换成 list(1)
(c : List[Int], v : Int) => v :: c,       --将list(1)和2进行组合从而转换成list(1,2)
(c1 : List[Int], c2 : List[Int]) => c1 ::: c2  --将全局相同的key的value进行组合
).collect
res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))

时间: 2024-08-29 14:32:44

spark之combineByKey的相关文章

Spark 的combineByKey函数

在Spark中有许多聚类操作是基于combineByKey的,例如group那个家族的操作等.所以combineByKey这个函数也是比较重要,所以下午花了点时间看来下这个函数.也参考了http://www.tuicool.com/articles/miueaqv这篇博客. 先看下combineByKey定义: /** * Generic function to combine the elements for each key using a custom set of aggregation

spark通过combineByKey算子实现条件性聚合的方法

实际开发过程中遇到了需要实现选择性聚合的场景,即对于某一个key对应的数据,满足条件的记录进行聚合,不满足条件的则不聚合. 使用spark处理这种计算场景时,想到了使用combineByKey算子,先将输入数据中的value映射成含一个元素的ArrayBuffer(scala中相当于java中的ArrayList),然后在聚合时对满足聚合条件的记录聚合后覆盖这一个ArrayBuffer,不满足条件的待聚合的两条记录都填入ArrayBuffer.最后调用flatMap将ArrayBuffer中的元

Job 逻辑执行图

General logical plan 典型的 Job 逻辑执行图如上所示,经过下面四个步骤可以得到最终执行结果: 从数据源(可以是本地 file,内存数据结构, HDFS,HBase 等)读取数据创建最初的 RDD.上一章例子中的 parallelize() 相当于 createRDD(). 对 RDD 进行一系列的 transformation() 操作,每一个 transformation() 会产生一个或多个包含不同类型 T 的 RDD[T].T 可以是 Scala 里面的基本类型或数

Spark API 之 combineByKey(一)

1       前言 combineByKey是使用Spark无法避免的一个方法,总会在有意或无意,直接或间接的调用到它.从它的字面上就可以知道,它有聚合的作用,对于这点不想做过多的解释,原因很简单,因为reduceByKey.aggregateByKey.foldByKey等函数都是使用它来实现的. combineByKey是一个高度抽象的聚合函数,可以用于数据的聚合和分组,由它牵出的shuffle也是Spark中重中之重,现在就让我们去看看它到底是怎么去实现的. 不足或错误之处, 烦请指出更

spark算子:combineByKey

假设我们有一组个人信息,我们针对人的性别进行分组统计,并进行统计每个分组中的记录数. scala> val people = List(("male", "Mobin"), ("male", "Kpop"), ("female", "Lucy"), ("male", "Lufei"), ("female", "A

Spark核心RDD:combineByKey函数详解

https://blog.csdn.net/jiangpeng59/article/details/52538254 为什么单独讲解combineByKey? 因为combineByKey是Spark中一个比较核心的高级函数,其他一些高阶键值对函数底层都是用它实现的.诸如 groupByKey,reduceByKey等等 如下给出combineByKey的定义,其他的细节暂时忽略(1.6.0版的函数名更新为combineByKeyWithClassTag) def combineByKey[C]

spark

Transformation 和Action本质区别: Transformations是RDD到RDD; Actions是RDD到result. Actions算子触发Spark job. Spark groupbykey和cogroup使用示例 groupByKeygroupByKey([numTasks])是数据分组操作,在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集.val rdd0 = sc.parallelize(Array((1,1), (1,2) ,

Spark计算均值

作者:Syn良子 出处:http://www.cnblogs.com/cssdongl 转载请注明出处 用spark来快速计算分组的平均值,写法很便捷,话不多说上代码 object ColumnValueAvg extends App { /** * ID,Name,ADDRESS,AGE * 001,zhangsan,chaoyang,20 * 002,zhangsa,chaoyang,27 * 003,zhangjie,chaoyang,35 * 004,lisi,haidian,24 *

spark总结——转载

转载自:http://smallx.me/2016/06/07/spark%E4%BD%BF%E7%94%A8%E6%80%BB%E7%BB%93/ 第一个Spark程序 /** * 功能:用spark实现的单词计数程序 * 环境:spark 1.6.1, scala 2.10.4 */ // 导入相关类库import org.apache.spark._ object WordCount { def main(args: Array[String]) { // 建立spark运行上下文 val