Spark Shuffle Write 阶段函数调用分析

Shuffle Write阶段函数调用如下:

org.apache.spark.executor.run() --> org.apache.spark.scheduler.Task.run() --> org.apache.spark.scheduler.runTask()

--> org.apache.spark.shuffle.hash.HashShuffleWriter.write() --> org.apache.spark.storage.DiskBlockObjectWriter.write()

任务在Executor上开始真正执行,代码如下:

该过程调用Task 中的run方法,该方法会调用runTask方法,而在spark中,任务可分为ShuffleMapTask和ResultTask,有Shuffle过程的为ShuffleMapTask

因此在Task类中的run方法中runTask(context: TaskContext)就有ShuffleMapTask与ResultTask的相应实现,本文讨论Shuffle Write过程中的函数调用关系

具体如下:

  在ShuffleMapask中的runTask()实现中有如下代码:

  

该阶段会调用Shuffle write方法,默认调用HashShuffleWriter中write方法,具体代码如下:

  

该阶段会调用DiskBlockObjectWriter中的write()方法,实现数据的真正写入,具体如下:

  

时间: 2024-10-06 13:54:33

Spark Shuffle Write 阶段函数调用分析的相关文章

Hadoop中shuffle阶段流程分析

Hadoop中shuffle阶段流程分析 MapReduce  longteng  9个月前 (12-23)  399浏览  0评论 宏观上,Hadoop每个作业要经历两个阶段:Map phase和reduce phase.对于Map phase,又主要包含四个子阶段:从磁盘上读数据->执行map函数->combine结果->将结果写到本地磁盘上:对于reduce phase,同样包含四个子阶段:从各个map task上读相应的数据(shuffle)->sort->执行red

Spark Shuffle过程详细分析

在MapReduce中shuffle和Spark的shuffle的过程有一些区别.这里做一下具体的介绍. Mapreduce的shuffle过程图解 Spark shuffle过程图解 注意:spark shuffle过程中没有分区和排序的过程,而且存储结果存储在内存中,所以速度要比mapreduce要快很多. 先就到这里吧,图解的说明应该比较清晰了.有问题欢迎留言

Spark SQL Catalyst源码分析之TreeNode Library

前几篇文章介绍了Spark SQL的Catalyst的SqlParser,和Analyzer,本来打算直接写Optimizer的,但是发现忘记介绍TreeNode这个Catalyst的核心概念,介绍这个可以更好的理解Optimizer是如何对Analyzed Logical Plan进行优化的生成Optimized Logical Plan,本文就将TreeNode基本架构进行解释. 一.TreeNode类型 TreeNode Library是Catalyst的核心类库,语法树的构建都是由一个个

Spark Shuffle的技术演进

在Spark或Hadoop MapReduce的分布式计算框架中,数据被按照key分成一块一块的分区,打散分布在集群中各个节点的物理存储或内存空间中,每个计算任务一次处理一个分区,但map端和reduce端的计算任务并非按照一种方式对相同的分区进行计算,例如,当需要对数据进行排序时,就需要将key相同的数据分布到同一个分区中,原分区的数据需要被打乱重组,这个按照一定的规则对数据重新分区的过程就是Shuffle(洗牌). Spark Shuffle的两阶段 对于Spark来讲,一些Transfor

Spark SQL Catalyst源码分析之Physical Plan 到 RDD的具体实现

接上一篇文章Spark SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节: 我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD. lazy val toRdd: RDD[Row] = executedPlan.execute() Spark Plan基本包含4种操作类型,即BasicOperator基本类型,还有就是Join.Aggregate和Sort这种

spark shuffle过程分析

spark shuffle流程分析 回到ShuffleMapTask.runTask函数 现在回到ShuffleMapTask.runTask函数中: overridedef runTask(context:TaskContext): MapStatus = { 首先得到要reduce的task的个数. valnumOutputSplits= dep.partitioner.numPartitions metrics= Some(context.taskMetrics) valblockMana

spark shuffle 内幕彻底解密课程

一:到底什么是Shuffle? Shuffle中文翻译为"洗牌",需要Shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节点上进行计算. 二:Shuffle可能面临的问题?运行Task的时候才会产生Shuffle(Shuffle已经融化在Spark的算子中了). 1, 数据量非常大: 2, 数据如何分类,即如何Partition,Hash.Sort.钨丝计算: 3, 负载均衡(数据倾斜): 4, 网络传输效率,需要在压缩和解压缩之间做出权衡,序列化和反序列也是要考

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadOutApp尽量在使用单个executor的资源. 源码分析 org.apache.spark.deploy.master.Master 1.首先判断,master状态不是ALIVE的话,直接返回2.调度driver3. Application的调度机制(核心之核心,重中之重) 源码如下: 1 /*

Spark Shuffle 堆外内存溢出问题与解决(Shuffle通信原理)

Spark Shuffle 堆外内存溢出问题与解决(Shuffle通信原理) 问题描述 Spark-1.6.0已经在一月份release,为了验证一下它的性能,我使用了一些大的SQL验证其性能,其中部分SQL出现了Shuffle失败问题,详细的堆栈信息如下所示: 16/02/17 15:36:36 WARN server.TransportChannelHandler: Exception in connection from /10.196.134.220:7337 java.lang.Out