【Spark】Stage生成和Stage源代码浅析

引入

上一篇文章《DAGScheduler源代码浅析》中,介绍了handleJobSubmitted函数,它作为生成finalStage的重要函数存在。这一篇文章中,我将就DAGScheduler生成Stage过程继续学习,同一时候介绍Stage的相关源代码。

Stage生成

Stage的调度是由DAGScheduler完毕的。由RDD的有向无环图DAG切分出了Stage的有向无环图DAG。Stage的DAG通过最后运行的Stage为根进行广度优先遍历,遍历到最開始运行的Stage运行。假设提交的Stage仍有未完毕的父母Stage,则Stage须要等待其父Stage运行完才干运行。同一时候DAGScheduler中还维持了几个重要的Key-Value集合结构,用来记录Stage的状态,这样能够避免过早运行和反复提交Stage。waitingStages中记录仍有未运行的父母Stage。防止过早运行。runningStages中保存正在运行的Stage,防止反复运行。failedStages中保存运行失败的Stage,须要又一次运行。这里的设计是出于容错的考虑。

  // Stages we need to run whose parents aren‘t done
  private[scheduler] val waitingStages = new HashSet[Stage]

  // Stages we are running right now
  private[scheduler] val runningStages = new HashSet[Stage]

  // Stages that must be resubmitted due to fetch failures
  private[scheduler] val failedStages = new HashSet[Stage]

依赖关系

RDD的窄依赖是指父RDD的全部输出都会被指定的子RDD消费。即输出路径是固定的;宽依赖是指父RDD的输出会由不同的子RDD消费,即输出路径不固定。

调度器会计算RDD之间的依赖关系,将拥有持续窄依赖的RDD归并到同一个Stage中。而宽依赖则作为划分不同Stage的推断标准。

导致窄依赖的Transformation操作:map、flatMap、filter、sample。导致宽依赖的Transformation操作:sortByKey、reduceByKey、groupByKey、cogroupByKey、join、cartensian。

Stage分为两种:

ShuffleMapStage, in which case its tasks’ results are input for another stage

事实上就是,非终于stage, 后面还有其它的stage, 所以它的输出一定是须要shuffle并作为兴许的输入。

这样的Stage是以Shuffle为输出边界,其输入边界能够是从外部获取数据。也能够是还有一个ShuffleMapStage的输出

其输出能够是还有一个Stage的開始。

ShuffleMapStage的最后Task就是ShuffleMapTask。

在一个Job里可能有该类型的Stage。也能够能没有该类型Stage。

ResultStage, in which case its tasks directly compute the action that initiated a job (e.g. count(), save(), etc)

终于的stage, 没有输出, 而是直接产生结果或存储。

这样的Stage是直接输出结果。其输入边界能够是从外部获取数据。也能够是还有一个ShuffleMapStage的输出。

ResultStage的最后Task就是ResultTask,在一个Job里必然有该类型Stage。

一个Job含有一个或多个Stage,但至少含有一个ResultStage。

Stage的划分

RDD转换本身存在ShuffleDependency,像ShuffleRDD、CoGroupdRDD、SubtractedRDD会返回ShuffleDependency。

假设RDD中存在ShuffleDependency,就会创建一个新的Stage。

Stage划分完毕就明白了下面内容:

  1. 产生的Stage须要从多少个Partition中读取数据
  2. 产生的Stage会生成多少Partition
  3. 产生的Stage是否属于ShuffleMap类型

确认Partition以决定须要产生多少不同的Task,ShuffleMap类型推断来决定生成的Task类型。Spark中有两种Task。各自是ShuffleMapTask和ResultTask。

Stage类

stage的RDD參数仅仅有一个RDD, final RDD, 而不是一系列的RDD。

由于在一个stage中的全部RDD都是map, partition不会有不论什么改变, 仅仅是在data依次运行不同的map function所以对于TaskScheduler而言, 一个RDD的状况就能够代表这个stage。

Stage參数说明:

val id: Int //Stage的序号数值越大,优先级越高

val rdd: RDD[_], //归属于本Stage的最后一个rdd

val numTasks: Int, //创建的Task数目,等于父RDD的输出Partition数目

val shuffleDep: Option[ShuffleDependency[, , _]], //是否存在SuffleDependency。宽依赖

val parents: List[Stage], //父Stage列表

val jobId: Int //作业ID

