Spark Streaming发行版笔记16:数据清理内幕彻底解密

本讲从二个方面阐述:

  1. 数据清理原因和现象
  2. 数据清理代码解析

Spark Core从技术研究的角度讲 对Spark Streaming研究的彻底,没有你搞不定的Spark应用程序。

Spark Streaming一直在运行,不断计算,每一秒中在不断运行都会产生大量的累加器、广播变量,所以需要对对象及

元数据需要定期清理。每个batch duration运行时不断触发job后需要清理rdd和元数据。Clinet模式

可以看到打印的日志,从文件日志也可以看到清理日志内容。

现在要看其背后的事情:

Spark运行在jvm上,jvm会产生对象,jvm需要对对象进行回收工作,如果

我们不管理gc(对象产生和回收),jvm很快耗尽。现在研究的是Spark Streaming的Spark GC

。Spark Streaming对rdd的数据管理、元数据管理相当jvm对gc管理。

数据、元数据是操作DStream时产生的,数据、元数据的回收则需要研究DStream的产生和回收。

看下DStream的继承结构:

接收数据靠InputDStream,数据输入、数据操作、数据输出,整个生命周期都是基于DStream构建的;得出结论:DStream负责rdd的生命周期,rrd是DStream产生的,对rdd的操作也是对DStream的操作,所以不断产生batchDuration的循环,所以研究对rdd的操作也就是研究对DStream的操作。

源码分析:

