spark源码学习-withScope

withScope是最近的发现版中新增加的一个模块,它是用来做DAG可视化的(DAG visualization on SparkUI)

以前的sparkUI中只有stage的执行情况,也就是说我们不可以看到上个RDD到下个RDD的具体信息。于是为了在

sparkUI中能展示更多的信息。所以把所有创建的RDD的方法都包裹起来,同时用RDDOperationScope 记录 RDD 的操作历史和关联,就能达成目标。下面就是一张WordCount的DAG visualization on SparkUI

记录关系的RDDOperationScope源码如下:

/**
 * A general, named code block representing an operation that instantiates RDDs.
 *
 * All RDDs instantiated in the corresponding code block will store a pointer to this object.
 * Examples include, but will not be limited to, existing RDD operations, such as textFile,
 * reduceByKey, and treeAggregate.
 *
 * An operation scope may be nested in other scopes. For instance, a SQL query may enclose
 * scopes associated with the public RDD APIs it uses under the hood.
 *
 * There is no particular relationship between an operation scope and a stage or a job.
 * A scope may live inside one stage (e.g. map) or span across multiple jobs (e.g. take).
 */
@JsonInclude(Include.NON_NULL)
@JsonPropertyOrder(Array("id", "name", "parent"))
private[spark] class RDDOperationScope(
    val name: String,
    val parent: Option[RDDOperationScope] = None,
    val id: String = RDDOperationScope.nextScopeId().toString) {

  def toJson: String = {
    RDDOperationScope.jsonMapper.writeValueAsString(this)
  }

  /**
   * Return a list of scopes that this scope is a part of, including this scope itself.
   * The result is ordered from the outermost scope (eldest ancestor) to this scope.
   */
  @JsonIgnore
  def getAllScopes: Seq[RDDOperationScope] = {
    parent.map(_.getAllScopes).getOrElse(Seq.empty) ++ Seq(this)
  }

  override def equals(other: Any): Boolean = {
    other match {
      case s: RDDOperationScope =>
        id == s.id && name == s.name && parent == s.parent
      case _ => false
    }
  }

  override def hashCode(): Int = Objects.hashCode(id, name, parent)

  override def toString: String = toJson
}

/**
 * A collection of utility methods to construct a hierarchical representation of RDD scopes.
 * An RDD scope tracks the series of operations that created a given RDD.
 */
private[spark] object RDDOperationScope extends Logging {
  private val jsonMapper = new ObjectMapper().registerModule(DefaultScalaModule)
  private val scopeCounter = new AtomicInteger(0)

  def fromJson(s: String): RDDOperationScope = {
    jsonMapper.readValue(s, classOf[RDDOperationScope])
  }

  /** Return a globally unique operation scope ID. */
  def nextScopeId(): Int = scopeCounter.getAndIncrement

  /**
   * Execute the given body such that all RDDs created in this body will have the same scope.
   * The name of the scope will be the first method name in the stack trace that is not the
   * same as this method‘s.
   *
   * Note: Return statements are NOT allowed in body.
   */
  private[spark] def withScope[T](
      sc: SparkContext,
      allowNesting: Boolean = false)(body: => T): T = {
    val ourMethodName = "withScope"
    val callerMethodName = Thread.currentThread.getStackTrace()
      .dropWhile(_.getMethodName != ourMethodName)
      .find(_.getMethodName != ourMethodName)
      .map(_.getMethodName)
      .getOrElse {
        // Log a warning just in case, but this should almost certainly never happen
        logWarning("No valid method name for this RDD operation scope!")
        "N/A"
      }
    withScope[T](sc, callerMethodName, allowNesting, ignoreParent = false)(body)
  }

  /**
   * Execute the given body such that all RDDs created in this body will have the same scope.
   *
   * If nesting is allowed, any subsequent calls to this method in the given body will instantiate
   * child scopes that are nested within our scope. Otherwise, these calls will take no effect.
   *
   * Additionally, the caller of this method may optionally ignore the configurations and scopes
   * set by the higher level caller. In this case, this method will ignore the parent caller‘s
   * intention to disallow nesting, and the new scope instantiated will not have a parent. This
   * is useful for scoping physical operations in Spark SQL, for instance.
   *
   * Note: Return statements are NOT allowed in body.
   */
  private[spark] def withScope[T](
      sc: SparkContext,
      name: String,
      allowNesting: Boolean,
      ignoreParent: Boolean)(body: => T): T = {
    // Save the old scope to restore it later
    val scopeKey = SparkContext.RDD_SCOPE_KEY
    val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
    val oldScopeJson = sc.getLocalProperty(scopeKey)
    val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
    val oldNoOverride = sc.getLocalProperty(noOverrideKey)
    try {
      if (ignoreParent) {
        // Ignore all parent settings and scopes and start afresh with our own root scope
        sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
      } else if (sc.getLocalProperty(noOverrideKey) == null) {
        // Otherwise, set the scope only if the higher level caller allows us to do so
        sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
      }
      // Optionally disallow the child body to override our scope
      if (!allowNesting) {
        sc.setLocalProperty(noOverrideKey, "true")
        log.info("this is textFile1")
        log.info("this is textFile2" )
        //println("this is textFile3")
        log.error("this is textFile4err")
        log.warn("this is textFile5WARN")
        log.debug("this is textFile6debug")
      }
      body
    } finally {
      // Remember to restore any state that was modified before exiting
      sc.setLocalProperty(scopeKey, oldScopeJson)
      sc.setLocalProperty(noOverrideKey, oldNoOverride)
    }
  }
}

