Spark 颠覆 MapReduce 保持的排序记录

在过去几年,Apache Spark的采用以惊人的速度增加着,通常被作为MapReduce后继,可以支撑数千节点规模的集群部署。在内存中数 据处理上,Apache Spark比MapReduce更加高效已经得到广泛认识;但是当数据量远超内存容量时,我们也听到了一些机构在Spark使用 上的困扰。因此,我们与Spark社区一起,投入了大量的精力做Spark稳定性、扩展性、性能等方面的提升。既然Spark在GB或TB级别数据上运行 良好,那么它在PB级数据上也应当同样如此。

为了评估这些工作,最近我们与AWS一起完成了一个Sort Benchmark(Daytona Gray类别)测试,一个考量系统排序 100TB数据(万亿条记录)速度的行业基准测试。在此之前,这项基准测试的世界记录保持者是雅虎,使用2100节点的Hadoop MapReduce 集群在72分钟内完成计算。而根据测试结果得知,在使用了206个EC2节点的情况下,Spark将排序用时缩短到了23分钟。这意味着在使用十分之一计 算资源的情况下,相同数据的排序上,Spark比MapReduce快3倍!

此外,在没有官方PB排序对比的情况下,我们首次将Spark推到了1PB数据(十万亿条记录)的排序。这个测试的结果是,在使用190个节点的情 况下,工作负载在短短不到4小时内完成,同样远超雅虎之前使用3800台主机耗时16个小时的记录。同时,据我们所知,这也是公用云环境首次完成的PB级 排序测试。

Hadoop World Record Spark 100 TB Spark 1 PB
Data Size 102.5 TB 100 TB 1000 TB
Elapsed Time 72 mins 23 mins 234 mins
# Nodes 2100 206 190
# Cores 50400 6592 6080
# Reducers 10,000 29,000 250,000
1.42 TB/min 4.27 TB/min 4.27 TB/min
Rate/node 0.67 GB/min 20.7 GB/min 22.5 GB/min
Sort Benchmark Daytona Rules Yes Yes No
Environment dedicated data center EC2 (i2.8xlarge) EC2 (i2.8xlarge)

为什么会选择排序?

排序的核心是shuffle操作,数据的传输会横跨集群中所有主机。Shuffle基本支持了所有的分布式数据处理负载。举个例子,在一个 连接了两个不同数据源的SQL查询中,会使用shuffle将需要连接数据的元组移动到同一台主机;同时,类似ALS等协同过滤算法同样需要依赖 shuffle在网络中发送用户或产品的评级(ratings)和权重(weights)。

大部分数据管道开始时都会有大量的原始数据,但是在管道处理过程中,随着越来越多不相干数据被过滤,或者中间数据被更简洁的表示,数据量必 然会减少。在100TB原始数据的查询上,网络上shuffle的数据可能只有100TB的一小部分,这种模式也体现在MapReduce的命名。

然而,排序却是非常有挑战的,因为数据管道中的数据量并不会减少。如果对100TB的原始数据进行排序,网络中shuffle的数据必然也 是100TB。同时,在Daytona类型的基准测试中,为了容错,不管是输入数据还是输出数据都需要做备份。实际上,在100TB的数据排序上,我们可 能会产生500TB的磁盘I/O及200TB的网络I/O。

因此,基于上述原因,当我们寻找Spark的测量标准和提升办法时,排序这个最苛刻的工作负载成为了对比的不二之选。

产生如此结果的技术实现

在超大规模工作负载上,我们投入了大量的精力来提升Spark。从细节上看,与这个基准测试高度相关的工作主要有3个:

首先及最关键的,在Spark 1.1中我们引入了一个全新的shuffle实现,也就是基于排序的 shuffle(SPARK-2045)。在此之前,Spark做的是基于哈希的shuffle实现,它需要在内存中同时保持P(reduce的分割数 量)个缓冲区。而在基于排序的shuffle下,任何时候系统只使用一个缓冲区。这个操作将显著地减少内存开销,因此同一个场景下可以支撑数十万任务(我 们在PB排序中使用了2.5万个任务)。

