Spark Application的调度算法

要想明白spark application调度机制,需要回答一下几个问题:

1.谁来调度?

2.为谁调度?

3.调度什么?

3.何时调度?

4.调度算法

前四个问题可以用如下一句话里来回答:每当集群资源发生变化时,active master 进程 为 所有已注册的并且没有调度完毕的application调度Worker节点上的Executor进程。

"active master" , spark集群可能有多个master,但是只有一个active master 参与调度,standby master不参与调度。

集群资源发生变化是什么意思呢?这里的集群资源指的主要是cores的变化,注册/移除Executor进程使得集群的freeCores变多/变少,添加/移除Worker节点使得集群的freeCores变多/变少... ...,所有导致集群资源发生变化的操作,都会调用schedule()重新为application和driver进行资源调度。

spark提供了两种资源调度算法:spreadOut和非spreadOut。spreadOut算法会尽可能的将一个application 所需要的Executor进程分布在多个worker几点上,从而提高并行度,非spreadOut与之相反,他会把一个worker节点的freeCores都耗尽了才会去下一个worker节点分配。

为了详细说明这两种算法,我们先来以一个具体的例子来介绍,最后再介绍源码。

基本概念

每一个application至少包含以下基本属性:

coresPerExecutor:每一个Executor进程的core个数

memoryPerExecutor:每一个Executor进程的memory大小

maxCores: 这个application最多需要的core个数。

每一个worker至少包含以下基本属性:

freeCores:worker 节点当前可用的core个数

memoryFree:worker节点当前可用的memory大小。

假设一个待注册的application如下:

coresPerExecutor:2

memoryPerExecutor:512M

maxCores: 12

这表示这个application 最多需要12个core,每一个Executor进行都要2个core,512M内存。

假设某一时刻spark集群有如下几个worker节点,他们按照coresFree降序排列:

Worker1:coresFree=10  memoryFree=10G

Worker2:coresFree=7   memoryFree=1G

Worker3:coresFree=3   memoryFree=2G

Worker4:coresFree=2   memoryFree=215M

Worker5:coresFree=1   memoryFree=1G

其中worker5不满足application的要求:worker5.coresFree < application.coresPerExecutor

worker4也不满足application的要求:worker4.memoryFree < application.memoryPerExecutor

因此最终满足调度要求的worker节点只有前三个,我们将这三个节点记作usableWorkers。

spreadOut算法

先介绍spreadOut算法吧。上面已经说了,满足条件的worker只有前三个:

Worker1:coresFree=10  memoryFree=10G

Worker2:coresFree=7   memoryFree=1G

Worker3:coresFree=3   memoryFree=2G

第一次调度之后,worker列表如下:

Worker1:coresFree=8  memoryFree=9.5G  assignedExecutors=1  assignedCores=2

Worker2:coresFree=7   memoryFree=1G    assignedExecutors=0  assignedCores=0

Worker3:coresFree=3   memoryFree=2G    assignedExecutors=0  assignedCores=0

totalExecutors:1,totalCores=2

可以发现,worker1的coresFree和memoryFree都变小了而worker2,worker3并没有发生改变,这是因为我们在worker1上面分配了一个Executor进程(这个Executor进程占用两个2core,512M memory)而没有在workre2和worker3上分配。

接下来,开始去worker2上分配:

Worker1:coresFree=8  memoryFree=9.5G      assignedExecutors=1  assignedCores=2

Worker2:coresFree=5   memoryFree=512M    assignedExecutors=1  assignedCores=2

Worker3:coresFree=3   memoryFree=2G        assignedExecutors=0  assignedCores=0

totalExecutors:2,totalCores=4

此时已经分配了2个Executor进程,4个core。

接下来去worker3上分配:

Worker1:coresFree=8  memoryFree=9.5G      assignedExecutors=1  assignedCores=2

Worker2:coresFree=5   memoryFree=512M    assignedExecutors=1  assignedCores=2

Worker3:coresFree=1   memoryFree=1.5G     assignedExecutors=1  assignedCores=2

