Spark之RDD弹性特性

  RDD作为弹性分布式数据集,它的弹性具体体现在以下七个方面。

1.自动进行内存和磁盘数据存储的切换

  Spark会优先把数据放到内存中,如果内存实在放不下,会放到磁盘里面,不但能计算内存放下的数据,也能计算内存放不下的数据。如果实际数据大于内存,则要考虑数据放置策略和优化算法。当应用程序内存不足时,Spark应用程序将数据自动从内存存储切换到磁盘存储,以保障其高效运行。

2.基于Lineage(血统)的高效容错机制

  Lineage是基于Spark RDD的依赖关系来完成的(依赖分为窄依赖和宽依赖两种形态),每个操作只关联其父操作,各个分片的数据之间互不影响,出现错误时只要恢复单个Split的特定部分即可。常规容错有两种方式:一个是数据检查点;另一个是记录数据的更新。数据检查点的基本工作方式,就是通过数据中心的网络链接不同的机器,然后每次操作的时候都要复制数据集,就相当于每次都有一个复制,复制是要通过网络传输的,网络带宽就是分布式的瓶颈,对存储资源也是很大的消耗。记录数据更新就是每次数据变化了就记录一下,这种方式不需要重新复制一份数据,但是比较复杂,消耗性能。Spark的RDD通过记录数据更新的方式为何很高效?因为① RDD是不可变的且Lazy;② RDD的写操作是粗粒度的。但是,RDD读操作既可以是粗粒度的,也可以是细粒度的。

3.Task如果失败,会自动进行特定次数的重试

  默认重试次数为4次。TaskSchedulerImpl的源码如下所示:

private[spark] class TaskSchedulerImpl(
    val sc: SparkContext,
    val maxTaskFailures: Int,
    isLocal: Boolean = false)
  extends TaskScheduler with Logging
{
  def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4))

  val conf = sc.conf

  TaskSchedulerImpl是底层的任务调度接口TaskScheduler的实现,这些Schedulers从每一个Stage中的DAGScheduler中获取TaskSet,运行它们,尝试是否有故障。DAGScheduler是高层调度,它计算每个Job的Stage的DAG,然后提交Stage,用TaskSets的形式启动底层TaskScheduler调度在集群中运行。

4.Stage如果失败,会自动进行特定次数的重试

  这样,Stage对象可以跟踪多个StageInfo(存储SparkListeners监听到的Stage的信息,将Stage信息传递给Listeners或web UI)。默认重试次数为4次,且可以直接运行计算失败的阶段,只计算失败的数据分片,Stage的源码如下所示:

private[scheduler] abstract class Stage(
    val id: Int,
    val rdd: RDD[_],
    val numTasks: Int,
    val parents: List[Stage],
    val firstJobId: Int,
    val callSite: CallSite)
  extends Logging {

  val numPartitions = rdd.partitions.length

  /** Set of jobs that this stage belongs to.属于这个工作集的Stage */
  val jobIds = new HashSet[Int]

  val pendingPartitions = new HashSet[Int]

  /** The ID to use for the next new attempt for this stage.用于此Stage的下一个新attempt的ID */
  private var nextAttemptId: Int = 0

  val name: String = callSite.shortForm
  val details: String = callSite.longForm

  /**
   * Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
   * here, before any attempts have actually been created, because the DAGScheduler uses this
   * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
   * have been created).
   * 最新的[StageInfo] Object指针,需要被初始化,任何attempts都是被创造出来的,因为DAGScheduler使用
   * StageInfo告诉SparkListeners工作何时开始(即发生前的任何阶段已经创建)
   */
  private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)

  /**
   * Set of stage attempt IDs that have failed with a FetchFailure. We keep track of these
   * failures in order to avoid endless retries if a stage keeps failing with a FetchFailure.
   * We keep track of each attempt ID that has failed to avoid recording duplicate failures if
   * multiple tasks from the same stage attempt fail (SPARK-5945).
   * 设置Stage attempt IDs当失败时可以读取失败信息,跟踪这些失败,为了避免无休止的重复失败
   * 跟踪每一次attempt,以便避免记录重复故障,如果从同一stage创建多任务失败(SPARK-5945)
   */
  private val fetchFailedAttemptIds = new HashSet[Int]

  private[scheduler] def clearFailures() : Unit = {
    fetchFailedAttemptIds.clear()
  }

  /**
   * Check whether we should abort the failedStage due to multiple consecutive fetch failures.
   * 检查是否应该中止由于连续多次读取失败的stage
   * This method updates the running set of failed stage attempts and returns
   * true if the number of failures exceeds the allowable number of failures.
   * 如果失败的次数超过允许的次数,此方法更新失败stage attempts和返回的运行集
   */
  private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean = {
    fetchFailedAttemptIds.add(stageAttemptId)
    fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES
  }

  /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
  // 在stage中创建一个新的attempt
  def makeNewStageAttempt(
      numPartitionsToCompute: Int,
      taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
    val metrics = new TaskMetrics
    metrics.register(rdd.sparkContext)
    _latestInfo = StageInfo.fromStage(
      this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences)
    nextAttemptId += 1
  }

  /** Returns the StageInfo for the most recent attempt for this stage. */
  // 返回当前stage中最新的stageinfo
  def latestInfo: StageInfo = _latestInfo

  override final def hashCode(): Int = id

  override final def equals(other: Any): Boolean = other match {
    case stage: Stage => stage != null && stage.id == id
    case _ => false
  }

  /** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
  // 返回需要重新计算的分区标识的序列
  def findMissingPartitions(): Seq[Int]
}

private[scheduler] object Stage {
  // The number of consecutive failures allowed before a stage is aborted
  // 允许一个stage中止的连续故障数
  val MAX_CONSECUTIVE_FETCH_FAILURES = 4
}

  Stage是Spark Job运行时具有相同逻辑功能和并行计算任务的一个基本单元。Stage中所有的任务都依赖同样的Shuffle,每个DAG任务通过DAGScheduler在Stage的边界处发生Shuffle形成Stage,然后DAGScheduler运行这些阶段的拓扑顺序。每个Stage都可能是ShuffleMapStage,如果是ShuffleMapStage,则跟踪每个输出节点(nodes)上的输出文件分区,它的任务结果是输入其他的Stage(s),或者输入一个ResultStage,若输入一个ResultStage,这个ResultStage的任务直接在这个RDD上运行计算这个Spark Action的函数(如count()、 save()等),并生成shuffleDep等字段描述Stage和生成变量,如outputLocs和numAvailableOutputs,为跟踪map输出做准备。每个Stage会有firstjobid,确定第一个提交Stage的Job,使用FIFO调度时,会使得其前面的Job先行计算或快速恢复(失败时)。 

  ShuffleMapStage是DAG产生数据进行Shuffle的中间阶段,它发生在每次Shuffle操作之前,可能包含多个Pipelined操作,ResultStage阶段捕获函数在RDD的分区上运行Action算子计算结果,有些Stage不是运行在RDD的所有的分区上,例如,first()、lookup()等。SparkListener是Spark调度器的事件监听接口。注意,这个接口随着Spark版本的不同会发生变化。

5.checkpoint和persist(检查点和持久化),可主动或被动触发

  checkpoint是对RDD进行的标记,会产生一系列的文件,且所有父依赖都会被删除,是整个依赖(Lineage)的终点。checkpoint也是Lazy级别的。persist后RDD工作时每个工作节点都会把计算的分片结果保存在内存或磁盘中,下一次如果对相同的RDD进行其他的Action计算,就可以重用。

  因为用户只与Driver Program交互,因此只能用RDD中的cache()方法去cache用户能看到的RDD。所谓能看到,是指经过Transformation算子处理后生成的RDD,而某些在Transformation算子中Spark自己生成的RDD是不能被用户直接cache的。例如,reduceByKey()中会生成的ShuffleRDD、MapPartitionsRDD是不能被用户直接cache的。在Driver Program中设定RDD.cache()后,系统怎样进行cache?首先,在计算RDD的Partition之前就去判断Partition要不要被cache,如果要被cache,先将Partition计算出来,然后cache到内存。cache可使用memory,如果写到HDFS磁盘的话,就要检查checkpoint。调用RDD.cache()后,RDD就变成persistRDD了,其StorageLevel为MEMORY_ONLY,persistRDD会告知Driver说自己是需要被persist的。此时会调用RDD.iterator()。 RDD.scala的iterator()的源码如下:

  /**
   * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
   * This should ‘‘not‘‘ be called by users directly, but is available for implementors of custom
   * subclasses of RDD.
   * RDD的内部方法,将从合适的缓存中读取,否则计算它。这不应该被用户直接使用,但可用于实现自定义的子RDD
   */
  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      getOrCompute(split, context)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

  当RDD.iterator()被调用的时候,也就是要计算该RDD中某个Partition的时候,会先去cacheManager那里获取一个blockId,然后去BlockManager里匹配该Partition是否被checkpoint了,如果是,那就不用计算该Partition了,直接从checkpoint中读取该Partition的所有records放入ArrayBuffer里面。如果没有被checkpoint过,先将Partition计算出来,然后将其所有records放到cache中。总体来说,当RDD会被重复使用(不能太大)时,RDD需要cache。Spark自动监控每个节点缓存的使用情况,利用最近最少使用原则删除老旧的数据。如果想手动删除RDD,可以使用RDD.unpersist()方法。  

  此外,可以利用不同的存储级别存储每一个被持久化的RDD。例如,它允许持久化集合到磁盘上,将集合作为序列化的Java对象持久化到内存中、在节点间复制集合或者存储集合到Alluxio中。可以通过传递一个StorageLevel对象给persist()方法设置这些存储级别。cache()方法使用默认的存储级别-StorageLevel.MEMORY_ONLY。RDD根据useDisk、useMemory、 useOffHeap、deserialized、replication 5个参数的组合提供了常用的12种基本存储,完整的存储级别介绍如下。StorageLevel.scala的源码如下:

  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

  StorageLevel是控制存储RDD的标志,每个StorageLevel记录RDD是否使用memory,或使用ExternalBlockStore存储,如果RDD脱离了memory或ExternalBlockStore,是否扔掉RDD,是否保留数据在内存中的序列化格式,以及是否复制多个节点的RDD分区。另外,org.apache.spark.storage.StorageLevel是单实例(singleton)对象,包含了一些静态常量和常用的存储级别,且可用singleton对象工厂方法StorageLevel(...)创建定制化的存储级别。

  Spark的多个存储级别意味着在内存利用率和CPU利用率间的不同权衡。推荐通过下面的过程选择一个合适的存储级别:①如果RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。因为这是CPU利用率最高的选项,会使RDD上的操作尽可能地快。②如果不适合用默认级别,就选择MEMORY_ONLY_SER。选择一个更快的序列化库提高对象的空间使用率,但是仍能够相当快地访问。③除非算子计算RDD花费较大或者需要过滤大量的数据,不要将RDD存储到磁盘上,否则重复计算一个分区,就会和从磁盘上读取数据一样慢。④如果希望更快地恢复错误,可以利用replicated存储机制,所有的存储级别都可以通过replicated计算丢失的数据来支持完整的容错。另外,replicated的数据能在RDD上继续运行任务,而不需要重复计算丢失的数据。在拥有大量内存的环境中或者多应用程序的环境中,Off_Heap(将对象从堆中脱离出来序列化,然后存储在一大块内存中,这就像它存储到磁盘上一样,但它仍然在RAM内存中。Off_Heap对象在这种状态下不能直接使用,须进行序列化及反序列化。序列化和反序列化可能会影响性能,Off_Heap堆外内存不需要进行GC)。Off_Heap具有如下优势:Off_Heap运行多个执行者共享的Alluxio中相同的内存池,显著地减少GC。如果单个的Executor崩溃,缓存的数据也不会丢失。

