shuffle过程中的信息传递

Spark中的shuffle大概是这么个过程:map端把map输出写成本地文件,reduce端去读取这些文件,然后执行reduce操作。

那么,问题来了:

reducer是怎么知道它的输入在哪呢?

首先,mapper在写完文件之后,肯定能提供与它的输出相关的信息。这个信息,在Spark中由MapStatus表示

private[spark] sealed trait MapStatus {

  def location: BlockManagerId

  def getSizeForBlock(reduceId: Int): Long
}

在ShuffleMapTask执行完毕时,MapStatus会被作为执行结果传递给driver。ShuffleMapTasks的runTask方法的声明是这样的

override def runTask(context: TaskContext): MapStatus

reducer如果从driver端获取了跟自己相关的MapStatus, 它就知道哪些BlockManager存储了自己所需要的map输出。

但是,还存在以下问题:

1. driver拿到MapStatus是如何处理的?

2. reducer是如何获取到MapStatus的?

3. reducer是如何根据MapStatus获取map输出的?

driver拿到MapStatus是如何处理的?

首先,executor会把MapStatus作为任务执行的结果,通过statusUpdate方法传给driver

  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
    val msg = StatusUpdate(executorId, taskId, state, data)
    driver match {
      case Some(driverRef) => driverRef.send(msg)
      case None => logWarning(s"Drop $msg because has not yet connected to driver")
    }
  }

DriverEndpoint收到StatusUpdate后,会调用TaskScheduler的statusUpdate方法

      case StatusUpdate(executorId, taskId, state, data) =>
        scheduler.statusUpdate(taskId, state, data.value)

然后经过一个很长的调用链……会调用到DAGScheduler的handleTaskCompletion方法,这个方法会对task的类型进行匹配

case smt: ShuffleMapTask =>

匹配后执行了很多操作,与shuffle有关的有以下一些

            val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
            updateAccumulators(event)
            val status = event.result.asInstanceOf[MapStatus]
            val execId = status.location.executorId
            if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
              logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
            } else {
              shuffleStage.addOutputLoc(smt.partitionId, status)
            }

重点在于,会把output location加到ShuffleMapStage的OutputLoc里,这个OutputLoc是ShuffleMapStage持有的一个MapStatus的数组。当这个Stage的所有任务都完成了,这个Stage里所有任务的MapStatus会被告知给MapOutputTracker

              mapOutputTracker.registerMapOutputs(
                shuffleStage.shuffleDep.shuffleId,
                shuffleStage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
                changeEpoch = true)

MapOutputTracker和BlockManager一样,都是master-worker的结构,worker通过RPC请求master,来提供信息。

由此,MapStatus的信息被从executor传递给driver,最终注册给了MapOutputTracker。

reducer是如何获取到MapStatus的?

首先,引发shuffle的transformation会生成特殊的RDD,ShuffledRDD和CoGroupedRDD,这些RDD的compute方法被调用时,会触发reduce的过程。

下面还是以ShuffledRDD为例。

  override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
      .read()
      .asInstanceOf[Iterator[(K, C)]]
  }

目前,shuffleManager的getReader方法,只会返回HashShuffleReader类型的reader,它是ShuffleReader的唯一子类。

它的read方法,会调用BlockStoreShuffleFetcher的fetch方法去获取map的输出

val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)

这个fetch方法会请求MapOutputTracker来获取map输出的位置和大小,MapOutputTracker的getServerStatus方法会获取这个reducer对应的MapStatus。

//statuses: Array[(BlockManagerId, Long)] 获取这个shuffleId, reduceId对应的map输出的位置和大小
    val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)

reducer是如何根据MapStatus获取map输出的呢

statuses的类型是Array[(BlockManagerId, Long)],这也就是MapStatus能提供的两个信息。

