Spark-Dependency

1、Spark中采用依赖关系(Dependency)表示rdd之间的生成关系。Spark可利用Dependency计算出失效的RDD。在每个RDD中都存在一个依赖关系的列表

  private var dependencies_ : Seq[Dependency[_]] = null

用以记录各rdd中各partition的parent partition。

2、Spark中存在两类Dependency:

1)NarrowDependency表示的是一个父partition仅对应于一个子partition。这样的依赖关系是不需要shuffle的。在这类依赖中,可以根据getParents方法获取某个partition的父partitions:

/**
 * :: DeveloperApi ::
 * Base class for dependencies where each partition of the parent RDD is used by at most one
 * partition of the child RDD.  Narrow dependencies allow for pipelined execution.
 */
@DeveloperApi
abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
  /**
   * 唯一的接口,获得该partition的所有parent partition
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
   */
  def getParents(partitionId: Int): Seq[Int]
}

这类又可分为:

a、OneToOneDependency:表示一一对应的依赖关系,由于在这种依赖中父partition与子partition Id是一致的,所以getParents直接原样返回。对应的转换操作有map和filter

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  /**
   * 其实partitionId就是partition在RDD中的序号, 所以如果是一一对应, 那么parent和child中的partition的序号应该是一样的
   */
  override def getParents(partitionId: Int) = List(partitionId)//原样返回
}

b、PruneDependency(org.apache.spark.rdd.PartitionPruningRDDPartition):未详

/**
 * Represents a dependency between the PartitionPruningRDD and its parent. In this
 * case, the child RDD contains a subset of partitions of the parents'.
 */
private[spark] class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
  extends NarrowDependency[T](rdd) {

  @transient
  val partitions: Array[Partition] = rdd.partitions
    .filter(s => partitionFilterFunc(s.index)).zipWithIndex
    .map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }

  override def getParents(partitionId: Int) = {
    List(partitions(partitionId).asInstanceOf[PartitionPruningRDDPartition].parentSplit.index)
  }
}

c、RangeDependency:这种是父rdd的连续多个partitions对应子rdd中的连续多个partitions,对应的转换有union

/**Union
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
 * @param rdd the parent RDD
 * @param inStart the start of the range in the parent RDD parent RDD中区间的起始点
 * @param outStart the start of the range in the child RDD child RDD中区间的起始点
 * @param length the length of the range
 */
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int) = {
    if (partitionId >= outStart && partitionId < outStart + length) {//判断partitionId的合理性,必须在child RDD的合理partition范围
      List(partitionId - outStart + inStart)//算出parent RDD中对应的partition id
    } else {
      Nil
    }
  }
}

2)WideDependency:这种依赖是指一个父partition可以对应子rdd中多个partitions。由于需要对父partition进行划分,故需要用到shuffle,而shuffle一般是采用键值对的。

这里为每个shuffle分配了一个全局唯一的shuffleId。为了进行shuffle,需要指定如何进行shuffle,这对应于参数partitioner;由于shuffle是需要网络传输的,故需要进行序列化Serializer。在宽依赖中并无法获得partition对应的parent partitions?

/**
 * :: DeveloperApi ::
 * Represents a dependency on the output of a shuffle stage.
 * @param rdd the parent RDD
 * @param partitioner partitioner used to partition the shuffle output
 * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to null,
 *                   the default serializer, as specified by `spark.serializer` config option, will
 *                   be used.
 */
