大数据:Spark Shuffle(一)ShuffleWrite:Executor如何将Shuffle的结果进行归并写到数据文件中去

1. 前序

关于Executor如何运行算子,请参考前面博文:大数据:Spark Core(四)用LogQuery的例子来说明Executor是如何运算RDD的算子,当Executor进行reduce运算的时候,生成运算结果的临时Shuffle数据,并保存在磁盘中,被最后的Action算子调用,而这个阶段就是在ShuffleMapTask里执行的。

前面博客中也提到了,用什么ShuffleWrite是由ShuffleHandler来决定的,在这篇博客里主要介绍最常见的SortShuffleWrite的核心算法ExternalSorter.

2. 结构AppendOnlyMap

在前面博客中介绍了SortedShuffleWrite调用ExternalSorter.insertAll进行数据插入和数据合并的,ExternalSorted里使用了PartitionedAppendOnlyMap作为数据的存储方式

先来看PartitionedAppendOnlyMap的结构

虽然名字为Map,但是在这里和常见的Map的结构并不太一样,里面并没有使用链表结果保存相同的hash值的key,当插入的key的hashcode相同的时但key不相同,会通过i的叠加一直找到数组里空闲的位置。

这里有几个注意点:

  • Key 注意这里的Key并不是通过Map里拆分的Key, 而是Tuple2(PartitionId,Key),由分片的段和key组合的联合key
  • 如何计算PartitionId? 这是由Partitioner来决定的

2.1 Partitioner

Partitioner的方法

abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

通过调用getPartition方法找到对应的partition相应的块,而常用的是HashPartitioner

 def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

计算 key的hashCode,进行总的分片数求余,分配到对应的片区

3. Spill

在大数据的情况下进行归并,由于合并的数据量非常大,仅仅使用AppendOnlyMap进行数据的归并内存显然是不足够的,在这种情况下需要对讲内存里的已经归并的数据刷到磁盘上避免OOM的风险。

控制Spill到磁盘的阀值

  • 内存:虽然Java的堆内存管理是由JVM虚拟机管控,但是Spark自己实现了一个简单的但不精准的内存管理,内存的申请在TaskMemoryManager里进行管理
 if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
      // Claim up to double our current memory from the shuffle memory pool
      val amountToRequest = 2 * currentMemory - myMemoryThreshold
      val granted = acquireMemory(amountToRequest)
      myMemoryThreshold += granted
      // If we were granted too little memory to grow further (either tryToAcquire returned 0,
      // or we already had more memory than myMemoryThreshold), spill the current collection
      shouldSpill = currentMemory >= myMemoryThreshold
    }

在每添加32个元素的时候,检查一下当前的内存状况,currentMemory是Map当前大概使用的内存,myMemoryThreshold是可以使用的内存址,初始的时候受参数控制:

spark.shuffle.spill.initialMemoryThreshold

为何要尝试申请1倍的当前内存?AppendOnlyMap的每次扩容是1倍数组

  • 数据的数量:有的时候每条数据量比较小,但是数据的数量非常大,为了避免在AppendOnlyMap里有大量的数据,在Spill的时候同时还可以使用数量的控制:
spark.shuffle.spill.numElementsForceSpillThreshold

3.1 如何Spill?

当从AppendOnlyMap到SpilledFile磁盘总共有3个过程

  1. 整理数组,将数组里的不存在KV的空间移除
  2. 按照区块排序,对同一区块里的Key使用TimeSort进行排序,TimeSort不在此处讨论
  3. Spill到文件的时候,只是保存了序列化了Key,Value并没有保存Key的区块信息,但在SpilledFile的对象中有记录每个partitionkey的数量的数组

SpilledFile的命名:temp_shuffle_UUID

4. 生成ShuffleWrite的数据文件

在3章节的时候,有没有考虑过为何要排序完才Spill到临时文件中?

Spark中是不要求在reduce端进行排序的,生成Shuffle的结果文件并不要求排序,但是因为Spill到文件中后,有可能相同的Key会分布在不同的文件中,所以需要对不同的文件进行相同的Key的值的计算。如果Spill到文件是乱序的,那代表在最后生成Shuffle结果的时候,还是要Load所有文件才能确定哪些Key是重复的需要做合并,这样依然面对着内存不够的情况。

