窄依赖与宽依赖&stage的划分依据

RDD根据对父RDD的依赖关系,可分为窄依赖与宽依赖2种。 
主要的区分之处在于父RDD的分区被多少个子RDD分区所依赖,如果一个就为窄依赖,多个则为宽依赖。更好的定义应该是: 
窄依赖的定义是子RDD的每一个分区都依赖于父RDD的一个或者少量几个分区(不依赖于全部分区)

与依赖相关的以下5个类:

Dependency
<--NarrowDependency
    <--OneToOneDependency
    <--RangeDependency
<--ShuffleDependency

它们全部在同一个Scala文件中,Dependency是一个abstract class, NarrowDependency(abstract class)与ShuffleDependency直接继承与它,OneToOneDependency与RangeDependency继承自NarrowDependency,大致如上图所示。

因此,关于Dependency的真正实现有三个,2个窄依赖:OneToOneDependency与RangeDependency,一个宽依赖:ShuffleDependency。

(一)Dependency

Dependency是一个抽象类,所有的依赖相关的类都必须继承自它。Dependency只有一个成员变量,表示的是父RDD。

/**
 * :: DeveloperApi ::
 * Base class for dependencies.
 */
@DeveloperApi
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

(一)窄依赖

1、NarrowDependency

看看代码中对NarrowDependency的说明:

Base class for dependencies where each partition of the child RDD depends on a small number of partitions of the parent RDD. Narrow dependencies allow for pipelined execution。 
即窄依赖的定义应该是子RDD的每一个分区都依赖于父RDD的一个或者少量几个分区(不依赖于全部分区)。

/**
 * :: DeveloperApi ::
 * Base class for dependencies where each partition of the child RDD depends on a small number
 * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
 */
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  /**
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
   */
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd
}

getParents根据子RDD的分区ID返回父RDD的分区ID。

主构建函数中的rdd是父RDD,下同。

2、OneToOneDependency

一对一依赖,即每个子RDD的分区的与父RDD的分区一一对应。

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between partitions of the parent and child RDDs.
 */
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

重写了NarrowDependency的getParents方法,返回一个List,这个List只有一个元素,且与子RDD的分区ID相同。即子分区的ID与父分区的ID一一对应且相等。

3、RangeDependency

子RDD中的每个分区依赖于父RDD的几个分区,而父RDD的每个分区仅补一个子RDD分区所依赖,即多对一的关系。它仅仅被UnionRDD所使用。

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
 * @param rdd the parent RDD
 * @param inStart the start of the range in the parent RDD
 * @param outStart the start of the range in the child RDD
 * @param length the length of the range
 */
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

(二)宽依赖

宽依赖只有一种:shuffleDependency,即子RDD依赖于父RDD的所有分区,父RDD的分每个区被所有子RDD的分区所依赖。

