声明:本文中所将的spark内存模型是1.6+的版本号。新的内存模型会在新的文章中讲到。
不久前我在StackOverflow上回答了一系列关于Apache Spark架构相关的问题。这似乎是因为网上缺乏好的Spark总体架构的文章。甚至是官网指导中也没有非常多具体的介绍。当然也缺少好的架构图。“Learning Spark”这本书和官方资料中也一样没有。
本文我将尝试解决问题并在总体上提供Spark架构相关以及常常被提及的先关概念一些问题的一站式指导。这篇文章并不全然是针对Spark刚開始学习的人的:文中不会提供Spark主要抽象编程的深入解说(如RDD和DGA)。可是须要大家具备这方面的知识。
以下我们先看看官方的架构图(http://spark.apache.org/docs/1.3.0/cluster-overview.html):
如上图所看到的。它一次性介绍了非常多概念:executor、task、Cache、Worker Node等等。我曾经開始学习Spark概念的时候。这张图差点儿就是网上所能看到的唯一样关于Spark架构的图了,可是眼下这样的情况依旧没有改变。我个人并不喜欢这张图,由于它并没有展示一些重要概念或者并没有将它们以最好的形式展示出来。
让我们从头開始。
不论什么工作在我们集群上或者本地机器上的Spark进程都是一个JVM进程;而对于不论什么JVM进程我们都能够通过JVM參数-Xmx和-Xms来配置堆栈大小。那么这个进程怎样使用堆栈以及为什么须要堆栈?这里有一张Spark在JVM堆中的内存分布图:
默认情况下,Spark的启动是JVM堆内存大小为512M。从安全方面考虑以及避免OOM错误。Spark仅仅同意利用堆内存的90%,这比例通过Spark的spark.storage.safetyFraction參数控制。好吧,我们可能还听说过Spark作为一个内存池工具能够让我们将一些数据放入缓存中。假设我们已经读过之前关于Spark误区的文章(译注:Spark误区)就会明确Spark并非真正的内存工具,它仅仅是使用内存作为它的LRU缓存(http://en.wikipedia.org/wiki/Cache_algorithms)。
当中一些缓存作为保留用来保存正在处理的数据。这部分区域通常占安全区域(safe
heap,译注:JVM对内存的90%那部分内存)的60%,这个比例能够通过spark.storage.memoryFraction參数控制。因此,假设我们想知道能够将多少数据放入Spark缓存中,我们须要减去全部executor所占的对内存总和,再乘以safetyFraction和storage.memoryFraction參数;默认情况下为0.9 * 0.6 = 0.54,也就是说我们有54%的堆内存供Spark使用。
接着我们略微多讲下shufle内存。它通过堆大小 * spark.shuffle.safetyFraction *spark.shuffle.memoryFraction计算公式得出。
spark.shuffle.safetyFraction的默认值为0.8或者80%,spark.shuffle.memoryFraction的默认值为0.2或者20%。因此,我们能够使用的shuffle内存大小为0.8
* 0.2 = 0.16,也就是JVM堆内存的16%。可是Spark怎样使用这个内存?我们能够在这里看到很多其它细节https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala。Spark使用这些内存来运行shuffle所须要的任务;当shuffle运行完毕后,有时我们须要对数据进行排序;当我们排序时须要将排过序的数据放入缓存中(可是请谨记。我们不能改动这些在LRU缓存中的数据,由于它们会在随后被重用)。
所以须要一些内存来存放已排序的数据块。那么假设没有足够的内存来存放这些排过序的数据时会怎么样?有一个广泛使用的算法叫做“外部排序”(http://en.wikipedia.org/wiki/External_sorting)能够让我们将数据一块一块的排序,然后将终于结果合并在一起。
内存中最后一部分还没有被提及的是“unroll”内存。
这块能够被unroll进程使用的内存大小通过spark.storage.unrollFraction *spark.storage.memoryFraction * spark.storage.safetyFraction计算公式得出,默认大小为0.2 * 0.6 * 0.9 = 0.108或者是10.8%。
这块内存就是将数据展开到内存中时能够使用的内存。到底为何我们须要unroll数据?由于Spark能够让我们将数据以序列化或非序列化的形式存储;以序列化的形式存储的数据是不能直接使用的,所以我们须要在用它之前将其展开。这就是这块内存的作用;这块内存是和Spark存储数据所用的内存共享的。所以当我们展开数据的时候可能会引起LRU缓存中存储的一些数据被清理。
很好,如今我们了解了Spark进程是怎么样的以及怎样利用JVM进程的堆内存。
接着我们来看看集群模式:当我们打开一个Spark集群时。它看起来究竟是什么样的?我比較喜欢YARN,所以我将讲讲它是怎么在YARN中执行的。但事实上这在其它集群中也都是一样的:
在YARN集群中,有一个YARN资源管理守护进程来管理集群中的资源(尤其是内存)、有一系列运行在集群节点上的YARN节点管理器以及控制节点上的资源利用。从YARN的角度上来看。每一个节点都代表一个我们能够控制的内存池。当我们从YARN资源管理器中请求一个资源时,它能够为我们调出executor容器;当中每一个executor容器都是一个具有一定堆内存大小的JVM;而JVM得位置则由YARN资源管理器来决定我们不能直接控制选择哪个JVM----假设一个节点有64G的内存在YARN的控制之下(通过yarn-site.xml中yarn.nodemanager.resource.memory-mb參数配置)。那么当我们发起10个运行请求每一个4G时,假设有一个非常大的集群。则这10个executor都能够非常easy在单个YARN节点上运行。
当我们在YARN上构建Spark集群时,我们须要指定运行器的数量(-num-executors标记或spark.executor.instances參数)、每一个executor的内存大小(-executor-memory标记或者spark.executor.memory參数)、每一个executor所同意的核心数(-executor-cores标记或者spark.executor.cores參数)、为每一个任务的运行分配的核心数(-spark.task.cpus參数)。同一时候我们还须要指定驱动程序所使用的内存大小(-driver-memory标记或者spark.driver.memory參数)。
当我们在Spark集群中发起运行命令时,我们的工作会被分为不同的阶段。每一个阶段又会被拆分为task。
每一个task都会被单独调度,我们能够觉得每一个JVM都被用作运行器(executor)来作为task运行的任务池。每一个executor又提供了spark.executor.cores 或spark.task.cpus參数来为每一个task设置运行槽。并通过spark.executor.instances參数设置每一个机器上所同意的executor数。这里有一个样例:有一个集群中有12个运行了YARN资源管理器的节点,每一个节点64G内存、32核CPU(拥有超线程的16核物理CPU);这种话每一个节点能够启动两个executor,每一个26G内存(由于要留一些内存给系统进程和YARN
NM以及数据节点),每一个executor分配12核CPU用来运行任务(由于要留一些内存给系统进程和YARN NM以及数据节点);因此这个集群能够处理12台机器 * 2个executor/机器 * 12个核心/executor * 1个核心/task = 288个任务槽,也就是说这个Spark集群能够最多并行运行288个任务并占满集群中的全部资源。而集群中能够被用来缓存数据的内存大小为 0.9 spark.storage.safetyFraction * 0.6 spark.storage.memoryFraction*
12台机器 * 2executor/每台机器 * 26G内存/executor = 336.96 GB。尽管不是非常多。但大部分情况下就够用了。
眼下一切顺利,我们Spark怎样利用内存以及集群中运行槽是什么。但我们可能还注意到。我并没停留在有关细节上来解释“task”究竟是什么。这是下一篇文章里要讲到的。可是基本上task是Spark进行作业的最小运行单位,而且作为executor JVM中的一个线程来运行;这就是Spark下job启动延迟低的秘密----在JVM内部fork一个额外的线程远比在Hadoop启动MapReduce作业时又一次启动一个完整的虚拟机要快的多。
以下我们来关注Spark的还有一个抽象“分区(partition)”。
我们在Spark中使用的全部数据都会被拆分为分区。什么是分区?它是由什么决定的?分区的大小全然是由我们所使用的数据源大小决定的。
在Spark大多数读取数据的方法中我们都能够指定想要在RDD中生成的分区数。当我们从HDFS中读取一个文件时,能够使用Hadoop的InputFormat来实现。默认情况下被InputFormat拆分出来的输入块在RDD中都会被封装为分区。
对于HDFS中的大多数文件来说每一个输入分片都会生成相应的块(block)存储在HDFS系统中,每一个块大约在64M到128M。
这里用大约是由于HDFS是使用字节来准确切分块,但处理的时候是依据记录来切分的。对于文本文件来说是通过换行符切换的,而对于序列文件又是依据块切分的等等。
这里唯一个列外就是压缩文件----假设我们将整个文本文件压缩,那么它就不能按记录来切分,那么整个文件就会作为一个单一的切分从而导致Spark仅仅会为其生成单个分区。这样的情况下我们仅仅能手动来切分。
接下来事情就变得非常easy了----将每一个分区中的数据通过离数据位置(这里指Hadoop中块的位置或者Spark中缓存的分区的位置)近期的任务槽中通过task来处理。
这篇文章中已经讲了非常多信息了。
下篇文章将讲讲Spark怎样将运行进程拆分为不同阶段并将每阶段放入task中运行?Spark怎样在集群中shuffle数据?以及其它一些实用的东西。
1. 本文由程序猿学架构翻译
2.原文链接:http://0x0fff.com/spark-architecture/
2. 转载请务必注明本文出自:程序猿学架构(微信号:archleaner)
3. 很多其它文章请扫码: