第三十六课 Spark之TaskScheduler Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详

</pre></h2><div><p>本节课内容:</p><p>1.     TaskSchedulerBackend与SchedulerBackend</p><p>2.     FIFO与FAIR两种调度模式</p><p>3.     Task数据本地性资源的分配</p></div><h3>一、Scheduler运行过程(Spark-shell角度)</h3><h4>1.启动Spark-shell</h4><div><img src="http://img.blog.csdn.net/20160515013848589?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="" /></div><div><img src="http://img.blog.csdn.net/20160515013852433?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="" /></div><div><img src="http://img.blog.csdn.net/20160515013856370?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="" /></div><div></div><div><p>当我们spark-shell本身的时候命令终端返回来的主要是ClientEndpoint和SparkDeploySchedulerBakcend。这是因为此时还没有任何应用程序Job的触发,这是启动Application本身而已,所以主要就是实例化SparkContext并注册当前的应用程序给Master,并从集群中获得ExecutorBackend的计算资源;(这就是为什么启动时日志没有DriverEndpoint信息的原因,因为此时应用程序内部还未发生具体计算资源的调度)</p><h4>2.TaskScheduler运行时机</h4><p>       DAGScheduler划分好Stage后,会通过TaskSchedulerImpl中的TaskSetManager来管理当前要运行的Stage中的所有的任务TaskSet,TaskSetManager会根据locality aware来为Task奉陪计算资源,监控Task的执行状态。(例如重试、慢任务以及进行推测式执行等)</p><p> </p><h3>二、TaskScheduler与SchedulerBackend</h3><h4>      1.底层调度的总流程</h4><p><strong>(1)TaskScheduler提交Tasks</strong></p><p>       TaskScheduler#submitTasks方法主要作用是将TaskSet加入到TaskSetManager中进行管理。</p><pre name="code" class="plain">override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
	//创建TaskSetManager,并设置最大失败重试次数
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
	//记录Stage中提交的TaskSetManager
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
	//如果重复提交同一个TaskSet或者Tasks不在当前的TaskSet中则会报错
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
	//添加TaskManager到调度队列中,schedulableBuilder是应用程序级别的调度器
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)  //1
	//为慢任务启动备份任务
      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
	//调用SparkDeploySchedulerBackend分配具体计算资源
    backend.reviveOffers()     //2
  }

&           Task执行提交失败后会重试,Task的默认重试次数为4次。

&  def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures",4))  (TaskSchedulerImpl)

(2)添加TaskSetManager

SchedulerBuilder(根据SchedulerMode的不同,FIFO与FAIR实现不同)#addTaskSetManger方法会确定TaskSetManager的调度顺序,然后按照TaskSetManager的locality aware来确定每个Task具体运行在那个ExecutorBackend中。

&  默认的调度顺序为FIFO;Spark应用程序目前支持两种调度模式FIFO和FAIR可以通过Spark-env.sh中的Spark.Scheduler.mode来进行具体的设置。

   // default scheduler is FIFO
    private valschedulingModeConf = conf.get("spark.scheduler.mode","FIFO")  (TaskSchedulerImpl)
//在1处调用addTaskSetManager
def addTaskSetManager(manager: Schedulable, properties: Properties)
//FIFO模式下的,addTaskSetManager
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
    rootPool.addSchedulable(manager)
  }
//直接将可调度对象TaskSetManager加入SchedulerQueue的尾端。
override def addSchedulable(schedulable: Schedulable) {
    require(schedulable != null)
    schedulableQueue.add(schedulable)
    schedulableNameToSchedulable.put(schedulable.name, schedulable)
    schedulable.parent = this
  }
//FAIR模式下,addTaskSetManager
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
    var poolName = DEFAULT_POOL_NAME
	//获得根节点的默认调度池的引用
    var parentPool = rootPool.getSchedulableByName(poolName)
    if (properties != null) {
	//根据优先级获得父可调度对象pool的引用。
      poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
      parentPool = rootPool.getSchedulableByName(poolName)
      if (parentPool == null) {
        //如果父可调度对象不存在,则根据应用程序配置信息创建之
        parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
          DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)

//做为根节点default pool的孩子加入default pool中
        rootPool.addSchedulable(parentPool)
        logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
          poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
      }
    }

	//与FIFO类似,在每个父pool中采用队列形式,将TaskSetManager加入队尾。
    parentPool.addSchedulable(manager)
    logInfo("Added task set " + manager.name + " tasks to pool " + poolName)
  }