其次,我们修订了Spark的网络模型,通过JNI(SPARK-2468)使用基于Netty的Epoll本地端口传输。同时,新的模型还拥有了独立的内存池,绕过了JVM的内存分配器,从而减少垃圾回收造成的影响。

最后但同样重要的是,我们建立了一个外部shuffle服务(SPARK-3796),它与Spark本身的执行器完全解耦。这个新的服务基于上文所述的网络模型,同时,在Spark本身的执行器忙于GC处理时,它仍然可以保证shuffle文件处理的继续执行。

通过这三项改变,我们的Spark集群在map阶段单 节点可以支撑每秒3GB的IO吞吐,在reduce阶段单节点可以支撑1.1GB,从而榨干这些机器间10Gbps的网络带宽。

更多的技术细节

TimSort:在Spark 1.1版本中,我们将默认排序算法从 quicksort转换到 TimSort,它是合并排 序和嵌入排序的一个衍生。在大部分现实世界数据集中,TimSort比quicksort更加高效,在部分排序数据中表现则更为优秀。不管在map阶段还 是Reduce阶段,我们都使用了TimSort。

缓存位置的利用:在排序基准测试中,每条记录的大小都是100字节,而被排序的键是前10个字节。在排序项目的 性能分析阶 段,我们注意到缓存命中率不如人意,因为每次比较都需要进行一个随机的对象指针查询。为此,我们重新设计了记录在内存的布局,用16字节长度(两个长整 形)的记录来表示每条记录。在这里,前10个字节代表了排序的键,后4个字节则代表了记录的位置(鉴于字节顺序和符号,这点并不容易发现)。这样一来,每 个比较只需要做一次缓存查询,而且它们都是连续的,从而避免了随机的内存查询。

使用TimSort和新的布局方式来利用缓存命中,排序所占用的CPU时间足足减少了5倍。

大规模下的容错机制:在大规模下,许多问题都会暴露。在这个测试过程中,我们看到因为网络连通问题出现的节点丢失,Linux内核自旋,以及因为内存碎片整理造成的节点停滞。幸运的是,Spark的容错机制非常好,并且顺利的进行故障恢复。

AWS的能量:如上文所述,我们使用了206个i2.8xlarge实例来跑这个I/O密集测试。通过SSD, 这些实例交付了非常高的I/O吞吐量。我们将这些实例放到一个VPC放置组中,从而通过单SR-IOV增强网络性能,以获得高性能(10Gbps)、低延 时和低抖动。

Spark只能在内存中大放异彩?

这个误解一直围绕着Spark,特别是刚进入社区中的新人更是如此认为。不错,Spark因为内存计算的高性能闻名,然而Spark的设计 初衷和理念却是一个通用的大数据处理平台——不管是使用内存还是磁盘。在数据无法完全放入内存时,基本上所有的Spark运算符都会做一些额外的处理。通 俗来说,Spark运算符是MapReduce的超集。

如本次测试所示,Spark可以胜任集群内存大小N倍的数据集处理。

总结

击败Hadoop MapReduce集群创造的大规模数据处理记录不仅是对我们工作的一个证明,也是对Spark承诺的一个验证——在任何数据体积,Spark在性能和扩展性上都更具优势。同时,我们也希望在用户使用过程中,Spark可以带来时间和开销上的双节省。

时间: 2024-10-18 16:35:37

Spark 颠覆 MapReduce 保持的排序记录的相关文章

(转)MapReduce二次排序

一.概述 MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的.在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求.对于二次排序的实现,网络上已经有很多人分享过了,但是对二次排序的实现的原理以及整个MapReduce框架的处理流程的分析还是有非常大的出入,而且部分分析是没有经过验证的.本文将通过一个实际的MapReduce二次排序例子,讲述二次排序的实现和其MapReduce的整个处理流程,并且通过结果和map

mapreduce 实现数子排序

设计思路: 使用mapreduce的默认排序,按照key值进行排序的,如果key为封装int的IntWritable类型,那么MapReduce按照数字大小对key排序,如果key为封装为String的Text类型,那么MapReduce按照字典顺序对字符串排序. 首先map阶段将输入的数字作为key,  并记录相同key出现的次数,在reduce阶段将输入的key作为输出的value,如果相同值存在多个,循环便利输出. 源数据:file1 2 32 654 32 15 756 65223 fi

Hadoop学习笔记—11.MapReduce中的排序和分组

一.写在之前的 1.1 回顾Map阶段四大步凑 首先,我们回顾一下在MapReduce中,排序和分组在哪里被执行: 从上图中可以清楚地看出,在Step1.4也就是第四步中,需要对不同分区中的数据进行排序和分组,默认情况下,是按照key进行排序和分组. 1.2 实验场景数据文件 在一些特定的数据文件中,不一定都是类似于WordCount单次统计这种规范的数据,比如下面这类数据,它虽然只有两列,但是却有一定的实践意义. 3 3 3 2 3 1 2 2 2 1 1 1 (1)如果按照第一列升序排列,当

Alluxio增强Spark和MapReduce存储能力

Alluxio的前身为Tachyon.Alluxio是一个基于内存的分布式文件系统:Alluxio以内存为中心设计,他处在诸如Amazon S3. Apache HDFS 或 OpenStack Swift存储系统和计算框架应用Apache Spark 或Hadoop MapReduce中间,它是架构在底层分布式文件系统和上层分布式计算框架之间的一个中间件. 对上层应用来讲,Alluxio是一个管理数据访问和快速存储的中间层,对底层存储而言,Alluxio消除了大数据业务和存储系统依赖和鸿沟,

MapReduce中的排序

hadoop的计算模型就是map/reduce,每一个计算任务会被分割成很多互不依赖的map/reduce计算单元,将所有的计算单元执行完毕后整个计算任务就完成了.因为计算单元之间互不依赖所以计算单元可以分配到不同的计算机上执行,这样就可以将计算压力平摊到多个机器上面.当然性能线性提高是有条件的,前提是计算任务所采用的算法必须能够适应map/reduce模式.例如对于海量数据排序任务来说,绝大多数的排序算法都是不适应map/reduce模式的,如堆排序,插入排序,冒泡排序都是不适用于map/re

MapReduce二次排序

本文主要介绍下二次排序的实现方式 我们知道MapReduce是按照key来进行排序的,那么如果有个需求就是先按照第一个字段排序,在第一个字段相等的情况下,按照第二个字段排序,这就是传说中的二次排序. 下面就具体说一下二次排序的实现方式 主要就是4点 1.自定义一个Key 为什么要自定义一个Key,我们知道MapReduce中排序就是按照Key来排序的,我们既然想要实现按照两个字段进行排序,默认的方式肯定是不行的,所以自定义一个新的Key,Key里面有两个属性,也就是我们要排序的两个字段. 首先,

spark VS mapreduce

Apache Spark,一个内存数据处理的框架,现在是一个顶级Apache项目. 这是Spark迈向稳定的重要一步,因为它越来越多地在下一代大数据应用中取代MapReduce. MapReduce是有趣并且非常有用的,但现在看来Spark开始从它手中接过缰绳,成为新的Hadoop工作负载的主要处理框架.该技术在上周四迈出了十分具有意义的一步:Apache软件基金会宣布Spark现在是一个顶级项目 . 因 为它比MapReduce的速度更快.更容易编程,Spark已经囊括大量的用户和代码贡献者.

Spark join 源码跟读记录

rdd.join的实现:rdd1.join(rdd2) => rdd1.cogroup(rdd2,partitioner) /** * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this`

Spark 与 MapReduce的区别

学习参考自 http://spark-internals.books.yourtion.com/markdown/4-shuffleDetails.html 1.  Shuffle read 边 fetch 边处理还是一次性 fetch 完再处理? 边 fetch 边处理. MapReduce shuffle 阶段就是边 fetch 边使用 combine() 进行处理,只是 combine() 处理的是部分数据.MapReduce 为了让进入 reduce() 的 records 有序,必须等