Checkpoint彻底解密:Checkpoint的运行原理和源码实现彻底详解(DT大数据梦工厂)

内容:

1、Checkpoint重大价值;

2、Checkpoint运行原理图;

3、Checkpoint源码解析;

机器学习、图计算稍微复杂迭代算法的时候都有Checkpoint的身影,作用不亚于persist

==========Checkpoint到底是什么============

1、Spark 在生产环境下经常会面临transformation的RDD非常多(例如一个Job中包含1万个RDD)或者具体transformation的RDD本身计算特别复杂或者耗时(例如计算时长超过1个小时),这个时候就要考虑对计算结果数据的持久化;

2、Spark是擅长多步骤迭代的,同时擅长基于Job的复用,这个时候如果能够对曾经计算的过程产生的数据进行复用,就可以极大的提升效率;

3、如果采用persist把数据放在内存中,虽然是快速的,但是也是最不可靠的;如果把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏,系统管理员可能清空磁盘

4、Checkpoint的产生就是为了相对而言更加可靠的持久化数据,在Checkpoint的时候可以指定把数据放在本地,并且是多副本的方式,但是在生产环境下是放在HDFS上,这就天然的借助了HDFS高容错、高可靠的特征来完成了最大化的可靠的持久化数据的方式;

5、Checkpoint是为了最大程度保证绝对可靠的复用RDD计算数据的Spark高级功能,通过checkpoint我们通常把数据持久化到HDFS来保证数据最大程度的安全性;

6、Checkpoint就是针对整个RDD计算链条中特别需要数据持久化的环节(后面会反复使用当前环节的RDD)开始基于HDFS等的数据持久化复用策略,通过对RDD启动checkpoint机制来实现容错和高可用;

加入进行一个1万个步骤,在9000个步骤的时候persist,数据还是有可能丢失的,但是如果checkpoint,数据丢失的概率几乎为0。

==========Checkpoint原理机制============

1、在SparkContext中设置进行Checkpoint操作的RDD,把数据放在哪里,在生产集群中运行,必须是HDFS的路径,同时为了提高效率,在进行Checkpoint的时候,可以指定很多目录;

/**
 * Set the directory under which RDDs are going to be checkpointed. The directory must
 * be a HDFS path if running on a cluster.
 */
def setCheckpointDir(directory: String) {

// If we are running on a cluster, log a warning if the directory is local.
  // Otherwise, the driver may attempt to reconstruct the checkpointed RDD from
  // its own local file system, which is incorrect because the checkpoint files
  // are actually on the executor machines.
  if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
    logWarning("Checkpoint directory must be non-local " +
      "if Spark is running on a cluster: " + directory)
  }

checkpointDir = Option(directory).map { dir =>
    val path = new Path(dir, UUID.randomUUID().toString)
    val fs = path.getFileSystem(hadoopConfiguration)
    fs.mkdirs(path)
    fs.getFileStatus(path).getPath.toString
  }
}

2、RDD中的checkpoint,在进行这个操作的时候,其所依赖的所有的RDD都会从计算联调中清空掉,保存在之前设置的路径下,且所有parent级别的RDD都会被清空掉;

/**
 * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
 * directory set with `SparkContext#setCheckpointDir` and all references to its parent
 * RDDs will be removed. This function must be called before any job has been
 * executed on this RDD. It is strongly recommended that this RDD is persisted in
 * memory, otherwise saving it on a file will require recomputation.
 */
def checkpoint(): Unit = RDDCheckpointData.synchronized {
  // NOTE: we use a global lock here due to complexities downstream with ensuring
  // children RDD partitions point to the correct parent partitions. In the future
  // we should revisit this consideration.
  if (context.checkpointDir.isEmpty) {
    throw new SparkException("Checkpoint directory has not been set in the SparkContext")
  } else if (checkpointData.isEmpty) {
    checkpointData = Some(new ReliableRDDCheckpointData(this))
  }
}

3、作为最佳实践,一般在进行checkpoint方法,调用前通常都要进行persist来把当前RDD的数据持久化到内存或者磁盘上(上面代码的注释说只会存在内存中说法不准确),因为checkpoint是lazy级别的,必须要有Job的执行,且在Job执行完成之后才会从后往前回溯哪个RDD进行了checkpoint标记,然后对该标记了要进行Checkpoint的RDD新启动一个Job执行具体的Checkpoint的过程

4、因为会从计算链条清空,所以Checkpoint改变了RDD的Lineage(血统关系);

