spark源码阅读 RDDs

RDDs弹性分布式数据集

spark就是实现了RDDs编程模型的集群计算平台。有很多RDDs的介绍,这里就不仔细说了,这儿主要看源码。

abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {

相关类

Dependency

宽依赖和窄依赖两种。Denpendency类中主要保存父RDD,根据partition id获得所依赖的父RDD partitions列表。

abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

/**
 * :: DeveloperApi ::
 * Base class for dependencies where each partition of the child RDD depends on a small number
 * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
 */
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  /**
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
   */
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd
}

/**
 * :: DeveloperApi ::
 * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
 * the RDD is transient since we don‘t need it on the executor side.
 *
 * @param _rdd the parent RDD
 * @param partitioner partitioner used to partition the shuffle output
 * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
 *                   the default serializer, as specified by `spark.serializer` config option, will
 *                   be used.
 * @param keyOrdering key ordering for RDD‘s shuffles
 * @param aggregator map/reduce-side aggregator for RDD‘s shuffle
 * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
 */
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Option[Serializer] = None,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {

  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  // Note: It‘s possible that the combiner class tag is null, if the combineByKey
  // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.size, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between partitions of the parent and child RDDs.
 */
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
 * @param rdd the parent RDD
 * @param inStart the start of the range in the parent RDD
 * @param outStart the start of the range in the child RDD
 * @param length the length of the range
 */
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

主要成员

/** sparkContext */
def sparkContext: SparkContext = sc
/** 唯一标识的RDD id */
val id: Int = sc.newRddId()
/** name */
@transient var name: String = null
/** 依赖列表 */
protected def getDependencies: Seq[Dependency[_]] = deps
/** 分区列表 */
protected def getPartitions: Array[Partition]
/** 子类可选重写,优先存储的位置 */
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
/** 分区器 */
@transient val partitioner: Option[Partitioner] = None
/** 指定怎么计算的到RDD的分区 */
def compute(split: Partition, context: TaskContext): Iterator[T]

主要方法

时间: 2024-09-29 04:37:00

spark源码阅读 RDDs的相关文章

[Apache Spark源码阅读]天堂之门——SparkContext解析

稍微了解Spark源码的人应该都知道SparkContext,作为整个Project的程序入口,其重要性不言而喻,许多大牛也在源码分析的文章中对其做了很多相关的深入分析和解读.这里,结合自己前段时间的阅读体会,与大家共同讨论学习一下Spark的入口对象—天堂之门—SparkContex. SparkContex位于项目的源码路径\spark-master\core\src\main\scala\org\apache\spark\SparkContext.scala中,源文件包含Classs Sp

【Spark】配置Spark源码阅读环境

Scala构建工具(SBT)的使用 SBT介绍 SBT是Simple Build Tool的简称,如果读者使用过Maven,那么可以简单将SBT看做是Scala世界的Maven,虽然二者各有优劣,但完成的工作基本是类似的. 虽然Maven同样可以管理Scala项目的依赖并进行构建,但SBT的某些特性却让人如此着迷,比如: 使用Scala作为DSL来定义build文件(one language rules them all); 通过触发执行(trigger execution)特性支持持续的编译与

Spark源码阅读笔记之Broadcast(一)

Spark源码阅读笔记之Broadcast(一) Spark会序列化在各个任务上使用到的变量,然后传递到Executor中,由于Executor中得到的只是变量的拷贝,因此对变量的改变只在该Executor有效.序列化后的任务的大小是有限制的(由spark.akka.frameSize决定,值为其减去200K,默认为10M-200K),spark会进行检查,超出该限制的任务会被抛弃.因此,对于需要共享比较大的数据时,需要使用Broadcast. Spark实现了两种传输Broadcast的机制:

第3课 Scala函数式编程彻底精通及Spark源码阅读笔记

本课内容: 1:scala中函数式编程彻底详解 2:Spark源码中的scala函数式编程 3:案例和作业 函数式编程开始: def fun1(name: String){ println(name) } //将函数名赋值给一个变量,那么这个变量就是一个函数了. val fun1_v = fun1_ 访问 fun1_v("Scala") 结果:Scala 匿名函数:参数名称用 => 指向函数体 val fun2=(content: String) => println(co

Spark源码阅读(1): Stage划分

Spark中job由action动作生成,那么stage是如何划分的呢?一般的解答是根据宽窄依赖划分.那么我们深入源码看看吧 一个action 例如count,会在多次runJob中传递,最终会到一个函数 dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)dagScheduler是DAGScheduler的一个实例,因此,后面的工作都交给DAGSchedul

Spark修炼之道(高级篇)——Spark源码阅读:第一节 Spark应用程序提交流程

作者:摇摆少年梦 微信号: zhouzhihubeyond spark-submit 脚本应用程序提交流程 在运行Spar应用程序时,会将spark应用程序打包后使用spark-submit脚本提交到Spark中运行,执行提交命令如下: root@sparkmaster:/hadoopLearning/spark-1.5.0-bin-hadoop2.4/bin# ./spark-submit --master spark://sparkmaster:7077 --class SparkWordC

Spark修炼之道(高级篇)——Spark源码阅读:第十节 Standalone运行模式解析

Spark Standalone采用的是Master/Slave架构,主要涉及到的类包括: 类:org.apache.spark.deploy.master.Master 说明:负责整个集群的资源调度及Application的管理. 消息类型: 接收Worker发送的消息 1. RegisterWorker 2. ExecutorStateChanged 3. WorkerSchedulerStateResponse 4. Heartbeat 向Worker发送的消息 1. Registered

Spark修炼之道(高级篇)——Spark源码阅读:第二节 SparkContext的创建

博文推荐:http://blog.csdn.net/anzhsoft/article/details/39268963,由大神张安站写的Spark架构原理,使用Spark版本为1.2,本文以Spark 1.5.0为蓝本,介绍Spark应用程序的执行流程. 本文及后面的源码分析都以下列代码为样板 import org.apache.spark.{SparkConf, SparkContext} object SparkWordCount{ def main(args: Array[String])

Spark修炼之道(高级篇)——Spark源码阅读:第八节 Task执行

Task执行 在上一节中,我们提到在Driver端CoarseGrainedSchedulerBackend中的launchTasks方法向Worker节点中的Executor发送启动任务命令,该命令的接收者是CoarseGrainedExecutorBackend(Standalone模式),类定义源码如下: private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, driverUrl: St