private[spark] class Stage(
    val id: Int,
    val rdd: RDD[_],
    val numTasks: Int,
    val shuffleDep: Option[ShuffleDependency[_, _, _]],  // Output shuffle if stage is a map stage
    val parents: List[Stage],
    val jobId: Int,
    val callSite: CallSite)
  extends Logging {

  val isShuffleMap = shuffleDep.isDefined
  val numPartitions = rdd.partitions.size
  val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
  var numAvailableOutputs = 0

  /** Set of jobs that this stage belongs to. */
  val jobIds = new HashSet[Int]

  /** For stages that are the final (consists of only ResultTasks), link to the ActiveJob. */
  var resultOfJob: Option[ActiveJob] = None
  var pendingTasks = new HashSet[Task[_]]

  private var nextAttemptId = 0

  val name = callSite.shortForm
  val details = callSite.longForm

  /** Pointer to the latest [StageInfo] object, set by DAGScheduler. */
  var latestInfo: StageInfo = StageInfo.fromStage(this)

  def isAvailable: Boolean = {
    if (!isShuffleMap) {
      true
    } else {
      numAvailableOutputs == numPartitions
    }
  }

  def addOutputLoc(partition: Int, status: MapStatus) {
    val prevList = outputLocs(partition)
    outputLocs(partition) = status :: prevList
    if (prevList == Nil) {
      numAvailableOutputs += 1
    }
  }

  def removeOutputLoc(partition: Int, bmAddress: BlockManagerId) {
    val prevList = outputLocs(partition)
    val newList = prevList.filterNot(_.location == bmAddress)
    outputLocs(partition) = newList
    if (prevList != Nil && newList == Nil) {
      numAvailableOutputs -= 1
    }
  }

  /**
   * Removes all shuffle outputs associated with this executor. Note that this will also remove
   * outputs which are served by an external shuffle server (if one exists), as they are still
   * registered with this execId.
   */
  def removeOutputsOnExecutor(execId: String) {
    var becameUnavailable = false
    for (partition <- 0 until numPartitions) {
      val prevList = outputLocs(partition)
      val newList = prevList.filterNot(_.location.executorId == execId)
      outputLocs(partition) = newList
      if (prevList != Nil && newList == Nil) {
        becameUnavailable = true
        numAvailableOutputs -= 1
      }
    }
    if (becameUnavailable) {
      logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format(
        this, execId, numAvailableOutputs, numPartitions, isAvailable))
    }
  }

  /** Return a new attempt id, starting with 0. */
  def newAttemptId(): Int = {
    val id = nextAttemptId
    nextAttemptId += 1
    id
  }

  def attemptId: Int = nextAttemptId

  override def toString = "Stage " + id

  override def hashCode(): Int = id

  override def equals(other: Any): Boolean = other match {
    case stage: Stage => stage != null && stage.id == id
    case _ => false
  }
}

处理Job。切割Job为Stage,封装Stage成TaskSet。终于提交给TaskScheduler的调用链

dagScheduler.handleJobSubmitted–>dagScheduler.submitStage–>dagScheduler.submitMissingTasks–>taskScheduler.submitTasks

handleJobSubmitted函数

函数handleJobSubmitted和submitStage主要负责依赖性分析,对其处理逻辑做进一步的分析。

handleJobSubmitted最基本的工作是生成Stage。并依据finalStage来产生ActiveJob。

  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      allowLocal: Boolean,
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: Stage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
    } catch {
      //错误处理。告诉监听器作业失败,返回....
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    if (finalStage != null) {
      val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
      clearCacheLocs()
      logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
        job.jobId, callSite.shortForm, partitions.length, allowLocal))
      logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
      logInfo("Parents of final stage: " + finalStage.parents)
      logInfo("Missing parents: " + getMissingParentStages(finalStage))
      val shouldRunLocally =
        localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
      val jobSubmissionTime = clock.getTimeMillis()
      if (shouldRunLocally) {
        // 非常短、没有父stage的本地操作,比方 first() or take() 的操作本地运行
        // Compute very short actions like first() or take() with no parent stages locally.
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
        runLocally(job)
      } else {
        // collect等操作走的是这个过程,更新相关的关系映射,用监听器监听,然后提交作业
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        finalStage.resultOfJob = Some(job)
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        // 提交stage
        submitStage(finalStage)
      }
    }
    // 提交stage
    submitWaitingStages()
  }

