RDD之五:Key-Value型Transformation算子

Transformation处理的数据为Key-Value形式的算子大致可以分为:输入分区与输出分区一对一、聚集、连接操作。

输入分区与输出分区一对一

mapValues

mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行处理。 

方框代表RDD分区。a=>a+2代表只对( V1, 1)数据中的1进行加2操作,返回结果为3。

源码:

[plain] view plain copy

  1. /**
  2. * Pass each value in the key-value pair RDD through a map function without changing the keys;
  3. * this also retains the original RDD‘s partitioning.
  4. */
  5. def mapValues[U](f: V => U): RDD[(K, U)] = {
  6. val cleanF = self.context.clean(f)
  7. new MapPartitionsRDD[(K, U), (K, V)](self,
  8. (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
  9. preservesPartitioning = true)
  10. }

单个RDD或两个RDD聚集

(1)combineByKey

combineByKey是对单个Rdd的聚合。相当于将元素为(Int,Int)的RDD转变为了(Int,Seq[Int])类型元素的RDD。 
定义combineByKey算子的说明如下:

  • createCombiner: V => C, 在C不存在的情况下,如通过V创建seq C。
  • mergeValue:(C, V) => C, 当C已经存在的情况下,需要merge,如把item V加到seq 
    C中,或者叠加。
  • mergeCombiners:(C,C) => C,合并两个C。
  • partitioner: Partitioner(分区器),Shuffle时需要通过Partitioner的分区策略进行分区。
  • mapSideCombine: Boolean=true, 为了减小传输量,很多combine可以在map端先做。例如, 叠加可以先在一个partition中把所有相同的Key的Value叠加, 再shuffle。
  • serializerClass:String=null,传输需要序列化,用户可以自定义序列化类。


方框代表RDD分区。 通过combineByKey,将(V1,2)、 (V1,1)数据合并为(V1,Seq(2,1))。

源码:

[plain] view plain copy

  1. /**
  2. * Generic function to combine the elements for each key using a custom set of aggregation
  3. * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
  4. * Note that V and C can be different -- for example, one might group an RDD of type
  5. * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
  6. *
  7. * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
  8. * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
  9. * - `mergeCombiners`, to combine two C‘s into a single one.
  10. *
  11. * In addition, users can control the partitioning of the output RDD, and whether to perform
  12. * map-side aggregation (if a mapper can produce multiple items with the same key).
  13. */
  14. def combineByKey[C](createCombiner: V => C,
  15. mergeValue: (C, V) => C,
  16. mergeCombiners: (C, C) => C,
  17. partitioner: Partitioner,
  18. mapSideCombine: Boolean = true,
  19. serializer: Serializer = null): RDD[(K, C)] = {
  20. require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
  21. if (keyClass.isArray) {
  22. if (mapSideCombine) {
  23. throw new SparkException("Cannot use map-side combining with array keys.")
  24. }
  25. if (partitioner.isInstanceOf[HashPartitioner]) {
  26. throw new SparkException("Default partitioner cannot partition array keys.")
  27. }
  28. }
  29. val aggregator = new Aggregator[K, V, C](
  30. self.context.clean(createCombiner),
  31. self.context.clean(mergeValue),
  32. self.context.clean(mergeCombiners))
  33. if (self.partitioner == Some(partitioner)) {
  34. self.mapPartitions(iter => {
  35. val context = TaskContext.get()
  36. new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
  37. }, preservesPartitioning = true)
  38. } else {
  39. new ShuffledRDD[K, V, C](self, partitioner)
  40. .setSerializer(serializer)
  41. .setAggregator(aggregator)
  42. .setMapSideCombine(mapSideCombine)
  43. }
  44. }
  45. /**
  46. * Simplified version of combineByKey that hash-partitions the output RDD.
  47. */
  48. def combineByKey[C](createCombiner: V => C,
  49. mergeValue: (C, V) => C,
  50. mergeCombiners: (C, C) => C,
  51. numPartitions: Int): RDD[(K, C)] = {
  52. combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
  53. }

(2)reduceByKey

reduceByKey是更简单的一种情况,只是两个值合并成一个值,所以createCombiner很简单,就是直接返回v,而mergeValue和mergeCombiners的逻辑相同,没有区别。

方框代表RDD分区。 通过用户自定义函数(A,B)=>(A+B),将相同Key的数据(V1,2)、(V1,1)的value相加,结果为(V1,3)。

源码:

[plain] view plain copy

  1. /**
  2. * Merge the values for each key using an associative reduce function. This will also perform
  3. * the merging locally on each mapper before sending results to a reducer, similarly to a
  4. * "combiner" in MapReduce.
  5. */
  6. def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
  7. combineByKey[V]((v: V) => v, func, func, partitioner)
  8. }
  9. /**
  10. * Merge the values for each key using an associative reduce function. This will also perform
  11. * the merging locally on each mapper before sending results to a reducer, similarly to a
  12. * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
  13. */
  14. def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
  15. reduceByKey(new HashPartitioner(numPartitions), func)
  16. }
  17. /**
  18. * Merge the values for each key using an associative reduce function. This will also perform
  19. * the merging locally on each mapper before sending results to a reducer, similarly to a
  20. * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
  21. * parallelism level.
  22. */
  23. def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
  24. reduceByKey(defaultPartitioner(self), func)
  25. }

