spark 源码分析之一 -- RDD的四种依赖关系

 RDD的四种依赖关系

  RDD四种依赖关系,分别是 ShuffleDependency、PrunDependency、RangeDependency和OneToOneDependency四种依赖关系。如下图所示:org.apache.spark.Dependency有两个一级子类,分别是 ShuffleDependency 和 NarrowDependency。其中,NarrowDependency 是一个抽象类,它有三个实现类,分别是OneToOneDependency、RangeDependency和 PruneDependency。

  

 RDD的窄依赖

  我们先来看窄RDD是如何确定依赖的父RDD的分区的呢?NarrowDependency 定义了一个抽象方法,如下:

/**
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
   */
  def getParents(partitionId: Int): Seq[Int]

  其输入参数是子RDD 的 分区Id,输出是子RDD 分区依赖的父RDD 的 partition 的 id 序列。

  下面,分别看三种子类的实现:

  OneToOneDependency

  首先,OneToOneDependency的getParent实现如下:

override def getParents(partitionId: Int): List[Int] = List(partitionId)

  就一行代码,实现比较简单,子RDD对应的partition index 跟父 RDD 的partition 的 index 一样。相当于父RDD 的 每一个partition 复制到 子RDD 的对应分区中,分区的关系是一对一的。RDD的关系也是一对一的。

  RangeDependency

  其次,RangeDependency的 getParent 实现如下:

  

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
 * @param rdd the parent RDD
 * @param inStart the start of the range in the parent RDD
 * @param outStart the start of the range in the child RDD
 * @param length the length of the range
 */
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

  首先解释三个变量:inStart:父RDD  range 的起始位置;outStart:子RDD range 的起始位置;length:range 的长度。

  获取 父RDD 的partition index 的规则是:如果子RDD 的 partition index 在父RDD 的range 内,则返回的 父RDD partition是 子RDD partition index - 父 RDD 分区range 起始 + 子RDD 分区range 起始。其中,(- 父 RDD 分区range 起始 + 子RDD 分区range 起始)即 子RDD 的分区的 range 起始位置和 父RDD 的分区的 range 的起始位置 的相对距离。子RDD 的 parttion index 加上这个相对距离就是 对应父的RDD partition。否则是无依赖的父 RDD 的partition index。父子RDD的分区关系是一对一的。RDD 的关系可能是一对一(length 是1 ,就是特殊的 OneToOneDependency),也可能是多对一,也可能是一对多。

  PruneDependency

最后,PruneDependency的 getParent 实现如下:

 1 /**
 2  * Represents a dependency between the PartitionPruningRDD and its parent. In this
 3  * case, the child RDD contains a subset of partitions of the parents‘.
 4  */
 5 private[spark] class PruneDependency[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean)
 6   extends NarrowDependency[T](rdd) {
 7
 8   @transient
 9   val partitions: Array[Partition] = rdd.partitions
10     .filter(s => partitionFilterFunc(s.index)).zipWithIndex
11     .map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }
12
13   override def getParents(partitionId: Int): List[Int] = {
14     List(partitions(partitionId).asInstanceOf[PartitionPruningRDDPartition].parentSplit.index)
15   }
16 }

  首先,解释三个变量: rdd 是指向父RDD 的实例引用;partitionFilterFunc 是一个回调函数,作用是过滤出符合条件的父 RDD 的 partition 集合;PartitionPruningRDDPartition类声明如下:

private[spark] class PartitionPruningRDDPartition(idx: Int, val parentSplit: Partition)
  extends Partition {
  override val index = idx
}

  partitions的生成过程如下: 先根据父RDD 引用获取父RDD 对应的 partition集合,然后根据过滤函数和partition index ,过滤出想要的父RDD 的 partition 集合并且从0 开始编号,最后,根据父RDD 的 partition 和 新编号实例化新的PartitionPruningRDDPartition实例,并放入到 partitions 集合中,相当于是先对parent RDD 的分区做Filter 剪枝操作。

  在getParent 方法中, 先根据子RDD 的 partition index 获取 到对应的 parent RDD 的对应分区,然后获取Partition 的成员函数 index,该index 就是 父RDD 的 partition 在父RDD 的所有分区中的 index。 子RDD partition 和 父RDD partition的关系是 一对一的, 父RDD 和子RDD 的关系是 多对一,也可能是一对多,也可能是一对一。

  简言之,在窄依赖中,子RDD 的partition 和 父RDD 的 partition 的关系是 一对一的。

 RDD 的宽依赖

  下面重点看 ShuffleDependency,ShuffleDependency代表的是 一个 shuffle stage 的输出。先来看其构造方法,即其依赖的变量或实例:

1 @DeveloperApi
2 class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
3     @transient private val _rdd: RDD[_ <: Product2[K, V]],
4     val partitioner: Partitioner,
5     val serializer: Serializer = SparkEnv.get.serializer,
6     val keyOrdering: Option[Ordering[K]] = None,
7     val aggregator: Option[Aggregator[K, V, C]] = None,
8     val mapSideCombine: Boolean = false)
9   extends Dependency[Product2[K, V]]

  其中,_rdd 代指父RDD实例;partitioner是用于给shuffle的输出分区的分区器;serializer,主要用于序列化,默认是org.apache.spark.serializer.JavaSerializer,可以通过`spark.serializer` 参数指定;keyOrdering RDD shuffle的key 的顺序。aggregator,map或reduce 端用于RDD shuffle的combine聚合器;mapSideCombine 是否执行部分的聚合(即 map端的预聚合,可以提高网络传输效率和reduce 端的执行效率),默认是false。因为并不是所有的都适合这样做。比如求全局平均值,均值,平方差等,但像全局最大值,最小值等是适合用mapSideCombine 的。注意,当mapSideCombine 为 true时, 必须设置combine聚合器,因为 shuffle 前需要使用聚合器做 map-combine 操作。

  partitioner的7种实现

  partitioner 定义了 RDD 里的key-value 对 是如何按 key 来分区的。映射每一个 key 到一个分区 id,从 0 到 分区数 - 1; 注意,分区器必须是确定性的,即给定同一个 key,必须返回同一个分区,便于任务失败时,追溯分区数据,确保了每一个要参与计算的分区数据的一致性。即 partition 确定了 shuffle 过程中 数据是要流向哪个具体的分区的。

  org.apache.spark.Partition的 7 个实现类如下:

  

  我们先来看Partitioner 的方法定义:

1 abstract class Partitioner extends Serializable {
2   def numPartitions: Int
3   def getPartition(key: Any): Int
4 }

  其中,numPartitions 是返回 子RDD 的 partition 数量;getPartition 会根据指定的 key 返回 子RDD 的 partition index。

  HashPartitioner 的 getPartition 的 实现如下,思路是 key.hashcode() mod 子RDD的 partition 数量:

1 def getPartition(key: Any): Int = key match {
2     case null => 0
3     case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
4   }

  RangePartitioner 的 getPartition 的实现如下:

 1 def getPartition(key: Any): Int = {
 2     val k = key.asInstanceOf[K]
 3     var partition = 0
 4     if (rangeBounds.length <= 128) { // 不大于 128 分区
 5       // If we have less than 128 partitions naive search
 6       while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
 7         partition += 1
 8       }
 9     } else { // 大于 128 个分区数量
10       // Determine which binary search method to use only once.
11       partition = binarySearch(rangeBounds, k) // 二分查找
12       // binarySearch either returns the match location or -[insertion point]-1
13       if (partition < 0) {
14         partition = -partition-1
15       }
16       if (partition > rangeBounds.length) {
17         partition = rangeBounds.length
18       }
19     }
20     if (ascending) {
21       partition
22     } else {
23       rangeBounds.length - partition
24     }
25   }

  PythonPartitioner 的 getPartition 如下,跟hash 很相似:

1 override def getPartition(key: Any): Int = key match {
2     case null => 0
3     // we don‘t trust the Python partition function to return valid partition ID‘s so
4     // let‘s do a modulo numPartitions in any case
5     case key: Long => Utils.nonNegativeMod(key.toInt, numPartitions)
6     case _ => Utils.nonNegativeMod(key.hashCode(), numPartitions)
7   }

  PartitionIdPassthrough 的 getPartition 如下:

1 override def getPartition(key: Any): Int = key.asInstanceOf[Int]

  GridPartitioner 的 getPartition 如下,思想,二元组定位到网格的partition:

 1 override val numPartitions: Int = rowPartitions * colPartitions
 2
 3   /**
 4    * Returns the index of the partition the input coordinate belongs to.
 5    *
 6    * @param key The partition id i (calculated through this method for coordinate (i, j) in
 7    *            `simulateMultiply`, the coordinate (i, j) or a tuple (i, j, k), where k is
 8    *            the inner index used in multiplication. k is ignored in computing partitions.
 9    * @return The index of the partition, which the coordinate belongs to.
10    */
11   override def getPartition(key: Any): Int = {
12     key match {
13       case i: Int => i
14       case (i: Int, j: Int) =>
15         getPartitionId(i, j)
16       case (i: Int, j: Int, _: Int) =>
17         getPartitionId(i, j)
18       case _ =>
19         throw new IllegalArgumentException(s"Unrecognized key: $key.")
20     }
21   }
22
23   /** Partitions sub-matrices as blocks with neighboring sub-matrices. */
24   private def getPartitionId(i: Int, j: Int): Int = {
25     require(0 <= i && i < rows, s"Row index $i out of range [0, $rows).")
26     require(0 <= j && j < cols, s"Column index $j out of range [0, $cols).")
27     i / rowsPerPart + j / colsPerPart * rowPartitions
28   }

  包括匿名类,还有好多种,就不一一介绍了。总而言之,宽依赖是根据partitioner 确定 分区内的数据具体到哪个分区。

  至此,RDD 的窄依赖和宽依赖都介绍清楚了。

原文地址:https://www.cnblogs.com/johnny666888/p/11111957.html

时间: 2024-10-10 16:12:26

spark 源码分析之一 -- RDD的四种依赖关系的相关文章

Spark 源码分析系列

如下,是 spark 源码分析系列的一些文章汇总,持续更新中...... Spark RPC spark 源码分析之五--Spark RPC剖析之创建NettyRpcEnv spark 源码分析之六--Spark RPC剖析之Dispatcher和Inbox.Outbox剖析 spark 源码分析之七--Spark RPC剖析之RpcEndPoint和RpcEndPointRef剖析 spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClie

Spark源码分析之二:Job的调度模型与运行反馈

在<Spark源码分析之Job提交运行总流程概述>一文中,我们提到了,Job提交与运行的第一阶段Stage划分与提交,可以分为三个阶段: 1.Job的调度模型与运行反馈: 2.Stage划分: 3.Stage提交:对应TaskSet的生成. 今天,我们就结合源码来分析下第一个小阶段:Job的调度模型与运行反馈. 首先由DAGScheduler负责将Job提交到事件队列eventProcessLoop中,等待调度执行.入口方法为DAGScheduler的runJon()方法.代码如下: [jav

Spark源码分析之七:Task运行(一)

在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在Task调度逻辑的最后,CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的makeOffers()方法的最后,我们通过调用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq[Seq[Tas

Accuracy(准确率), Precision(精确率), 和F1-Measure, 结合Spark源码分析

例子 某大学一个系,总共100人,其中男90人,女10人,现在根据每个人的特征,预测性别 Accuracy(准确率) Accuracy=预测正确的数量需要预测的总数 计算 由于我知道男生远多于女生,所以我完全无视特征,直接预测所有人都是男生 我预测所的人都是男生,而实际有90个男生,所以 预测正确的数量 = 90 需要预测的总数 = 100 Accuracy = 90 / 100 = 90% 问题 在男女比例严重不均匀的情况下,我只要预测全是男生,就能获得极高的Accuracy. 所以在正负样本

Spark源码分析之八:Task运行(二)

在<Spark源码分析之七:Task运行(一)>一文中,我们详细叙述了Task运行的整体流程,最终Task被传输到Executor上,启动一个对应的TaskRunner线程,并且在线程池中被调度执行.继而,我们对TaskRunner的run()方法进行了详细的分析,总结出了其内Task执行的三个主要步骤: Step1:Task及其运行时需要的辅助对象构造,主要包括: 1.当前线程设置上下文类加载器: 2.获取序列化器ser: 3.更新任务状态TaskState: 4.计算垃圾回收时间: 5.反

Spark源码分析之四:Stage提交

各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交. Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示: 与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那么顺理成章的下一步便是Stage的提交.在han

Spark源码分析之五:Task调度(一)

在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段: 1.Job的调度模型与运行反馈: 2.Stage划分: 3.Stage提交:对应TaskSet的生成. Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分为一个个stage,并将每个stage转化为tasks的集合--TaskSet.

spark源码分析之Executor启动与任务提交篇

任务提交流程 概述 在阐明了Spark的Master的启动流程与Worker启动流程.接下继续执行的就是Worker上的Executor进程了,本文继续分析整个Executor的启动与任务提交流程 Spark-submit 提交一个任务到集群通过的是Spark-submit 通过启动脚本的方式启动它的主类,这里以WordCount为例子 spark-submit --class cn.itcast.spark.WordCount bin/spark-clas -> org.apache.spar

Spark源码分析之六:Task调度(二)

话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: [java] view plain copy // Make fake resource offers on all executors // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的) private def makeOffers