@DeveloperApi
class ShuffleDependency[K, V](
    @transient rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,//需要给出partitioner, 指示如何完成shuffle
    val serializer: Serializer = null)//shuffle不象map可以在local进行, 往往需要网络传输或存储, 所以需要serializerClass
  extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {

  val shuffleId: Int = rdd.context.newShuffleId()//每个shuffle需要分配一个全局的id, context.newShuffleId()的实现就是把全局id累加

  rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

Spark-Dependency

时间: 2024-10-10 23:53:05

Spark-Dependency的相关文章

分别用Eclipse和IDEA搭建Scala+Spark开发环境

开发机器上安装jdk1.7.0_60和scala2.10.4,配置好相关环境变量.网上资料很多,安装过程忽略.此外,Eclipse使用Luna4.4.1,IDEA使用14.0.2版本. 1. Eclipse开发环境搭建 1.1. 安装scala插件 安装eclipse-scala-plugin插件,下载地址http://scala-ide.org/download/prev-stable.html 解压缩以后把plugins和features复制到eclipse目录,重启eclipse以后即可.

Spark编程环境搭建(基于Intellij IDEA的Ultimate版本)

为什么,我要在这里提出要用Ultimate版本. IDEA Community(社区版)再谈之无奈之下还是去安装旗舰版 IntelliJ IDEA的黑白色背景切换(Ultimate和Community版本皆通用) 使用 IntelliJ IDEA 导入 Spark 最新源码及编译 Spark 源代码 IDEA里如何多种方式打jar包,然后上传到集群 IntelliJ IDEA(Community版本)的下载.安装和WordCount的初步使用(本地模式和集群模式) IntelliJ IDEA(U

Spark Streaming编程示例

近期也有开始研究使用spark streaming来实现流式处理.本文以流式计算word count为例,简单描述如何进行spark streaming编程. 1. 依赖的jar包 参考<分别用Eclipse和IDEA搭建Scala+Spark开发环境>一文,pom.xml中指定依赖库spark-streaming_2.10.jar. <dependency> <groupId>org.scala-lang</groupId> <artifactId&

在 Azure HDInsight 中安装和使用 Spark

Spark本身用Scala语言编写,运行于Java虚拟机(JVM).只要在安装了Java 6以上版本的便携式计算机或者集群上都可以运行spark.如果您想使用Python API需要安装Python解释器(2.6或者更高版本),请注意Spark暂不支持Python 3. 下载Spark 首先下载Spark并解压,我们从下载预编译版本的Spark开始.在浏览器中访问 http://spark.apache.org/down loads.html 选择"Pre-built for Hadoop 2.

Spark学习笔记5:Spark集群架构

Spark的一大好处就是可以通过增加机器数量并使用集群模式运行,来扩展计算能力.Spark可以在各种各样的集群管理器(Hadoop YARN , Apache Mesos , 还有Spark自带的独立集群管理器)上运行,所以Spark应用既能够适应专用集群,又能用于共享的云计算环境. Spark运行时架构 Spark在分布式环境中的架构如下图: 在分布式环境下,Spark集群采用的是主/从结构.在Spark集群,驱动器节点负责中央协调,调度各个分布式工作节点.执行器节点是工作节点,作为独立的Ja

Apache Spark RDD之RDD的转换

RDD的转换 Spark会根据用户提交的计算逻辑中的RDD的转换和动作来生成RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG.接下来以“Word Count”为例,详细描述这个DAG生成的实现过程. Spark Scala版本的Word Count程序如下: 1: val file = spark.textFile("hdfs://...") 2: val counts = file.flatMap(line => line.split(" "))

spark mllib配置pom.xml错误 Multiple markers at this line Could not transfer artifact net.sf.opencsv:opencsv:jar:2.3 from/to central (https://repo.maven.apache.org/maven2): repo.maven.apache.org

刚刚spark mllib,在maven repository网站http://mvnrepository.com/中查询mllib后得到相关库的最新dependence为: <dependency>        <groupId>org.apache.spark</groupId>        <artifactId>spark-mllib-local_2.11</artifactId>        <version>2.1.

Apache Spark RDD初谈3

RDD的转换和DAG的生成 Spark会根据用户提交的计算逻辑中的RDD的转换和动作来生成RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG.接下来以“Word Count”为例,详细描述这个DAG生成的实现过程. Spark Scala版本的Word Count程序如下: 1: val file = spark.textFile("hdfs://...") 2: val counts = file.flatMap(line => line.split(" &

Spark RDD Action 简单用例(一)

collectAsMap(): Map[K, V] 返回key-value对,key是唯一的,如果rdd元素中同一个key对应多个value,则只会保留一个./** * Return the key-value pairs in this RDD to the master as a Map. * * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only * on

【甘道夫】Java Hello World on Spark

引言 通过Java编写Spark应用程序的HelloWorld,虽然有点寒碜,没用Scala简洁明了,但还是得尝试和记录下. 环境 Windows7 Eclipse+Maven Jdk1.7 Ubuntu 14.04 步骤一:在eclipse中创建maven工程,过程很简单,不详述. pom文件为: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/X