5、当我们调用了checkpoint对RDD进行checkpoint操作的话,此时框架会自动生成RDDCheckpointData,当RDD上运行过一个Job后,就会立即触发RDDCheckpoint中的checkpoint方法,在其内部会调用doCheckpoint,实际上在生产环境下会调用ReliableRDDCheckpointData的doCheckpoint,在生产环境下会导致ReliableCheckpointRDD的writeRDDToCheckpointDirectory的调用,而在writeRDDToCheckpointDirectory方法内部会触发runJob来执行把当前的RDD中的数据写到checkpoint的目录中,同时会产生ReliableCheckpointRDD实例返回;

/**
 * This class contains all the information related to RDD checkpointing. Each instance of this
 * class is associated with a RDD. It manages process of checkpointing of the associated RDD,
 * as well as, manages the post-checkpoint state by providing the updated partitions,
 * iterator and preferred locations of the checkpointed RDD.
 */
private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
  extends Serializable {

import CheckpointState._

// The checkpoint state of the associated RDD.
  protected var cpState = Initialized

// The RDD that contains our checkpointed data
  private var cpRDD: Option[CheckpointRDD[T]] = None

// TODO: are we sure we need to use a global lock in the following methods?

/**
   * Return whether the checkpoint data for this RDD is already persisted.
   */
  def isCheckpointed: Boolean = RDDCheckpointData.synchronized {
    cpState == Checkpointed
  }

/**
   * Materialize this RDD and persist its content.
   * This is called immediately after the first action invoked on this RDD has completed.
   */
  final def checkpoint(): Unit = {
    // Guard against multiple threads checkpointing the same RDD by
    // atomically flipping the state of this RDDCheckpointData
    RDDCheckpointData.synchronized {
      if (cpState == Initialized) {
        cpState = CheckpointingInProgress
      } else {
        return
      }
    }

val newRDD = doCheckpoint()

// Update our state and truncate the RDD lineage
    RDDCheckpointData.synchronized {
      cpRDD = Some(newRDD)
      cpState = Checkpointed
      rdd.markCheckpointed()
    }
  }

/**
 * Materialize this RDD and write its content to a reliable DFS.
 * This is called immediately after the first action invoked on this RDD has completed.
 */
protected override def doCheckpoint(): CheckpointRDD[T] = {
  val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)

// Optionally clean our checkpoint files if the reference is out of scope
  if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
    rdd.context.cleaner.foreach { cleaner =>
      cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
    }
  }

logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
  newRDD
}

/**
 * Write RDD to checkpoint files and return a ReliableCheckpointRDD representing the RDD.
 */
def writeRDDToCheckpointDirectory[T: ClassTag](
    originalRDD: RDD[T],
    checkpointDir: String,
    blockSize: Int = -1): ReliableCheckpointRDD[T] = {

val sc = originalRDD.sparkContext

// Create the output path for the checkpoint
  val checkpointDirPath = new Path(checkpointDir)
  val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
  if (!fs.mkdirs(checkpointDirPath)) {
    throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
  }

// Save to file, and reload it as an RDD
  val broadcastedConf = sc.broadcast(
    new SerializableConfiguration(sc.hadoopConfiguration))
  // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
  sc.runJob(originalRDD,
    writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)

if (originalRDD.partitioner.nonEmpty) {
    writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
  }

val newRDD = new ReliableCheckpointRDD[T](
    sc, checkpointDirPath.toString, originalRDD.partitioner)
  if (newRDD.partitions.length != originalRDD.partitions.length) {
    throw new SparkException(
      s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +
        s"number of partitions from original RDD $originalRDD(${originalRDD.partitions.length})")
  }
  newRDD
}

王家林老师名片:

中国Spark第一人

新浪微博:http://weibo.com/ilovepains

微信公众号:DT_Spark

博客:http://blog.sina.com.cn/ilovepains

手机:18610086859

QQ:1740415547

邮箱:[email protected]

时间: 2024-10-06 15:54:57

Checkpoint彻底解密:Checkpoint的运行原理和源码实现彻底详解(DT大数据梦工厂)的相关文章

CacheManager彻底解密:CacheManager运行原理流程图和源码详解(DT大数据梦工厂)

内容: 1.CacheManager重大价值: 2.CacheManager运行原理图: 3.CacheManager源码解析: BlockManager针对Cache这样的行为做了CacheManager Spark出色的原因: 1.Spark基于RDD构成了一体化.多元化的大数据处理中心(不需要再处理多种范式来部署多种框架,只要Spark!!!降低成本投入获得更高的产出): 2.迭代,因为在计算的时候迭代,在构建复杂算法的时候非常方便(图计算.机器学习.数据仓库),而CacheManager

[Spark內核] 第41课:Checkpoint彻底解密:Checkpoint的运行原理和源码实现彻底详解

本课主题 Checkpoint 运行原理图 Checkpoint 源码解析 引言 Checkpoint 到底是什么和需要用 Checkpoint 解决什么问题: Spark 在生产环境下经常会面临 Transformation 的 RDD 非常多(例如一个Job 中包含1万个RDD) 或者是具体的 Transformation 产生的 RDD 本身计算特别复杂和耗时(例如计算时常超过1个小时) , 可能业务比较复杂,此时我们必需考虑对计算结果的持久化. Spark 是擅长多步骤迭代,同时擅长基于

DT大数据梦工厂第三十五课 Spark系统运行循环流程

本节课内容: 1.     TaskScheduler工作原理 2.     TaskScheduler源码 一.TaskScheduler工作原理 总体调度图: 通过前几节课的讲解,RDD和DAGScheduler以及Worker都已有深入的讲解,这节课我们主要讲解TaskScheduler的运行原理. 回顾: DAGScheduler面向整个Job划分多个Stage,划分是从后往前的回溯过程:运行时从前往后运行的.每个Stage中有很多任务Task,Task是可以并行执行的.它们的执行逻辑完

Spark内核架构解密(DT大数据梦工厂)

只有知道内核架构的基础上,才知道为什么要这样写程序? 手工绘图来解密Spark内核架构 通过案例来验证Spark内核架构 Spark架构思考 ==========Spark Runtime的几个概念============ 下载下来运行,基本都是standalone模式,如果掌握了standalone,则yarn和mesos,以后不做特别说明,一律是standalone模式 application=driver+executor,executor是具体处理数据分片,里面是线程池并发的处理数据分片

SparkRDD解密(DT大数据梦工厂)

第一阶段,彻底精通Spark 第二阶段,从0起步,操作项目 Hadoop是大数据的基础设施,存储等等 Spark是计算核心所在 1.RDD:基于工作集的应用抽象 2.RDD内幕解密 3.RDD思考 不掌握RDD的人,不可能成为Spark的高手 绝对精通RDD,解决问题的能力大大提高 各种框架底层封装的都是RDD,RDD提供了通用框架 RDD是Spark的通用抽象基石 顶级SPark高手, 1.能解决问题.性能调优: 2.Spark高手拿Spark过来就是修改的 ==========基于工作集的应

Spark运行原理和RDD解析(DT大数据梦工厂)

Spark一般基于内存,一些情况下也会基于磁盘 Spark优先会把数据放到内存中,如果内存实在放不下,也会放到磁盘里面的 不单能计算内存放的下的数据,也能计算内存放不下的数据 实际如果数据大于内存,则要考虑数据放置策略和优化算法,因为Spark初衷是一寨式处理 小到5~10台的分布式大到8000台的规模,Spark都能运行 大数据计算问题:交互式查询(基于shell.sparkSQL).批处理.机器学习和计算等等 底层基于RDD,分布式弹性数据级,支持各种各样的比如流处理.SQL.SparkR等

HA下Spark集群工作原理(DT大数据梦工厂)

Spark高可用HA实战 Spark集群工作原理详解 资源主要指内存.CPU 如果是单点的话,Master如果出现故障,则集群不能对外工作 Spark通过Zookeeper做HA,一般做HA是一个active级别,standby active就是当前工作 standby是随时准备active的挂了之后会切换成为active级别 以前一般是2台机器,一个active,一个standby 现在一般是3台机器,一个active,两个standby,甚至3台以上 Zookeeper包含了哪些内容:所有的

Spark on Yarn彻底解密(DT大数据梦工厂)

内容: 1.Hadoop Yarn的工作流程解密: 2.Spark on Yarn两种运行模式实战: 3.Spark on Yarn工作流程解密: 4.Spark on Yarn工作内幕解密: 5.Spark on Yarn最佳实践: 资源管理框架Yarn Mesos是分布式集群的资源管理框架,和大数据没关系,但是可以管理大数据的资源 ==========Hadoop Yarn解析============ 1.Yarn是Hadoop推出的资源管理器,是负责分布式(大数据)集群计算的资源管理的,负

Master HA彻底解密(DT大数据梦工厂)

内容: 1.Master HA解析: 2.Master HA的四种方式: 3.Master HA的内部工作机制: 4.Master HA的源码解密: 本讲主要源码角度分析Master HA,因为在生产环境必然要做的 ==========Master HA解析============ Spark是Master-Slave的结构 现在业界是1个Master Active,2个以上standby 如果有HA的话,切换active的时候,会在上次运行的基础上继续运行 Drvier提交程序.申请资源,是跟