fetch方法会用获取到的MapStatus里的信息组装ShuffleBlockId

    val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]
    for (((address, size), index) <- statuses.zipWithIndex) {
      splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))
    }

    val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map {
      case (address, splits) =>
        (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
    }

注意,statuses这个数组里的信息包括了每个map的输出,即使有map没有对应于此reduce的输出,也会有。这个数组i索引处的信息,即是mapId为i的map的输出信息。因此, splitsByAddress在生成时,使用了statues.zipWithIndex来获取mapId。而组装blocksByAddress的过程就由此生成ShuffleBlockId

case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
  override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
}

这个blocksByAddress会被用来构造ShuffleBlockFetcherIterator,它会去请求BlockManager获取对应的ShuffleBlock。下面是fetch方法中构造ShuffleBlockFetcherIterator的代码

    val blockFetcherItr = new ShuffleBlockFetcherIterator(
      context,
      SparkEnv.get.blockManager.shuffleClient,
      blockManager,
      blocksByAddress,
      serializer,
      // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
      SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)

ShuffleBlockFetcherIterator是一个迭代器,它的主构造器会调用initialize方法进行初始化。这个initialize的主要功能是生成对ShuffleBlock的fetch请求,并发送这些请求。

  private[this] def initialize(): Unit = {
    // Add a task completion callback (called in both success case and failure case) to cleanup.
    context.addTaskCompletionListener(_ => cleanup())

    // 区分开本地的和远端的block
    val remoteRequests = splitLocalRemoteBlocks()
    // 把远端的block随机排列,加到队列里
    fetchRequests ++= Utils.randomize(remoteRequests)

    // 发送对远端的block的请求
    while (fetchRequests.nonEmpty &&
      (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
      sendRequest(fetchRequests.dequeue())
    }

    val numFetches = remoteRequests.size - fetchRequests.size
    logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))

    // 获取本地的block
    fetchLocalBlocks()
    logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime))
  }

它会区分远端的还是本地的block,本地的block就是当前这个executor的BlockManager所管理的block,它可以通过block所在BlockManagerId是否等于本地的BlockManagerId来判断。

fetchLocalBlocks的过程很简单,只要请求本地的BlockManager就行了

val buf = blockManager.getBlockData(blockId)

获取远端的block麻烦一点, 需要ShuffleClient提供帮助

    shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,
      new BlockFetchingListener {
         ...
      }
    )

这个shuffleClient是由BlockManager提供的。它有两种

  private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
    val transConf = SparkTransportConf.fromSparkConf(conf, numUsableCores)
    new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
      securityManager.isSaslEncryptionEnabled())
  } else {
    blockTransferService
  }

默认情况下会使用BlockTransferService。这个东西有两种

    val blockTransferService =
      conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
        case "netty" =>
          new NettyBlockTransferService(conf, securityManager, numUsableCores)
        case "nio" =>
          new NioBlockTransferService(conf, securityManager)
      }

默认使用NettyBlockTransferService。这个东西会启动一个NettyBlockRpcServer,提供block的传输服务。ShuffleClient会通过host和port联系上它。

经过一串的调用,这个server会收到OpenBlocks类型的消息,然后它会这么处理

   message match {
      case openBlocks: OpenBlocks =>
        val blocks: Seq[ManagedBuffer] =
          openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData)
        val streamId = streamManager.registerStream(blocks.iterator)
        logTrace(s"Registered streamId $streamId with ${blocks.size} buffers")
        responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteArray)

在这里,它会调用BlockDataManager的getBlockData方法获取block。BlockManager继承了BlockDataManager,它会把自己注册给BlockTransferService

