Common Join
最为普通的join策略,不受数据量的大小影响,也可以叫做reduce side join ,最没效率的一种join 方式. 它由一个mapreduce job 完成.
首先将大表和小表分别进行map 操作, 在map shuffle 的阶段每一个map output key 变成了table_name_tag_prefix + join_column_value , 但是在进行partition 的时候它仍然只使用join_column_value 进行hash.
每一个reduce 接受所有的map 传过来的split , 在reducce 的shuffle 阶段,它将map output key 前面的table_name_tag_prefix 给舍弃掉进行比较. 因为reduce 的个数可以由小表的大小进行决定,所以对于每一个节点的reduce 一定可以将小表的split 放入内存变成hashtable. 然后将大表的每一条记录进行一条一条的比较.
真正的Join在reduce阶段。
MapJoin
Map Join 的计算步骤分两步,将小表的数据变成hashtable广播到所有的map 端,将大表的数据进行合理的切分,然后在map 阶段的时候用大表的数据一行一行的去探测(probe) 小表的hashtable. 如果join key 相等,就写入HDFS.
map join 之所以叫做map join 是因为它所有的工作都在map 端进行计算.
hive 在map join 上做了几个优化:
hive 0.6 的时候默认认为写在select 后面的是大表,前面的是小表, 或者使用 /*+mapjoin(map_table) */ 提示进行设定. hive 0.7 的时候这个计算是自动化的,它首先会自动判断哪个是小表,哪个是大表,这个参数由(hive.auto.convert.join=true)来控制. 然后控制小表的大小由(hive.smalltable.filesize=25000000L)参数控制(默认是25M),当小表超过这个大小,hive 会默认转化成common join. 你可以查看HIVE-1642.
首先小表的Map 阶段它会将自己转化成MapReduce Local Task ,然后从HDFS 取小表的所有数据,将自己转化成Hashtable file 并压缩打包放入DistributedCache 里面.
目前hive 的map join 有几个限制,一个是它打算用BloomFilter 来实现hashtable , BloomFilter 大概比hashtable 省8-10倍的内存, 但是BloomFilter 的大小比较难控制.
现在DistributedCache 里面hashtable默认的复制是3份,对于一个有1000个map 的大表来说,这个数字太小,大多数map 操作都等着DistributedCache 复制.
优化后的map-join
Converting Common Join into Map Join
判断谁是大表谁是小表(小表的标准就是size小于hive.mapjoin.smalltable.filesize的值)
Hive在Compile阶段的时候对每一个common join会生成一个conditional task,并且对于每一个join table,会假设这个table是大表,生成一个mapjoin task,然后把这些mapjoin tasks装进
conditional task(List<Task<? extends Serializable>> resTasks),同时会映射大表的alias和对应的mapjoin task。在runtime运行时,resolver会读取每个table alias对应的input file size,如果小表的file size比设定的threshold要低 (hive.mapjoin.smalltable.filesize,默认值为25M),那么就会执行converted mapjoin task。对于每一个mapjoin task同时会设置一个backup task,就是先前的common join task,一旦mapjoin task执行失败了,则会启用backup task
Performance Bottleneck
性能瓶颈
1、Distributed Cache is the potential performance bottleneck
分布式缓存是一个潜在的性能瓶颈
A、Large hashtable file will slow down the propagation of Distributed Cache
大的hashtable文件将会减速分布式缓存的传播
B、Mappers are waiting for the hashtables file from Distributed Cache
Mapper排队等待从分布式缓存获取hashtables(因为默认一个hashtable缓存是三份,如果mappers数量太多需要一个一个的等待)
2、Compress and archive all the hashtable file into a tar file.
压缩和归档所有的hashtable文件为一个tar文件。
Bucket Map Join
Why:
Total table/partition size is big, not good for mapjoin.
How:
set hive.optimize.bucketmapjoin = true;
1. Work together with map join
2. All join tables are bucketized, and each small table?s bucket number can be divided by big table?s bucket number.
所有join的表是bucketized并且小表的bucket数量是大表bucket数量的整数倍
3. Bucket columns == Join columns
hive 建表的时候支持hash 分区通过指定clustered by (col_name,xxx ) into number_buckets buckets 关键字.
当连接的两个表的join key 就是bucket column 的时候,就可以通过
hive.optimize.bucketmapjoin= true
来控制hive 执行bucket map join 了, 需要注意的是你的小表的number_buckets 必须是大表的倍数. 无论多少个表进行连接这个条件都必须满足.(其实如果都按照2的指数倍来分bucket, 大表也可以是小表的倍数,不过这中间需要多计算一次,对int 有效,long 和string 不清楚)
Bucket Map Join 执行计划分两步,第一步先将小表做map 操作变成hashtable 然后广播到所有大表的map端,大表的map端接受了number_buckets 个小表的hashtable并不需要合成一个大的hashtable,直接可以进行map 操作,map 操作会产生number_buckets 个split,每个split 的标记跟小表的hashtable 标记是一样的, 在执行projection 操作的时候,只需要将小表的一个hashtable 放入内存即可,然后将大表的对应的split 拿出来进行判断,所以其内存限制为小表中最大的那个hashtable 的大小.
Bucket Map Join 同时也是Map Side Join 的一种实现,所有计算都在Map 端完成,没有Reduce 的都被叫做Map Side Join ,Bucket 只是hive 的一种hash partition 的实现,另外一种当然是值分区.
create table a (xxx) partition by (col_name)
不过一般hive 中两个表不一定会有同一个partition key, 即使有也不一定会是join key. 所以hive 没有这种基于值的map side join, hive 中的list partition 主要是用来过滤数据的而不是分区. 两个主要参数为(hive.optimize.cp = true 和 hive.optimize.pruner=true)
hadoop 源代码中默认提供map side join 的实现, 你可以在hadoop 源码的src/contrib/data_join/src 目录下找到相关的几个类. 其中TaggedMapOutput 即可以用来实现hash 也可以实现list , 看你自己决定怎么分区. Hadoop Definitive Guide 第8章关于map side join 和side data distribution 章节也有一个例子示例怎样实现值分区的map side join.
上图解释:b表是大表,a,c是小表并且都是整数倍,将a,c表加入内存先join然后到每个b表的map去做匹配。
Sort Merge Bucket Map Join
Why:
No limit on file/partition/table size.
How:
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
1.Work together with bucket map join
将bucket加入到map join中
2.Bucket columns == Join columns == sort columns
Bucket Map?Join?并没有解决map?join 在小表必须完全装载进内存的限制, 如果想要在一个reduce 节点的大表和小表都不用装载进内存,必须使两个表都在join?key 上有序才行,你可以在建表的时候就指定sorted byjoin?key?或者使用index 的方式.
做法还是两边要做hash bucket,而且每个bucket内部要进行排序。这样一来当两边bucket要做局部join的时候,只需要用类似merge sort算法中的merge操作一样把两个bucket顺序遍历一遍即可完成,这样甚至都不用把一个bucket完整的加载成hashtable,这对性能的提升会有很大帮助。
set hive.optimize.bucketmapjoin?= true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
Bucket columns ==?Join?columns == sort columns
这样小表的数据可以每次只读取一部分,然后还是用大表一行一行的去匹配,这样的join?没有限制内存的大小. 并且也可以执行全外连接.
Skew Join
Join bottlenecked on the reducer who gets the
skewed key
set hive.optimize.skewjoin = true;
set hive.skewjoin.key = skew_key_threshold