totalExecutors:3,totalCores=6

接下来再去worker1分配,然后worker2...  ...以round-robin方式分配,由于worker3.coresFree<application.coresPerExecutor,不会在他上面分配资源了:

Worker1:coresFree=6  memoryFree=9.0G      assignedExecutors=2  assignedCores=4

Worker2:coresFree=5   memoryFree=512M    assignedExecutors=1  assignedCores=2

Worker3:coresFree=1   memoryFree=1.5G     assignedExecutors=1  assignedCores=2

totalExecutors:4,totalCores=8

Worker1:coresFree=6  memoryFree=9.0G      assignedExecutors=2  assignedCores=4

Worker2:coresFree=3   memoryFree=0M       assignedExecutors=2  assignedCores=4

Worker3:coresFree=1   memoryFree=1.5G     assignedExecutors=1  assignedCores=2

totalExecutors:5,totalCores=10

此时worker2也不满足要求了:worker2.memoryFree<application.memoryPerExecutor

因此,下一次分配就去worker1上了:

Worker1:coresFree=4  memoryFree=8.5G      assignedExecutors=3  assignedCores=6

Worker2:coresFree=3   memoryFree=0M        assignedExecutors=2  assignedCores=4

Worker3:coresFree=1   memoryFree=1.5G     assignedExecutors=1  assignedCores=2

totalExecutors:6,totalCores=12

ok,由于已经分配了12个core,达到了application的要求,所以不在为这个application调度了。

非spreadOUt算法

那么非spraadOut算法呢?他是逮到一个worker如果不把他的资源耗尽了是不会放手的:

Worker1:coresFree=8  memoryFree=9.5G  assignedExecutors=1  assignedCores=2

Worker2:coresFree=7   memoryFree=1G    assignedExecutors=0  assignedCores=0

Worker3:coresFree=3   memoryFree=2G    assignedExecutors=0  assignedCores=0

totalExecutors:1,totalCores=2

Worker1:coresFree=6  memoryFree=9.0G  assignedExecutors=2  assignedCores=4

Worker2:coresFree=7   memoryFree=1G    assignedExecutors=0  assignedCores=0

Worker3:coresFree=3   memoryFree=2G    assignedExecutors=0  assignedCores=0

totalExecutors:2,totalCores=4

Worker1:coresFree=4  memoryFree=8.5    assignedExecutors=3  assignedCores=6

Worker2:coresFree=7   memoryFree=1G    assignedExecutors=0  assignedCores=0

Worker3:coresFree=3   memoryFree=2G    assignedExecutors=0  assignedCores=0

totalExecutors:3,totalCores=6

Worker1:coresFree=2  memoryFree=8.0G  assignedExecutors=4  assignedCores=8

Worker2:coresFree=7   memoryFree=1G    assignedExecutors=0  assignedCores=0

Worker3:coresFree=3   memoryFree=2G    assignedExecutors=0  assignedCores=0

totalExecutors:4,totalCores=8

Worker1:coresFree=0  memoryFree=7.5G  assignedExecutors=5  assignedCores=10

Worker2:coresFree=7   memoryFree=1G    assignedExecutors=0  assignedCores=0

Worker3:coresFree=3   memoryFree=2G    assignedExecutors=0  assignedCores=0

totalExecutors:5,totalCores=10

可以发现,worker1的coresfree已经耗尽了,好可怜。由于application需要12个core,而这里才分配了10个,所以还要继续往下分配:

Worker1:coresFree=0  memoryFree=7.5G      assignedExecutors=5  assignedCores=10

Worker2:coresFree=5   memoryFree=512G    assignedExecutors=1  assignedCores=2

Worker3:coresFree=3   memoryFree=2G        assignedExecutors=0  assignedCores=0

totalExecutors:6,totalCores=12

ok,最终分配来12个core,满足了application的要求。

对比:

spreadOut算法中,是以round-robin方式,轮询的在worker节点上分配Executor进程,即以如下序列分配:worker1,worker2... ... worker n,worker1... ....