/**
 * :: DeveloperApi ::
 * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
 * the RDD is transient since we don‘t need it on the executor side.
 *
 * @param _rdd the parent RDD
 * @param partitioner partitioner used to partition the shuffle output
 * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set
 *                   explicitly then the default serializer, as specified by `spark.serializer`
 *                   config option, will be used.
 * @param keyOrdering key ordering for RDD‘s shuffles
 * @param aggregator map/reduce-side aggregator for RDD‘s shuffle
 * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
 */
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {

  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  // Note: It‘s possible that the combiner class tag is null, if the combineByKey
  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

(三)stage的划分

DAG根据宽依赖来划分stage,每个宽依赖的处理均会是一个stage的划分点。同一个stage中的多个操作会在一个task中完成。因为子RDD的分区仅依赖于父RDD的一个分区,因此这些步骤可以串行执行。

时间: 2025-01-11 06:46:53

窄依赖与宽依赖&stage的划分依据的相关文章

RDD Join中宽依赖与窄依赖的判断

1.规律 如果JoinAPI之前被调用的RDD API是宽依赖(存在shuffle), 而且两个join的RDD的分区数量一致,join结果的rdd分区数量也一样,这个时候join api是窄依赖 除此之外的,rdd 的join api是宽依赖 2.测试程序 1 package com.ibeifeng.senior.join 2 3 import org.apache.spark.{SparkConf, SparkContext} 4 5 /** 6 * RDD数据Join相关API讲解 7

Spark RDD 的宽依赖和窄依赖 -- (视频笔记)

窄依赖 narrow dependency map,filter,union , join(co-partitioned)制定了父RDD中的分片具体交给哪个唯一的子RDD 并行的,RDD分片是独立的. 只依赖相同ID的分片 range分片 one to dependency range dependency 内部可以previously computed partition 可以将计算合并,可以极大的提升效率,编写的时候可能是多个函数,执行的时候合并成一个函数,极大的减少了零碎内存或磁盘资源.

Maven依赖传递、依赖传递排除、依赖冲突

转载请注明原文地址:http://www.cnblogs.com/ygj0930/p/6628429.html  一:Maven依赖传递 假如有Maven项目A,项目B依赖A,项目C依赖B.那么我们可以说 C依赖A.也就是说,依赖的关系为:C->B->A. 那么我们执行项目C时,会自动把B.A都下载导入到C项目的jar包文件夹中. 这就是依赖的传递性. 二:依赖传递的排除 如上,C->B->A.加入现在不想执行C时把A下载进来,那么我们可以用 <exclusions>标

【1】Dll依赖与被依赖关系查看工具

[1]工具主界面 [2]工具的功能介绍 2.1 查看Dll依赖的Dll列表(一级关系,如果依赖的Dll不在该目录下,则不显示到右侧) 如: 选择Dll的目录,软件自动获取该目录的Dll列表,并显示到左侧.选择左侧某个特定的Dll,右侧会显示该Dll依赖或被依赖的Dll列表. 2.2 查看Dll被依赖的Dll列表(即依赖此Dll的Dll列表.一级关系,如果被依赖的Dll不在该目录下,则不显示到右侧) [2]下载地址 1.0版本: 

依赖注入和依赖注入容器

http://www.digpage.com/di.html#di 为了降低代码耦合程度,提高项目的可维护性,Yii采用多许多当下最流行又相对成熟的设计模式,包括了依赖注入(Denpdency Injection, DI)和服务定位器(Service Locator)两种模式. 关于依赖注入与服务定位器, Inversion of Control Containers and the Dependency Injection pattern 给出了很详细的讲解,这里结合Web应用和Yii具体实现

TestNg依赖高级用法之强制依赖与顺序依赖------TestNg依赖详解(二)

TestNg使用dependsOnGroups属性来进行依赖测试, 测试方法依赖于某个或某些方法,这个/这些方法作为前置依赖条件 强制依赖:如果被依赖的某一个方法发生了异常,那么之后的方法都不会被执行(默认) 顺序依赖:无论被依赖的方法是否出现异常,后续的方法都会被执行,通过alwaysRun="true"来配置 [java] view plain copy /** * * <p> * Title: TestngDependOnGroups * </p> * *

[Maven实战](8)依赖配置与依赖范围

 1. 依赖配置 依赖基本配置: <project> <dependencies> <dependency> <groupId>...</groupId> <artifactId>...</artifactId> <version>...</version> <exclusions> <exclusion> <groupId>...</groupId>

MVC缓存依赖:文件依赖

<1>所谓缓存依赖,文件依赖就是指,我不我没有设置缓存的过期时间,当缓存依赖的文件内容发生改变的时候,就通知framework清空缓存.然后重数据库中取数据(或者文件中取数据)然后又把取到是数据缓存起来,用户请求的时候,直接从缓存中取数据,如果缓存依赖的文件内容又发生改变,就又清空,周而复始.. using System; using System.Collections.Generic; using System.Linq; using System.Web; using System.We

TestNg依赖详解(一)------简单的依赖(单一方法依赖)

TestNg依赖测试之简单方法依赖,通过dependsOnMethods属性来配置依赖方法 Java code: /** * * <p> * Title: TestngDependencies * </p> * * <p> * Description: Testng提供了两种依赖实现 * * 1.强制依赖:某个测试用例之前需要执行的依赖链中如果有一个失败,那么接下来所有的测试都不会被执行 * 2.顺序依赖(软依赖):顺序依赖的用处更多是用来检测一个测试链是否按照正确的顺