Spark中的Shuffle机制

Spark中的shuffle是在干嘛?

Shuffle在Spark中即是把父RDD中的KV对按照Key重新分区,从而得到一个新的RDD。也就是说原本同属于父RDD同一个分区的数据需要进入到子RDD的不同的分区。

但这只是shuffle的过程,却不是shuffle的原因。为何需要shuffle呢?

Shuffle和Stage

在分布式计算框架中,比如map-reduce,数据本地化是一个很重要的考虑,即计算需要被分发到数据所在的位置,从而减少数据的移动,提高运行效率。

Map-Reduce的输入数据通常是HDFS中的文件,所以数据本地化要求map任务尽量被调度到保存了输入文件的节点执行。但是,有一些计算逻辑是无法简单地获取本地数据的,reduce的逻辑都是如此。对于reduce来说,处理函数的输入是key相同的所有value,但是这些value所在的数据集(即map的输出)位于不同的节点上,因此需要对map的输出进行重新组织,使得同样的key进入相同的reducer。 shuffle移动了大量的数据,对计算、内存、网络和磁盘都有巨大的消耗,因此,只有确实需要shuffle的地方才应该进行shuffle。

Stage的划分

对于Spark来说,计算的逻辑存在于RDD的转换逻辑中。Spark的调度器也是在依据数据本地化在调度任务,只不过此处的“本地”不仅包括磁盘文件,也包括RDD的分区, Spark会使得数据尽量少地被移动,据此,DAGScheduler把一个job划分为多个Stage,在一个Stage内部,数据是不需要移动地,数据会在本地经过一系列函数的处理,直至确实需要shuffle的地方。

例如,在DAGScheduler的getParentStages方法中,寻找父stage时,使用了如下的代码段

        for (dep <- r.dependencies) {
          dep match {
            case shufDep: ShuffleDependency[_, _, _] =>
              parents += getShuffleMapStage(shufDep, jobId)
            case _ =>
              waitingForVisit.push(dep.rdd)
          }

即找到了ShuffleDependency才会划分出一个最的Stage(除了没有父RDD的RDD,比如HadoopRDD,它的dependencies为Nil)。

在上边的代码中,提到了ShuffleMapStage,其实Spark的Stage只有两个子类:ShuffleStage和 ResultStage。相应的,Task也只有两个子类,ResultTask和ShuffleMapTask。这些类之间的联系,可以从DAGScheduler的submitMissingTasks方法中表现中来。下面是这个方法中的一段代码:

  val tasks: Seq[Task[_]] = try {
      stage match {
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
            val locs = getPreferredLocs(stage.rdd, id)
            val part = stage.rdd.partitions(id)
            new ShuffleMapTask(stage.id, taskBinary, part, locs)
          }

        case stage: ResultStage =>
          val job = stage.resultOfJob.get
          partitionsToCompute.map { id =>
            val p: Int = job.partitions(id)
            val part = stage.rdd.partitions(p)
            val locs = getPreferredLocs(stage.rdd, p)
            new ResultTask(stage.id, taskBinary, part, locs, id)
          }
      }
    } catch {
      case NonFatal(e) =>
        abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}")
        runningStages -= stage
        return
    }

这段代码用来生成task, 确切地说是为某个Stage生成task。从以上代码可以看出,为ResultStage生成的就是ResultTask, 为ShuffleMapStage生成的就是ShuffleMapTask。

ShuffleMapTask有何特殊之处呢?

对于多于一个Stage的job,肯定会存在shuffle,这也意味会有Stage的父Stage是ShuffleMapStage。ShuffleMapStage中的ShuffleMapTask的最后一个RDD的数据会被进行shuffle,这也是它与ResultTask的区别。下边是ShuffleMapTask的runTask方法中的一段代码,executor会间接调用runTask方法

      val manager = SparkEnv.get.shuffleManager//莸取ShuffleManager
      //获取writer,注意会把ShuffleDependency.shuffleHander传过去
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      return writer.stop(success = true).get

writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])这一句会计算最后一个rdd的某个分区,然后用writer写入这个分区的数据,这可以认为是shuffle中的map阶段。

那么reduce阶段是如何触发的呢?

这实际上是很自然地由Spark对RDD的计算逻辑触发的。

Spark的运算逻辑是由对RDD的partition的计算驱动的(上一篇提到过), 即对子RDD的partition的计算会触发对父RDD的对应partition的计算,由此触发到第一个可以计算的RDD的分区。所以shuffle关系子Stage中最初始的那个RDD一定包含有和shuffle过程相关的逻辑,这种特殊的RDD有两类,ShuffledRDD和CoGroupedRDD,(后者不一定是shuffle的结果), 也就是说reduce是由对特殊RDD的计算触发的。下面以ShuffledRDD为例进行说明,单个RDD进行shuffle会生成这种RDD。

ShuffledRDD