原文地址:https://www.cnblogs.com/moonlightml/p/9055184.html

时间: 2024-10-04 09:11:39

spark源码学习-withScope的相关文章

Spark源码学习和总结1

一. Spark源码编译 解析: wget http://archive.apache.org/dist/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz gtar -zxvf spark-1.6.0-bin-hadoop2.6.tgz cd spark-1.6.0-bin-hadoop2.6 ./sbt/sbt gen-idea 说明:漫长的等待后,上述命令生成SBT工程,我们就可以使用IDEA以SBT Project的方式打开. 二. RDD实现

Spark源码学习1.1——DAGScheduler.scala

本文以Spark1.1.0版本为基础. 经过前一段时间的学习,基本上能够对Spark的工作流程有一个了解,但是具体的细节还是需要阅读源码,而且后续的科研过程中也肯定要修改源码的,所以最近开始Spark的源码的学习.首先以重要文件为基础分别分析,然后再整体的分析. (一)DAGScheduler.scala文件的主要功能 DAGScheduler是划分Job为stage的调度,它是在作业所需要的数据已经被分为RDD之后执行的.DAGScheduler将Job划分为DAG图,以stage为图的结点,

Spark源码学习2

转自:http://www.cnblogs.com/hseagle/p/3673123.html 在源码阅读时,需要重点把握以下两大主线. 静态view 即 RDD, transformation and action 动态view 即 life of a job, 每一个job又分为多个stage,每一个stage中可以包含多个rdd及其transformation,这些stage又是如何映射成为task被distributed到cluster中 一.概要 本文以wordCount为例,详细说

Spark源码学习1.4——MapOutputTracker.scala

相关类:MapOutputTrackerMessage,GetMapOutputStatuses extends MapPutputTrackerMessage,StopMapOutputTracker extends MapOutputTrackerMessage,MapOutputTrackerMasterActor,MapOutputTrackerMaster. 首先重写MapOutputTrackerMasterActor的receiveWithLogging:判断MapOutputTr

Spark源码学习1.5——BlockManager.scala

一.BlockResult类 该类用来表示返回的匹配的block及其相关的参数.共有三个参数: data:Iterator [Any]. readMethod: DataReadMethod.Value. bytes: Long. 实例化InputMetrics类. 二.BlockManager类 关系的参数有executorId,blockManagerMaster,mapOutPutTracker等.关系的类有ShuffleBlockManager类,DiskBlockManager类,Co

Spark源码学习和总结2

2. SchedulerBackend SchedulerBackend是一个trait,主要的功能是向当前等待分配计算资源的Task分配计算资源Executor,并且在分配的Executor上启动Task,完成计算的调度过程.上述任务调度是通过方法def reviveOffers(): Unit实现的.需要说明的是CoarseGrainedSchedulerBackend是SchedulerBackend的一个具体实现,而YARN.Standalone和Mesos都是基于SchedulerBa

Spark源码学习1.6——Executor.scala

Executor.scala 一.Executor类 首先判断本地性,获取slaves的host name(不是IP或者host: port),匹配运行环境为集群或者本地.如果不是本地执行,需要启动一个handler来监控所有的executor进程,避免阻塞.然后,初始化Spark执行环境.向SparkEnv注册executor资源,即registerSource方法.第三步,装载类,序列化类到内存中.第四,启动worker的线程池.第五,收集所有的task任务.接下就可以分配资源给task了,

Spark源码学习1.3——TaskSetManager.scala

TaskSetManager.scala TaskSet是指一系列被提交的task,一般是代表特定的stage中丢失的partition.TaskSetManager通过一个TaskSchedulerImpl实体来对TaskSet进行管理. 该方法定义了以下参数: EXECUTOR_TASK_BLACKLIST_TIMEOUT:executor加入黑名单的时间.如果executor意外终止导致任务执行失败,那么会暂时将这个executor加入黑名单,不再分配任务给它. SPECULATION_Q

Spark 消息队列机制源码学习

源码学习 spark源码注释中有下面一句话: Asynchronously passes SparkListenerEvents to registered SparkListeners 即所有spark消息SparkListenerEvents 被异步的发送给已经注册过的SparkListeners. 在SparkContext中, 首先会创建LiveListenerBus实例,这个类主要功能如下: 保存有消息队列,负责消息的缓存 保存有注册过的listener,负责消息的分发 该类的继承层次