生成Shuffle文件过程实际上就是个外排序的过程。

  • 首先对AppendOnlyMap进行归并,排序
  • 开始对同一区块的进行归并
  • 将AppendOnlyMap,SpilledFile的文件进行优先级的Queue的迭代,每次迭代出所有Queue中一个最小的Key,最小的Key就是HashCode最小
  private def mergeSort(iterators: Seq[Iterator[Product2[K, C]]], comparator: Comparator[K])
      : Iterator[Product2[K, C]] =
  {
    val bufferedIters = iterators.filter(_.hasNext).map(_.buffered)
    type Iter = BufferedIterator[Product2[K, C]]
    val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] {
      // Use the reverse of comparator.compare because PriorityQueue dequeues the max
      override def compare(x: Iter, y: Iter): Int = -comparator.compare(x.head._1, y.head._1)
    })
    heap.enqueue(bufferedIters: _*)  // Will contain only the iterators with hasNext = true
    new Iterator[Product2[K, C]] {
      override def hasNext: Boolean = !heap.isEmpty

      override def next(): Product2[K, C] = {
        if (!hasNext) {
          throw new NoSuchElementException
        }
        val firstBuf = heap.dequeue()
        val firstPair = firstBuf.next()
        if (firstBuf.hasNext) {
          heap.enqueue(firstBuf)
        }
        firstPair
      }
    }
  }

  • 当找到一个最小的Key的时候,并不能保存到ShuffleWrite文件中,因为有可能存在相同的最小的key,所以还需要在迭代找到下一个最小的Key,如果key的hashcode相同的时候,要进行相同的Key进行合并(因为Key的排序是依赖于HashCode的大小,所以相同的最小的Key代表的是HashCode相同的Key),如果不同则保存成相同HashCode的数组,进行下一次的优先queue的查找,直到找到的Key的hashcode大于最小的Key结束
   if (!hasNext) {
            throw new NoSuchElementException
          }
          keys.clear()
          combiners.clear()
          val firstPair = sorted.next()
          keys += firstPair._1
          combiners += firstPair._2
          val key = firstPair._1
          while (sorted.hasNext && comparator.compare(sorted.head._1, key) == 0) {
            val pair = sorted.next()
            var i = 0
            var foundKey = false
            while (i < keys.size && !foundKey) {
              if (keys(i) == pair._1) {
                combiners(i) = mergeCombiners(combiners(i), pair._2)
                foundKey = true
              }
              i += 1
            }
            if (!foundKey) {
              keys += pair._1
              combiners += pair._2
            }
          }
  • 将k,v内容写到shufflewrite的文件Shuffle_shuffleId_mapId_reduceId.data中去

  • 重复前面的行为直到所有的key被迭代结束
  • 前面的归并是以区块(Partition)为单位的,而data的文件里并没有保存区块的相关信息,但在每迭代完一个Partition的时候(SpilledFile文件里面也没有Partition的信息,但是是通过SpilledFile结构中的numPartition的数量来判断Partition的数据是否已经读完),会生成一个Segement,Segement 里记录了这个块保存在data文件里的长度
  • 最后生成Shuffle_shuffleId_mapId_reduceId.index文件,文件里记录了每个Partition在data文件中的位移

这样一个完整的Shuffle结果写入data的逻辑执行完了

5 总结

  • 使用AppendOnlyMap数据结构进行输入数据的合并计算
  • 输入的数据是进行分区合并计算,分区的方式是由Partitioner决定的
  • 当内存不够的时候,会进行相同区块下的数据整理排序,Spill到临时文件temp_shuffle_UUID
  • 最后对所有的数据集合(AppendOnlyMap里的数据和多个Spill的临时文件)进行区块的数据合并
  • 生成Shuffle_shuffleId_mapId_reduceId.data 分区的数据文件,Shuffle_shuffleId_mapId_reduceId.index记录分区的位置
时间: 2024-10-21 08:06:08

大数据:Spark Shuffle(一)ShuffleWrite:Executor如何将Shuffle的结果进行归并写到数据文件中去的相关文章

JavaIO 将数据写入到文件中去

