spark block读写流程分析

之前分析了spark任务提交以及计算的流程,本文将分析在计算过程中数据的读写过程。我们知道:spark抽象出了RDD,在物理上RDD通常由多个Partition组成,一个partition对应一个block。在driver和每个executor端,都有一个Blockmanager。Blockmanager是spark在计算过程中对block进行读写的入口,它屏蔽了在读取数据时涉及到的内存分配,从其他executor端远程获取等具体细节。接下来,本文将以读写block为主线,分析spark在计算过程中读写实际数据的流程。

1,计算数据写流程

1.1,从计算上来说, RDD中的一个Partition对应一个Task。在Task在taskRunner的run方法中调用task.run方法,然后根据计算结果的大小,以不同形式(直接发送或者通过blockManager)将数据发送给driver。

val value = try {  val res = task.run(    taskAttemptId = taskId,    attemptNumber = attemptNumber,    metricsSystem = env.metricsSystem)  threwException = false res}

...

// directSend = sending directly back to the driverval serializedResult: ByteBuffer = {  if (maxResultSize > 0 && resultSize > maxResultSize) {    logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +      s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +      s"dropping it.")    ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))  } else if (resultSize > maxDirectResultSize) {    val blockId = TaskResultBlockId(taskId)    env.blockManager.putBytes(      blockId,      new ChunkedByteBuffer(serializedDirectResult.duplicate()),      StorageLevel.MEMORY_AND_DISK_SER)    logInfo(      s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")    ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))  } else {    logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")    serializedDirectResult  }}

1.2,在使用block形式的时候,可以看到调用了blockManager的putBytes方法,在核心实现doPutBytes中,根据存储级别和是否序列化使用memstore和diskstore中不同的不方法进行存储。顾名思义,memstore和diskstore主要就是根据存储级别将对应的block储存到内存或者磁盘。

1.3,memstore的putBytes实现如下。可以看到首先需要通过memoryManager申请存储保存当前block的内存,申请到内存后改block的数据,会以BlockId和SerializeMemoryEntry的键值对,保存在memstore的entries的对象中。关于memoryManager,当前有StaticMemoryManager和UnifiedMemoryManager两种实现。StaticMemoryManager是之前老的实现,将spark计算过程使用的存储内存和计算内存按照总大小的固定比例进行分配。UnifiedMemoryManager是2.x的默认实现,相对StaticMemoryManager,UnifiedMemoryManager中存储和计算的内存是可以动态调整的。也就是说,当计算内存紧张,储存内存空闲的时候,计算内存可以借用存储内存。反之类似。

1.4,在1.3完成当前executor机器完成当前block存储以后,当需要告诉driver时(tellMaster参数),会将该block的状态汇报给driver(reportBlockStatus),通过向dirver发送UpdateBlockInfo消息。driver接收到UpdateBlockInfo消息后,将汇报过来的相关信息保存在BlockManagerMasterEndpoint的blockManagerInfo和blockLocations中。

至此,计算过程写数据的流程完成。

2,计算数据读流程

2.1,话说在TaskRunner运行结束以后,会调用execBackend.statusUpdate,会将该任务的结束的状态通过StatusUpdate的信息发送给driver。

2.2,driver端接收到StatusUpdate消息后,最终将调用TaskResultGetter的enqueueSuccessfulTask方法。在该方法中,对于使用block(即IndirectTaskResult),最终将调用blockManager的getRemoteBytes获取该blockId对应的数据。

2.3,在blockManager的getRemoteBytes方法中,主要逻辑是获取该blockId对应的存储该blockId数据的所有机器位置,通过调用blockTransferService的fetchBlockSync获取具体数据,一旦从一个指定的位置获取到数据,则立即返回。

2.4,fetchBlockSync接着会调用具体实现NettyBlockTransferService中的fetchBlocks方法,在该方法中,将通过OneForOneBlockFetcher发送OpenBlocks消息给指定目标的blockManager,从而对应的streamId等信息,然后通过

client.fetchChunk一次获取每块的数据。

2.5,在提供数据的blockManager端(即server端),接受到消息OpenBlocks消息后,首先根据blockId通过blockManager的getBlockData方法获取对应的数据,然后将该数据和一个streamId奖励对应关系(通过streamManager调用进行)

2.6,在2.4中获取到对应的streamId后,将通过ChunkFetchRequest分块获取数据。server端接受到该消息以后,streamManager将根据streamId和chunkIndex获取对应数据,然后返回给客户端。

至此,计算过程获取block数据的流程结束。

时间: 2024-11-02 08:50:32

spark block读写流程分析的相关文章

S3C6410 SPI全双工读写流程分析(原创)【转】