这个注册,发生在BlockManager的intialize方法中

  def initialize(appId: String): Unit = {
    blockTransferService.init(this)  //把自己注册给BlockTransferService,让BlockTransferService能通过自己存取block

所以,最终会调用到BlockManager的getBlockData方法

  override def getBlockData(blockId: BlockId): ManagedBuffer = {
    if (blockId.isShuffle) {
      shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
    } else {
      val blockBytesOpt = doGetLocal(blockId, asBlockResult = false)
        .asInstanceOf[Option[ByteBuffer]]
      if (blockBytesOpt.isDefined) {
        val buffer = blockBytesOpt.get
        new NioManagedBuffer(buffer)
      } else {
        throw new BlockNotFoundException(blockId.toString)
      }
    }
  }

所以对于ShuffleBlockId,它会调用ShuffleBlockResover来获取block的数据。

这个ShuffleBlockResolver是个神奇的东西。

Spark的shuffle有两种, sort和hash, 分别使用HashShuffleManager和SortShuffleManager。hash的方式会把每个map为每个reduce的输出写一个文件,但是sort是每个map只写一个文件。因此,ShuffleBlockResolver必须应对这两种情况,实际上,也的确有两种ShuffleBlockResolver。HashShuffleManager使用FileShuffleBlockResolver, SortShuffleManager使用IndexShuffleBlockResolver。

这两个ShuffleBlockResolver的区别集中体现了hash和sort两种shuffle方式里reducer读取map输出文件时的差别。

Hash和Sort两种shuffle方式读取map输出文件时的差别

HashShuffleManager使用的是FileShuffleBlockResolver,它的getBlockData方法依据是否启用了consolidate shuffle有不同的执行方式,consolidate shuffle默认是不启用的,此时执行的是

  override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
    if (consolidateShuffleFiles) {
        ...
    } else {
      val file = blockManager.diskBlockManager.getFile(blockId)
      new FileSegmentManagedBuffer(transportConf, file, 0, file.length)
    }
  }    

会直接根据blockId去DiskBlockManager获取相应的文件,然后生成一个FileSegmentManagedBuffer对象,这个buffer的offset从0开始,长度为file.length,也就是整个文件。

SortShuffleManager使用IndexShuffleBlockResolver。由于sort方式的shuffle里的每个map会写一个数据文件和一个索引文件,这个数据文件里会有对应于多个reducer的数据,因此需要先读索引文件来确定对于哪个reducer该从何处读起。

  override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
    // The block is actually going to be a range of a single map output file for this map, so
    // find out the consolidated file, then the offset within that from our index
    val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)

    val in = new DataInputStream(new FileInputStream(indexFile))
    try {
      ByteStreams.skipFully(in, blockId.reduceId * 8)
      val offset = in.readLong()
      val nextOffset = in.readLong()
      new FileSegmentManagedBuffer(
        transportConf,
        getDataFile(blockId.shuffleId, blockId.mapId),
        offset,
        nextOffset - offset)
    } finally {
      in.close()
    }
  }

这个索引文件记得是一系列的long型的值,第i个值代表第i个reducer的数据在数据文件中的偏移。因此,它返回的FileSegmentManagedBuffer不像hash方式时的一样包括整个文件,而是这个文件中的一个片段。

时间: 2024-10-13 17:32:27

shuffle过程中的信息传递的相关文章

去除hadoop的启动过程中警告信息

如何去除hadoop的启动过程中警告信息1.由于警告是在执行start-all.sh启动Hadoop时出现的,所以应该查看start-all.sh,执行more start-all.sh可以看到下面代码:if [ -e "$bin/../libexec/hadoop-config.sh" ]; then  . "$bin"/../libexec/hadoop-config.shelse  . "$bin/hadoop-config.sh"fi根据

SSAS多维数据库处理过程中错误信息及解决方案收集

SSAS在处理过程中一般会遇到这几种错误:1.用户登录失败 说明多维数据库数据源不对,需修改.双击数据源,具体如下图 2.重复属性键问题 例如这种层次结构: 首先,要进行如下设置: 然后,设置month的属性: 设置其KeyColumns属性,将year也添加进去 最后设置NameColumn属性,如下图: 3.找不到属性键错误 此种错误是因为垃圾数据引起的.可以忽略维度键错误,或者修正 数据仓库里面垃圾数据.具体如下图: 4.当CUBE建完后,对事实表或维度表的字段长度做改变时会出现,就会出现