(1)   CoarseGrainedSchedulerBackend分配资源

CoarseGrainedSchedulerBackend#reviveOffers方法,给DriverEndpoint发送ReviveOffers消息;ReviveOffers本身是一个空的case object对象,只是起到触发底层调度的左右,在有Task提交或者资源变动时,就会发送ReviveOffers消息。每提交一个Stage都要申请一次资源,发送一个ReviveOffers消息。

//2 处被调用的方法
 override def reviveOffers() {
    driverEndpoint.send(ReviveOffers)
  }

&  ReviveOffers相当于触发器,在资源变化时触发。

&  TaskScheduler要负责为Task分配计算资源(分配的是程序启动时向Master申请的集群资源),根据计算的本地性原则确定Task具体要运行在哪个ExecutorBackend中。

(4)接收ReviveOffers消息与分配资源

在DriverEndpoint接收ReviveOffers消息并路由到MakeOffers方法中;在MakeOffers方法中首先准备好所有可用于计算的workOffers(代表了应用程序从Master获得的Executor中所有可用的Core信息)。

//CoarseGrainedSchedulerBackend.DriverEndpoint#receive
override def receive: PartialFunction[Any, Unit] = {
	//省略部分代码
      case ReviveOffers =>
        makeOffers()
}
   //在逻辑上,让所有Executor都成为计算资源的提供者
    private def makeOffers() {
      // 过滤掉挂掉的Executor
      val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
      //生成有所有aliver的Executor元信息组成的序列
	val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq
	//生成资源分配的二位数组,并以此为基础进行Tasks加载、执行
      launchTask(scheduler.resourceOffers(workOffers))      //3  4
    }

(a)resourceOffers方法

调用TaskSchedulerImpl#resourceOffers方法,为每一个Task具体分配计算资源,其输入是ExecutorBackend机器上可用的cores,输出是TaskDescription二维数组,在其中定义了每个Task具体运行在在哪个ExecutorBackend。

//3处调用resourceOffers,该方法输入为一个Executor的列表,输出为一个
//TaskDescription的二位数组
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // 每个slave节点为alive并且记录其hostname
    // 如果有新的slave节点加入,对其进行追踪。
    var newExecAvail = false
    for (o <- offers) {
      executorIdToHost(o.executorId) = o.host
      executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
      if (!executorsByHost.contains(o.host)) {
		//如果有新的Executors即新的slave节点加入
        executorsByHost(o.host) = new HashSet[String]()
	//通知DAGScheduler添加Executors
        executorAdded(o.executorId, o.host)
		//标记有新的Executor可用
        newExecAvail = true
      }
	//更新可用的节点信息
      for (rack <- getRackForHost(o.host)) {
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }

    //利用随机打乱offers的方式(round-robin manner)分配计算资源Executor,避免了Task
	//集中分配到某些机器上。
    val shuffledOffers = Random.shuffle(offers)
    //为每一个worker创建一个tasks分配的列表,参见下图
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
	//获取按照调度策略排好序的TaskSetManager
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) {
	//如果有新的slave中Executor可用,需要重新计算该TaskSetManager的就近原则
        taskSet.executorAdded()
      }
    }

    //为从rootPool里获取的TaskSetManager列表分配资源。分配的原则是就近原则,优先分配顺
    //序为PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY
    var launchedTask = false
    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) 〖{  〗^⑦
      do {
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }

图36-1 worker与tasks及availableCpus对应关系

&  TaskDescription中以确定好Task具体运行在那个ExecutorBackend上;而确定Task具体运行在那个ExecutorBackend的算法是由TaskSetManager的resourceOffers方法决定。

//为每一个TaskSetManager分配资源
private def resourceOfferSingleTaskSet(
      taskSet: TaskSetManager,
      maxLocality: TaskLocality,
      shuffledOffers: Seq[WorkerOffer],
      availableCpus: Array[Int],
      tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
    var launchedTask = false
	//顺序遍历当前存在的Executor
    for (i <- 0 until shuffledOffers.size) {
	//获得Executor的Id和hostname
      val execId = shuffledOffers(i).executorId
      val host = shuffledOffers(i).host
	//该Executor可以被分配任务核心实现,通过调用TaskSetManager来为Executor分配Task
      if (availableCpus(i) >= CPUS_PER_TASK) {
		//保证可用的Cores个数不小于Task运行所需的最小cores个数,CPUS_PER_TASK默认
		//为1
        try {
		//获取最高级别的本地性,并记录executor与Task的运行对应关系
          for (task <- taskSet.resourceOffer(execId, host, maxLocality)^?) {
            tasks(i) += task
            val tid = task.taskId
            taskIdToTaskSetManager(tid) = taskSet
            taskIdToExecutorId(tid) = execId
            executorIdToTaskCount(execId) += 1
            executorsByHost(host) += execId
            availableCpus(i) -= CPUS_PER_TASK
            assert(availableCpus(i) >= 0)
            launchedTask = true
          }
        } catch {
          //Task未进行序列化
          case e: TaskNotSerializableException =>
            logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
            return launchedTask
        }
      }
    }
	//返回信息:Task是否已分配好资源。
    return launchedTask
  }

resourceOffers算法思想

ResourceOffers确定了Task具体运行在哪个ExecutorBackend上的。算法实现具体如下:

a)     通过Random#shuffle,将计算资源重新洗牌,以寻求计算子奥运的负载均衡。