newStage函数

  /**
   * Create a Stage -- either directly for use as a result stage, or as part of the (re)-creation
   * of a shuffle map stage in newOrUsedStage.  The stage will be associated with the provided
   * jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage
   * directly.
   */
  private def newStage(
      rdd: RDD[_],
      numTasks: Int,
      shuffleDep: Option[ShuffleDependency[_, _, _]],
      jobId: Int,
      callSite: CallSite)
    : Stage =
  {
    val parentStages = getParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

当中,Stage的初始化參数:在创建一个Stage之前,须要知道该Stage须要从多少个Partition读入数据。这个数值直接影响要创建多少个Task。

也就是说。创建Stage时,已经清楚该Stage须要从多少不同的Partition读入数据,并写出到多少个不同的Partition中,输入和输出的个数均已明白。

getParentStages函数:

通过不停的遍历它之前的rdd,假设碰到有依赖是ShuffleDependency类型的,就通过getShuffleMapStage方法计算出来它的Stage来。

  /**
   * Get or create the list of parent stages for a given RDD. The stages will be assigned the
   * provided jobId if they haven‘t already been created with a lower jobId.
   */
  private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
    val parents = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]]
    def visit(r: RDD[_]) {
      if (!visited(r)) {
        visited += r
        // Kind of ugly: need to register RDDs with the cache here since
        // we can‘t do it in its constructor because # of partitions is unknown
        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_, _, _] =>
              parents += getShuffleMapStage(shufDep, jobId)
            case _ =>
              waitingForVisit.push(dep.rdd)
          }
        }
      }
    }
    waitingForVisit.push(rdd)
    while (!waitingForVisit.isEmpty) {
      visit(waitingForVisit.pop())
    }
    parents.toList
  }

ActiveJob类

用户所提交的job在得到DAGScheduler的调度后,会被包装成ActiveJob,同一时候会启动JobWaiter堵塞监听job的完毕状况。

同一时候依据job中RDD的dependency和dependency属性(NarrowDependency。ShufflerDependecy),DAGScheduler会依据依赖关系的先后产生出不同的stage DAG(result stage, shuffle map stage)。

在每一个stage内部,依据stage产生出对应的task。包含ResultTask或是ShuffleMapTask,这些task会依据RDD中partition的数量和分布,产生出一组对应的task。并将其包装为TaskSet提交到TaskScheduler上去。

/**
 * Tracks information about an active job in the DAGScheduler.
 */
private[spark] class ActiveJob(
    val jobId: Int,
    val finalStage: Stage,
    val func: (TaskContext, Iterator[_]) => _,
    val partitions: Array[Int],
    val callSite: CallSite,
    val listener: JobListener,
    val properties: Properties) {

  val numPartitions = partitions.length
  val finished = Array.fill[Boolean](numPartitions)(false)
  var numFinished = 0
}

submitStage函数

submitStage函数中会依据依赖关系划分stage,通过递归调用从finalStage一直往前找它的父stage。直到stage没有父stage时就调用submitMissingTasks方法提交改stage。这样就完毕了将job划分为一个或者多个stage。

submitStage处理流程:

  • 所依赖的Stage是否都已经完毕,假设没有完毕则先运行所依赖的Stage
  • 假设全部的依赖已经完毕,则提交自身所处的Stage
  • 最后会在submitMissingTasks函数中将stage封装成TaskSet通过taskScheduler.submitTasks函数提交给TaskScheduler处理。
  /** Submits stage, but first recursively submits any missing parents. */
  private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        val missing = getMissingParentStages(stage).sortBy(_.id) // 依据final stage发现是否有parent stage
        logDebug("missing: " + missing)
        if (missing == Nil) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          submitMissingTasks(stage, jobId.get) // 假设没有parent stage须要运行, 则直接submit当前stage的task
        } else {
          for (parent <- missing) {
            submitStage(parent) // 提交父stage的task。这里是个递归,直到没有父stage才在上面的语句中提交task
          }
          waitingStages += stage // 临时不能提交的stage,先加入到等待队列
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id)
    }
  }

这个提交stage的过程是一个递归的过程,它是先要把父stage先提交,然后把自己加入到等待队列中,直到没有父stage之后,就提交该stage中的任务。等待队列在最后的submitWaitingStages方法中提交。

getMissingParentStages

getMissingParentStages通过图的遍历,来找出所依赖的全部父Stage。

  private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]]
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {
        visited += rdd
        if (getCacheLocs(rdd).contains(Nil)) {
          for (dep <- rdd.dependencies) {
            dep match {
              case shufDep: ShuffleDependency[_, _, _] =>  // 假设发现ShuffleDependency, 说明遇到新的stage
                val mapStage = getShuffleMapStage(shufDep, stage.jobId)
                // check shuffleToMapStage, 假设该stage已经被创建则直接返回, 否则newStage
                if (!mapStage.isAvailable) {
                  missing += mapStage
                }
              case narrowDep: NarrowDependency[_] => // 对于NarrowDependency, 说明仍然在这个stage中
                waitingForVisit.push(narrowDep.rdd)
            }
          }
        }
      }
    }
    waitingForVisit.push(stage.rdd)
    while (!waitingForVisit.isEmpty) {
      visit(waitingForVisit.pop())
    }
    missing.toList
  }

