王家林谈Spark性能优化第九季之Spark Tungsten内存使用彻底解密

内容:

1、到底什么是Page;

2、Page具体的两种实现方式;

3、Page的使用的源码详解;

==========Tungsten中到底什么是Page============

1、在Spark中其实是不存在Page这个类的!!!实质上来说,Page是一种数据结构(类似于Stack、List等),从OS的层面来讲,Page代表了一个内存块在Page里面可以存放数据,在OS会存在很多不同的Page,当要获得数据的时候,首先要定位具体是哪个Page中的数据,找到该Page之后,从Page中根据特定的规则(例如说数据的offset和length等)取出数据;

2、那到底什么是Spark中的Page?在阅读源码的时候研究MemoryBlock.java,是从TaskMemoryManager.java进去看到的发现MemoryBlock就是Page

public class MemoryBlock extends MemoryLocation {

private final long length;

/**
   * Optional page number; used when this MemoryBlock represents a page allocated by a
   * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager,
   * which lives in a different package.
   */
  public int pageNumber = -1;

public MemoryBlock(@Nullable Object obj, long offset, long length) {
    super(obj, offset);
    this.length = length;
  }

/**
   * Returns the size of the memory block.
   */
  public long size() {
    return length;
  }

/**
   * Creates a memory block pointing to the memory used by the long array.
   */
  public static MemoryBlock fromLongArray(final long[] array) {
    return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8);
  }
}

MemoryBlock表示Page,里面的数据可能是on-heap或者off-heap的,所以上面的构造函数的第一个参数,是可以为空,on-heap是有对象的,off-heap是无对象的

如果On-heap的方式,内存的分配是有HeapMemoryAllocator完成的

/**
 * A simple {@link MemoryAllocator} that can allocate up to 16GB using a JVM long primitive array.
 */