(3)partitionBy

partitionBy函数对RDD进行分区操作。 
如果原有RDD的分区器和现有分区器(partitioner)一致,则不重分区,如果不一致,则相当于根据分区器生成一个新的ShuffledRDD。 

方框代表RDD分区。 通过新的分区策略将原来在不同分区的V1、 V2数据都合并到了一个分区。

源码:

[plain] view plain copy

  1. /**
  2. * Return a copy of the RDD partitioned using the specified partitioner.
  3. */
  4. def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
  5. if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
  6. throw new SparkException("Default partitioner cannot partition array keys.")
  7. }
  8. if (self.partitioner == Some(partitioner)) {
  9. self
  10. } else {
  11. new ShuffledRDD[K, V, V](self, partitioner)
  12. }
  13. }

(4)cogroup

cogroup函数将两个RDD进行协同划分。对在两个RDD中的Key-Value类型的元素,每个RDD相同Key的元素分别聚合为一个集合,并且返回两个RDD中对应Key的元素集合的迭代器(K, (Iterable[V], Iterable[w]))。其中,Key和Value,Value是两个RDD下相同Key的两个数据集合的迭代器所构成的元组。

大方框代表RDD,大方框内的小方框代表RDD中的分区。 将RDD1中的数据(U1,1)、(U1,2)和RDD2中的数据(U1,2)合并为(U1,((1,2),(2)))。

源码:

[plain] view plain copy

  1. /**
  2. * For each key k in `this` or `other1` or `other2` or `other3`,
  3. * return a resulting RDD that contains a tuple with the list of values
  4. * for that key in `this`, `other1`, `other2` and `other3`.
  5. */
  6. def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
  7. other2: RDD[(K, W2)],
  8. other3: RDD[(K, W3)],
  9. partitioner: Partitioner)
  10. : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
  11. if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
  12. throw new SparkException("Default partitioner cannot partition array keys.")
  13. }
  14. val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
  15. cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
  16. (vs.asInstanceOf[Iterable[V]],
  17. w1s.asInstanceOf[Iterable[W1]],
  18. w2s.asInstanceOf[Iterable[W2]],
  19. w3s.asInstanceOf[Iterable[W3]])
  20. }
  21. }
  22. /**
  23. * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
  24. * list of values for that key in `this` as well as `other`.
  25. */
  26. def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
  27. : RDD[(K, (Iterable[V], Iterable[W]))]  = {
  28. if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
  29. throw new SparkException("Default partitioner cannot partition array keys.")
  30. }
  31. val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
  32. cg.mapValues { case Array(vs, w1s) =>
  33. (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
  34. }
  35. }
  36. /**
  37. * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
  38. * tuple with the list of values for that key in `this`, `other1` and `other2`.
  39. */
  40. def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
  41. : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
  42. if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
  43. throw new SparkException("Default partitioner cannot partition array keys.")
  44. }
  45. val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
  46. cg.mapValues { case Array(vs, w1s, w2s) =>
  47. (vs.asInstanceOf[Iterable[V]],
  48. w1s.asInstanceOf[Iterable[W1]],
  49. w2s.asInstanceOf[Iterable[W2]])
  50. }
  51. }
  52. /**
  53. * For each key k in `this` or `other1` or `other2` or `other3`,
  54. * return a resulting RDD that contains a tuple with the list of values
  55. * for that key in `this`, `other1`, `other2` and `other3`.
  56. */
  57. def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)])
  58. : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
  59. cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3))
  60. }
  61. /**
  62. * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
  63. * list of values for that key in `this` as well as `other`.
  64. */
  65. def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {
  66. cogroup(other, defaultPartitioner(self, other))
  67. }
  68. /**
  69. * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
  70. * tuple with the list of values for that key in `this`, `other1` and `other2`.
  71. */
  72. def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
  73. : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
  74. cogroup(other1, other2, defaultPartitioner(self, other1, other2))
  75. }
  76. /**
  77. * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
  78. * list of values for that key in `this` as well as `other`.
  79. */
  80. def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = {
  81. cogroup(other, new HashPartitioner(numPartitions))
  82. }
  83. /**
  84. * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
  85. * tuple with the list of values for that key in `this`, `other1` and `other2`.
  86. */
  87. def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
  88. : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
  89. cogroup(other1, other2, new HashPartitioner(numPartitions))
  90. }
  91. /**
  92. * For each key k in `this` or `other1` or `other2` or `other3`,
  93. * return a resulting RDD that contains a tuple with the list of values
  94. * for that key in `this`, `other1`, `other2` and `other3`.
  95. */
  96. def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
  97. other2: RDD[(K, W2)],
  98. other3: RDD[(K, W3)],
  99. numPartitions: Int)
  100. : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = {
  101. cogroup(other1, other2, other3, new HashPartitioner(numPartitions))
  102. }

连接

(1)join

join对两个需要连接的RDD进行cogroup函数操作。cogroup操作之后形成的新RDD,对每个key下的元素进行笛卡尔积操作,返回的结果再展平,对应Key下的所有元组形成一个集合,最后返回RDD[(K,(V,W))]。
join的本质是通过cogroup算子先进行协同划分,再通过flatMapValues将合并的数据打散。 

对两个RDD的join操作示意图。 大方框代表RDD,小方框代表RDD中的分区。函数对拥有相同Key的元素(例如V1)为Key,以做连接后的数据结果为(V1,(1,1))和(V1,(1,2))。

源码:

[plain] view plain copy

  1. /**
  2. * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
  3. * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
  4. * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
  5. */
  6. def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
  7. this.cogroup(other, partitioner).flatMapValues( pair =>
  8. for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
  9. )
  10. }

(2)leftOuterJoin和rightOuterJoin

LeftOuterJoin(左外连接)和RightOuterJoin(右外连接)相当于在join的基础上先判断一侧的RDD元素是否为空,如果为空,则填充为空。 如果不为空,则将数据进行连接运算,并返回结果。

源码:

[plain] view plain copy

  1. /**
  2. * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
  3. * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
  4. * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
  5. * partition the output RDD.
  6. */
  7. def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
  8. this.cogroup(other, partitioner).flatMapValues { pair =>
  9. if (pair._2.isEmpty) {
  10. pair._1.iterator.map(v => (v, None))
  11. } else {
  12. for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
  13. }
  14. }
  15. }
  16. /**
  17. * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
  18. * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
  19. * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
  20. * partition the output RDD.
  21. */
  22. def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
  23. : RDD[(K, (Option[V], W))] = {
  24. this.cogroup(other, partitioner).flatMapValues { pair =>
  25. if (pair._1.isEmpty) {
  26. pair._2.iterator.map(w => (None, w))
  27. } else {
  28. for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
  29. }
  30. }
  31. }

原文链接:http://blog.csdn.net/jasonding1354

时间: 2025-01-02 00:44:16

RDD之五:Key-Value型Transformation算子的相关文章

【Spark】RDD操作详解2——值型Transformation算子

