Spark技术内幕:Spark Pluggable框架详解,你怎么开发自己的Shuffle Service?

首先介绍一下需要实现的接口。框架的类图如图所示(今天CSDN抽风,竟然上传不了图片。如果需要实现新的Shuffle机制,那么需要实现这些接口。

1.1.1  org.apache.spark.shuffle.ShuffleManager

Driver和每个Executor都会持有一个ShuffleManager,这个ShuffleManager可以通过配置项spark.shuffle.manager指定,并且由SparkEnv创建。Driver中的ShuffleManager负责注册Shuffle的元数据,比如Shuffle ID,map task的数量等。Executor中的ShuffleManager 则负责读和写Shuffle的数据。

需要实现的函数及其功能说明:

1)       由Driver注册元数据信息

defregisterShuffle[K, V, C](

shuffleId: Int,

numMaps: Int,

dependency:ShuffleDependency[K, V, C]): ShuffleHandle

一般如果没有特殊的需求,可以使用下面的实现,实际上Hash BasedShuffle 和Sort BasedShuffle都是这么实现的。

override def registerShuffle[K, V, C](

shuffleId: Int,

numMaps: Int,

dependency: ShuffleDependency[K, V, C]):ShuffleHandle = {

new BaseShuffleHandle(shuffleId, numMaps,dependency)

}

2)       获得Shuffle Writer, 根据Shuffle Map Task的ID为其创建Shuffle Writer。

def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context:TaskContext): ShuffleWriter[K, V]

3)       获得Shuffle Reader,根据Shuffle ID和partition的ID为其创建ShuffleReader。

def getReader[K, C](

handle: ShuffleHandle,

startPartition: Int,

endPartition: Int,

context: TaskContext): ShuffleReader[K,C]

4)       为数据成员shuffleBlockManager赋值,以保存实际的ShuffleBlockManager

5)       defunregisterShuffle(shuffleId: Int): Boolean,删除本地的Shuffle的元数据。

6)       def stop(): Unit,停止Shuffle Manager。

每个接口的具体实现的例子,可以参照org.apache.spark.shuffle.sort.SortShuffleManager 和org.apache.spark.shuffle.hash.HashShuffleManager。

1.1.2  org.apache.spark.shuffle.ShuffleWriter

Shuffle Map Task通过ShuffleWriter将Shuffle数据写入本地。这个Writer主要通过ShuffleBlockManager来写入数据,因此它的功能是比较轻量级的。

1)         def write(records: Iterator[_ <:Product2[K, V]]): Unit, 写入所有的数据。需要注意的是如果需要在Map端做聚合。(aggregate),那么写入前需要将records做聚合。

2)         def stop(success: Boolean): Option[MapStatus],写入完成后提交本次写入。

对于Hash BasedShuffle,请查看org.apache.spark.shuffle.hash.HashShuffleWriter;对于Sort Based Shuffle,请查看org.apache.spark.shuffle.sort.SortShuffleWriter。

1.1.3  org.apache.spark.shuffle.ShuffleBlockManager

主要使用从本地读取Shuffle数据的功能。这些接口都是通过org.apache.spark.storage.BlockManager调用的。

1)       def getBytes(blockId: ShuffleBlockId):Option[ByteBuffer], 一般通过调用下一个接口实现,只不过将ManagedBuffer转换成了ByteBuffer。

2)       def getBlockData(blockId:ShuffleBlockId): ManagedBuffer,核心读取逻辑。比如Hash Based Shuffle的从本地读取文件都是通过这个接口实现的。因为不同的实现可能文件的组织方式是不一样的,比如Sort Based Shuffle需要通过先读取Index索引文件获得每个partition的起始位置后,才能读取真正的数据文件。

3)       def stop(): Unit,停止该Manager。

对于Hash Based Shuffle,请查看org.apache.spark.shuffle.FileShuffleBlockManager;对于Sort Based Shuffle,请查看org.apache.spark.shuffle.IndexShuffleBlockManager。

1.1.4  org.apache.spark.shuffle.ShuffleReader

ShuffleReader实现了下游的Task如何读取上游的ShuffleMapTask的Shuffle输出的逻辑。这个逻辑比较复杂,简单来说就是通过org.apache.spark.MapOutputTracker获得数据的位置信息,然后如果数据在本地那么调用org.apache.spark.storage.BlockManager的getBlockData读取本地数据(实际上getBlockData最终会调用org.apache.spark.shuffle.ShuffleBlockManager的getBlockData)。具体的Shuffle Read的逻辑请查看下面的章节。

1)       def read():Iterator[Product2[K, C]]

如何开发自己的Shuffle机制?到这里你应该知道怎么做了。不知道? 再看一遍吧。

如果您喜欢 本文,那么请动一下手指支持以下博客之星的评比吧。非常感谢您的投票。每天可以一票哦。

点我投票

时间: 2024-11-05 14:41:38

Spark技术内幕:Spark Pluggable框架详解,你怎么开发自己的Shuffle Service?的相关文章

Spark技术内幕:Executor分配详解

当用户应用new SparkContext后,集群就会为在Worker上分配executor,那么这个过程是什么呢?本文以Standalone的Cluster为例,详细的阐述这个过程.序列图如下: 1. SparkContext创建TaskScheduler和DAG Scheduler SparkContext是用户应用和Spark集群的交换的主要接口,用户应用一般首先要创建它.如果你使用SparkShell,你不必自己显式去创建它,系统会自动创建一个名字为sc的SparkContext的实例.

Spark技术内幕:Client,Master和Worker 通信源码解析

Spark的Cluster Manager可以有几种部署模式: Standlone Mesos YARN EC2 Local 在向集群提交计算任务后,系统的运算模型就是Driver Program定义的SparkContext向APP Master提交,有APP Master进行计算资源的调度并最终完成计算.具体阐述可以阅读<Spark:大数据的电花火石!>. 那么Standalone模式下,Client,Master和Worker是如何进行通信,注册并开启服务的呢? 1. node之间的IP

Spark技术内幕:Master基于ZooKeeper的High Availability(HA)源码实现

如果Spark的部署方式选择Standalone,一个采用Master/Slaves的典型架构,那么Master是有SPOF(单点故障,Single Point of Failure).Spark可以选用ZooKeeper来实现HA. ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master但是只有一个是Active的,其他的都是Standby,当Active的Master出现故障时,另外的一个Standby Master会被选举出来.由于

Spark技术内幕:Master的故障恢复

Spark技术内幕:Master基于ZooKeeper的High Availability(HA)源码实现  详细阐述了使用ZK实现的Master的HA,那么Master是如何快速故障恢复的呢? 处于Standby状态的Master在接收到org.apache.spark.deploy.master.ZooKeeperLeaderElectionAgent发送的ElectedLeader消息后,就开始通过ZK中保存的Application,Driver和Worker的元数据信息进行故障恢复了,它

Spark技术内幕:Worker源码与架构解析

首先通过一张Spark的架构图来了解Worker在Spark中的作用和地位: Worker所起的作用有以下几个: 1. 接受Master的指令,启动或者杀掉Executor 2. 接受Master的指令,启动或者杀掉Driver 3. 报告Executor/Driver的状态到Master 4. 心跳到Master,心跳超时则Master认为Worker已经挂了不能工作了 5. 向GUI报告Worker的状态 说白了,Worker就是整个集群真正干活的.首先看一下Worker重要的数据结构: v

spark读写压缩文件API使用详解

最近研究了下Spark如何读写压缩格式的文件,主要有如下三种方式,这里以lzo方式压缩为例     /*******************old hadoop api*************************/     val confHadoop = new JobConf     confHadoop.set("mapred.output.compress", "true")     confHadoop.set("mapred.output

Spark技术内幕:一个图搞定Spark到底有多少行代码

Spark1.0.0发布一个多月了,那么它有多少行代码(Line of Code, LOC)? 注:代码统计未包含测试,sample. Spark技术内幕:一个图搞定Spark到底有多少行代码

Spark技术内幕: Task向Executor提交的源代码解析

在上文<Spark技术内幕:Stage划分及提交源代码分析>中,我们分析了Stage的生成和提交.可是Stage的提交,仅仅是DAGScheduler完毕了对DAG的划分,生成了一个计算拓扑,即须要依照顺序计算的Stage,Stage中包括了能够以partition为单位并行计算的Task.我们并没有分析Stage中得Task是怎样生成而且终于提交到Executor中去的. 这就是本文的主题. 从org.apache.spark.scheduler.DAGScheduler#submitMis

华为网络技术大赛模拟题答案详解

华为网络技术大赛模拟题答案详解 [尊重原创,转载请注明出处]http://blog.csdn.net/guyuealian/article/details/51354514 一.判断题 (1)VLSM的作用是:在有类的IP地址基础上,从主机位部分划分出相应的位数做为网络位.但是在路由器上部署时,需要路由协议的支持. [解释]对,VLSM=Variable Length Subnet Mask,可变长子网掩码 (2)有效的沟通是任何组织和任何项目的基础,项目经理可以花90%或者更多的时间在沟通这方