RDDs弹性分布式数据集
spark就是实现了RDDs编程模型的集群计算平台。有很多RDDs的介绍,这里就不仔细说了,这儿主要看源码。
abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging {
相关类
Dependency
宽依赖和窄依赖两种。Denpendency类中主要保存父RDD,根据partition id获得所依赖的父RDD partitions列表。
abstract class Dependency[T] extends Serializable { def rdd: RDD[T] } /** * :: DeveloperApi :: * Base class for dependencies where each partition of the child RDD depends on a small number * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution. */ @DeveloperApi abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { /** * Get the parent partitions for a child partition. * @param partitionId a partition of the child RDD * @return the partitions of the parent RDD that the child partition depends upon */ def getParents(partitionId: Int): Seq[Int] override def rdd: RDD[T] = _rdd } /** * :: DeveloperApi :: * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle, * the RDD is transient since we don‘t need it on the executor side. * * @param _rdd the parent RDD * @param partitioner partitioner used to partition the shuffle output * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None, * the default serializer, as specified by `spark.serializer` config option, will * be used. * @param keyOrdering key ordering for RDD‘s shuffles * @param aggregator map/reduce-side aggregator for RDD‘s shuffle * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine) */ @DeveloperApi class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Option[Serializer] = None, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false) extends Dependency[Product2[K, V]] { override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]] private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName // Note: It‘s possible that the combiner class tag is null, if the combineByKey // methods in PairRDDFunctions are used instead of combineByKeyWithClassTag. private[spark] val combinerClassName: Option[String] = Option(reflect.classTag[C]).map(_.runtimeClass.getName) val shuffleId: Int = _rdd.context.newShuffleId() val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( shuffleId, _rdd.partitions.size, this) _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) } /** * :: DeveloperApi :: * Represents a one-to-one dependency between partitions of the parent and child RDDs. */ @DeveloperApi class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = List(partitionId) } /** * :: DeveloperApi :: * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs. * @param rdd the parent RDD * @param inStart the start of the range in the parent RDD * @param outStart the start of the range in the child RDD * @param length the length of the range */ @DeveloperApi class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = { if (partitionId >= outStart && partitionId < outStart + length) { List(partitionId - outStart + inStart) } else { Nil } } }
主要成员
/** sparkContext */ def sparkContext: SparkContext = sc /** 唯一标识的RDD id */ val id: Int = sc.newRddId() /** name */ @transient var name: String = null /** 依赖列表 */ protected def getDependencies: Seq[Dependency[_]] = deps /** 分区列表 */ protected def getPartitions: Array[Partition] /** 子类可选重写,优先存储的位置 */ protected def getPreferredLocations(split: Partition): Seq[String] = Nil /** 分区器 */ @transient val partitioner: Option[Partitioner] = None /** 指定怎么计算的到RDD的分区 */ def compute(split: Partition, context: TaskContext): Iterator[T]
主要方法
时间: 2024-09-29 04:37:00