处理数据类型为Value型的Transformation算子可以根据RDD变换算子的输入分区与输出分区关系分为以下几种类型: 1)输入分区与输出分区一对一型 2)输入分区与输出分区多对一型 3)输入分区与输出分区多对多型 4)输出分区为输入分区子集型 5)还有一种特殊的输入与输出分区一对一的算子类型:Cache型. Cache算子对RDD分区进行缓存 输入分区与输出分区一对一型 (1)map 将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素.源码中的map算子相当于初

【Spark】RDD操作具体解释2——值型Transformation算子

处理数据类型为Value型的Transformation算子能够依据RDD变换算子的输入分区与输出分区关系分为下面几种类型: 1)输入分区与输出分区一对一型 2)输入分区与输出分区多对一型 3)输入分区与输出分区多对多型 4)输出分区为输入分区子集型 5)另一种特殊的输入与输出分区一对一的算子类型:Cache型. Cache算子对RDD分区进行缓存 输入分区与输出分区一对一型 (1)map 将原来RDD的每一个数据项通过map中的用户自己定义函数f映射转变为一个新的元素. 源代码中的map算子相

【Spark】RDD操作详解3——键值型Transformation算子

Transformation处理的数据为Key-Value形式的算子大致可以分为:输入分区与输出分区一对一.聚集.连接操作. 输入分区与输出分区一对一 mapValues mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行处理. 方框代表RDD分区.a=>a+2代表只对( V1, 1)数据中的1进行加2操作,返回结果为3. 源码: /** * Pass each value in the key-value pair RDD through a m

【Spark】RDD操作详解1——Transformation和Actions概况

Spark算子的作用 下图描述了Spark在运行转换中通过算子对RDD进行转换. 算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作. 输入:在Spark程序运行中,数据从外部数据空间(如分布式存储:textFile读取HDFS等,parallelize方法输入Scala集合或数据)输入Spark,数据进入Spark运行时数据空间,转化为Spark中的数据块,通过BlockManager进行管理. 运行:在Spark数据输入形成RDD后便可以通过变换算子,如filter等,对数据进行操

【Spark】RDD操作具体解释4——Action算子

本质上在Actions算子中通过SparkContext运行提交作业的runJob操作,触发了RDD DAG的运行. 依据Action算子的输出空间将Action算子进行分类:无输出. HDFS. Scala集合和数据类型. 无输出 foreach 对RDD中的每一个元素都应用f函数操作,不返回RDD和Array,而是返回Uint. 图中.foreach算子通过用户自己定义函数对每一个数据项进行操作. 本例中自己定义函数为println,控制台打印全部数据项. 源代码: /** * Applie

【Spark】RDD操作详解4——Action算子

本质上在Actions算子中通过SparkContext执行提交作业的runJob操作,触发了RDD DAG的执行. 根据Action算子的输出空间将Action算子进行分类:无输出. HDFS. Scala集合和数据类型. 无输出 foreach 对RDD中的每个元素都应用f函数操作,不返回RDD和Array,而是返回Uint. 图中,foreach算子通过用户自定义函数对每个数据项进行操作. 本例中自定义函数为println,控制台打印所有数据项. 源码: /** * Applies a f

大数据笔记(二十九)——RDD简介、特性及常用算子

1.什么是RDD? 最核心 (*)弹性分布式数据集,Resilent distributed DataSet (*)Spark中数据的基本抽象 (*)结合源码,查看RDD的概念 RDD属性 * Internally, each RDD is characterized by five main properties: * * - A list of partitions 一组分区,把数据分成了的不同的分区,每个分区可能运行在不同的worker * - A function for computi

Spark常用的transformation算子

1.map 和 mapPartitions map的输入变换函数应用于RDD中所有元素,而mapPartitions应用于所有分区.区别于mapPartitions主要在于调用粒度不同.mapPartition可以倒过来理解,先partition,再把每个partition进行map函数, 适用场景: 如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的多. val numbers: RDD[Int] = sc.parallelize(seqs,3) //ma

Spark Transformation 算子

Java版 package com.huanfion.Spark; import com.sun.tools.internal.ws.processor.model.java.JavaParameter; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.