6.数据调度弹性,DAGScheduler、TASKScheduler和资源管理无关

  Spark将执行模型抽象为通用的有向无环图计划(DAG),这可以将多Stage的任务串联或并行执行,从而不需要将Stage中间结果输出到HDFS中,当发生节点运行故障时,可有其他可用节点代替该故障节点运行。

7.数据分片的高度弹性(coalesce)

  Spark进行数据分片时,默认将数据放在内存中,如果内存放不下,一部分会放在磁盘上进行保存。

  RDD.scala的coalesce算子代码如下:

  /**
   * Return a new RDD that is reduced into `numPartitions` partitions.
   *
   * This results in a narrow dependency, e.g. if you go from 1000 partitions
   * to 100 partitions, there will not be a shuffle, instead each of the 100
   * new partitions will claim 10 of the current partitions.
   *
   * However, if you‘re doing a drastic coalesce, e.g. to numPartitions = 1,
   * this may result in your computation taking place on fewer nodes than
   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
   * you can pass shuffle = true. This will add a shuffle step, but means the
   * current upstream partitions will be executed in parallel (per whatever
   * the current partitioning is).
   *
   * Note: With shuffle = true, you can actually coalesce to a larger number
   * of partitions. This is useful if you have a small number of partitions,
   * say 100, potentially with a few partitions being abnormally large. Calling
   * coalesce(1000, shuffle = true) will result in 1000 partitions with the
   * data distributed using a hash partitioner.
   */
  def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
    require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      // 从随机分区开始,将元素均匀分布在输出分区上
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = (new Random(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          // key的哈希码是key本身,HashPartitioner将它与总分区数进行取模运算
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      // 包括一个shuffle步骤,使我们的上游任务仍然是分布式的
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
        new HashPartitioner(numPartitions)),
        numPartitions,
        partitionCoalescer).values
    } else {
      new CoalescedRDD(this, numPartitions, partitionCoalescer)
    }
  }

  例如,在计算的过程中,会产生很多的数据碎片,这时产生一个Partition可能会非常小,如果一个Partition非常小,每次都会消耗一个线程去处理,这时可能会降低它的处理效率,需要考虑把许多小的Partition合并成一个较大的Partition去处理,这样会提高效率。另外,有可能内存不是那么多,而每个Partition的数据Block比较大,这时需要考虑把Partition变成更小的数据分片,这样让Spark处理更多的批次,但是不会出现OOM。  

  

