转自:http://my.oschina.net/leejun2005/blog/100922
最近几次被问到关于数据倾斜的问题,这里找了些资料也结合一些自己的理解.
在并行计算中我们总希望分配的每一个task 都能以差不多的粒度来切分并且完成时间相差不大,但是集群中可能硬件不同,应用的类型不同和切分的数据大小不一致总会导致有部分任务极大的拖慢了整个任务的完成时间,硬件不同就不说了,应用的类型不同其中就比如page rank 或者data mining 里面一些计算,它的每条记录消耗的成本不太一样,这里只讨论关于关系型运算的(一般能用SQL表述的) 数据切分上的数据倾斜问题.
hadoop 中数据倾斜会极大影响性能的一个背景是mapreduce 框架中总是不分条件的进行sort . 在通用情况下map sort + partition +reduce sort 可以得到结果,但是这个过程不一定是最优的. 对于关系型计算,其中数据倾斜影响最大的地方在reduce 的sort , reduce 处理的数据量的大小如果超过给定的reduce jvm 的大小的2倍不到的阈值的时候(这个阈值是我猜测的,具体以实际监控运行情况为准),reduce 端会发生multi-pass merge sort 的情况, 这个时候观察这些运行较慢的reduce task 的metrics 会发现reduce 跟IO 相关的metrics 会比其他reduce 大很多. 具体的细节参考今年hadoop summit 上Todd 的performance tuning 的ppt (26页):
http://www.slideshare.net/cloudera/mr-perf
这种在reduce 端不分条件的排序只是hadoop 是这种实现,并不是mapreduce 框架一定需要排序,其他的mapreduce 实现或者其他的分布式计算框架可能在reduce 上的这种瓶颈会小一些, 比如shark 里面的group by 就是基于hash 而不是sort 的.
对于关系型的计算中常见的数据倾斜有两种:group by 和 join , 其他有可能的有:
in或exists 的操作尤其是in或exists 作为subquery 的返回(in 或exists 有时候会变成left semi join),
有相同输入源的union 或union all 也许也会有(其他集合类型的操作intersect 之类也许也是).
hive 中的udtf 也算一种.
这里只讨论最常见的group by 和join 的情况.
数据分布:
正常的数据分布理论上都是倾斜的,就是我们所说的20-80原理:80%的财富集中在20%的人手中, 80%的用户只使用20%的功能 , 20%的用户贡献了80%的访问量 , 不同的数据字段可能的数据倾斜一般有两种情况:
一种是唯一值非常少,极少数值有非常多的记录值(唯一值少于几千)
一种是唯一值比较多,这个字段的某些值有远远多于其他值的记录数,但是它的占比也小于百分之一或千分之一
分区:
常见的mapreduce分区方式为hash 和range ,
hash partition 的好处是比较弹性,跟数据类型无关,实现简单(设定reduce个数就好,一般不需要自己实现)
range partition 需要实现者自己了解数据分布, 有时候需要手工做sample取样. 同时也不够弹性, 表现在几个方面,1. 对同一个表的不同字段都需要实现不同的range partition, 对于时间这种字段根据查询类型的不同或者过滤条件的不同切分range 的大小都不一定.
2 .有时候可能设计使用多个字段组合的情况, 这时候又不能使用之前单个字段的partition 类, 并且多个字段组合之间有可能有隐含的联系,比如出生日期和星座,商品和季节.
3. 手工做sample 非常耗时间,需要使用者对查询使用的数据集的分布有领域知识.
4. 分配方式是死的,reduce 个数是确定的,一旦某种情况下发生倾斜,调整参数
其他的分区类型还有hbase 的hregionpartitioner 或者totalorder partitioner 等.
能够想到的关于数据倾斜的一些解决方式(欢迎补充,尤其是有没有做搜索或者数据挖掘的朋友有碰到类似问题):
1. 增加reduce 的jvm内存
2. 增加reduce 个数
3. customer partition
4. 其他优化的讨论.
5. reduce sort merge排序算法的讨论
6. 正在实现中的hive skewed join.
7. pipeline
8. distinct
9. index 尤其是bitmap index
方式1:既然reduce 本身的计算需要以合适的内存作为支持,在硬件环境容许的情况下,增加reduce 的内存大小显然有改善数据倾斜的可能,这种方式尤其适合数据分布第一种情况,单个值有大量记录, 这种值的所有纪录已经超过了分配给reduce 的内存,无论你怎么样分区这种情况都不会改变. 当然这种情况的限制也非常明显, 1.内存的限制存在,2.可能会对集群其他任务的运行产生不稳定的影响.
方式2: 这个对于数据分布第二种情况有效,唯一值较多,单个唯一值的记录数不会超过分配给reduce 的内存. 如果发生了偶尔的数据倾斜情况,增加reduce 个数可以缓解偶然情况下的某些reduce 不小心分配了多个较多记录数的情况. 但是对于第一种数据分布无效.
方式3: 一种情况是某个领域知识告诉你数据分布的显著类型,比如hadoop definitive guide 里面的温度问题,一个固定的组合(观测站点的位置和温度) 的分布是固定的, 对于特定的查询如果前面两种方式都没用,实现自己的partitioner 也许是一个好的方式.
方式4: 目前有的一些针对数据倾斜的优化比如pig 的skewed join
http://pig.apache.org/docs/r0.7.0/piglatin_ref1.html#Skewed+Joins
pig 文档上面说是根据数据输入的统计信息来确定分区(也就是range partition?),另外不清楚这个行为是否是动态运行时候才决定的,也就是运行之前有一步pig 自动做sample 的工作,因为pig 是没有统计信息这一说的.
hive 中的group by
<property>
<name>hive.groupby.skewindata</name>
<value>false</value>
<description>Whether there is skew in data to optimize group by queries</description>
</property>
<property>
<name>hive.optimize.groupby</name>
<value>true</value>
<description>Whether to enable the bucketed group by from bucketed partitions / tables.</description>
</property>
<property>
<name>hive.mapjoin.followby.map.aggr.hash.percentmemory</name>
<value>0.3</value>
<description>Portion of total memory to be used by map-side grup aggregation hash table, when this group by is followed by map join</description>
</property>
<property>
<name>hive.groupby.mapaggr.checkinterval</name>
<value>100000</value>
<description>Number of rows after which size of the grouping keys/aggregation classes is performed</description>
</property>
其中最后一个参数hive.groupby.mapaggr.checkinterval 的思路跟in-memory combiner 相似, in-memeory combiner 是发生在mapper 端sort 之前,而不是现在的combiner发生在mapper sort 之后甚至在写入磁盘之后重新读磁盘然后排序合并. in-memeory combiner 最早好像是《Data-Intensive Text Processing with MapReduce》,mapr 去年的介绍ppt 里面好像提到它们也有这个优化. mapper 端减少数据的机会比reduce 端的要大,所以一般不会看到reduce 端的combiner 的讨论,但是这种思路也有,比如google tenzing 的join 讨论里面有一个prev-next 的小优化就是基于reduce 端的combiner, 但那个前提是基于block shuffle 实现的基础上,数据已经排过序了,所以join 时候前一条数据跟后一条数据相同的概率很大.
hive 中的skewed join : 之前的文章已经介绍过两表join 中hive 的几个优化,其中的skewed join 的类似思路就是上面介绍的skewed 的第二种:增加reduce 的个数,hive 中是通过判断阈值如果大于一个reduce 需要处理的数据量,重新起额外的task 来处理这些超额的reduce 本身需要处理的数据, 这是一种较晚的补救措施,本身hive 开始分区的时候已经倾斜(partition 的方式不合理), 当运行的时候通过运行时监控reduce 发现倾斜的特殊key 然后额外的起task 去处理,效果比较一般,感兴趣的同学可以参考HIVE-3086 里面我和facebook 团队对这种优化思路的讨论. 第六节我会讨论一下我所认为的思路和facebook 正在做的思路之间的差别.
方式5 : reduce 分配的内存远小于处理的数据量时,会产生multi-pass sort 的情况是瓶颈,那么就要问
1. 这种排序是有必要的嘛?
2. 是否有其他排序算法或优化可以根据特定情况降低他瓶颈的阈值?
3. map reduce 适合处理这种情况嘛?
关于问题1. 如果是group by , 那么对于数据分布情况1 ,hash 比sort 好非常多,即使某一个reduce 比其他reduce 处理多的多的数据,hash 的计算方式也不会差距太大.
问题2. 一个是如果实现block shuffle 肯定会极大的减少排序本身的成本, 另外,如果分区之后的reduce 不是使用copy –> sort-merge –> reduce 的计算方式, 在copy 之后将每个block 的头部信息保存在内存中,不用sort – merge 也可以直接计算reduce, 只不过这时候变成了随机访问,而不是现在的sort-merge 之后的顺序访问. block shuffle 的实现有两种类型,一种是当hadoop 中真正有了列数据格式的时候,数据有更大的机会已经排过序并且按照block 来切分,一般block 为1M ( 可以关注avro-806 ) , 这时候的mapper 什么都不做,甚至连计算分区的开销都小了很多倍,直接进入reduce 最后一步,第二种类型为没有列数据格式的支持,需要mapper 排序得到之后的block 的最大最小值,reduce 端在内存中保存最大最小值,copy 完成后直接用这个值来做随机读然后进行reduce. ( block shuffle 的实现可以关注 MAPREDUCE-4039 , hash 计算可以关注 MAPREDUCE-1639)
问题3 . map reduce 只有两个函数,一个map 一个 reduce, 一旦发生数据倾斜就是partition 失效了,对于join 的例子,某一个key 分配了过多的记录数,对于只有一次partittion的机会,分配错了数据倾斜的伤害就已经造成了,这种情况很难调试,但是如果你是基于map-reduce-reduce 的方式计算,那么对于同一个key 不需要分配到同一个reduce 中,在第一个reduce 中得到的结果可以在第二个reduce 才汇总去重,第二个reduce 不需要sort – merge 的步骤,因为前一个reduce 已经排过序了,中间的reduce 处理的数据不用关心partition 怎么分,处理的数据量都是一样大,而第二个reduce 又不使用sort-merge 来排序,不会遇到现在的内存大小的问题,对于skewed join 这种情况瓶颈自然小很多.
方式6: 目前hive 有几个正在开发中的处理skewed join 情况的jira case, HIVE-3086 , HIVE-3286 ,HIVE-3026 . 简单介绍一下就是facebook 希望通过手工处理提前枚举的方式列出单个倾斜的值,在join 的时候将这些值特殊列出当作map join 来处理,对于其他值使用原来的方式. 我个人觉得这太不伸缩了,值本身没有考虑应用过滤条件和优化方式之后的数据量大小问题,他们提前列出的值都是基于整个分区的. join key 如果为组合key 的情况也应该没有考虑,对metastore 的储存问题有限制,对输入的大表和小表都会scan 两次( 一次处理非skew key , 一次处理skew key 做map join), 对输出表也会scan 两次(将两个结果进行merge) , skew key 必须提前手工列出这又存在额外维护的成本,目前因为还没有完整的开发完到能够投入生产的情况,所以等所有特性处理完了有了文档在看看这个处理方式是否有效,我个人认为的思路应该是接着bucked map join 的思路往下走,只不过不用提前处理cluster key 的问题, 这时候cluster key 的选择应该是join key + 某个能分散join key 的列, 这等于将大表的同一个key 的值分散到了多个不同的reduce 中,而小表的join key 也必须cluster 到跟大表对应的同一个key , join 中对于数据分布第二种情况不用太难,增加reduce 个数就好,主要是第一种,需要大表的join key 能够分散,对于同样join key 的小表又能够匹配到所有大表中的记录. 这种思路就是不用扫描大表两遍或者结果输出表,不需要提前手工处理,数据是动态sample 的应用了过滤条件之后的数据,而不是提前基于统计数据的不准确结果. 这个基本思路跟tenzing 里面描述的distributed hash join 是一样的,想办法切成合适的大小然后用hash 和 map join .
方式7: 当同时出现join 和group 的时候, 那么这两个操作应该是以pipeline (管道) 的方式执行. 在join 的时候就可以直接使用group 的操作符减少大量的数据,而不是等待join 完成,然后写入磁盘,group 又读取磁盘做group操作. HIVE-2206 正在做这个优化. hive 里面是没有pipeline 这个概念的. 像是cloudera 的crunch 或者twitter 的Scalding 都是有这种概念的.
方式8: distinct 本身就是group by 的一种简写,我原先以为count(distinct x)这种跟group by 是一样的,但是发现hive 里面distinct 明显比group by 要慢,可能跟group by 会有map 端的combiner有关, 另外观察到hive 在预估count(distinct x) 的reduce 个数比group by 的个数要少 , 所以hive 中使用count(distinct x) , 要么尽量把reduce 个数设置大,直接设置reduce 个数或者hive.exec.reducers.bytes.per.reducer 调小,我个人比较喜欢调后面一个,hive 目前的reduce 个数没有统计信息的情况下就是用map端输入之前的数值, 如果你是join 之后还用count(distinct x) 的话,这个默认值一般都会悲剧,如果有where 条件并能过滤一定数量的数据,那么默认reduce 个数可能就还好一点. 不管怎样,多浪费一点reduce slot 总比等十几甚至几十分钟要好, 或者转换成group by 的写法也不错,写成group by 的时候distributed by 也很有帮助.
方式9: hive 中的index 就是物化视图,对于group by 和distinct 的情况等于变成了map 端在做计算,自然不存在倾斜. 尤其是bitmap index , 对于唯一值比较少的列优势更大,不过index 麻烦的地方在于需要判断你的sql 是不是常用sql , 另外如果create index 的时候没有选你查询的时候用的字段,这个index 是不能用的( hive 中是永远不可能有DBMS中的用index 去lookup 或者join 原始表这种概念的)
其他建议:
网上能找到的另外一份很好的描述数据倾斜的资料是
http://nuage.cs.washington.edu/pubs/opencirrus2011.pdf
里面的map side skew 和expensive record 都不是关系型计算中的问题,所以不是这篇文章关注点. 对于关系型计算,其中数据倾斜影响最大的地方在reduce 的sort. 这篇文章里面最后总结的5点好的建议值得参考,
其中第三条需要你知道应用combiner 和特殊优化方式是否带来了性能的提升,hive 的map aggr 在数据分布情况1效果会比较好,数据分布情况2效果就不大,还有combiner 应用的时候是消耗了系统资源的,确认这种消耗是否值得而不是任何情况下都使用combiner.
对于第四点关系型计算中map 倾斜情况不太常见. 一种可以举出来的例子是分区不合理,或者hive 中的cluster by 的key 选择不合理(都是使用目录的方式分区, 目录是最小处理单元了).
- Use domain knowledge when choosing the
map output partitioning scheme if the reduce operation is
expensive: Range partition or some other form of explicit
partition may be better than the default hash-partition - Try different partitioning schemes on sample
workloads or collect the data distribution at the reduce input
if a MapReduce job is expected to run several times - Implement a combiner to reduce the amount
of data going into the reduce-phase and, as such, significantly
dampen the effects of any type of reduce-skew - Use a pre-processing MapReduce job that
extracts properties of the input data in the case of a longruning,
skew-prone map phase. Appropriately partitioning the
data before the real application runs can significantly reduce
skew problems in the map phase. - Best Practice 5. Design algorithms whose runtime depends
only on the amount of input data and not the data distribution.
另外一份是淘宝的数据倾斜总结:http://www.alidata.org/archives/2109
不过我个人觉得帮助不是太大,里面第一个解决方式空值产生的影响第一个Union All 的方式个人是极力反对的,同一个表尤其是大表扫描两遍这额外的成本跟收益太不匹配,不推荐,第二个将特殊值变成random 的方式, 这个产生的结果是正确的嘛? 尤其是在各种情况下输出结果是正确的嘛?里面背景好像是那个小表users 的主键为userid, 然后userid 又是join key , 而且还不为空? 不太推荐,背景条件和输出的正确性与否存疑.
第二个数据类型不同的问题我觉得跟HIVE-3445 都算是数据建模的问题,提前修改好是一样的.
第三个是因为淘宝的hadoop 版本中没有map side hash aggr 的参数吧. 而且写成distinct 还多了一个MR 步骤,不太推荐.
数据倾斜在MPP 中也是一个课题,这也设计到一个数据重分配的问题,但是相对于MPP 中有比较成熟的机制,一个是mpp 在处理数据初始分布的时候总是会指定segmented by 或者distributed by 这种显示分配到不同物理机器上的建表语句. 还有就是统计信息会帮助执行引擎选择合适的重新分布.但是统计信息也不是万能的,比如
1:统计信息的粒度和更新问题.
2: 应用了过滤条件之后的数据也许不符合原始期望的数据分布.
3: 统计信息是基于采样的,总于真实所有数据存在误差.
4: 统计信息是基于partittion 的, 对于查询没有涉及到partition 字段的切分就不能使用各partition 只和来表示总体的统计信息.
5. 临时表或者多步骤查询的中间过程数据没有统计信息的情况.
6. 各种其他的算法优化比如in-mapper combiner 或者google Tenzing 的prev – next combine 都会影响统计信息对于算法选择的不同.
总结:
数据倾斜没有一劳永逸的方式可以解决,了解你的数据集的分布情况,然后了解你所使用计算框架的运行机制和瓶颈,针对特定的情况做特定的优化,做多种尝试,观察是否有效.
- ———————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————HIVE数据倾斜:http://www.cnblogs.com/ggjucheng/archive/2013/01/03/2842860.html