public class HeapMemoryAllocator implements MemoryAllocator {

@GuardedBy("this")
  private final Map<Long, LinkedList<WeakReference<MemoryBlock>>> bufferPoolsBySize =
    new HashMap<>();

private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;

/**
   * Returns true if allocations of the given size should go through the pooling mechanism and
   * false otherwise.
   */
  private boolean shouldPool(long size) {
    // Very small allocations are less likely to benefit from pooling.
    return size >= POOLING_THRESHOLD_BYTES;
  }

@Override
  public MemoryBlock allocate(long size) throws OutOfMemoryError {
    if (shouldPool(size)) {
      synchronized (this) {
        final LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
        if (pool != null) {
          while (!pool.isEmpty()) {
            final WeakReference<MemoryBlock> blockReference = pool.pop();
            final MemoryBlock memory = blockReference.get();
            if (memory != null) {
              assert (memory.size() == size);
              return memory;
            }
          }
          bufferPoolsBySize.remove(size);
        }
      }
    }
    long[] array = new long[(int) ((size + 7) / 8)];
    return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
  }

如果Off-heap的方式,内存的分配是有UnsafeMemoryAllocator完成的

/**
 * A simple {@link MemoryAllocator} that uses {@code Unsafe} to allocate off-heap memory.
 */
public class UnsafeMemoryAllocator implements MemoryAllocator {

@Override
  public MemoryBlock allocate(long size) throws OutOfMemoryError {
    long address = Platform.allocateMemory(size);
    return new MemoryBlock(null, address, size);
  }

==========如何使用Page呢============

1、在TaskMemoryManager中通过封装Page来定位数据,定位的时候如果是on-heap的话,则先找到对象,然后再对象中通过offset来具体定位地址,而如果是off-heap,则直接定位;

2、一个关键的问题是,如何确定数据呢?这个时候就需要设计具体的算法

TaskMemoryManager中已经写好准备以后一台机器拥有32T大小的内存了

王家林老师名片:

中国Spark第一人

新浪微博:http://weibo.com/ilovepains

微信公众号:DT_Spark

博客:http://blog.sina.com.cn/ilovepains

手机:18610086859

QQ:1740415547

邮箱:[email protected]

时间: 2024-08-10 23:28:48

王家林谈Spark性能优化第九季之Spark Tungsten内存使用彻底解密的相关文章

王家林谈Spark性能优化第一季!(DT大数据梦工厂)

内容: 1.Spark性能优化需要思考的基本问题: 2.CPU和Memory: 3.并行度和Task: 4.网络: ==========王家林每日大数据语录============ 王家林每日大数据语录Spark篇0080(2016.1.26于深圳):如果Spark中CPU的使用率不够高,可以考虑为当前的程序分配更多的Executor,或者增加更多的Worker实例来充分的使用多核的潜能. 王家林每日大数据语录Spark篇0079(2016.1.26于深圳):适当设置Partition分片数是非

Spark性能优化之道——解决Spark数据倾斜(Data Skew)的N种姿势

原创文章,转载请务必将下面这段话置于文章开头处.本文转发自技术世界,原文链接 http://www.jasongj.com/spark/skew/ 摘要 本文结合实例详细阐明了Spark数据倾斜的几种场景以及对应的解决方案,包括避免数据源倾斜,调整并行度,使用自定义Partitioner,使用Map侧Join代替Reduce侧Join,给倾斜Key加上随机前缀等. 为何要处理数据倾斜(Data Skew) 什么是数据倾斜 对Spark/Hadoop这样的大数据系统来讲,数据量大并不可怕,可怕的是

Spark性能优化指南——高级篇

Spark性能优化指南--高级篇 [TOC] 前言 继基础篇讲解了每个Spark开发人员都必须熟知的开发调优与资源调优之后,本文作为<Spark性能优化指南>的高级篇,将深入分析数据倾斜调优与shuffle调优,以解决更加棘手的性能问题. 数据倾斜调优 调优概述 有的时候,我们可能会遇到大数据计算中一个最棘手的问题--数据倾斜,此时Spark作业的性能会比期望差很多.数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能. 数据倾斜发生时的现象 绝大多数tas

Spark性能优化指南——基础篇

前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作,应用范围与前景非常广泛.在美团•大众点评,已经有很多同学在各种项目中尝试使用Spark.大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执行速度更快.性能更高. 然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的.如果没有对Spar

美团Spark性能优化指南——基础篇

http://tech.meituan.com/spark-tuning-basic.html 前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作,应用范围与前景非常广泛.在美团?大众点评,已经有很多同学在各种项目中尝试使用Spark.大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执行速度更快.性

【转载】 Spark性能优化指南——基础篇

前言 开发调优 调优概述 原则一:避免创建重复的RDD 原则二:尽可能复用同一个RDD 原则三:对多次使用的RDD进行持久化 原则四:尽量避免使用shuffle类算子 原则五:使用map-side预聚合的shuffle操作 原则六:使用高性能的算子 原则七:广播大变量 原则八:使用Kryo优化序列化性能 原则九:优化数据结构 资源调优 调优概述 Spark作业基本运行原理 资源参数调优 写在最后的话 前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的

Spark性能优化指南——基础篇转

前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作,应用范围与前景非常广泛.在美团•大众点评,已经有很多同学在各种项目中尝试使用Spark.大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执行速度更快.性能更高. 然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的.如果没有对Spar

【转载】Spark性能优化指南——高级篇

前言 数据倾斜调优 调优概述 数据倾斜发生时的现象 数据倾斜发生的原理 如何定位导致数据倾斜的代码 查看导致数据倾斜的key的数据分布情况 数据倾斜的解决方案 解决方案一:使用Hive ETL预处理数据 解决方案二:过滤少数导致倾斜的key 解决方案三:提高shuffle操作的并行度 解决方案四:两阶段聚合(局部聚合+全局聚合) 解决方案五:将reduce join转为map join 解决方案六:采样倾斜key并分拆join操作 解决方案七:使用随机前缀和扩容RDD进行join 解决方案八:多

Spark性能优化指南--基础篇

前言 开发调优 调优概述 原则一:避免创建重复的RDD 原则二:尽可能复用同一个RDD 原则三:对多次使用的RDD进行持久化 原则四:尽量避免使用shuffle类算子 原则五:使用map-side预聚合的shuffle操作 原则六:使用高性能的算子 原则七:广播大变量 原则八:使用Kryo优化序列化性能 原则九:优化数据结构 资源调优 调优概述 Spark作业基本运行原理 资源参数调优 写在最后的话 前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的