关于RDD

RDD, Resilient Distributed Dataset,弹性分布式数据集, 是Spark的核心概念。

对于RDD的原理性的知识,可以参阅Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster ComputingAn Architecture for Fast and General Data Processing on Large Clusters 这两篇论文。

这篇文章用来记录一部分Spark对RDD实现的细节。

首先翻译一下RDD这个虚类的注释

RDD是一个分布式弹性数据集, RDD是Spark的基本抽象,代表了一个不可变的、分区的、可以用于并行计算的数据集。这个类包括了所有RDD共有的
基本操作,比如map,
filter, persist。另外

  • org.apache.spark.rdd.PairRDDFunctions包括了只能用于key-value对类型的RDD的操作,
    比如groupByKey和join。
  • org.apache.spark.rdd.DoubleRDDFunctions包括了只能用于Double类型RDD的操作,
  • org.apache.spark.rdd.SequenceFileRDDFunctions包括了能被保存为SequenceFile的RDD支持的操作。
    通过隐式转换,只要RDD的类型正确,相关的操作就自动可用。

在内部,每个RDD都由五个主要属性来表征:

  • 分区表(A list of partitions)
  • 一个用于计算每个split的函数
  • 对其它RDD的依赖
  • 可选: 用于键值对类型的RDD使用的Partitioner
  • 可选: 计算每个split时优先使用的location(+ 数据本地化, preferred locations)
    (比如一个HDFS文件的block的位置)。

Spark里所有的调度和执行都是依据这些方法,以此来允许每个RDD实现自己的方式来计算自己。用户可以覆盖这些方法来实现自己的RDD(比如,从一个新的存储系统中读取数据)。
新参考Spark
paper
来查看关于RDD内部机制的更多细节。

RDD的5个主要属性对应的代码主要为:

    • 分区   protected def getPartitions: Array[Partition] 以及 final def partitions: Array[Partition]
    • 计算每个partition   def compute(split: Partition, context: TaskContext): Iterator[T]
    • 对其它RDD的依赖   构造函数中的 deps: Seq[Dependency[_]] 以及 protected def getDependencies: Seq[Dependency[_]] = deps 以及 final def dependencies: Seq[Dependency[T]]
    • kv类型RDD的partitioner   @transient val partitioner: Option[Partitioner] = None
    • preferred location   protected def getPreferredLoations(split: Partition): Seeq[String] = Nil 以及 final def preferredLocations(split: Partition): Seq[String]

其中的这些final方法: partitions, dependencies, preferedLocations都是考虑了checkpoint的结果。可见,checkpoint机制会对这些属性有所改变。


以下是对于这个注释的内容的思考:

1. RDD把定语去掉了,就是数据集;但是Spark作为一个分布式计算的框架,“数据集的转换”与“数据集”都是不可缺少的。Spark并没有把transformation这个概念抽象成一个基类,在我们写rdd.filter(func1).map(func2)这样的语句的时候,得到的最终结果是一个RDD,而scheduler使用的也只是这个RDD,因此,func1和func2这样的转换操作,作为一种元信息,肯定被RDD记录,作为RDD的属性。具体的讲,转换操作的信息会被记录在RDD的第二个属性“一个用于计算每个split的函数”中。所以,RDD不仅是弹性分布式数据集,也包括了数据集之间进行转换所需要的函数。

2. RDD的第三个属性“对其它RDD的依赖”,提供了以下信息:

    a. 对这个RDD的父RDD的引用

    b. 这个RDD的每个partition跟父RDD的partition的映射关系。

假设有RDD X和RDD YX可以转换为Y, 即 X -> Y。这是一个链式的构造,要获得Y,需要X和->。 ->即是转换操作,被记录于第二个属性,那么X在何处呢?X即是Dependency, 是RDD的第三个属性。也就是说第二和第三个属性,使得RDD成为一个链式结构, X -> Y -> Z,知道Z,就可以上溯到作为源头的X,就能从X计算出Z来。这个就是为什么我们在最后一个RDD上调用action, Spark就可以开始执行,而不再需要提供其它的RDD。

下面看一下Spark对于以上两点具体的实现。

转换逻辑的存储