非spreadOut算法中,逮者一个worker就不放手,直到满足一下条件之一:

worker.freeCores<application.coresPerExecutor 或者  worker.memoryFree<application.memoryPerExecutor 。

在上面两个例子中,虽然最终都分配了6个Executor进程和12个core,但是spreadOut方式下,6个Executor进程分散在不同的worker节点上,充分利用了spark集群的worker节点,而非spreadOut方式下,只在worker1和worker2上分配了Executor进程,并没有充分利用spark worker节点。

小插曲,spreadOut + oneExecutorPerWorker 算法

spark还有一个叫做”oneExecutorPerWorker“机制,即一个worker上启动一个Executor进程,下面只是简单的说一下得了:

Worker1:coresFree=8  memoryFree=9.5G  assignedExecutors=1  assignedCores=2

Worker2:coresFree=7   memoryFree=1G    assignedExecutors=0  assignedCores=0

Worker3:coresFree=3   memoryFree=2G    assignedExecutors=0  assignedCores=0

totalExecutors:1,totalCores=2

Worker1:coresFree=8  memoryFree=9.5G      assignedExecutors=1  assignedCores=2

Worker2:coresFree=5   memoryFree=512M    assignedExecutors=1  assignedCores=2

Worker3:coresFree=3   memoryFree=2G        assignedExecutors=0  assignedCores=0

totalExecutors:2,totalCores=4

Worker1:coresFree=8  memoryFree=9.5G      assignedExecutors=1  assignedCores=2

Worker2:coresFree=5   memoryFree=512M    assignedExecutors=1  assignedCores=2

Worker3:coresFree=1   memoryFree=1.5G     assignedExecutors=1  assignedCores=2

totalExecutors:3,totalCores=6

Worker1:coresFree=6  memoryFree=9.0G      assignedExecutors=1  assignedCores=4

Worker2:coresFree=3   memoryFree=512M     assignedExecutors=1  assignedCores=2

Worker3:coresFree=1   memoryFree=1.5G     assignedExecutors=1  assignedCores=2

totalExecutors:3,totalCores=8

Worker1:coresFree=6  memoryFree=9.0G      assignedExecutors=1  assignedCores=4

Worker2:coresFree=2   memoryFree=0   M     assignedExecutors=1  assignedCores=4

Worker3:coresFree=1   memoryFree=1.5G     assignedExecutors=1  assignedCores=2

totalExecutors:3,totalCores=10

Worker1:coresFree=4  memoryFree=9.5G      assignedExecutors=1  assignedCores=6

Worker2:coresFree=2   memoryFree=0   M     assignedExecutors=1  assignedCores=4

Worker3:coresFree=1   memoryFree=1.5G     assignedExecutors=1  assignedCores=2

totalExecutors:3,totalCores=12

和spreadOut+非oneExecutorPerWorker对比发现,唯一的不同就是Executor进程的数量,一个是6,一个是3。

(

这里在额外扩展一下,假设application的maxCores=14,而不是12,那么接着上面那个worker列表来:

Worker1:coresFree=4  memoryFree=9.5G      assignedExecutors=1  assignedCores=6

Worker2:coresFree=0   memoryFree=0   M     assignedExecutors=1  assignedCores=6

Worker3:coresFree=1   memoryFree=1.5G     assignedExecutors=1  assignedCores=2

totalExecutors:3,totalCores=12

虽然worker2.memoryFree=0,但是仍然可以继续在他上面分配core,因为onExecutorPerWorker机制不检查内存的限制。

)

接下来看看源码是怎么实现的:

了解了上面写的,在阅读源码就很轻易了,这里简单说一下。

