Spark RDD到底是个什么东西

前言

  用Spark有一段时间了,但是感觉还是停留在表面,对于Spark的RDD的理解还是停留在概念上,即只知道它是个弹性分布式数据集,其他的一概不知

有点略显惭愧。下面记录下我对RDD的新的理解。

官方介绍

   弹性分布式数据集。 RDD是只读的、分区记录的集合。RDD只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建。

问题

只要你敢问度娘RDD是什么,包你看到一大片一模一样的答案,都是说这样的概念性的东西,没有任何的价值。

我只想知道 RDD为什么是弹性 而不是 不弹性, RDD到底是怎么存数据,在执行任务的过程中是咋哪个阶段读取数据。

什么是弹性

我的理解如下(若有误或不足,烦请指出更正):

1. RDD可以在内存和磁盘之间手动或自动切换

2. RDD可以通过转换成其他的RDD,即血统

3. RDD可以存储任意类型的数据

存储的内容是什么

根据编写Spark任务的代码来看,很直观的感觉是RDD就是一个只读的数据,例如  rdd.foreach(println)

但是不是, RDD其实不存储真是的数据,只存储数据的获取的方法,以及分区的方法,还有就是数据的类型。

百闻不如一见, 下面看看RDD的源码:

//其他的代码删除了,主要保留了它的两个抽象方法abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {
   //计算某个分区数据的方法 ,将某个分区的数据读成一个 Iterator  def compute(split: Partition, context: TaskContext): Iterator[T]
  //计算分区信息 只会被调用一次
  protected def getPartitions: Array[Partition]

}

  通过RDD的这两个抽象方法,我们可以看出 :

RDD其实是不存储真是数据的,存储的的只是 真实数据的分区信息getPartitions,还有就是针对单个分区的读取方法 compute

到这里可能就有点疑惑,要是RDD只存储这分区信息和读取方法,那么RDD的依赖信息是怎么保存的?

其实RDD是有保存的,只是我粘贴出的只是RDD顶层抽象类,还要一点需要注意 ,RDD只能向上依赖,而真正实现这两个方法的RDD都是整个任务的输入端,即处于RDD血统的顶层,初代RDD

举个例子:val rdd = sc.textFile(...); val rdd1 = rdd.map(f)  .  这里的 rdd是初代RDD, 是没有任何依赖的RDD的,所以没就没有保存依赖信息, 而 rdd1是子代RDD,那么它就必须得记录下自己是来源于谁,也就是血统,

下面展示的是HadoopRDD和  MapPartitionsRDD

//负责记录数据的分区信息  和 读取方法

class HadoopRDD[K, V](
  @transient sc: SparkContext,
  broadcastedConf: Broadcast[SerializableConfiguration],
  initLocalJobConfFuncOpt: Option[JobConf => Unit],
  inputFormatClass: Class[_ <: InputFormat[K, V]],
  keyClass: Class[K],
  valueClass: Class[V],
  minPartitions: Int)
  extends RDD[(K, V)](sc, Nil) with Logging {

override def getPartitions: Array[Partition] = { ***篇幅所限  自己查看**}

override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {***篇幅所限  自己查看**}

}

//子代RDD的作用起始很简单  就是记录初代RDD到底在干了什么才得到了自己

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](

    prev: RDD[T],  //上一代RDD
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)  //初代RDD生成自己的方法
    preservesPartitioning: Boolean = false)
  extends RDD[U](prev) {

  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

  override def getPartitions: Array[Partition] = firstParent[T].partitions

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))
}

  到这里,我们就大概了解了RDD到底存储了什么东西,

初代RDD: 处于血统的顶层,存储的是任务所需的数据的分区信息,还有单个分区数据读取的方法,没有依赖的RDD, 因为它就是依赖的开始。

子代RDD: 处于血统的下层, 存储的东西就是 初代RDD到底干了什么才会产生自己,还有就是初代RDD的引用

现在我们基本了解了RDD里面到底存储了些什么东西,那么问题就来了,到底读取数据发生在什么时候。

数据读取发生在什么时候

直接开门见山的说, 数据读取是发生在运行的Task中,也就是说,数据是在任务分发的executor上运行的时候读取的,上源码:

private[spark] class ResultTask[T, U](
    stageId: Int,
    stageAttemptId: Int,
    taskBinary: Broadcast[Array[Byte]],
    partition: Partition,
    @transient locs: Seq[TaskLocation],
    val outputId: Int,
    internalAccumulators: Seq[Accumulator[Long]])
  extends Task[U](stageId, stageAttemptId, partition.index, internalAccumulators)
  with Serializable {

  @transient private[this] val preferredLocs: Seq[TaskLocation] = {
    if (locs == null) Nil else locs.toSet.toSeq
  }

  override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val deserializeStartTime = System.currentTimeMillis()
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

    metrics = Some(context.taskMetrics)
    func(context, rdd.iterator(partition, context))  //这里调用了 rdd.iterator , 下面看看RDD的这个方法
  }

  // This is only callable on the driver side.
  override def preferredLocations: Seq[TaskLocation] = preferredLocs

  override def toString: String = "ResultTask(" + stageId + ", " + partitionId + ")"
}

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {

//先判断是否有缓存 ,有则直接从缓存中取 , 没有就从磁盘中取出来, 然后再执行缓存操作 
    SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) 
  } else {

//直接从磁盘中读取 或 从 检查点中读取 
    computeOrReadCheckpoint(split, context)
  }
}

  在spark中的任务 最终是会被分解成多个TaskSet到executor上运行,TaskSet的划分是根据是否需要shuffle来的。