ShuffledRDD的特点由三部分可以体现。首先,它包括了一些跟shuffle有关的field:

  private var serializer: Option[Serializer] = None

  private var keyOrdering: Option[Ordering[K]] = None

  private var aggregator: Option[Aggregator[K, V, C]] = None

  private var mapSideCombine: Boolean = false

其中Aggregator主要用来指明对于同一个key对应的value,如何进行aggregate,但不仅于此。这是个挺有意思的类,它的域是一系列函数。

其次,它的dependency是ShuffleDependency,因此DAGScheduler会把它当作新Stage的起点,它的父RDD被当作前一个Stage的终点。

  override def getDependencies: Seq[Dependency[_]] = {
    List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
  }

最后,当ShuffledRDD的某个partition被compute时,会触发对map输出的fetch,以及对value的aggregate等操作,也就是reduce阶段。

  override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
      .read()
      .asInstanceOf[Iterator[(K, C)]]
  }

那么ShuffledRDD是如何生成的呢?

当然,会引起shuffle的transformation就会生成ShuffledRDD,以reduceByKey为例。

reduceByKey实际上有很多个重载的同名方法,以最简单的为例

  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKey[V]((v: V) => v, func, func, partitioner)
  }

reduceByKey是在某个RDD上被调用的,设此RDD为A,调用reduceByKey生成的RDD为B。那么,以上代码中的partitioner是指用于生成B的Partitioner, 它指出了A中的每个kv对应该进行B的哪个分区。之所以需要注意这点,是因为在combineByKey中会根据这个Partitioner决定需要生成的RDD,在特定情况下reduceByKey不会导致shuffle.

下面是combineByKey中用于决定生成何种RDD的代码:

  if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }

它会根据

if (self.partitioner == Some(partitioner)) 

来决定是否生成ShuffledRDD。其中self.partitioner是指A这个RDD的partitioner,它指明了A这个RDD中的每个key在哪个partition中。而等号右边的partitioner,指明了B这个RDD的每个key在哪个partition中。当二者==时,就会用self.mapPartitions生成MapPartitionsRDD, 这和map这种transformation生成的RDD是一样的,此时reduceByKey不会引发shuffle。

Partitioner有几个子类,它们中的某些会override默认的equals方法(注意,Scala中的==会调用equals方法,这点和Java不同)。典型的,如HashPartitioner中的equals方法

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

当两个HashPartitioner的分区数目一致时,就认为他们相等。但是,即是A和B有相同的Partitioner,也只决定了这两个RDD中相同的key在同一个partition中,并不意味着A中相同的key对应的value已经被aggregate了,因此在combineByKey操作中调用mapPartitions方法时,指定了特殊的Iterator到Iterator的转换方法。

 new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))

也就是说A中partition的Iterator会被执行combineValuesByKey操作,来对value进行aggregate。对于reduceByKey,不管需不需要进行shuffle,对value进行aggregate都是要执行的。比如,在ShuffledRDD的compute方法中,会调用ShuffleReader的read方法。ShuffleReader当前只有一种,叫HashShuffleReader, 不管是用sort还是hash进行shuffle,reduce端都是使用的这个Reader,它会对从map端抓取数据后生成的iterator进行aggregate

    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
        new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
      } else {
        new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
      }
    } else {
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")

      // Convert the Product2s to pairs since this is what downstream RDDs currently expect
      iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
    }

在上边combineByKey的代码中,可以看到它生成ShuffledRDD时,设置了aggreator,而mapSideCombine使用了默认参数,为true,所以combineCombinerByKey会被调用,来对已经combine好的value进行combine。

总结

通过上边的内容,基本可以了解到DAGScheduler是如何处理根据shuffle划分Stage,生成特殊的task;以及Spark执行过程中,map和reduce两个阶段是如何被触发的。

总的是来说, RDD的转换操作会尽量避免shuffle的出现,如果不得不shuffle,会生成特殊的RDD,它的dependencies会是ShuffleDependency。DAGScheduler在划分Stage时,会用ShuffleDependency确定Stage的边界,也会由此生成ShuffleMapTask来完成map端的工作。引发shuffle的transformation会生成特殊的RDD,此RDD会是shuffle中子Stage的起点,当这些RDD的compute方法被调用时,就会触发reduce端操作的执行。这种特殊的RDD有两类:

ShuffledRDD, 它只有一个父RDD,是对一个RDD进行shuffle的结果。

CoGroupedRDD, 它有多个RDD,是对多个RDD进行shuffle的结果。

时间: 2024-08-29 11:16:57

Spark中的Shuffle机制的相关文章

Spark中的Shuffle过程