org.apache.spark.deploy.master.Master收到application发送的RegisterApplication(description, driver)消息后,开始执行注册逻辑:

    case RegisterApplication(description, driver) => {
      // TODO Prevent repeated registrations from some driver      //standby master不调度
      if (state == RecoveryState.STANDBY) {
        // ignore, don‘t send response
      } else {
        logInfo("Registering app " + description.name)
        val app = createApplication(description, driver)        //注册app,即将其加入到waitingApps中
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)        //将app加入持久化引擎,主要是为了故障恢复
        persistenceEngine.addApplication(app)        //向driver发送RegisteredApplication消息表明master已经注册了这个app
        driver.send(RegisteredApplication(app.id, self))        //为waitingApps中的app调度资源
        schedule()
      }
    }

上面的注释已经写的很清楚了... ...

  /**
   * Schedule the currently available resources among waiting apps. This method will be called
   * every time a new app joins or resource availability changes.
   */
  private def schedule(): Unit = {
    if (state != RecoveryState.ALIVE) { return }
    // Drivers take strict precedence over executors    //为了避免每次schedule,总是在相同的worker上分配资源,所有这里打乱worker顺序。
    val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers    //下面这个for循环是为driver调度资源,因为这里只将application的调度,所以driver的调度不说了。
    for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
      for (driver <- waitingDrivers) {
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          launchDriver(worker, driver)
          waitingDrivers -= driver
        }
      }
    }

    //为application调度资源
    startExecutorsOnWorkers()
  }
  /**
   * Schedule and launch executors on workers
   */
  private def startExecutorsOnWorkers(): Unit = {
    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.    // 为waitingApps中的app调度资源,app.coresLeft是app还有多少core没有分配
    for (app <- waitingApps if app.coresLeft > 0) {
      val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
      // Filter out workers that don‘t have enough resources to launch an executor      // 筛选出状态为ALIVE并且这个worker剩余内存,剩余core都大于等于app的要求,然后按照coresFree降序排列
      val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
        .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
          worker.coresFree >= coresPerExecutor.getOrElse(1))
        .sortBy(_.coresFree).reverse      //在usableWorkers上为app分配Executor
      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

      // Now that we‘ve decided how many cores to allocate on each worker, let‘s allocate them      // 在worker上启动Executor进程
      for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
        allocateWorkerResourceToExecutors(
          app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
      }
    }
  }

这个方法做了如下事情:

1.筛选出可用的worker,即usableWorkers,如果一个worker满足以下所有条件,那么这个worker就被添加到usableWorkers中:

Alive

worker.memoryFree >= app.desc.memoryPerExecutorMB

worker.coresFree >= coresPerExecutor