b)     根据每个ExecutorBackend的cores个数,声明类型为TaskDescription的ArrayBuffer数组。

c)     如果有新的ExecutorBackend分配给我们的Job,此时会调用ExecutorAdded来获得完整的可用的计算资源。

&  这里说一下,数据本地性的级别(Locality level)的由高到低优先级的次序:PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY,其中NO_PREF是指机器本地性。RACK_LOCAL是机架本地性。

d)     通过下述代码追踪最高级别的的本地性。(见7)

for (taskSet <-sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {

每个Task默认采用一个线程来进行计算的。

// 执行Task默认需要一个cores,即一个线程。
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)

e)     通过调用用TaskSetManager#resourceOffer最终确定每个Task具体运行在哪个ExecutorBackend以及具体的Locality level。

//p121 3处调用 TaskSetManager#resourceOffer
def resourceOffer(
      execId: String,
      host: String,
      maxLocality: TaskLocality.TaskLocality)
    : Option[TaskDescription] =
  {//不是僵尸TaskSet,即还可以提交Task的TaskSet
    if (!isZombie) {
      val curTime = clock.getTimeMillis()
     //获得当前的最大本地性级别
      var allowedLocality = maxLocality
     //如果最大本地性级别不是机器本地性
      if (maxLocality != TaskLocality.NO_PREF) {
		//重新计算当前时间节点的最高本地性级别,由于存在延迟调度,所以我们需要根据基于等
	//待时间的延迟调度算法来获取当前的本地性。
        allowedLocality = getAllowedLocalityLevel(curTime)
        if (allowedLocality > maxLocality) {
          //如果得到的加载Task本地性低于原来的maxLocality,则将Task加载的本地性置为
//maxLocality
          allowedLocality = maxLocality
        }
      }
	//根据不同的Task的本地性级别进行不同的处理。
      dequeueTask(execId, host, allowedLocality) match {
		//index表示Task在TaskSet中的下标,taskLocality:本地性,speculative:表示是否
		//是投机产生的,由于其他Task已排定,进而确定该Task。
        case Some((index, taskLocality, speculative)) => {
          // 为Task找到一个executor(也可以认为是为当前executor找到了TaskSet中一个
		//Task),对Task返回信息进行一些登记处理
		//在TaskSet中找到这个Task
          val task = tasks(index)
		//创建Task的id
          val taskId = sched.newTaskId()
          // Do various bookkeeping (???)
          copiesRunning(index) += 1
		//设置尝试提交次数
          val attemptNum = taskAttempts(index).size
		//实例化Task的元信息
          val info = new TaskInfo(taskId, index, attemptNum, curTime,
            execId, host, taskLocality, speculative)
          taskInfos(taskId) = info
          taskAttempts(index) = info :: taskAttempts(index)
          // 为延迟调度策略更新本地性级别
          // NO_PREF不会影响延迟调度相关的变量
          if (maxLocality != TaskLocality.NO_PREF) {
            currentLocalityIndex = getLocalityIndex(taskLocality)
            lastLaunchTime = curTime
          }
          //序列化并返回Task
          val startTime = clock.getTimeMillis()
          val serializedTask: ByteBuffer = try {
            Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
          } catch {
		//task序列化失败,则丢弃整个TaskSet
            case NonFatal(e) =>
              val msg = s"Failed to serialize task $taskId, not attempting to retry it."
              logError(msg, e)
              abort(s"$msg Exception during serialization: $e")
              throw new TaskNotSerializableException(e)
          }
		//Task广播时序列化的大小限制(为什么后面还要序列化,这是广播作用??)
          if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
              !emittedTaskSizeWarning) {
            emittedTaskSizeWarning = true
            logWarning(s"Stage ${task.stageId} contains a task of very large size " +
              s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
  	  s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
          }
		//Task加入到运行Task队列中
          addRunningTask(taskId)

          //序列化一些日志处理。
          val taskName = s"task ${info.id} in stage ${taskSet.id}"
          logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +
            s"$taskLocality, ${serializedTask.limit} bytes)")
		//向高层调度器DAGScheduler报告Task开始执行
          sched.dagScheduler.taskStarted(task, info)
		//返回封装了TaskDescription的Some类
          return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
            taskName, index, serializedTask))
        }
        case _ =>
      }
    }
    None
  }

