Spark的Straggler深入学习(2):思考Block和Partition的划分问题——以论文为参考

一、partition的划分问题

如何划分partition对block数据的收集有很大影响。如果需要根据block来加速task的执行,partition应该满足什么条件?

参考思路1:range partition

1、出处:

IBM DB2 BLU;Google PowerDrill;Shark on HDFS

2、规则:

range partition遵循三个原则:1、针对每一列进行细粒度的范围细分,防止数据倾斜和工作量倾斜;2、每一个partition分配的列是不同的;3、需要针对数据相关性和过滤关联性来考虑partition的划分。

实现方法可以参考思路3中的Spark的实现。

3、简单思考:

这样划分partition需要很多额外的工作,如果针对我的设计是不需要这么多的,唯一需要考虑的就是第一点:避免数据倾斜和工作量倾斜。

参考思路2:Fine-grained Partitioning ( one of horizontal partitioning )

1、出处:

Fine-grained Partitioning for Aggressive Data Skipping (ACMSIGMOD14)

2、规则:

fine-grained partitioning的划分目的很明确:划分成细粒度的、大小平衡的block,而划分的依据是让查询操作(该partition方式针对 shark query制定)更大程度上的跳过对该block的扫描。具体的方法是:

(1)从之前频繁使用的过滤集(文章已证明一部分典型的过滤能够用来决策)中抽取过滤的判断条件作为特征;

(2)根据抽取的特征对数据进行重计算,产生特征向量,从而将问题修改成最优解的问题;

示例如下:每个partition对应一个Feature为0,即不满足该特征,如果利用该Feature扫描时可以直接跳过改partition。

3、简单思考:

首先,fine-grained具有以下特点:(1)由一个额外的进程守护,工作于数据加载时或者某个最新的任务(这个任务必然是对partition重新提出了请求,诸如显示的用户partition操作)执行时;(2)该方法针对partition的形成、block的形成同时适用;(3)从filter中抽取典型的特征来划分数据。

然后分析该方法的思想,该方法利用所需要的信息,即filter特性与skip block的特性,来将划分block和partition,具有很高的参考价值。而我所需要的数据特性应该包括数据的重要性、启动task所需要的数据的完整性、启动task所需要的block块的完整性,更确切的说应该是block数据完成了多少就可以启动task了,如何判断这个量?

 参考思路3:Spark实现的Hash Partition

1、出处:

Spark1.3.1源码解析,Spark默认的自带Partitioner。实际上,Spark1.3.1在这一块也实现了RangePartitioner,而HashPartitioner利用的还是Range Partition。

2、规则:

首先,为了粗略达到输出Partition之间的平衡,需要定义一些参数,这些参数辅助决策一个RDD结果分配到每个Partition的样本数:

sampleSize,限定取样的大小,min(20*partitions,1M);

sampleSizePerPartition,Partition取样样本数,ceil(3*sampleSize/partitions),通过向上取整表示允许超过取样数目少量;

sketched,用来描述一个rdd的结果,包括了(partition id, items number, sample),sample是根据前面的参数决定的;

(key,weight)和imbalancedPartitions,分别是一个buffer数组和mutable类型值,分别存放平衡的Partition的权重和非平衡的Partition,平衡与否根据Partition大小与平均Partition大小的关系判断,权重=Partition大小/取样大小。

具体看看如何实现平衡的判断:

val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)

val balance_or_not = if(fraction * numItems > sampleSizePerPartition) true else false

针对不平衡的Partition重新取样,取样后权重为1/fraction。

其次,需要注意的是,Partition中利用了一个implicitly方法,该方法获取RangPartition中隐藏的参数值:Ordering[ClassTag]。改参数值用来写入和读取Spark框架中的数据流。通过writeObject和readObject可以控制写入和读取Partition中的数据。

最后,决策Partition的bounds时利用的是Object RangePartitioner中的determineBounds方法,该方法利用weight的值来平衡block的大小,然后放入Partition中,进而平衡Partition的大小。