通过对DirectKafkaInputDStream 会产生kafkardd:
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {  val untilOffsets = clamp(latestLeaderOffsets(maxRetries))  val rdd = KafkaRDD[K, V, U, T, R](    context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)  // Report the record number and metadata of this batch interval to InputInfoTracker.  val offsetRanges = currentOffsets.map { case (tp, fo) =>    val uo = untilOffsets(tp)    OffsetRange(tp.topic, tp.partition, fo, uo.offset)  }  val description = offsetRanges.filter { offsetRange =>    // Don‘t display empty ranges.    offsetRange.fromOffset != offsetRange.untilOffset  }.map { offsetRange =>    s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +      s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"  }.mkString("\n")  // Copy offsetRanges to immutable.List to prevent from being modified by the user  val metadata = Map(    "offsets" -> offsetRanges.toList,    StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)  val inputInfo = StreamInputInfo(id, rdd.count, metadata)  ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)  currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)  Some(rdd)}

foreachRDD会触发ForEachDStream:

/**
 * An internal DStream used to represent output operations like DStream.foreachRDD. * @param parent        Parent DStream * @param foreachFunc   Function to apply on each RDD generated by the parent DStream * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated *                           by `foreachFunc` will be displayed in the UI; only the scope and *                           callsite of `DStream.foreachRDD` will be displayed. */
private[streaming]class ForEachDStream[T: ClassTag] (    parent: DStream[T],    foreachFunc: (RDD[T], Time) => Unit,    displayInnerRDDOps: Boolean  ) extends DStream[Unit](parent.ssc) {  override def dependencies: List[DStream[_]] = List(parent)  override def slideDuration: Duration = parent.slideDuration  override def compute(validTime: Time): Option[RDD[Unit]] = None  override def generateJob(time: Time): Option[Job] = {    parent.getOrCompute(time) match {      case Some(rdd) =>        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {          foreachFunc(rdd, time)        }        Some(new Job(time, jobFunc))      case None => None    }  }}

再看DStream源码foreachRDD:

/** * Apply a function to each RDD in this DStream. This is an output operator, so * ‘this‘ DStream will be registered as an output stream and therefore materialized. * @param foreachFunc foreachRDD function * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated *                           in the `foreachFunc` to be displayed in the UI. If `false`, then *                           only the scopes and callsites of `foreachRDD` will override those *                           of the RDDs on the display. */
private def foreachRDD(    foreachFunc: (RDD[T], Time) => Unit,    displayInnerRDDOps: Boolean): Unit = {  new ForEachDStream(this,    context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()}

/**

 * Get the RDD corresponding to the given time; either retrieve it from cache * or compute-and-cache it. */
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {  // If RDD was already generated, then retrieve it from HashMap,  // or else compute the RDD  generatedRDDs.get(time).orElse {    // Compute the RDD if time is valid (e.g. correct time in a sliding window)    // of RDD generation, else generate nothing.    if (isTimeValid(time)) {      val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {        // Disable checks for existing output directories in jobs launched by the streaming        // scheduler, since we may need to write output to an existing directory during checkpoint        // recovery; see SPARK-4835 for more details. We need to have this call here because        // compute() might cause Spark jobs to be launched.        PairRDDFunctions.disableOutputSpecValidation.withValue(true) {          compute(time)        }      }
      rddOption.foreach { case newRDD =>        // Register the generated RDD for caching and checkpointing        if (storageLevel != StorageLevel.NONE) {          newRDD.persist(storageLevel)          logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")        }        if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {          newRDD.checkpoint()          logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")        }        generatedRDDs.put(time, newRDD)      }      rddOption    } else {      None    }  }} 

DStream随着时间进行,不断在内存数据结构,generatorRDD中时间窗口和窗口下的rdd实例,

按照batchDuration存储rdd以及删除掉rdd的。有时候会调用DStream的cache操作,cache就是persist操作,其实是对rdd的cache操作。

Rdd本身释放,产生rdd有数据源和元数据,释放rdd时山方面都需要考虑。数据周期性产生和周期性释放,需要找到时钟,需要找jobGenerator下的时钟:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

根据时间发给eventloop,这边receive的时候不断的有generatorjobs产生:

/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env)
  Try {
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    graph.generateJobs(time) // generate jobs using allocated block
  } match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

短短几行代码把整个作业的生命周期处理的清清楚楚。

/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
  logDebug("Got event " + event)
  event match {
    case GenerateJobs(time) => generateJobs(time)
    case ClearMetadata(time) => clearMetadata(time)
    case DoCheckpoint(time, clearCheckpointDataLater) =>
      doCheckpoint(time, clearCheckpointDataLater)
    case ClearCheckpointData(time) => clearCheckpointData(time)
  }
}

看下clearMetadata方法:

/** Clear DStream metadata for the given `time`. */
private def clearMetadata(time: Time) {
  ssc.graph.clearMetadata(time)
  // If checkpointing is enabled, then checkpoint,
  // else mark batch to be fully processed
  if (shouldCheckpoint) {
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
  } else {
    // If checkpointing is not enabled, then delete metadata information about
    // received blocks (block data not saved in any case). Otherwise, wait for
    // checkpointing of this batch to complete.
    val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
 jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
    jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)
    markBatchFullyProcessed(time)
  }
}

Inputinfotracker里面是保存了元数据。

defclearMetadata(time: Time) {
  logDebug("Clearing metadata for time " + time)
  this.synchronized {
    outputStreams.foreach(_.clearMetadata(time))
  }
  logDebug("Cleared old metadata for time " + time)
}

清理完成后输出日志。

有很多类型数据输出,先清理outputds的内容,有不同的outputds,其实就是foreachds。

继续跟踪ds类的清理方法:

/**
 * Clear metadata that are older than `rememberDuration` of this DStream.
 * This is an internal method that should not be called directly. This default
 * implementation clears the old generated RDDs. Subclasses of DStream may override
 * this to clear their own metadata along with the generated RDDs.
 */
private[streaming] def clearMetadata(time: Time) {
  val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
  val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))//batchdration的倍数
  logDebug("Clearing references to old RDDs: [" +
    oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
  generatedRDDs --= oldRDDs.keys
  if (unpersistData) {
    logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", "))
    oldRDDs.values.foreach { rdd =>
      rdd.unpersist(false)
      // Explicitly remove blocks of BlockRDD
      rdd match {
        case b: BlockRDD[_] =>
          logInfo("Removing blocks of RDD " + b + " of time " + time)
          b.removeBlocks()
        case _ =>
      }
    }
  }
  logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
    (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
  dependencies.foreach(_.clearMetadata(time))
}

除了清理rdd还需要清理元数据。

随着时间推移,不断收到清理的消息,不用担心driver内存问题。

接下来需要删除RDD:

/**
 * Remove the data blocks that this BlockRDD is made from. NOTE: This is an
 * irreversible operation, as the data in the blocks cannot be recovered back
 * once removed. Use it with caution.
 */
private[spark] def removeBlocks() {
  blockIds.foreach { blockId =>
    sparkContext.env.blockManager.master.removeBlock(blockId)
  }
  _isValid = false
}

基于rdd肯定背blockmanager,需要删除block的话需要告诉blockmanager master来做。

接下来需要处理depanedcied foreach需要把依赖的父ds都会被清理掉。

最后一个问题:清理是在什么时候被触发的?

根据源码分析,作业产生的jobGenerator类中有下面的方法:

/**
 * Callback called when a batch has been completely processed.
 */
def onBatchCompletion(time: Time) {
  eventLoop.post(ClearMetadata(time))
}
/** * Callback called when the checkpoint of a batch has been written. */def onCheckpointCompletion(time: Time, clearCheckpointDataLater: Boolean) {
  if (clearCheckpointDataLater) {    eventLoop.post(ClearCheckpointData(time))  }}

每个batchDuration处理完成后都会被回调、发消息,checkpoint完成之后也会调用checkpointdata,需要从作业运行来分析:JobScheduler类下的jobHandler方法:private def processEvent(event: JobSchedulerEvent) {

  try {    event match {
      case JobStarted(job, startTime) => handleJobStart(job, startTime)
      case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
      case ErrorReported(m, e) => handleError(m, e)
    }
  } catch {
    case e: Throwable =>
      reportError("Error in job scheduler", e)
  }
}
private def handleJobCompletion(job: Job, completedTime: Long) {
  val jobSet = jobSets.get(job.time)
  jobSet.handleJobCompletion(job)
  job.setEndTime(completedTime)
  listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
  logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
  if (jobSet.hasCompleted) {
    jobSets.remove(jobSet.time)
    jobGenerator.onBatchCompletion(jobSet.time)
    logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
      jobSet.totalDelay / 1000.0, jobSet.time.toString,
      jobSet.processingDelay / 1000.0
    ))
    listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
  }
  job.result match {
    case Failure(e) =>
      reportError("Error running job " + job, e)
    case _ =>
  }
}

完成后调用onBatchCompletion:

/**
 * Callback called when a batch has been completely processed.
 */
def onBatchCompletion(time: Time) {
  eventLoop.post(ClearMetadata(time))
}

总结:

Spark Streaming在batchDuration处理完成后都会对产生的信息做清理,对输出DStream清理、依赖关系进行清理、清理默认也会清理rdd数据信息、元数据清理。

感谢王家林老师的知识分享

王家林老师名片:

中国Spark第一人

感谢王家林老师的知识分享

新浪微博:http://weibo.com/ilovepains

微信公众号:DT_Spark

博客:http://blog.sina.com.cn/ilovepains

手机:18610086859

QQ:1740415547

邮箱:[email protected]

YY课堂:每天20:00现场授课频道68917580

时间: 2024-11-05 15:51:18

Spark Streaming发行版笔记16:数据清理内幕彻底解密的相关文章

(版本定制)第16课:Spark Streaming源码解读之数据清理内幕彻底解密

本期内容: 1.Spark Streaming元数据清理详解 2.Spark Streaming元数据清理源码解析 一.如何研究Spark Streaming元数据清理 操作DStream的时候会产生元数据,所以要解决RDD的数据清理工作就一定要从DStream入手.因为DStream是RDD的模板,DStream之间有依赖关系. DStream的操作产生了RDD,接收数据也靠DStream,数据的输入,数据的计算,输出整个生命周期都是由DStream构建的.由此,DStream负责RDD的整个

第16课:Spark Streaming源码解读之数据清理内幕彻底解密

本期内容: Spark Streaming数据清理原因和现象 Spark Streaming数据清理代码解析 对Spark Streaming解析了这么多课之后,我们越来越能感知,Spark Streaming只是基于Spark Core的一个应用程序,因此掌握Spark Streaming对于我们怎么编写Spark应用是绝对有好处的. Spark Streaming 不像Spark Core的应用程序,Spark Core的应用的数据是存储在底层文件系统,如HDFS等别的存储系统中,而Spar