一.Spark中的Shuffle过程 Shuffle分为两种:Shuffle write.Shuffle read Spark中Shuffle分为两种:HahShuffle.SortShuffle: 1.HashShuffle 磁盘小文件的个数为:M*R = 4*3 =12个 每一个buffer的大小为32k,由于产生的磁盘小文件过多,会产生一系列的问题 如:因为在写文件的时候会产生大量的写句柄,导致产生大量的临时对象,产生OM问题 在Reduce端读取小文件的时候,又会产生的大量的读句柄,浪费

Hadoop与 Spark中的Shuffle之区别与联系

Hadoop与 Spark中的Shuffle之区别与联系 2018年08月22日 20:24:46 小爷欣欣 阅读数:175 转自:http://mini.eastday.com/mobile/180114141035935.html mapreduce过程解析(mapreduce采用的是sort-based shuffle),将获取到的数据分片partition进行解析,获得k/v对,之后交由map()进行处理.map函数处理完成之后,进入collect阶段,对处理后的k/v对进行收集,存储在

Hadoop中的Shuffle 与 Spark中的Shuffle得区别与联系

MapReduce过程.Spark和Hadoop以Shuffle为中心的对比分析 mapreduce与Spark的map-Shuffle-reduce过程 mapreduce过程解析(mapreduce采用的是sort-based shuffle) 将获取到的数据分片partition进行解析,获得k/v对,之后交由map()进行处理. map函数处理完成之后,进入collect阶段,对处理后的k/v对进行收集,存储在内存的环形缓冲区中. 当环形缓冲区中的数据达到阀值之后(也可能一直没有达到阀值

【Spark】Spark的Shuffle机制

MapReduce中的Shuffle 在MapReduce框架中,shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量. Shuffle是MapReduce框架中的一个特定的phase,介于Map phase和Reduce phase之间,当Map的输出结果要被Reduce使用时.输出结果须要按key哈希.而且分发到每个Reducer上去.这个过程就是shuffle.因为shu

spark性能调优(二) 彻底解密spark的Hash Shuffle

装载:http://www.cnblogs.com/jcchoiling/p/6431969.html 引言 Spark HashShuffle 是它以前的版本,现在1.6x 版本默应是 Sort-Based Shuffle,那为什么要讲 HashShuffle 呢,因为有分布式就一定会有 Shuffle,而且 HashShuffle 是 Spark以前的版本,亦即是 Sort-Based Shuffle 的前身,因为有 HashShuffle 的不足,才会有后续的 Sorted-Based S

初解,Scala语言中基于Actor的并发编程的机制,并展示了在Spark中基于Scala语言的Actor而产生的消息驱动框架Akka的使用,

Scala深入浅出实战中级--进阶经典(第66讲:Scala并发编程实战初体验及其在Spark源码中应用解析)内容介绍和视频链接 2015-07-24 DT大数据梦工厂 从明天起,做一个勤奋的人 看视频.下视频,分享视频 DT大数据梦工厂-Scala深入浅出实战中级--进阶经典:第66讲:Scala并发编程实战初体验及其在Spark源码中的应用解析 本期视频通过代码实战详解了Java语言基于加锁的并发编程模型的弊端以及Scala语言中基于Actor的并发编程的机制,并展示了在Spark中基于Sc

第37课:Spark中Shuffle详解及作业

1.什么是Spark的Shuffle 图1 Spark有很多算子,比如:groupByKey.join等等都会产生shuffle. 产生shuffle的时候,首先会产生Stage划分. 上一个Stage会把 计算结果放在LocalSystemFile中,并汇报给Driver: 下一个Stage的运行由Driver触发,Executor向Driver请求,把上一个Stage的计算结果抓取过来. 2.Hadoop的Shuffle过程 图2 该图表达了Hadoop的map和reduce两个阶段,通过S

[Spark内核] 第35课:打通 Spark 系统运行内幕机制循环流程

本课主题 打通 Spark 系统运行内幕机制循环流程 引言 通过 DAGScheduelr 面向整个 Job,然后划分成不同的 Stage,Stage 是從后往前划分的,执行的时候是從前往后执行的,每个 Stage 内部有一系列任務,前面有分享過,任务是并行计算啦,这是并行计算的逻辑是完全相同的,只不过是处理的数据不同而已,DAGScheduler 会以 TaskSet 的方式把我们一个 DAG 构造的 Stage 中的所有任务提交给底层的调度器 TaskScheduler,TaskSchedu

浅谈MapReduce的shuffle机制

Map Reduce是一个计算框架.Map函数发送到所有含有涉及数据的节点上运行,而Reduce之运行在多台主机上用作收集map结果用,reduce数量取决于reduce收集函数分了几个组,只在几个几个节点上运行. shuffle机制:分组排序 MapReduce执行过程 map进程数量基于切片思想,一个切片对应一个map进程,切片大小相对块大小而言,块小切片对应的块数量多,切片是文件中偏移量的范围. 计算分好的split切片交付给map进程后,先在内存中处理,每用满一次缓存,将缓存内容输出成一