2.assignedCores是一个数组,assignedCores[i]里面存储了需要在usableWorkers[i]上分配的core个数,譬如如果assingedCores[1]=2,那么就需要在usableWorkers[1]上分配2个core。

  /**
   * Schedule executors to be launched on the workers.
   * Returns an array containing number of cores assigned to each worker.
   *
   * There are two modes of launching executors. The first attempts to spread out an application‘s
   * executors on as many workers as possible, while the second does the opposite (i.e. launch them
   * on as few workers as possible). The former is usually better for data locality purposes and is
   * the default.
   *
   * The number of cores assigned to each executor is configurable. When this is explicitly set,
   * multiple executors from the same application may be launched on the same worker if the worker
   * has enough cores and memory. Otherwise, each executor grabs all the cores available on the
   * worker by default, in which case only one executor may be launched on each worker.
   *
   * It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
   * at a time). Consider the following example: cluster has 4 workers with 16 cores each.
   * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
   * allocated at a time, 12 cores from each worker would be assigned to each executor.
   * Since 12 < 16, no executors would launch [SPARK-8881].
   */
  private def scheduleExecutorsOnWorkers(
      app: ApplicationInfo,
      usableWorkers: Array[WorkerInfo],
      spreadOutApps: Boolean): Array[Int] = {
    val coresPerExecutor = app.desc.coresPerExecutor
    val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
    val oneExecutorPerWorker = coresPerExecutor.isEmpty
    val memoryPerExecutor = app.desc.memoryPerExecutorMB
    val numUsable = usableWorkers.length
    val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
    val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
    var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

    /** Return whether the specified worker can launch an executor for this app. */    //是否可以在一个worker上分配Executor
    def canLaunchExecutor(pos: Int): Boolean = {
      val keepScheduling = coresToAssign >= minCoresPerExecutor
      val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor

      // If we allow multiple executors per worker, then we can always launch new executors.
      // Otherwise, if there is already an executor on this worker, just give it more cores.
      val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
      if (launchingNewExecutor) {        //在不里,需要检查worker的空闲core和内存是否够用
        val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
        val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
        val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
        keepScheduling && enoughCores && enoughMemory && underLimit
      } else {
        // We‘re adding cores to an existing executor, so no need
        // to check memory and executor limits        //尤其需要注意的是,oneExecutorPerWorker机制下,不检测内存限制,很重要。
        keepScheduling && enoughCores
      }
    }

    // Keep launching executors until no more workers can accommodate any
    // more executors, or if we have reached this application‘s limits
    var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
    while (freeWorkers.nonEmpty) {
      freeWorkers.foreach { pos =>
        var keepScheduling = true
        while (keepScheduling && canLaunchExecutor(pos)) {          //要分配的cores
          coresToAssign -= minCoresPerExecutor          //已分配的cores
          assignedCores(pos) += minCoresPerExecutor

          // If we are launching one executor per worker, then every iteration assigns 1 core
          // to the executor. Otherwise, every iteration assigns cores to a new executor.          //一个worker只启动一个Executor
          if (oneExecutorPerWorker) {
            assignedExecutors(pos) = 1
          } else {
            assignedExecutors(pos) += 1
          }

          // Spreading out an application means spreading out its executors across as
          // many workers as possible. If we are not spreading out, then we should keep
          // scheduling executors on this worker until we use all of its resources.
          // Otherwise, just move on to the next worker.          //如果没有开启spreadOUt算法,就一直在一个worker上分配,直到不能再分配为止。
          if (spreadOutApps) {
            keepScheduling = false
          }
        }
      }
      freeWorkers = freeWorkers.filter(canLaunchExecutor)
    }
    assignedCores
  }
  /**
   * Allocate a worker‘s resources to one or more executors.
   * @param app the info of the application which the executors belong to
   * @param assignedCores number of cores on this worker for this application
   * @param coresPerExecutor number of cores per executor
   * @param worker the worker info
   */
  private def allocateWorkerResourceToExecutors(
      app: ApplicationInfo,
      assignedCores: Int,
      coresPerExecutor: Option[Int],
      worker: WorkerInfo): Unit = {
    // If the number of cores per executor is specified, we divide the cores assigned
    // to this worker evenly among the executors with no remainder.
    // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.    //计算要创建多少个Executor进程,默认值是1.

    val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
    val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
    for (i <- 1 to numExecutors) {
      val exec = app.addExecutor(worker, coresToAssign)      //真正的启动Executor进程了。
      launchExecutor(worker, exec)
      app.state = ApplicationState.RUNNING
    }
  }

由于本人接触spark时间不长,如有错误或者任何意见可以在留言或者发送邮件到[email protected],让我们一起交流。

作者:FrancisWang

邮箱:[email protected]
出处:http://www.cnblogs.com/francisYoung/
本文地址:http://www.cnblogs.com/francisYoung/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

时间: 2024-08-11 03:36:29

Spark Application的调度算法的相关文章

Spark Application、Driver、Job、stage、task

