RDD的源码

RDD是一个抽象类定义了所有RDD共有的一些属性和方法,下面介绍了主要的属性和方法。

abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {

RDD有5个主要的属性

 *  - A list of partitions
 *  - A function for computing each split
 *  - A list of dependencies on other RDDs
 *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
 *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
 *    an HDFS file)

(一)子类

CoGroupedRDD, EdgeRDD, EdgeRDDImpl, HadoopRDD, JdbcRDD, NewHadoopRDD, PartitionPruningRDD, ShuffledRDD, UnionRDD, VertexRDD, VertexRDDImpl

(二)属性

1、SpackContext

@transient private var _sc: SparkContext

在主构建函数中定义,表示RDD所在运行环境,可用于获取配置,清理环境等。

2、Seq[Dependency[_]]

@transient private var deps: Seq[Dependency[_]]

定义了这个RDD对父RDD的依赖关系。

(三)方法

1、tranformation与action

RDD中定义了所有RDD所共用的tranformation与action,如map, filter, reduce, first等,举个filter的例子:

 def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
      this,
      (context, pid, iter) => iter.filter(cleanF),
      preservesPartitioning = true)
  }

2、缓存

包括pesist的多个实现及cache等,举个例子

 /**
   * Mark this RDD for persisting using the specified level.
   *
   * @param newLevel the target storage level
   * @param allowOverride whether to override any existing level with the new one
   */
  private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
    // TODO: Handle changes of StorageLevel
    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
      throw new UnsupportedOperationException(
        "Cannot change storage level of an RDD after it was already assigned a level")
    }
    // If this is the first time this RDD is marked for persisting, register it
    // with the SparkContext for cleanups and accounting. Do this only once.
    if (storageLevel == StorageLevel.NONE) {
      sc.cleaner.foreach(_.registerRDDForCleanup(this))
      sc.persistRDD(this)
    }
    storageLevel = newLevel
    this
  }
时间: 2024-11-03 22:22:29

RDD的源码的相关文章

Spark RDD类源码学习(未完)

每天进步一点点~开搞~ abstract class RDD[T: ClassTag]( //@transient 注解表示将字段标记为瞬态的 @transient private var _sc: SparkContext, // Seq是序列,元素有插入的先后顺序,可以有重复的元素. @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { if (classOf[RDD[_]]

第2课 Scala面向对象彻底精通及Spark源码SparkContext,RDD阅读总结

第2课:Scala面向对象彻底精通及Spark源码阅读本期内容:1 Scala中的类.object实战详解 2 Scala中的抽象类.接口实战详解 3 综合案例及Spark源码解析 一:定义类class HiScala{private var name = "Spark" def sayName(){println(name)}def getName = name} Scala中,变量与类中的方法是同等级的,可以直接赋值给方法. scala中的get与set与Java中的get,set

(版本定制)第15课:Spark Streaming源码解读之No Receivers彻底思考

hu本期内容: 1.Kafka解密 背景: 目前No Receivers在企业中使用的越来越多,No Receivers具有更强的控制度,语义一致性.No Receivers是我们操作数据来源自然方式,操作数据来源使用一个封装器,且是RDD类型的. 所以Spark Streaming就产生了自定义RDD –> KafkaRDD. 源码分析: 1.KafkaRDD源码 private[kafka]class KafkaRDD[K: ClassTag,V: ClassTag,U <: Decode

Spark SQL Catalyst源码分析之Physical Plan 到 RDD的具体实现

接上一篇文章Spark SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节: 我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD. lazy val toRdd: RDD[Row] = executedPlan.execute() Spark Plan基本包含4种操作类型,即BasicOperator基本类型,还有就是Join.Aggregate和Sort这种

Spark版本定制八:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考

本期内容: 1.DStream与RDD关系彻底研究 2.Streaming中RDD的生成彻底研究 一.DStream与RDD关系彻底研究 课前思考: RDD是怎么生成的? RDD依靠什么生成?根据DStream来的 RDD生成的依据是什么? Spark Streaming中RDD的执行是否和Spark Core中的RDD执行有所不同? 运行之后我们对RDD怎么处理? ForEachDStream不一定会触发Job的执行,但是它一定会触发job的产生,和Job是否执行没有关系: 对于DStream

Spark源码系列(五)RDD是如何被分布式缓存?

这一章想讲一下Spark的缓存是如何实现的.这个persist方法是在RDD里面的,所以我们直接打开RDD这个类. def persist(newLevel: StorageLevel): this.type = { // StorageLevel不能随意更改 if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { throw new UnsupportedOperationException("C

第七篇:Spark SQL 源码分析之Physical Plan 到 RDD的具体实现

/** Spark SQL源码分析系列文章*/ 接上一篇文章Spark SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节: 我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD. [java] view plain copy lazy val toRdd: RDD[Row] = executedPlan.execute() Spark Plan基本包含4种操作

RDD.scala(源码)

---- map. --- flatMap.fliter.distinct.repartition.coalesce.sample.randomSplit.randomSampleWithRange.takeSample.union.++.sortBy.intersection map源码 /** * Return a new RDD by applying a function to all elements of this RDD. */def map[U: ClassTag](f: T =

Spark 定制版:008~Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考

本讲内容: a. DStream与RDD关系的彻底的研究 b. Streaming中RDD的生成彻底研究 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上节课,我们重点给大家揭秘了JobScheduler内幕:可以说JobScheduler是整个Spark Streming的调度的核心,其地位相当于Spark Core中的DAGScheduler. JobScheduler是SparkStreaming 所有Job调度的中心,内部有两个重