package com.Practice_FileWriter; import java.io.FileWriter; import java.io.IOException; public class JustPractice { public static void main(String[] args) throws IOException { //第一步,先建立一个可以向文件中写入数据的输出流对象,这一步的时候要给出写入文件的路径: FileWriter fw = new FileWrit

利用正则表达式读取txt文件中的邮箱,电话号码,url地址,手机号,将数据一行一个保存到一个新的文件中去

本人最近整合了从文本中提取信息,将之保存的新的文本中,在此处用到的是正则表达式,希望大家一起学习. 文件操作类:fileOperation.java package dyx_13; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileOutputStream; import java.io.FileReader; import java.io.F

C# 将后台数据 写到前台脚本中去【控制页面中某些脚本只在 页面第一次加载的时候才去 执行】

protected void Page_Load(object sender, EventArgs e) { if (!Page.IsPostBack) { this.domainBind(); Page.RegisterClientScriptBlock("F1", "<script>var c=1</script>"); } else { Page.RegisterClientScriptBlock("F0", &qu

在文件中的指定位置写入数据

笔者的应用场景:将图像指定区域的像素值按其空间位置写到 txt 文件中. int hang = 4; // 矩阵行数 int lie = 8; // 矩阵列数 // 先在txt文件中构造一个空矩阵,然后向该矩阵中指定行列写入个位数 int main10() { FILE *filp = fopen("cc.txt", "w+"); for (int i = 0; i < hang; i++) { fseek(filp, lie, SEEK_END); fpri

[Linux应用]Linux应用程序输出数据重定向到文件中

转自:http://blog.chinaunix.net/uid-20680966-id-4698387.html 目的是要让程序的printf的打印能重定向到某个文本中,ctrl+c强制退出后查看文本,方便调试.运行方式:a. out程序正常运行是会打印一些字符的 a.out > 1.txt 2>&1 把stdout与stderr都重定向到1.txt文件中去,结果发现运行过程中1.txt一直为空,调试发现如果a.out正常结束退出1.txt数据就正常,ctrl+c强制退出程序则1.t

大数据-spark理论(2)算子,shuffle优化

导读目录 第一节:代码层面 1:RDD创建 2:算子 3:数据持久化算子 4:广播变量 5:累加器 6:开发流程 第二节:Shuffle优化层面 1:Shuffle 2:调优 第一节:代码层面 (1)RDD创建: Java: sc.textfile sc.parallelize() sc.parallelizePairs(得到KV格式的RDD) Scala: sc.textfile sc.parallelize //如果不指定分区数,用的是系统的默认分区数 makeRDD //如果不指定分区数,

王家林 大数据Spark超经典视频链接全集[转]

压缩过的大数据Spark蘑菇云行动前置课程视频百度云分享链接 链接:http://pan.baidu.com/s/1cFqjQu SCALA专辑 Scala深入浅出经典视频 链接:http://pan.baidu.com/s/1i4Gh3Xb 密码:25jc DT大数据梦工厂大数据spark蘑菇云Scala语言全集(持续更新中) http://www.tudou.com/plcover/rd3LTMjBpZA/ 1 Spark视频王家林第1课:大数据时代的“黄金”语言Scala 2 Spark视

2016年大数据Spark“蘑菇云”行动之flume整合spark streaming

近期,听了王家林老师的2016年大数据Spark"蘑菇云"行动,需要将flume,kafka和Spark streaming进行整合. 感觉一时难以上手,还是先从简单着手吧:我的思路是这样的,flume产生数据,然后输出到spark streaming,flume的源数据是netcat(地址:localhost,端口22222),输出是avro(地址:localhost,端口是11111).Spark streaming的处理是直接输出有几个events. 一.配置文件 Flume 配

如何成为云计算大数据Spark高手

Spark是发源于美国加州大学伯克利分校AMPLab的集群计算平台,它立足于内存计算,性能超过Hadoop百倍,从多迭代批量处理出发,兼收并蓄数据仓库.流处理和图计算等多种计算范式,是罕见的全能选手.Spark采用一个统一的技术堆栈解决了云计算大数据的如流处理.图技术.机器学习.NoSQL查询等方面的所有核心问题,具有完善的生态系统,这直接奠定了其一统云计算大数据领域的霸主地位. 伴随Spark技术的普及推广,对专业人才的需求日益增加.Spark专业人才在未来也是炙手可热,轻而易举可以拿到百万的