1.Application application(应用)其实就是用spark-submit提交的程序.一个application通常包含三部分:从数据源(比方说HDFS)取数据形成RDD,通过RDD的transformation和action进行计算,将结果输出到console或者外部存储. 2.Driver Spark中的driver感觉其实和yarn中Application Master的功能相类似.主要完成任务的调度以及和executor和cluster manager进行协调.有cli

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadOutApp尽量在使用单个executor的资源. 源码分析 org.apache.spark.deploy.master.Master 1.首先判断,master状态不是ALIVE的话,直接返回2.调度driver3. Application的调度机制(核心之核心,重中之重) 源码如下: 1 /*

Spark 资源池简介

在一个application内部,不同线程提交的Job默认按照FIFO顺序来执行,假设线程1先提交了一个job1,线程2后提交了一个job2,那么默认情况下,job2必须等待job1执行完毕后才能执行,如果job1是一个长作业,而job2是一个短作业,那么这对于提交job2的那个线程的用户来说很不友好:我这个job是一个短作业,怎么执行了这么长时间. 使用spark的公平调度算法可以在一定程度上解决这个问题,此时,job2不必等待job1完全运行完毕之后就可以获得集群资源来执行,最终的效果的就是

Spark调优秘诀

1.诊断内存的消耗 在Spark应用程序中,内存都消耗在哪了? 1.每个Java对象都有一个包含该对象元数据的对象头,其大小是16个Byte.由于在写代码时候,可能会出现这种情况:对象头比对象本身占有的字节数更多,比如对象只有一个int的域.一般这样设计是不合理的,造成对象的"浪费",在实际开发中应避免这种情况. 2.Java的String对象,会比它内部的原始数据要多出40个字节.因为它内部使用char数组来保存内部的字符序列的,并且还得保存诸如数组长度之类的信息.而且String使

Spark核心原理(核心篇 二)

目录 运行结构图 & 常用术语 消息通信原理 运行流程图 调度算法 容错及HA 监控 一.运行结构图 & 常用术语 Application: Appliction都是指用户编写的Spark应用程序,其中包括一个Driver功能的代码和分布在集群中多个节点上运行的Executor代码 SparkContext: Spark应用程序的入口,负责调度各个运算资源,协调各个Worker Node上的Executor Driver:  Spark中的Driver即运行上述Application的ma

Spark Job具体的物理执行

即使采用pipeline的方式,函数f对依赖的RDD中的数据集合的操作也会有两种方式: 1.f(record),f作用于集合的每一条记录,每次只作用于一条记录 2.f(records),f一次性作用于集合的全部数据: Spark采用的是第一种方式,因为: 1.无需等待,可以最大化的使用集群的计算资源 2.减少OOM的产生 3.最大化的有利于并发 4.可以精准的控制每一个Partition本身(Dependency)及其内部的计算(compute) 5.基于lineage的算子流动式函数式计算,可

Spark从入门到上手实战

Spark从入门到上手实战 课程学习地址:http://www.xuetuwuyou.com/course/186 课程出自学途无忧网:http://www.xuetuwuyou.com 讲师:轩宇老师 课程简介: Spark属于新起的基于内存处理海量数据的框架,由于其快速被众公司所青睐.Spark 生态栈框架,非常的强大,可以对数据进行批处理.流式处理.SQL 交互式处理及机器学习和Graphx 图像计算.目前绝大数公司都使用,主要在于 Spark SQL 结构化数据的处理,非常的快速,高性能

Spark内核架构解密(DT大数据梦工厂)

只有知道内核架构的基础上,才知道为什么要这样写程序? 手工绘图来解密Spark内核架构 通过案例来验证Spark内核架构 Spark架构思考 ==========Spark Runtime的几个概念============ 下载下来运行,基本都是standalone模式,如果掌握了standalone,则yarn和mesos,以后不做特别说明,一律是standalone模式 application=driver+executor,executor是具体处理数据分片,里面是线程池并发的处理数据分片

基于Python Spark的大数据分析_pyspark实战项目课程

基于Python Spark的大数据分析(第一期) 课程介绍地址:http://www.xuetuwuyou.com/course/173 课程出自学途无忧网:http://www.xuetuwuyou.com 讲师:轩宇老师 1.开课时间:小班化教学授课,第一期开课时间为5月20号(满30人开班,先报先学!): 2.学习方式:在线直播,共8次课,每次2小时,每周2次(周三.六,晚上20:30 - 22:30),提供在线视频,课后反复学习: 3.报名课程后,请联系客服申请加入班级答疑交流QQ群: