【Spark调优】大表join大表,少数key导致数据倾斜解决方案

【使用场景】  

  两个RDD进行join的时候,如果数据量都比较大,那么此时可以sample看下两个RDD中的key分布情况。如果出现数据倾斜,是因为其中某一个RDD中的少数几个key的数据量过大,而另一个RDD中的所有key都分布比较均匀,此时可以考虑采用本解决方案。

【解决方案】  

  1. 对有数据倾斜那个RDD,使用sample算子采样出一份样本,统计下每个key的数量,看看导致数据倾斜数据量最大的是哪几个key。
  2. 然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀;不会导致倾斜的大部分key形成另外一个RDD。
  3. 接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD。
  4. 再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,这样就可以将原先相同的key打散成n份,分散到多个task中去进行join了。
  5. 而另外两个普通的RDD就照常join即可。
  6. 最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。

【方案优点】

  对于两个大RDD进行join时的数据倾斜,如果只是某几个key导致了倾斜,采用该方式可以用最有效的方式打散key进行join。而且只需要针对少数倾斜key对应的数据进行扩容n倍,不需要对全量数据进行扩容,避免了占用过多内存。

 

【方案局限】

  如果导致倾斜的key特别多的话,比如成千上万个key都导致数据倾斜,就不能使用本解决方案了。

【代码实现】

  我对上述方案做了代码实现,见我的github:https://github.com/wwcom614/Spark

  Java版实现

  上一篇:【Spark调优】小表join大表数据倾斜解决方案

原文地址:https://www.cnblogs.com/wwcom123/p/10597825.html

时间: 2024-11-03 05:29:51

【Spark调优】大表join大表,少数key导致数据倾斜解决方案的相关文章

【Spark调优】小表join大表数据倾斜解决方案

[使用场景] 对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小(例如几百MB或者1~2GB),比较适用此方案.. [解决方案] 小表join大表转为小表broadcast+map大表实现.具体为: 普通的join是会shuffle的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join,此时如果发生数据倾斜,影响处理性能,而此时恰好

大数据-spark理论(3)sparkSql,sparkStreaming,spark调优

导读目录 第一节:sparksql 1:简介 2:核心 3:与hive整合 4:dataFrame 5:函数 第二节:spark Streaming 1:对比strom 2:DStream的算子 3:代码 4:driver HA 5:读取数据 第三节:spark调优 第一节:sparksql (1)简介: Shark:shark是sparksql的前身,hive是shark的前身 快的原因:不仅是内存,还有谓词下移(减少一定量的数据IO) 正常 谓词下移 (先关联表在切割) (先将表中的字段过滤

大数据开发实战:Hive优化实战3-大表join大表优化

5.大表join大表优化 如果Hive优化实战2中mapjoin中小表dim_seller很大呢?比如超过了1GB大小?这种就是大表join大表的问题.首先引入一个具体的问题场景,然后基于此介绍各自优化方案. 5.1.问题场景 问题场景如下: A表为一个汇总表,汇总的是卖家买家最近N天交易汇总信息,即对于每个卖家最近N天,其每个买家共成交了多少单,总金额是多少,假设N取90天,汇总值仅取成交单数.A表的字段有:buyer_id. seller_id.pay_cnt_90day. B表为卖家基本信

【Spark学习】Apache Spark调优

Spark调优 本文系根据官方文档翻译而来,转载请注明本文链接 http://www.oschina.net/translate/spark-tuning?print 数据序列化 内存优化 确定内存用量 调整数据结构 序列化RDD存储 垃圾收集调整 其他考虑因素 并行化水平 Reduce任务的内存用量 Broadcasting large variables 总结 因为大部分Spark程序都具有“内存计算”的天性,所以集群中的所有资源:CPU.网络带宽或者是内存都有可能成为Spark程序的瓶颈.

spark调优之开发调优

(1)避免重复的RDD 案例: val rdd1 = sc.textFile("hdfs://zzy/hello.txt") rdd1.map(...) val rdd2 = sc.textFile("hdfs://zzy/hello.txt") rdd2.reduce(...) 这里条用了两次textFile,并且读取的是同一个文件,造成了多次的磁盘读取,如果是hi同一个文件,读取一次即可. (2)尽可能多的复用一个RDD 错误演示: //由于业务需要,对rdd1

【Spark调优】Broadcast广播变量

[业务场景] 在Spark的统计开发过程中,肯定会遇到类似小维表join大业务表的场景,或者需要在算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时应该使用Spark的广播(Broadcast)功能来提升性能. [原理说明] 在算子函数中使用到外部变量或两表join时,默认情况下,Spark会将该变量或小维表复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本.如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能

spark调优(二):调节并行度

并行度:其实就是指的是,Spark作业中,各个stage的task数量,也就代表了Spark作业的在各个阶段(stage)的并行度. 如果不调节并行度,导致并行度过低,会怎么样? 假设,现在已经在spark-submit脚本里面,给我们的spark作业分配了足够多的资源,比如50个executor,每个executor有10G内存,每个executor有3个cpu core.基本已经达到了集群或者yarn队列的资源上限. task没有设置,或者设置的很少,比如就设置了,100个task.50个e

spark调优经验(待续)

spark调优是须要依据业务须要调整的,并非说某个设置是一成不变的,就比方机器学习一样,是在不断的调试中找出当前业务下更优的调优配置.以下零碎的总结了一些我的调优笔记. spark 存储的时候存在严重的分配不均的现象,有几台机器在过渡使用, 有几台机器却非常少被使用.有几台机器缓存了几十个上百个RDD blocks  有的机器一个RDD blocks 都没有.这样存储有RDD blocks 的能够进行运算.运算的tasks 最多为该机器core数. spark.storage.memoryFra

hive join 优化 --小表join大表

1.小.大表 join 在小表和大表进行join时,将小表放在前边,效率会高,hive会将小表进行缓存. 2.mapjoin 使用mapjoin将小表放入内存,在map端和大表逐一匹配,从而省去reduce. 例子: select /*+MAPJOIN(b)*/ a.a1,a.a2,b.b2 from tablea a JOIN tableb b ON a.a1=b.b1 在0.7版本后,也可以用配置来自动优化 set hive.auto.convert.join=true;