submitMissingTasks

可见不管是哪种stage,都是对于每一个stage中的每一个partitions创建task。并终于封装成TaskSet,将该stage提交给taskscheduler。

  /** Called when stage‘s parents are available and we can now do its task. */
  private def submitMissingTasks(stage: Stage, jobId: Int) {
    logDebug("submitMissingTasks(" + stage + ")")
    // Get our pending tasks and remember them in our pendingTasks entry
    stage.pendingTasks.clear()

    // First figure out the indexes of partition ids to compute.
    val partitionsToCompute: Seq[Int] = {
      if (stage.isShuffleMap) {
        (0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)
      } else {
        val job = stage.resultOfJob.get
        (0 until job.numPartitions).filter(id => !job.finished(id))
      }
    }

    val properties = if (jobIdToActiveJob.contains(jobId)) {
      jobIdToActiveJob(stage.jobId).properties
    } else {
      // this stage will be assigned to "default" pool
      null
    }

    runningStages += stage
    // SparkListenerStageSubmitted should be posted before testing whether tasks are
    // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
    // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
    // event.
    stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))
    outputCommitCoordinator.stageStart(stage.id)
    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

    // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
    // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
    // the serialized copy of the RDD and for each task we will deserialize it, which means each
    // task gets a different copy of the RDD. This provides stronger isolation between tasks that
    // might modify state of objects referenced in their closures. This is necessary in Hadoop
    // where the JobConf/Configuration object is not thread-safe.
    var taskBinary: Broadcast[Array[Byte]] = null
    try {
      // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
      // For ResultTask, serialize and broadcast (rdd, func).
      val taskBinaryBytes: Array[Byte] =
        if (stage.isShuffleMap) {
          closureSerializer.serialize((stage.rdd, stage.shuffleDep.get) : AnyRef).array()
        } else {
          closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : AnyRef).array()
        }
      taskBinary = sc.broadcast(taskBinaryBytes)
    } catch {
      // In the case of a failure during serialization, abort the stage.
      case e: NotSerializableException =>
        abortStage(stage, "Task not serializable: " + e.toString)
        runningStages -= stage
        return
      case NonFatal(e) =>
        abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")
        runningStages -= stage
        return
    }

    val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {
      partitionsToCompute.map { id =>
        val locs = getPreferredLocs(stage.rdd, id)
        val part = stage.rdd.partitions(id)
        new ShuffleMapTask(stage.id, taskBinary, part, locs)
      }
    } else {
      val job = stage.resultOfJob.get
      partitionsToCompute.map { id =>
        val p: Int = job.partitions(id)
        val part = stage.rdd.partitions(p)
        val locs = getPreferredLocs(stage.rdd, p)
        new ResultTask(stage.id, taskBinary, part, locs, id)
      }
    }

    if (tasks.size > 0) {
      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      stage.pendingTasks ++= tasks
      logDebug("New pending tasks: " + stage.pendingTasks)
      taskScheduler.submitTasks(
        new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    } else {
      // Because we posted SparkListenerStageSubmitted earlier, we should mark
      // the stage as completed here in case there are no tasks to run
      markStageAsFinished(stage, None)
      logDebug("Stage " + stage + " is actually done; %b %d %d".format(
        stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
    }
  }

參考资料

fxjwind–Spark源代码分析–Stage

Spark源代码系列(三)作业运行过程

Spark技术内幕:Stage划分及提交源代码分析

转载请注明作者Jason Ding及其出处

GitCafe博客主页(http://jasonding1354.gitcafe.io/)

Github博客主页(http://jasonding1354.github.io/)

CSDN博客(http://blog.csdn.net/jasonding1354)

简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)

Google搜索jasonding1354进入我的博客主页

时间: 2024-08-21 02:05:50

【Spark】Stage生成和Stage源代码浅析的相关文章

【Spark】Stage生成和Stage源码浅析

引入 上一篇文章<DAGScheduler源码浅析>中,介绍了handleJobSubmitted函数,它作为生成finalStage的重要函数存在,这一篇文章中,我将就DAGScheduler生成Stage过程继续学习,同时介绍Stage的相关源码. Stage生成 Stage的调度是由DAGScheduler完成的.由RDD的有向无环图DAG切分出了Stage的有向无环图DAG.Stage的DAG通过最后执行的Stage为根进行广度优先遍历,遍历到最开始执行的Stage执行,如果提交的St

【Spark】DAGScheduler源代码浅析

DAGScheduler DAGScheduler的主要任务是基于Stage构建DAG,决定每个任务的最佳位置 记录哪个RDD或者Stage输出被物化 面向stage的调度层.为job生成以stage组成的DAG.提交TaskSet给TaskScheduler运行 又一次提交shuffle输出丢失的stage 每个Stage内.都是独立的tasks,他们共同运行同一个computefunction,享有同样的shuffledependencies.DAG在切分stage的时候是按照出现shuff

Spark技术内幕:Stage划分及提交源码分析

当触发一个RDD的action后,以count为例,调用关系如下: org.apache.spark.rdd.RDD#count org.apache.spark.SparkContext#runJob org.apache.spark.scheduler.DAGScheduler#runJob org.apache.spark.scheduler.DAGScheduler#submitJob org.apache.spark.scheduler.DAGSchedulerEventProcess

第三十四课 Spark中任务处理的Stage划分和Task最佳位置算法

本节课的内容 1.     Job Stage的划分算法 2.     Task最佳计算位置算法 一.Stage划分算法 由于Spark的算子构建一般都是链式的,这就涉及了要如何进行这些链式计算,Spark的策略是对这些算子,鲜花分Stage,然后在进行计算. 由于数据是分布式的存储在各个节点上的,所以为了减少网络传输的开销,就必须最大化的追求数据本地性,所谓的数据本地性是指,在计算时,数据本身已经在内存中或者利用已有缓存无需计算的方式获取数据. 1.      Stage划分算法思想 (1)一

Stage生成Task

一.stage 的处理过程 1.从下图可以看出stage是通过递归的形式,从开始依次提交每个stage,直到ResultStage. 2.生成task的主要代码 3.提交到taskScheduler 4. 二.每个stage生成的task的个数 从以上的几幅图不难发现task的数量其实只与rdd的partition的数量是一致的,所以每个stage的Task的个数其实早就已经确定. 而通过追踪 sc.textFile 这个方法得到partition是由core数和最小值2所确定的(如下图).当然

Gradle 庖丁解牛(构建生命周期核心托付对象创建源代码浅析)

[工匠若水 http://blog.csdn.net/yanbober 未经同意严禁转载,请尊重作者劳动成果.私信联系我] 1 背景 上一篇<Gradle 庖丁解牛(构建源头源代码浅析)>我们分析了 Gradle 框架自身初始化(非构建生命周期初始化)的核心流程,这一篇我们续着前面的分析继续(假设没看过前一篇的建议先去看前一篇,由于这一系列存在非常高的关联性).上一篇说到当我们运行 gradle taskName 命令后经过一系列艰难的框架初始化终于走到了 DefaultGradleLaunc

php进行图片裁剪及生成缩略图程序源代码

我们经常会遇到对图像进行裁剪动作,下面这段代码就是裁剪的源码 处理方法是: 1.当原图的宽或高任一比规定的尺寸小,只进行等比缩略处理, 2.当原图的宽与高都比规定尺寸大,先进行等比缩略处理,然后算出居中位置进行裁剪 /* * $o_photo 原图路径 * $d_photo 处理后图片路径 * $width 定义宽 * $height 定义高 * 调用方法 cutphoto("test.jpg","temp.jpg",256,146); */ function cu

Spark下生成测试数据,并在Spark环境下使用BulkProcessor将测试数据入库到ES6.4.2

Spark下生成2000w测试数据(每条记录150列) 使用spark生成大量数据过程中遇到问题,如果sc.parallelize(fukeData, 64);的记录数特别大比如500w,1000w时,会特别慢,而且会抛出内存溢出over head错误.解决方案,一次生成的数据量不高于100w,多次调用,这样下来一共生成2000w耗时十几分钟. 如果环境允许你可以在本地生成测试数据,然后上传到hdfs供spark测试. import java.io.BufferedWriter; import

Spark DAGSheduler生成Stage过程分析实验

Spark Action会触发SparkContext类的runJob,而runJob会继续调用DAGSchduler类的runJob DAGSchduler类的runJob方法调用submitJob方法,并根据返回的completionFulture的value判断Job是否完成. onReceive用于DAGScheduler不断循环的处理事件,其中submitJob()会产生JobSubmitted事件,进而触发handleJobSubmitted方法. 正常情况下会根据finalStag