原文地址:https://www.cnblogs.com/xiaoyh/p/10976075.html

时间: 2024-11-07 09:16:44

Spark之RDD弹性特性的相关文章

Spark的RDD原理以及2.0特性的介绍

转载自:http://www.tuicool.com/articles/7VNfyif 王联辉,曾在腾讯,Intel 等公司从事大数据相关的工作.2013 年 - 2016 年先后负责腾讯 Yarn 集群和 Spark 平台的运营与研发.曾负责 Intel Hadoop 发行版的 Hive 及 HBase 版本研发.参与过百度用户行为数据仓库的建设和开发,以及淘宝数据魔方和淘宝指数的数据开发工作.给 Spark 社区贡献了 25+ 个 patch,接受的重要特性有 python on yarn-

Spark之RDD的定义及五大特性

RDD是分布式内存的一个抽象概念,是一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,能横跨集群所有节点并行计算,是一种基于工作集的应用抽象. RDD底层存储原理:其数据分布存储于多台机器上,事实上,每个RDD的数据都以Block的形式存储于多台机器上,每个Executor会启动一个BlockManagerSlave,并管理一部分Block:而Block的元数据由Driver节点上的BlockManagerMaster保存,BlockManagerSlave生成Block后向Block

Spark学习之路 (三)Spark之RDD[转]