&  DAGScheduler是从数据(存储)层面考虑preferedLocation的,而TaskScheduler则是从具体计算Task的角度考虑计算的本地性的。

f)通过Lanch Task把任务发送给ExecutorBackend去执行。(见(4)中代码注释4?处)

 // Launch tasks returned by a set of resource offers
    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
	//对tasks中的所有task进行序列化
        val serializedTask = ser.serialize(task)
		//序列化后的Task的限制
        if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
          scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
                "spark.akka.frameSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
                AkkaUtils.reservedSizeBytes)
		//Task大小超出限制,丢弃TaskSet
              taskSetMgr.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {//否则,Task大小符合要求
		//更新executor信息
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK
		//发送Task序列化后的Task给executor。
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
      }
    }

&  对于任务大小的设置:Task进行广播时,AkkaFrameSize大小是128MB,Akka保留的字节大小为200k,如果任务大于或等于128MB-200K的话,则任务会直接被丢弃掉;如果小于128MB-200K的话会通过CoarseGrainedSchedulerBackend去LaunchTask到具体的ExecutorBackend上。

至此,在Driver端的处理完成了,下一节将会讲解ExecutorBackend端接收Task后的处理。(可参照第35课图)

说明:

本文是由DT大数据梦工厂的IFM课程第36课为基础所做的笔记

时间: 2024-11-05 13:04:35

第三十六课 Spark之TaskScheduler Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详的相关文章

TaskScheduler内幕天机:Spark shell案例,TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解

TaskSchedulerBackend与SchedulerBackend FIFO与FAIR两种调度模式 Task数据本地性资源的分配 一.TaskScheduler运行过程(Spark-shell角度) 1.启动Spark-shell 当我们spark-shell本身的时候命令终端返回来的主要是ClientEndpoint和SparkDeploySchedulerBakcend.这是因为此时还没有任何应用程序Job的触发,这是启动Application本身而已,所以主要就是实例化SparkC

NeHe OpenGL教程 第三十六课:从渲染到纹理

转自[翻译]NeHe OpenGL 教程 前言 声明,此 NeHe OpenGL教程系列文章由51博客yarin翻译(2010-08-19),本博客为转载并稍加整理与修改.对NeHe的OpenGL管线教程的编写,以及yarn的翻译整理表示感谢. NeHe OpenGL第三十六课:从渲染到纹理 放射模糊和渲染到纹理: 如何实现放射状的滤镜效果呢,看上去很难,其实很简单.把渲染得图像作为纹理提取出来,在利用OpenGL本身自带的纹理过滤,就能实现这种效果,不信,你试试. 嗨,我是Dario Corn