以常用的map操作为例  X -> Y, -> 在这里就是map。

  /**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

作为map参数的f和map的语义一起指明了从当前RDD到MapPartitionsRDD转换的逻辑。而这个逻辑,作为参数被传递给MapPartitionsRDD,即 (context, pid, iter) => iter.map(cleanF))。下面看一下MapPartitionsRDD是如何储存这个逻辑的。

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false)
  extends RDD[U](prev) {

  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

  override def getPartitions: Array[Partition] = firstParent[T].partitions

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))
}

注意它的compute方法,首先,它调用了f, f就是我们在RDD的map方法中传给MapPartitionsRDD构造器的函数。也就是说MapPartitionsRDD储存了从父RDD转换的逻辑, 即 ->

另外,注意compute方法中的 firstParent[T].iterator(split, context)。firstParent即是在map函数中传进来的this, 也就是MapPartitionsRDD的父RDD, 即X

-> 和 X就这样被储存在了Y, 即MapPartitionsRDD中。

关于Iterator

当compute方法被调用时,实际上会调用firstParent.iterator.map(cleanF)。那么此时,父RDD的迭代器会进行迭代和map计算吗?

答案是否,而且,可以看出Spark的RDD间的转换和Scala的迭代器间的转换是类似的,它们都可以认为是惰性的,即在x -> y中,储存了x和->,只有在需要计算时才会计算。

下面是scala.collection.Iterator的map方法的代码

  def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
    def hasNext = self.hasNext
    def next() = f(self.next())
  }

在这里被返回的Iterator相当于y, 而调用map的Iterator相当于x。y持有对x的引用"self", 也持有转换的函数f,这就使得x -> y的链是完备的,因此Iterator上的map, filter等操作也构成了一个链式结构。

由于Iterator的这种特性,使得RDD的计算过程构成一个由函数组成的管道,在不对中间RDD进行persist的操作时,初始RDD的每个元素经过所有转换函数的处理后,再开始处理第二个元素;而不是所有元素都经过第一个函数处理后,形成一个数据集,这个数据集再进行转换。

比如,有三个RDD, X -> Y -> Z,都是使用的map进行转换,所使用的函数依次为f和g。

那么Z的compute方法的调用过程就成为了X.iterator.map(f).map(g)。

依据Iterator的特点, Z的迭代器的hasNext方法会返回X.iterator.hasNext.hasNext, Z的迭代器的next方法会返回g(f(X.iterator.next))。

因此,在一系列转过程中的中间的RDD如果没有被persist, 是不会作为一个数据集存在的。

另外,需要注意

trait Iterator[+A] extends TraversableOnce[A]

注意这个TraversableOnce的含义。所以,在自己实现RDD时,需要确保compute方法被调用时,它所使用的父RDD的迭代器没有在其它地方被使用过,不然一个已经被迭代过的迭代器再次被使用时,可能不会返回所有元素,或者干脆就不能继续迭代了(俺就曾经在compute里加了条日志,记了下iteartor.size(), 就悲剧了)。

父子关系的存储

先看下RDD的主构造器

abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {

RDD的这个构造器展示了Dependency对于RDD定义的重要作用。 Dependency包含了这个RDD对其父RDD的依赖,这个依赖不仅包括其父RDD是什么,还包括子RDD的分区和父RDD的
分区之间的对应关系。

需要注意到,deps是一个Seq,
这说明单个的Dependency可能不足以描述父子RDD之间的依赖关系,得通过一系列的Dependency才能描述此关系。结合Dependency的定义,每个
Dependency只包含了一个父RDD的信息,但是一个RDD可能依赖多个RDD,所以这里用Seq[Dependency[_]]是有必要的 。

如果使用

class NarrowDependency[T](parent: RDD[T], deps: List[List[Int]]) extends Dependency[T]{
  override def _rdd: RDD[T] = parent

  def getDependency(partition: Int) = deps(partition)
}

这种定义。在Dependency中提供子RDD的每个分区所依赖的父RDD的分区,那么NarrowDependency和ShuffleDependency就都可以用这一种方式来定义。
但是,Spark中却把NarrowDependency和ShuffleDependency分开定义,是为了区分什么呢?

  • 或许是在NarrowDependency的定义中是定义的每个父RDD的分区被哪一个子RDD的分区依赖。
  • 或许是在ShuffleDependency中不仅要提供子RDD的每个分区的依赖,还要提供父RDD的每个分区被哪些子RDD的分区依赖,这样进行shuffle时,才好由父RDD
    的分区计算出对于不同子RDD分区的数据。

let us see see.

ShuffleDependency

之所以不像俺想的那样,是因为ShuffleDependency包括了与shuffle有关的更多的信息,这些信息包括:

  1. partitioner 决定父RDD的每个record进入哪个子RDD分区。同时,它包含了reduce的个数的信息。
  2. aggeragator 可选,对value进行聚合
  3. mapSideCombine 是否要在map侧调用aggeragator,这是一个布尔类型值
  4. keyOrdering 可选,决定key的顺序,用来对key排序。
  5. serializer ?可选,或许是用来对key-value做序列化的,现在不能确定

以上是构造函数里的信息,此外ShuffleDependency的方法也提供了一些信息:
*
shuffleId 还不确定有什么用
*
shuffleHandle 提供与shuffle有关的信息。目前只看到它的一个实现:
BaseShuffleHandler,构造器为(shuffleId, numTasks,
Dependency:[ShuffleDependency])
不确定其具体作用

这些信息被shuffle过程使用,具体怎么用,得看shuffle的实现。

NarrowDependency

而NarrowDependency包括的情况更少,因为如果用List[List[Int]]来表示NarrowDependency的话,会把NarrowDependency的范围括大,比如多对多的关系也能用这种形式来表示。
Spark的实现里,NarrowDependency是个abstract
class
,由不同的子类来应对具体的NarrowDependency的情况,每种情况用不同的方法来表示窄依赖。在NarrowDependency同
一个文件里,有两种NarrowDepdency的子类。在其它的RDD实现中,还有会其它的NarrowDependency,比如CoalescedRDD在一个匿名内部类里实现了自己的NarrowDependency。

  • OneToOneDependency 这种情况父RDD的分区跟子RDD的分区是一致的,每个子RDD分区依赖于同样索引号的父RDD的分区
  • RangeDependency 子RDD的一个分区依赖于父RDD的某个连续的分区段,比如0-3, 4-5这种。

其实现为:

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

可见,父RDD的index为partitionId的分区被同样index的子RDD的分区依赖,父子RDD的分区是一对一的关系

class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

它述描了子RDD的一些分区对父RDD的一些分区依赖关系,在父子RDD对应的分区间是OneToOne的关系,但这种关系只对父子RDD的一个区间有效。比如,
子RDD从index为2开始的分区,以OneToOne的关系依赖于父RDD从index为8开始的分区,这种依赖关系对于连续的3个分区有效,即(子2依赖父8),
(子3依赖父9),
(子4依赖父10)

在UnionRDD中会使用RangeDependency

总结:

RDD储存了DAG Scheduler进行调度所需的信息(比如可以在RDD链中寻找ShuffleDependency来划分Stage),也储存了生成目标RDD所需要的计算逻辑。也就是说RDD对于Spark这个框架,在某种程度上相当于元数据。可以看到,在driver往executor发送的作为task的字节数组中就包括了RDD。

在ShuffleMapTask中,反序列化后的taskBinary为:

    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](  //返回结果是(RDD, ShuffleDependency)
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

在ResultTask中,反序列化后的taskBinary为:

    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

可以看到,RDD始终是作为计算逻辑的主要携带者被传给executor。

而RDD能做到这些,就是因为它储存了所需的信息在自己的定义中, 前边分析了一部分其实现的细节。RDD这个类的实现有很长很长的代码,也有更多有意思的细节需要进一步看一下。

时间: 2024-10-10 08:19:50

关于RDD的相关文章

(3)RDD编程

1.RDD基础 弹性分布式数据集,简称RDD,是一个不可变的分布式对象集合.在Spark中,对数据的所有操作不外乎创建RDD,转化已有RDD以及调用RDD操作进行求值. 每一个RDD都被分为多个分区,这些分区运行在集群中的不同节点上,RDD可以包含Python,Java,Scala中任意类型的对象,甚至可以包含用户自定义对象. 用户可以使用两种方法创建RDD:读取一个外部数据集,或者在驱动程序里分发驱动器程序中的对象集合. 创建出来以后,RDD支持两种类型的操作:转化操作和行动操作.转化操作和行

spark2.x由浅入深深到底系列六之RDD java api详解三

学习任何spark知识点之前请先正确理解spark,可以参考:正确理解spark 本文详细介绍了spark key-value类型的rdd java api 一.key-value类型的RDD的创建方式 1.sparkContext.parallelizePairs JavaPairRDD<String, Integer> javaPairRDD =         sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3

spark2.x由浅入深深到底系列六之RDD java api详解四

学习spark任何的知识点之前,先对spark要有一个正确的理解,可以参考:正确理解spark 本文对join相关的api做了一个解释 SparkConf conf = new SparkConf().setAppName("appName").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD<Integer, Integer> javaPa

spark的数据结构 RDD——DataFrame——DataSet区别

转载自:http://blog.csdn.net/wo334499/article/details/51689549 RDD 优点: 编译时类型安全 编译时就能检查出类型错误 面向对象的编程风格 直接通过类名点的方式来操作数据 缺点: 序列化和反序列化的性能开销 无论是集群间的通信, 还是IO操作都需要对对象的结构和数据进行序列化和反序列化. GC的性能开销 频繁的创建和销毁对象, 势必会增加GC import org.apache.spark.sql.SQLContext import org

Spark弹性分布式数据集RDD

RDD(Resilient Distributed Dataset)是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现.RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现.RDD必须是可序列化的.RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作.这对于迭代运算比

RDD Join中宽依赖与窄依赖的判断

1.规律 如果JoinAPI之前被调用的RDD API是宽依赖(存在shuffle), 而且两个join的RDD的分区数量一致,join结果的rdd分区数量也一样,这个时候join api是窄依赖 除此之外的,rdd 的join api是宽依赖 2.测试程序 1 package com.ibeifeng.senior.join 2 3 import org.apache.spark.{SparkConf, SparkContext} 4 5 /** 6 * RDD数据Join相关API讲解 7

RDD Join相关API,以及程序

1.数据集 A表数据: 1 a 2 b 3 c B表数据: 1 aa1 1 aa2 2 bb1 2 bb2 2 bb3 4 dd1 2.join的分类 inner join left outer join right outer join full outer join left semi join 3.集中join的结果 A inner join B: 1 a 1 aa1 1 a 1 aa2 2 b 2 bb1 2 b 2 bb2 2 b 2 bb3 A left outer join B:

解析spark RDD

RDD是spark抽象的基石,可以说整个spark编程就是对RDD进行的操作 RDD是弹性的分布式数据集,它是只读的,可分区的,这个数据集的全部或者部分数据可以缓存在内存中,在多次计算间重用.所谓的弹性意思是:内存不够时可以与磁盘进行交换.这是RDD另一个特性:内存计算.就是将数据保存到内存中,同时为了解决内存容量大小的问题,他允许所有的数据我们可以自由的设置cache,和 是否cache RDD的特征: (1)有一个分片列表,就是这个RDD可以被切分,和hadoop一样,能被切分的数据才能并行

RDD之三:RDD创建方式

RDD创建方式 1)从Hadoop文件系统(如HDFS.Hive.HBase)输入创建.2)从父RDD转换得到新RDD.3)通过parallelize或makeRDD将单机数据创建为分布式RDD. 4)基于DB(Mysql).NoSQL(HBase).S3(SC3).数据流创建. 从集合创建RDD parallelize def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: Clas

处理键值对RDD

保存Key/Value对的RDD叫做Pair RDD. 1.创建Pair RDD: 1.1 创建Pair RDD的方式: 很多数据格式在导入RDD时,会直接生成Pair RDD.我们也可以使用map()来将之前讲到的普通RDD转化为Pair RDD. 1.2 Pair RDD转化实例: 下面例子中,把原始RDD,修改成首单词做Key,整行做Value的Pair RDD. Java中没有tuple类型,所以使用scala的scala.Tuple2类来创建tuple.创建tuple:  new Tu