RDD的概述 什么是RDD? RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行计算的集合.RDD具有数据流模型的特点:自动容错.位置感知性调度和可伸缩性.RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度. RDD的属性 (1)一组分片(Partition),即数据集的基本组成单位.对于RDD来说,每个分片都会被一个计算任务处

Spark核心—RDD初探

本文目的 ? 最近在使用Spark进行数据清理的相关工作,初次使用Spark时,遇到了一些挑(da)战(ken).感觉需要记录点什么,才对得起自己.下面的内容主要是关于Spark核心-RDD的相关的使用经验和原理介绍,作为个人备忘,也希望对读者有用. ? 为什么选择Spark ? 原因如下 代码复用:使用Scala高级语言操作Spark,灵活方便,面向对象,函数编程的语言特性可以全部拿来.Scala基本上可以无缝集成java及其相关库.最重要的是,可以封装组件,沉淀工作,提高工作效率.之前用hi

Spark的RDD检查点实现分析

概述 在<深入理解Spark:核心思想与源码分析>一书中只是简单介绍了下RDD的checkpoint,对本书是个遗憾.所以此文的目的旨在查漏补缺,完善本书的内容. Spark的RDD执行完成之后会保存检查点,便于当整个作业运行失败重新运行时候,从检查点恢复之前已经运行成功的RDD结果,这样就会大大减少重新计算的成本,提高任务恢复效率和执行效率,节省Spark各个计算节点的资源.本文着重分析检查点的代码实现,更深入理解其原理.在<深入理解Spark:核心思想与源码分析>一书的第5章中

Spark发行笔记8:解读Spark Streaming RDD的全生命周期

本节主要内容: 一.DStream与RDD关系的彻底的研究 二.StreamingRDD的生成彻底研究 Spark Streaming RDD思考三个关键的问题: RDD本身是基本对象,根据一定时间定时产生RDD的对象,随着时间的积累,不对其管理的话会导致内存会溢出,所以在BatchDuration时间内执行完RDD操作后,需对RDD进行管理. 1.DStream生成RDD的过程,DStream到底是怎么生成RDD的? 2.DStream和RDD到底什么关系? 3.运行之后怎么对RDD处理? 所

08、Spark常用RDD变换

08.Spark常用RDD变换 8.1 概述 Spark RDD内部提供了很多变换操作,可以使用对数据的各种处理.同时,针对KV类型的操作,对应的方法封装在PairRDDFunctions trait中,KV类的RDD可以被隐式转换成PairRDDFunctions类型.其中很多的操作,和传统的SQL语句中的操作是对应的,只是底层换成Spark的MR计算. 8.2 常用变换 操作 解释 map 变换,将输入的每个元素进行响应操作,生成新的元素 flatMap 压扁,取出具有可迭代性质的组件中每个

Spark中将RDD转换成DataFrame的两种方法

总结下Spark中将RDD转换成DataFrame的两种方法, 代码如下: 方法一: 使用createDataFrame方法 ```java //StructType and convert RDD to DataFrame val schema = StructType( Seq( StructField("name",StringType,true) ,StructField("age",IntegerType,true) ) ) val rowRDD = sp

Spark RDD弹性7点

1.自动的进行磁盘数据和内存之间的无缝切换 2.基于lineage的高效容错,第n个出错,会从第n-1个开始执行 3.task失败会进行特定次数的重试 4.stage失败会自动进行特定次数的重试,并且只运行计算失败的数据分片 5.checkpoint(类似单机游戏里的存档)和presist,持久化cache 6.数据调度弹性,DAG TASK与资源管理无关 7.数据分片的高度弹性,repartition,1W个大的,变成10W个小的, 10W个小的变成1W个大的.