kubernetes环境下 创建pod过程中 异常信息总结整理

[toc] 1.异常信息:Failed to pull image "spark:0.1": rpc error: code = Unknown desc = repository docker.io/spark not found: does not exist or no pull access 2.异常信息:Error from server (BadRequest): container "xej" in pod "xej-545694f448-j

C#调用SQL中的存储过程中有output参数,存储过程执行过程中返回信息

C#调用SQL中的存储过程中有output参数,类型是字符型的时候一定要指定参数的长度.不然获取到的结果总是只有第一字符.本人就是由于这个原因,折腾了很久.在此记录一下,供大家以后参考! 例如: CREATE PROCEDURE sp_AccountRole_Create @CategoryID int, @RoleName nvarchar(10), @Description nvarchar(50), @RoleID int output AS DECLARE @Count int -- 查

Hadoop学习笔记—10.Reduce阶段中的Shuffle过程

一.回顾Reduce阶段三大步凑 在第四篇博文<初识MapReduce>中,我们认识了MapReduce的八大步凑,其中在Reduce阶段总共三个步凑,如下图所示: 其中,Step2.1就是一个Shuffle操作,它针对多个map任务的输出按照不同的分区(Partition)通过网络复制到不同的reduce任务节点上,这个过程就称作为Shuffle. PS:Hadoop的shuffle过程就是从map端输出到reduce端输入之间的过程,这一段应该是Hadoop中最核心的部分,因为涉及到Had

Spark 学习: spark 原理简述与 shuffle 过程介绍

Spark学习: 简述总结 Spark 是使用 scala 实现的基于内存计算的大数据开源集群计算环境.提供了 java,scala, python,R 等语言的调用接口. Spark学习 简述总结 引言 1 Hadoop 和 Spark 的关系 Spark 系统架构 1 spark 运行原理 RDD 初识 shuffle 和 stage 性能优化 1 缓存机制和 cache 的意义 2 shuffle 的优化 3 资源参数调优 4 小结 本地搭建 Spark 开发环境 1 Spark-Scal

【Big Data - Hadoop - MapReduce】通过腾讯shuffle部署对shuffle过程进行详解

摘要: 通过腾讯shuffle部署对shuffle过程进行详解 摘要:腾讯分布式数据仓库基于开源软件Hadoop和Hive进行构建,TDW计算引擎包括两部分:MapReduce和Spark,两者内部都包含了一个重要的过程—Shuffle.本文对Shuffle过程进行解析,并对两个计算引擎的Shuffle过程进行比较. 腾讯分布式数据仓库(Tencent distributed Data Warehouse, 简称TDW)基于开源软件Hadoop和Hive进行构建,并且根据公司数据量大.计算复杂等

MapReduce和spark的shuffle过程详解

面试常见问题,必备答案. 参考:https://blog.csdn.net/u010697988/article/details/70173104 mapReducehe和Spark之间的最大区别是前者较偏向于离线处理,而后者重视实效性,下面主要介绍mapReducehe和Spark两者的shuffle过程. MapReduce的Shuffle过程 MapReduce计算模型一般包括两个重要的阶段:Map是映射,负责数据的过滤分发:Reduce是规约,负责数据的计算归并.Reduce的数据来源于

hadoop的mapReduce和Spark的shuffle过程的详解与对比及优化

https://blog.csdn.net/u010697988/article/details/70173104 大数据的分布式计算框架目前使用的最多的就是hadoop的mapReduce和Spark,mapReducehe和Spark之间的最大区别是前者较偏向于离线处理,而后者重视实现性,下面主要介绍mapReducehe和Spark两者的shuffle过程. MapReduce的Shuffle过程介绍 Shuffle的本义是洗牌.混洗,把一组有一定规则的数据尽量转换成一组无规则的数据,越随