2018-08-24 第三十六课

第三十六课 非关系统型数据库-mangodb 目录 二十四 mongodb介绍 二十五 mongodb安装 二十六 连接mongodb 二十七 mongodb用户管理 二十八 mongodb创建集合.数据管理 二十九 php的mongodb扩展 三十 php的mongo扩展 三十一 mongodb副本集介绍 三十二 mongodb副本集搭建 三十三 mongodb副本集测试 三十四 mongodb分片介绍 三十五 mongodb分片搭建 三十六 mongodb分片测试 三十七 mongodb备份

第三十六课、文本编辑器中的功能交互

一 .判断未保存的数据 1.QPlainTextEdit能够触发与编辑功能相关的信号 2.解决方案 (1).定义槽函数void onTextChanged() (2).映射textChanged()到槽函数 (3).定义成员变量bool m_isTextChanged = false; (4).当文本框内容发生改变时, m_isTextChanged = true; (5).当m_isTextChanged 为真时,则保存数据 二.文本编辑器的持续开发 1.文件打开操作 2.文件新建操作 #if

第三十六课:Ajax详解2

本课主要教大家如何书写一个完整的ajax模块,讲解的代码主要跟ajax有关,而jQuery的ajax模块添加了Deferred异步编程的机制,因此对ajax的理解难度增大,还是忽略掉.但是我要讲解的代码跟jQuery的ajax模块思路是一样的,只是没有加入Deferred异步编程的思想,这样更有利于大家理解ajax的原理. $.ajax = function(opts){    //大家如果用过jQuery的ajax,应该记得$.ajax({url:...,data:....,type:'POS

JAVA学习第三十六课(常用对象API)- 集合框架(四)— Set集合:HashSet集合演示

随着Java学习的深入,感觉大一时搞了一年的ACM,简直是明智之举,Java里很多数据结构.算法类的东西,理解起来就轻松多了 Set集合下有两大子类开发常用 HashSet集合 .TreeSet集合 Set集合的元素是不重复且无序 一.HashSet集合 API文档解释:此类实现 Set 接口,由哈希表(实际上是一个 HashMap 实例)支持.它不保证 set 的迭代顺序:特别是它不保证该顺序恒久不变.此类允许使用null 元素. 此类为基本操作提供了稳定性能,注意,此实现不是同步的. 由上可

AGG第三十六课 gsv_text_outline 渲染环绕的字符

agg::rendering_buffer &rbuf = rbuf_window(); agg::pixfmt_bgr24 pixf(rbuf); typedef agg::renderer_base<agg::pixfmt_bgr24> renderer_base_type; renderer_base_type renb(pixf); typedef agg::renderer_scanline_bin_solid<renderer_base_type> render

第三十四课 Spark中任务处理的Stage划分和Task最佳位置算法

本节课的内容 1.     Job Stage的划分算法 2.     Task最佳计算位置算法 一.Stage划分算法 由于Spark的算子构建一般都是链式的,这就涉及了要如何进行这些链式计算,Spark的策略是对这些算子,鲜花分Stage,然后在进行计算. 由于数据是分布式的存储在各个节点上的,所以为了减少网络传输的开销,就必须最大化的追求数据本地性,所谓的数据本地性是指,在计算时,数据本身已经在内存中或者利用已有缓存无需计算的方式获取数据. 1.      Stage划分算法思想 (1)一

第十六课 数组的引入 【项目1-5】

第十六课 数组的引入 项目一 [数组大折腾] (1)创建一个有20个元素的整型数组,通过初始化,为数组中的前10个元素赋初值,然后通过键盘输入后10个元素的值,从前往后(从第0个到第19个)输出数组中元素的值,每5个元素换一行. [cpp] view plain copy print? int main( ) { int a[20]={...};  //初始化前10个元素 //键盘输入后10个元素的值 //由前往后输出数组中所有元素的值 printf("由前往后,数组中的值是:\n")