3、简单思考:

Spark自带的Partition策略是利用hashcode获取Partition id的RangePartition,RangePartition采用取样权重的方法来平衡各个Partition的大小,但是并未考虑Partition内部数据的关联度,也就是Block层面的决策没有体现在这里,需要进一步考虑如何按block优化。

二、如何利用Partition和Block的划分策略——重点论文:The Power of Choice in Data-Aware Cluster Scheduling,OSDI14

前文讲了三个如何划分Partition和Block的方法,但是划分之后如何应用其优化,除了上述提到的相应文章,与开题更加对应的是OSDI14年的文章The Power of Choice in Data-Aware Cluster Scheduling。该论文设计实现的系统为KMN,因此下文以KMN代替该论文。

1、概要

原始的Spark中的task在需要资源(该资源为上游stage的output)时,由调度器拉取task需要的资源数目,然后交付给task;而KMN的策略是,调度器拉取的是全部的资源中的数学组合,数量上仍旧是task所需的资源数,大体的关系如下图。

因此,KMN的特色之处在于choice,如何组合最优的block给Scheduler,然后调度给task,达到最高的效率。进而将问题转化为NP问题。需要注意的是,KMN选择choice时是根据全部的block来决策,那么必须等全部的block产生,即上游stage运行完成后才能决策。这样KMN就需要考虑上游Straggler的影响了,很遗憾的是,KMN针对的是近似解问题,从而导致它决定将Straggler丢弃来加快速度。

2、详细实现

KMN核心在于数据感知的choice,其决策分Input Stage和Intermediate Stage两种基本场景,决策Memory Locality和NetWork Blance两个方面。

(1)Input Stage

Input Stage中通过组合block的决策可以在各种集群利用率下保证很高的数据本地性,论文以N中采样K个block为例说明自然采样和用户自定义采样条件下的数据本地性概率。

(2)Intermediate Stage

Intermediate Stage需要考虑其上下游的Stage。KMN为上游Stage设置了额外的task,需要确认额外的task对Block决策调度的影响,文章以M个task和K个block的模型,分析M/K下上游额外task对cross-rack skew,即倾斜度造成的干扰。然后,根据上游Stage的输出选择最优的Block,此时问题转化为一个NP困难问题。最后,需要处理上游Stage的Straggler问题,因为Straggler的出现会导致Intermediate Stage的block的决策受到影响。文章对比Straggler的出现和额外的choice决策的时间,发现Straggler的影响占20%-40%,因此文章采用如下方法解决该问题:当M个上游task中的K个task执行完成后就启动下游task。实际上就是通过加速Stage的执行来加快下游stage的启动时间。

时间: 2024-10-12 09:25:14

Spark的Straggler深入学习(2):思考Block和Partition的划分问题——以论文为参考的相关文章

Spark的Straggler深入学习(1):如何在本地图形监控远程Spark的GC情况——使用java自带的jvisualvm

一.本文的目的 Straggler是目前研究的热点,Spark中也存在Straggler的问题.GC问题是总所周知的导致Straggler的重要因素之一,为了了解GC导致的Straggler问题,首先需要学习GC问题以及如何监控Spark的GC.GC问题的讨论比较多了,推荐一篇系列文章用于学习:成为Java的GC专家. 二.本文所需工具 本文所需工具很简单,基本不用下载.监控GC的前提是: 1.已经在集群上安装了Spark并可以正常提交作业: 2.本地装有jdk1.6以上版本. 本文实际使用的工

初探swift语言的学习笔记十(block)

作者:fengsh998 原文地址:http://blog.csdn.net/fengsh998/article/details/35783341 转载请注明出处 如果觉得文章对你有所帮助,请通过留言或关注微信公众帐号fengsh998来支持我,谢谢! 在前面一些学习中,原本把闭包给理解成了block尽管有很多相似之处,但block还是有他自己的独特之外.近日,在写oc/swift混合编码时,有时候需要swift回调oc,oc回调swift . 因此我把swift中的 block 常见的声明和写