在spark中就只有两种Task,一种是ResultTask ,一种是ShuffleTask, 两种Task都是以相同的方式读取RDD的数据。

时间: 2024-12-22 04:17:24

Spark RDD到底是个什么东西的相关文章

【spark 深入学习 03】Spark RDD的蛮荒世界

RDD真的是一个很晦涩的词汇,他就是伯克利大学的博士们在论文中提出的一个概念,很抽象,很难懂:但是这是spark的核心概念,因此有必要spark rdd的知识点,用最简单.浅显易懂的词汇描述.不想用学术话的语言来阐述RDD是什么,用简单.容易理解的方式来描述. 一.什么是RDD,RDD出现的背景 Mapreduce计算模型的出现解决了分布式计算的诸多难题,但是由于MR对数据共享的解决方案比较低效,导致MR编程模型效率不高,将数据写到一个稳定的外部存储系统,如HDFS,这个会引起数据复写.磁盘IO

Spark RDD揭秘

 Spark计算中提供的各种库例如:spark sql,spark machine learning等的底层都是封装的RDD RDD本身提供了通用的抽象,在现有的spark sql, spark streaming, machine learning,图计算以及sqpark R中,可以根据具体的领域的内容为基础扩充和私有化与自己业务相关的库,而它们的通用接口和基石就是Spark RDD. RDD:基于工作集的分布式函数编程的应用抽象,MapReduce是基于数据集的.他们的共同特征是位置感知

Spark RDD使用详解1--RDD原理

RDD简介 在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(Resilient Distributed Dataset,RDD).RDD是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现.RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现.RDD必须是可序列化的.RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操

Java8函数式编程(二):类比Spark RDD算子的Stream流操作

1 Stream流 对集合进行迭代时,可调用其iterator方法,返回一个iterator对象,之后便可以通过该iterator对象遍历集合中的元素,这被称为外部迭代(for循环本身正是封装了其的语法糖),其示意图如下: 除此之外,还有内部迭代方法,这正是这里要说明的集合的stream()方法返回的Stream对象的一系列操作,比如,要统计一个数字列表的偶数元素个数,当使用Stream对象的操作时,如下: List<Integer> list = new ArrayList<Integ

Spark RDD、DataFrame和DataSet的区别

版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 转载请标明出处:小帆的帆的专栏 RDD 优点: 编译时类型安全 编译时就能检查出类型错误 面向对象的编程风格 直接通过类名点的方式来操作数据 缺点: 序列化和反序列化的性能开销 无论是集群间的通信, 还是IO操作都需要对对象的结构和数据进行序列化和反序列化. GC的性能开销 频繁的创建和销毁对象, 势必会增加GC import org.apache.spark.sql.SQLContext import org.apache

Spark RDD解密

1.  基于数据集的处理: 从物理存储上加载数据,然后操作数据,然后写入数据到物理设备; 基于数据集的操作不适应的场景: 不适合于大量的迭代: 不适合交互式查询:每次查询都需要对磁盘进行交互. 基于数据流的方式不能够复用曾经的结果或者中间的结果; 2. RDD弹性数据集 特点: A)自动的进行内存和磁盘数据的存储切换: B) 基于lineage的高效容错: C) Task如果失败会自动进行重试 D) Stage如果失败会自动进行重试,而且只会计算失败的分片; E) Checkpoint和pers

Spark3000门徒第14课spark RDD解密总结

今晚听了王家林老师的第14课spark RDD解密,课堂笔记如下: Spark是基于工作集的应用抽象,RDD:Resillient Distributed Dataset是基于工作集的,spark可以对结果重用. 位置感知:spark比hadoop更精致. RDD是lazy的,是分布式函数式编程的抽象,RDD可以看做一个只读的List或者Array.产生的中间结果怎么办? 不能让 他立即计算,采用Lazy级别,只对数据处理做标记.所以RDD操作是有向的,链式的,所以Stage有1000个步骤,不

Spark RDD Transformation 简单用例(一)

map(func) /** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U]  map(func) Return a new distributed dataset formed by passing each element of the source through a function func.  将原RDD中的

Spark RDD整理

参考资料: Spark和RDD模型研究:http://itindex.net/detail/51871-spark-rdd-模型 理解Spark的核心RDD:http://www.infoq.com/cn/articles/spark-core-rdd/ Spark RDD详解:http://f.dataguru.cn/thread-475874-1-1.html http://developer.51cto.com/art/201309/410276_1.htm