Spark Streaming源码解读之数据清理内幕彻底解密

本期内容 : Spark Streaming数据清理原理和现象 Spark Streaming数据清理代码解析 Spark Streaming一直在运行的,在计算的过程中会不断的产生RDD ,如每秒钟产生一个BachDuration同时也会产生RDD, 在这个过程中除了基本的RDD外还有累加器.广播变量等,对应Spark Streaming也有自己的对象.源数据及数据清理机制, 在运行中每个BachDuration会触发了Job ,由于会自动产生对象.数据及源数据等运行完成后肯定要自动进行回收 

Spark 定制版:016~Spark Streaming源码解读之数据清理内幕彻底解密

本讲内容: a. Spark Streaming数据清理原因和现象 b. Spark Streaming数据清理代码解析 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上一讲中,我们之所以用一节课来讲No Receivers,是因为企业级Spark Streaming应用程序开发中在越来越多的采用No Receivers的方式.No Receiver方式有自己的优势,比如更大的控制的自由度.语义一致性等等.所以对No Receivers方

Spark Streaming发行版笔记14:updateStateByKey和mapWithState源码解密

本篇从二个方面进行源码分析: 一.updateStateByKey解密 二.mapWithState解密 通过对Spark研究角度来研究jvm.分布式.图计算.架构设计.软件工程思想,可以学到很多东西. 进行黑名单动态生成和过滤例子中会用到updateStateByKey方法,此方法在DStream类中没有定义,需要在 DStream的object区域通过隐式转换来找,如下面的代码: object DStream {   // `toPairDStreamFunctions` was in Sp

Spark Streaming发行版笔记15:no receivers彻底思考

数据接入Spark Streaming的二种方式:Receiver和no receivers方式 建议企业级采用no receivers方式开发Spark Streaming应用程序,好处: 1.更优秀的自由度控制 2.语义一致性 no receivers更符合数据读取和数据操作,Spark 计算框架底层有数据来源,如果只有direct直接操作数据来源则更天然.操作数据来源封装其一定是rdd级别的. 所以Spark 推出了自定义的rdd即Kafkardd,只是数据来源不同. 进入源码区: 注释基

Spark Streaming发行版笔记17:资源动态分配和动态控制消费速率

本篇从二个方面讲解: 高级特性: 1.Spark Streaming资源动态分配 2.Spark Streaming动态控制消费速率 原理剖析,动态控制消费速率其后面存在一套理论,资源动态分配也有一套理论. 先讲理论,后面讨论. 为什么要动态资源分配和动态控制速率? Spark默认是先分配资源,然后计算:粗粒度的分配方式,资源提前分配好,有计算任务提前分配好资源: 不好的地方:从Spark Streaming角度讲有高峰值和低峰值,如果资源分配从高峰值.低峰值考虑都有大量资源的浪费. 其实当年S

Spark发行版笔记10:Spark Streaming源码解读之流数据不断接收和全生命周期彻底研究和思考

本节的主要内容: 一.数据接受架构和设计模式 二.接受数据的源码解读 Spark Streaming不断持续的接收数据,具有Receiver的Spark 应用程序的考虑. Receiver和Driver在不同进程,Receiver接收数据后要不断给Deriver汇报. 因为Driver负责调度,Receiver接收的数据如果不汇报给Deriver,Deriver调度时不会把接收的数据计算入调度系统中(如:数据ID,Block分片). 思考Spark Streaming接收数据: 不断有循环器接收

spark发行版笔记4Spark Streaming事务处理彻底掌握

Spark Streaming事务处理彻底掌握 感谢DT大数据梦工厂支持提供以下内容,DT大数据梦工厂专注于Spark发行版定制. 内容概括: 1Exactly once 2 输出不重复 1 正如银行转账业务一样,如果你给一个朋友转账一次,银行的系统必须保证此次的转账数据有且只能处理一次,不能出现另外的情况.事务的意思就是保证数据有且只能处理一次. 而Spark Streaming流处理在事务处理方面也是做得非常好的,并且这一部分内容也是非常重要的. 所谓一图胜千言,我们就来画一张图吧. 整个数