spark算子:combineByKey

假设我们有一组个人信息,我们针对人的性别进行分组统计,并进行统计每个分组中的记录数。

scala> val people = List(("male", "Mobin"), ("male", "Kpop"), ("female", "Lucy"), ("male", "Lufei"), ("female", "Amy"))
      people: List[(String, String)] = List((male,Mobin), (male,Kpop), (female,Lucy), (male,Lufei), (female,Amy))

scala> val rdd = sc.parallelize(people)
      rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:23

scala> val combinByKeyRDD = rdd.combineByKey(
           |   (x: String) => (List(x), 1),
           |   (peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1),
           |   (sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2))
combinByKeyRDD: org.apache.spark.rdd.RDD[(String, (List[String], Int))] = ShuffledRDD[1] at combineByKey at <console>:25

scala> combinByKeyRDD.foreach(println)
      (female,(List(Lucy, Amy),2))
      (male,(List(Mobin, Kpop, Lufei),3))
scala>

输出步骤:

Partition1:
K="male"  -->  ("male","Mobin")  --> createCombiner("Mobin") =>  peo1 = (  List("Mobin") , 1 )
K="male"  -->  ("male","Kpop")  --> mergeValue(peo1,"Kpop") =>  peo2 = (  "Kpop"  ::  peo1_1 , 1 + 1 )    //Key相同调用mergeValue函数对值进行合并
K="female"  -->  ("female","Lucy")  --> createCombiner("Lucy") =>  peo3 = (  List("Lucy") , 1 )

Partition2:
K="male"  -->  ("male","Lufei")  --> createCombiner("Lufei") =>  peo4 = (  List("Lufei") , 1 )
K="female"  -->  ("female","Amy")  --> createCombiner("Amy") =>  peo5 = (  List("Amy") , 1 )

Merger Partition:
K="male" --> mergeCombiners(peo2,peo4) => (List(Lufei,Kpop,Mobin))
K="female" --> mergeCombiners(peo3,peo5) => (List(Amy,Lucy))

上边的信息中,个人信息中只有一个值,如果value是元组的话,需要定义出一个type:

scala>       val people = List(("male", ("Mobin",89)),("male", ("Kpop",98)),("female", ("Lucy",99)),("male", ("Lufei",77)),("female", ("Amy",97)))
scala>       val rdd = sc.parallelize(people)
rdd: org.apache.spark.rdd.RDD[(String, (String, Int))] = ParallelCollectionRDD[2] at parallelize at <console>:23

scala>   type MVType = (String, Int)
defined type alias MVType

scala>       val combinByKeyRDD = rdd.combineByKey(
     |         (x: MVType) => (List(x), 1),
     |         (peo: (List[MVType], Int), x:MVType) => (x :: peo._1, peo._2 + 1),
     |         (sex1: (List[MVType], Int), sex2: (List[MVType], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2))
combinByKeyRDD: org.apache.spark.rdd.RDD[(String, (List[(String, Int)], Int))] = ShuffledRDD[3] at combineByKey at <console>:27

scala>    combinByKeyRDD.foreach(println)
(male,(List((Mobin,89), (Kpop,98), (Lufei,77)),3))
(female,(List((Lucy,99), (Amy,97)),2))
时间: 2024-08-29 11:41:44

spark算子:combineByKey的相关文章

UserView--第二种方式(避免第一种方式Set饱和),基于Spark算子的java代码实现

UserView--第二种方式(避免第一种方式Set饱和),基于Spark算子的java代码实现 测试数据 java代码 1 package com.hzf.spark.study; 2 3 import java.util.Map; 4 import java.util.Set; 5 6 import org.apache.spark.SparkConf; 7 import org.apache.spark.api.java.JavaPairRDD; 8 import org.apache.s

Spark算子---实战应用

Spark算子实战应用 数据集 :http://grouplens.org/datasets/movielens/ MovieLens 1M Datase 相关数据文件 : users.dat ---UserID::Gender::Age::Occupation::Zip-code movies.dat --- MovieID::Title::Genres ratings.dat ---UserID::MovieID::Rating::Timestamp SogouQ.mini 完成以下业务需求

Spark算子:RDD基本转换操作(1)–map、flatMap、distinct

Spark算子:RDD基本转换操作(1)–map.flatMap.distinct 关键字:Spark算子.Spark RDD基本转换.map.flatMap.distinct map 将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素. 输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区. hadoop fs -cat /tmp/lxw1234/1.txt hello world hello spark hello hive //读取HDFS文件到RDD sca

spark通过combineByKey算子实现条件性聚合的方法

实际开发过程中遇到了需要实现选择性聚合的场景,即对于某一个key对应的数据,满足条件的记录进行聚合,不满足条件的则不聚合. 使用spark处理这种计算场景时,想到了使用combineByKey算子,先将输入数据中的value映射成含一个元素的ArrayBuffer(scala中相当于java中的ArrayList),然后在聚合时对满足聚合条件的记录聚合后覆盖这一个ArrayBuffer,不满足条件的待聚合的两条记录都填入ArrayBuffer.最后调用flatMap将ArrayBuffer中的元

Spark算子选择策略

摘要  1.使用reduceByKey/aggregateByKey替代groupByKey 2.使用mapPartitions替代普通map 3.使用foreachPartitions替代foreach 4.使用filter之后进行coalesce操作 5.使用repartitionAndSortWithinPartitions替代repartition与sort类操作 6.使用broadcast使各task共享同一Executor的集合替代算子函数中各task传送一份集合 7.使用相同分区方

PairRDD中算子combineByKey图解

1.combineByKey combine 为结合意思.    作用: 将RDD[(K,V)] => RDD[(K,C)] 表示V的类型可以转成C两者可以不同类型. def combineByKey[C](createCombiner:V =>C ,mergeValue:(C,V) =>C, mergeCombiners:(C,C) =>C):RDD[(K,C)] def combineByKey[C](createCombiner:V =>C ,mergeValue:(C

(二)spark算子 分为3大类

transgormation的算子对key-value类型的数据有三种: (1)输入 与 输出为一对一关系 mapValue();针对key-value类型的数据并只对其中的value进行操作,不对key进行操作 (2)对单个rdd聚集 combineByKey 相当于将(v1,2 v1,1)转为(v1,Seq(1,2))的rdd reduceByKey 就是将相同的key合并,算出他们的和 partitionBy 对rdd进行分区,如果原有的rdd与现在的rdd一致则不进行分区:如果不一致则根

(三)spark算子 分为3大类

ation算子通过sparkContext执行提交作业的runJob,触发rdd的DAG执行 (foreach) foreach(f) 会对rdd中的每个函数进行f操作,下面的f操作就是打印输出没有元素 saveAsTextFile 将rdd保存到hdfs指定的路径,将rdd中每一个分区保存到hdfs上的block saveAsObjectFile 将rdd中每10个元素组成一个array,然后将这个array序列化,映射为(null,bytesWritable(y)) 写入hdfs为Seque

Spark 的combineByKey函数

在Spark中有许多聚类操作是基于combineByKey的,例如group那个家族的操作等.所以combineByKey这个函数也是比较重要,所以下午花了点时间看来下这个函数.也参考了http://www.tuicool.com/articles/miueaqv这篇博客. 先看下combineByKey定义: /** * Generic function to combine the elements for each key using a custom set of aggregation