黑马程序员-OC学习笔记之block

过山车 Time Limit: 1000/1000 MS (Java/Others)    Memory Limit: 32768/32768 K (Java/Others) Total Submission(s): 9426    Accepted Submission(s): 4151 Problem Description RPG girls今天和大家一起去游乐场玩,终于可以坐上梦寐以求的过山车了.可是,过山车的每一排只有两个座位,而且还有条不成文的规矩,就是每个女生必须找个个男生做par

IOS学习之路--BLOCK

/* 1.定义block变量: 返回值类型 (^block变量名) (参数类型1, 参数类型2, ....); 2.给block变量赋值 block变量名 = ^(参数类型1 参数名称1, .....) { }; */ /* 1.设置动画属性 2.开始执行动画 3.动画执行完毕 block1 = ^{ 封装了动画开始执行前想做的事情 }; block2 = ^{ 封装了动画执行完毕后想做的事情 }; */ #import <Foundation/Foundation.h> //typedef

Spark大数据的学习历程

Spark主要的编程语言是Scala,选择Scala是因为它的简洁性(Scala可以很方便在交互式下使用)和性能(JVM上的静态强类型语言).Spark支持Java编程,但对于使用Java就没有了Spark-Shell这样方便的工具,其它与Scala编程是一样的,因为都是JVM上的语言,Scala与Java可以互操作,Java编程接口其实就是对Scala的封装. 大数据未来几年发展的重点方向,大数据战略已经在十八届五中全会上作为重点战略方向,中国在大数据方面才刚刚起步,但是在美国已经产生了上千亿

第41周五学习力思考

今天基本考虑说服了自己产品基线项目变动的问题,明确出各种可能变数的应对策略,同时也对现有的产品项目有些失望,做什么要做成什么样子基本上就是领导说说,没有给予售后支持对产品问题的反馈,定制项目的功能分析,售前方案对产品的功能需求,感觉完全没有根据市场有效的反馈,或许领导本事就更有市场眼光吧. 在对目前比较失落且不想努力补救改善现有工作环境的情况下,不断增加自身竞争力才是王道.而一句最很有名的话是这样说的”学历代表过去,能力代表现在,学习力代表未来“,那到底什么是学习?如何判断衡量自己的学习力?怎样

学习与思考

一.写代码之前我们要思考的问题? 1.明确需求.我要做什么?2.分析思路.我要怎么做?1,2,3.3.确定步骤.每一个思路部分用到哪些语句,方法,和对象.4.代码实现.用具体的java语言代码把思路体现出来. 二.学习新技术要明确的什么? 1.该技术是什么?2.该技术有什么特点(使用注意):3.该技术怎么使用.demo4.该技术什么时候用?test.

2015年5月产品设计学习与思考

前言: 最近在做产品设计,对这方面的思考也很多.时常的我们,无论有没有仔细用过,专业不专业,对一个产品的吐槽都很容易,一副头头是道的样子. 到真正设计一个从最基本的方案可行,有竞争力,到贴合用户需求,甚至具有创新性,革命性的产品时,会发现你的产品可以添加上百种功能,以此推导出上百条的演进道路.这每条道路将遇到的困难,付出的代价,和最后到达的目的地都会不一样,选择哪条路,有无数的矛盾体挣扎与纠结,需要选择,沟通,头脑风暴,独断等等,有时甚至会回顾我们出发时的目标,时光荏苒,风尘仆仆,却南辕北辙.

Spark生态之Tachyon学习7--下载源码通过maven安装成功

更多代码请见:https://github.com/xubo245/SparkLearning 1.环境 hadoop 2.6.0 spark 1.5.2 java 1.7 2.下载: https://github.com/Alluxio/alluxio/archive/v0.7.1.tar.gz 3.编译: mvn clean package -Djava.version=1.7 -Dhadoop.version=2.6.0 -Dspark.version=1.5.2 -DskipTests