Spark核心RDD:combineByKey函数详解

https://blog.csdn.net/jiangpeng59/article/details/52538254

为什么单独讲解combineByKey?

因为combineByKey是Spark中一个比较核心的高级函数,其他一些高阶键值对函数底层都是用它实现的。诸如 groupByKey,reduceByKey等等

如下给出combineByKey的定义,其他的细节暂时忽略(1.6.0版的函数名更新为combineByKeyWithClassTag)

  1. def combineByKey[C](

  2.  

    createCombiner: V => C,

  3.  

    mergeValue: (C, V) => C,

  4.  

    mergeCombiners: (C, C) => C,

  5.  

    partitioner: Partitioner,

  6.  

    mapSideCombine: Boolean = true,

  7.  

    serializer: Serializer = null)

如下解释下3个重要的函数参数:

  • createCombiner: V => C ,这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)
  • mergeValue: (C, V) => C,该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)
  • mergeCombiners: (C, C) => C,该函数把2个元素C合并 (这个操作在不同分区间进行)

如下看一个使用combineByKey来求解平均数的例子

  1. val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))

  2.  

    val d1 = sc.parallelize(initialScores)

  3.  

    type MVType = (Int, Double) //定义一个元组类型(科目计数器,分数)

  4.  

    d1.combineByKey(

  5.  

    score => (1, score),

  6.  

    (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),

  7.  

    (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)

  8.  

    ).map { case (name, (num, socre)) => (name, socre / num) }.collect

参数含义的解释
a 、score => (1, score),我们把分数作为参数,并返回了附加的元组类型。 以"Fred"为列,当前其分数为88.0 =>(1,88.0)  1表示当前科目的计数器,此时只有一个科目

b、(c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),注意这里的c1就是createCombiner初始化得到的(1,88.0)。在一个分区内,我们又碰到了"Fred"的一个新的分数91.0。当然我们要把之前的科目分数和当前的分数加起来即c1._2 + newScore,然后把科目计算器加1即c1._1 + 1

c、 (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2),注意"Fred"可能是个学霸,他选修的科目可能过多而分散在不同的分区中。所有的分区都进行mergeValue后,接下来就是对分区间进行合并了,分区间科目数和科目数相加分数和分数相加就得到了总分和总科目数

执行结果如下:

res1: Array[(String, Double)] = Array((Wilma,95.33333333333333), (Fred,91.33333333333333))

例子来源:http://codingjunkie.net/spark-combine-by-key/

原文地址:https://www.cnblogs.com/bnuvincent/p/9354878.html

时间: 2024-10-10 00:44:38

Spark核心RDD:combineByKey函数详解的相关文章

spark wordcont Spark: sortBy和sortByKey函数详解

val res = sc.textFile("D:\\test\\spark\\urlCount").flatMap(_.split("\\s")) .map((_,1)).reduceByKey(_+_).map(t=>(t._2,t._1)).sortByKey().top(10) 在很多应用场景都需要对结果数据进行排序,Spark中有时也不例外.在Spark中存在两种对RDD进行排序的函数,分别是 sortBy和sortByKey函数.sortBy是对标

spark2.x由浅入深深到底系列六之RDD java api详解三

学习任何spark知识点之前请先正确理解spark,可以参考:正确理解spark 本文详细介绍了spark key-value类型的rdd java api 一.key-value类型的RDD的创建方式 1.sparkContext.parallelizePairs JavaPairRDD<String, Integer> javaPairRDD =         sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3

spark2.x由浅入深深到底系列六之RDD java api详解二

package com.twq.javaapi.java7; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.funct

spark2.x由浅入深深到底系列六之RDD java api详解四

学习spark任何的知识点之前,先对spark要有一个正确的理解,可以参考:正确理解spark 本文对join相关的api做了一个解释 SparkConf conf = new SparkConf().setAppName("appName").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD<Integer, Integer> javaPa

Spark Streaming性能调优详解(转)

原文链接:Spark Streaming性能调优详解 Spark Streaming提供了高效便捷的流式处理模式,但是在有些场景下,使用默认的配置达不到最优,甚至无法实时处理来自外部的数据,这时候我们就需要对默认的配置进行相关的修改.由于现实中场景和数据量不一样,所以我们无法设置一些通用的配置(要不然Spark Streaming开发者就不会弄那么多参数,直接写死不得了),我们需要根据数据量,场景的不同设置不一样的配置,这里只是给出建议,这些调优不一定试用于你的程序,一个好的配置是需要慢慢地尝试

Spark核心—RDD初探

本文目的 ? 最近在使用Spark进行数据清理的相关工作,初次使用Spark时,遇到了一些挑(da)战(ken).感觉需要记录点什么,才对得起自己.下面的内容主要是关于Spark核心-RDD的相关的使用经验和原理介绍,作为个人备忘,也希望对读者有用. ? 为什么选择Spark ? 原因如下 代码复用:使用Scala高级语言操作Spark,灵活方便,面向对象,函数编程的语言特性可以全部拿来.Scala基本上可以无缝集成java及其相关库.最重要的是,可以封装组件,沉淀工作,提高工作效率.之前用hi

ThinkPHP内置函数详解D、F、S、C、L、A、I

ThinkPHP内置函数详解D.F.S.C.L.A.I 单字母函数D.F.S.C.L.A.I 他们都在ThinkPHP核心的ThinkPHP/Mode/Api/functions.php这个文件中定义. 下面我分别说明一下他们的功能: D() 加载Model类 M() 加载Model类 A() 加载Action类 L() 获取语言定义 C() 获取配置值    用法就是   C("这里填写在配置文件里数组的下标") S() 全局缓存配置 用法S(“这里相当于一个唯一的标识”) F()

Linux系统调用--getrlimit()与setrlimit()函数详解

http://www.cnblogs.com/niocai/archive/2012/04/01/2428128.html 功能描述:获取或设定资源使用限制.每种资源都有相关的软硬限制,软限制是内核强加给相应资源的限制值,硬限制是软限制的最大值.非授权调 用进程只可以将其软限制指定为0~硬限制范围中的某个值,同时能不可逆转地降低其硬限制.授权进程可以任意改变其软硬限制.RLIM_INFINITY的 值表示不对资源限制. 用法: #include <sys/resource.h>int getr

scandir函数详解

scandir函数详解2009-10-30 10:51scandir函数:读取特定的目录数据表头文件:#include <dirent.h>定义函数:int scandir(const char *dir, struct dirent **namelist, nt (*select) (const struct dirent *), nt                     (*compar) (const struct dirent **, const struct dirent**))