转自:http://blog.csdn.net/hustyangju/article/details/21165721 原创博文,知识共享!转载请注明出处:http://blog.csdn.net/hustyangju/article/details/21165721 S3C6410 SPI全双工读写流程分析 一.SPI控制器datasheet 1详细请参考:http://blog.csdn.net/hustyangju/article/details/20474659 2 SPI的所有寄存器都

HDFS(一)架构及文件读写流程

Hadoop 中有三大组件:HDFS.MapReduce.YARN,HDFS 负责大文件存储的问题,MapReduce 负责大数据计算,而 YARN 负责资源的调度,接下来的文章我会一一介绍这几个组件.今天我们先来聊聊 HDFS 的架构及文件的读写流程. 总体架构 HDFS 设计的目的是为了存储大数据集的文件,因此一台服务器是应付不了的,我们需要一个集群来实现这个目标.当用户需要存储一个文件时,HDFS 会将这个文件切分为一个个小的数据块(在 2.x 的版本中,每个数据块默认大小为 128M),

基于SPARK SQL 读写ORACLE 的简单案例分析常见问题

该文章出自上海harli,偷偷地把女神的东西拿出来,希望女神不要介意. 一.概述 本文主要内容包含Spark SQL读写Oracle表数据的简单案例,并针对案例中比较常见的几个问题给出解决方法. 最后从常见的java.lang.ClassNotFoundException(无法找到驱动类)的异常问题出发,分析相关的几种解决方法,以及各个解决方法之间的异同点. 二.案例中比较常见问题及其解决方法 2.1 启动 首先查看Spark 官网给出的SparkSQL的编程指南部分(http://spark.

Spark SQL源码分析之核心流程

自从去年Spark Submit 2013 Michael Armbrust分享了他的Catalyst,到至今1年多了,Spark SQL的贡献者从几人到了几十人,而且发展速度异常迅猛,究其原因,个人认为有以下2点: 1.整合:将SQL类型的查询语言整合到 Spark 的核心RDD概念里.这样可以应用于多种任务,流处理,批处理,包括机器学习里都可以引入Sql. 2.效率:因为Shark受到hive的编程模型限制,无法再继续优化来适应Spark模型里. 前一段时间测试过Shark,并且对Spark

spark 启动job的流程分析

从WordCount开始分析 编写一个例子程序 编写一个从HDFS中读取并计算wordcount的例子程序: packageorg.apache.spark.examples importorg.apache.spark.SparkContext importorg.apache.spark.SparkContext._ objectWordCount{ defmain(args : Array[String]) { valsc = newSparkContext(args(0),"wordco

第一篇:Spark SQL源码分析之核心流程

/** Spark SQL源码分析系列文章*/ 自从去年Spark Submit 2013 Michael Armbrust分享了他的Catalyst,到至今1年多了,Spark SQL的贡献者从几人到了几十人,而且发展速度异常迅猛,究其原因,个人认为有以下2点: 1.整合:将SQL类型的查询语言整合到 Spark 的核心RDD概念里.这样可以应用于多种任务,流处理,批处理,包括机器学习里都可以引入Sql.    2.效率:因为Shark受到hive的编程模型限制,无法再继续优化来适应Spark

spark core源码分析6 Spark job的提交

本节主要讲解SparkContext的逻辑 首先看一个spark自带的最简单的例子: object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = ma

大数据技术之_10_Kafka学习_Kafka概述+Kafka集群部署+Kafka工作流程分析+Kafka API实战+Kafka Producer拦截器+Kafka Streams

第1章 Kafka概述1.1 消息队列1.2 为什么需要消息队列1.3 什么是Kafka1.4 Kafka架构第2章 Kafka集群部署2.1 环境准备2.1.1 集群规划2.1.2 jar包下载2.2 Kafka集群部署2.3 Kafka命令行操作第3章 Kafka工作流程分析3.1 Kafka 生产过程分析3.1.1 写入方式3.1.2 分区(Partition)3.1.3 副本(Replication)3.1.4 写入流程3.2 Broker 保存消息3.2.1 存储方式3.2.2 存储策

JAVAWEB开发之Struts2详解(一)——Struts2框架介绍与快速入门、流程分析与工具配置以及Struts2的配置以及Action和Result的详细使用

Struts2框架介绍 三大框架:是企业主流JavaEE开发的一套架构.Struts2 + Spring + Hibernate 什么是框架?为什么要学习框架? 框架是实现部分功能的代码(半成品),使用框架简化企业级软件开发. Struts2与MVC? Struts是一款优秀的MVC框架 MVC:是一种思想,是一种模式,将软件分为Model模型.View视图.Controller控制器 JAVAEE软件三层架构:web层(表现层).业务逻辑层.数据持久层(Sun提供javaEE开发规范) Jav