Hive Join Strategies hive的连接策略

Common Join

最为普通的join策略,不受数据量的大小影响,也可以叫做reduce side join ,最没效率的一种join 方式. 它由一个mapreduce job 完成.

首先将大表和小表分别进行map 操作, 在map shuffle 的阶段每一个map output key 变成了table_name_tag_prefix + join_column_value , 但是在进行partition 的时候它仍然只使用join_column_value 进行hash.

每一个reduce 接受所有的map 传过来的split , 在reducce 的shuffle 阶段,它将map output key 前面的table_name_tag_prefix 给舍弃掉进行比较. 因为reduce 的个数可以由小表的大小进行决定,所以对于每一个节点的reduce 一定可以将小表的split 放入内存变成hashtable. 然后将大表的每一条记录进行一条一条的比较.

真正的Join在reduce阶段。

MapJoin

Map Join 的计算步骤分两步,将小表的数据变成hashtable广播到所有的map 端,将大表的数据进行合理的切分,然后在map 阶段的时候用大表的数据一行一行的去探测(probe) 小表的hashtable. 如果join key 相等,就写入HDFS.

map join 之所以叫做map join 是因为它所有的工作都在map 端进行计算.

hive 在map join 上做了几个优化:

hive 0.6 的时候默认认为写在select 后面的是大表,前面的是小表, 或者使用 /*+mapjoin(map_table) */ 提示进行设定. hive 0.7 的时候这个计算是自动化的,它首先会自动判断哪个是小表,哪个是大表,这个参数由(hive.auto.convert.join=true)来控制. 然后控制小表的大小由(hive.smalltable.filesize=25000000L)参数控制(默认是25M),当小表超过这个大小,hive 会默认转化成common join. 你可以查看HIVE-1642.

首先小表的Map 阶段它会将自己转化成MapReduce Local Task ,然后从HDFS 取小表的所有数据,将自己转化成Hashtable file 并压缩打包放入DistributedCache 里面.

目前hive 的map join 有几个限制,一个是它打算用BloomFilter 来实现hashtable , BloomFilter 大概比hashtable 省8-10倍的内存, 但是BloomFilter 的大小比较难控制.

现在DistributedCache 里面hashtable默认的复制是3份,对于一个有1000个map 的大表来说,这个数字太小,大多数map 操作都等着DistributedCache 复制.

优化后的map-join

Converting Common Join into Map Join

判断谁是大表谁是小表(小表的标准就是size小于hive.mapjoin.smalltable.filesize的值)

Hive在Compile阶段的时候对每一个common join会生成一个conditional task,并且对于每一个join table,会假设这个table是大表,生成一个mapjoin task,然后把这些mapjoin tasks装进

conditional task(List<Task<? extends Serializable>> resTasks),同时会映射大表的alias和对应的mapjoin task。在runtime运行时,resolver会读取每个table alias对应的input file size,如果小表的file size比设定的threshold要低 (hive.mapjoin.smalltable.filesize,默认值为25M),那么就会执行converted mapjoin task。对于每一个mapjoin task同时会设置一个backup task,就是先前的common join task,一旦mapjoin task执行失败了,则会启用backup task

Performance Bottleneck

性能瓶颈

1、Distributed Cache is the potential performance bottleneck

分布式缓存是一个潜在的性能瓶颈

A、Large hashtable file will slow down the propagation of Distributed Cache

大的hashtable文件将会减速分布式缓存的传播

B、Mappers are waiting for the hashtables file from Distributed Cache

Mapper排队等待从分布式缓存获取hashtables(因为默认一个hashtable缓存是三份,如果mappers数量太多需要一个一个的等待)

2、Compress and archive all the hashtable file into a tar file.

压缩和归档所有的hashtable文件为一个tar文件。

Bucket Map Join

Why:

Total table/partition size is big, not good for mapjoin.

How:

set hive.optimize.bucketmapjoin = true;

1. Work together with map join

2. All join tables are bucketized, and each small table?s bucket number can be divided by big table?s bucket number.

所有join的表是bucketized并且小表的bucket数量是大表bucket数量的整数倍

3. Bucket columns == Join columns

hive 建表的时候支持hash 分区通过指定clustered by (col_name,xxx ) into number_buckets buckets 关键字.

当连接的两个表的join key 就是bucket column 的时候,就可以通过

hive.optimize.bucketmapjoin= true

来控制hive 执行bucket map join 了, 需要注意的是你的小表的number_buckets 必须是大表的倍数. 无论多少个表进行连接这个条件都必须满足.(其实如果都按照2的指数倍来分bucket, 大表也可以是小表的倍数,不过这中间需要多计算一次,对int 有效,long 和string 不清楚)

Bucket Map Join 执行计划分两步,第一步先将小表做map 操作变成hashtable 然后广播到所有大表的map端,大表的map端接受了number_buckets 个小表的hashtable并不需要合成一个大的hashtable,直接可以进行map 操作,map 操作会产生number_buckets 个split,每个split 的标记跟小表的hashtable 标记是一样的, 在执行projection 操作的时候,只需要将小表的一个hashtable 放入内存即可,然后将大表的对应的split 拿出来进行判断,所以其内存限制为小表中最大的那个hashtable 的大小.

Bucket Map Join 同时也是Map Side Join 的一种实现,所有计算都在Map 端完成,没有Reduce 的都被叫做Map Side Join ,Bucket 只是hive 的一种hash partition 的实现,另外一种当然是值分区.

create table a (xxx) partition by (col_name)

不过一般hive 中两个表不一定会有同一个partition key, 即使有也不一定会是join key. 所以hive 没有这种基于值的map side join, hive 中的list partition 主要是用来过滤数据的而不是分区. 两个主要参数为(hive.optimize.cp = true 和 hive.optimize.pruner=true)

hadoop 源代码中默认提供map side join 的实现, 你可以在hadoop 源码的src/contrib/data_join/src 目录下找到相关的几个类. 其中TaggedMapOutput 即可以用来实现hash 也可以实现list , 看你自己决定怎么分区. Hadoop Definitive Guide 第8章关于map side join 和side data distribution 章节也有一个例子示例怎样实现值分区的map side join.

上图解释:b表是大表,a,c是小表并且都是整数倍,将a,c表加入内存先join然后到每个b表的map去做匹配。

Sort Merge Bucket Map Join

Why:

No limit on file/partition/table size.

How:

set hive.optimize.bucketmapjoin = true;

set hive.optimize.bucketmapjoin.sortedmerge = true;

set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;

1.Work together with bucket map join

将bucket加入到map join中

2.Bucket columns == Join columns == sort columns

Bucket Map?Join?并没有解决map?join 在小表必须完全装载进内存的限制, 如果想要在一个reduce 节点的大表和小表都不用装载进内存,必须使两个表都在join?key 上有序才行,你可以在建表的时候就指定sorted byjoin?key?或者使用index 的方式.

做法还是两边要做hash bucket,而且每个bucket内部要进行排序。这样一来当两边bucket要做局部join的时候,只需要用类似merge sort算法中的merge操作一样把两个bucket顺序遍历一遍即可完成,这样甚至都不用把一个bucket完整的加载成hashtable,这对性能的提升会有很大帮助。

set hive.optimize.bucketmapjoin?= true;

set hive.optimize.bucketmapjoin.sortedmerge = true;

set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;

Bucket columns ==?Join?columns == sort columns

这样小表的数据可以每次只读取一部分,然后还是用大表一行一行的去匹配,这样的join?没有限制内存的大小. 并且也可以执行全外连接.

Skew Join

Join bottlenecked on the reducer who gets the

skewed key

set hive.optimize.skewjoin = true;

set hive.skewjoin.key = skew_key_threshold

时间: 2024-08-01 10:22:56

Hive Join Strategies hive的连接策略的相关文章

Hive JOIN使用详解

转自http://shiyanjun.cn/archives/588.html Hive是基于Hadoop平台的,它提供了类似SQL一样的查询语言HQL.有了Hive,如果使用过SQL语言,并且不理解Hadoop MapReduce运行原理,也就无法通过编程来实现MR,但是你仍然可以很容易地编写出特定查询分析的HQL语句,通过使用类似SQL的语法,将HQL查询语句提交Hive系统执行查询分析,最终Hive会帮你转换成底层Hadoop能够理解的MR Job. 对于最基本的HQL查询我们不再累述,这

Hive Join

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Joins LanguageManualJoins Join Syntax Hive支持下面的表join语法结构: join_table: table_reference JOIN table_factor[join_condition] | table_reference {LEFT|RIGHT|FULL} [OUTER]JOIN table_reference jo

hive工作中的一些优化策略

1.hive抓取策略 hive.fetch.task.conversion = more/none more不走mr,none走mr 2.explain 显示执行计划 3.设置本地运行模式 set hive.exec.mode.local.auto = true hive.exec.mode.local.inputbytes.max 默认128M,表示加载文件的最大值,若大于该配置仍会以集群方式运行 4.并行计算 Set hive.exec.parallel = true/falses Set

安装Hive(独立模式 使用mysql连接)

安装Hive(独立模式 使用mysql连接) 1.默认安装了java+hadoop 2.下载对应hadoop版本的安装包 3.解压安装包 tar zxvf apache-hive-1.2.1-bin.tar.gz 4.安装mysql yum -y install mysql-server mysql mysqldev //需要以root身份运行 另外可能需要配置yum源 mysql常用命令: service mysqld start/stop chkconfig mysqld on //加入开机

hive join 空指针异常

2015-07-28 10:03:21,557 Stage-2 map = 100%, reduce = 0% Ended Job = job_1437720498561_1035 with errors Error during job, obtaining debugging information... Examining task ID: task_1437720498561_1035_m_000000 (and more) from job job_1437720498561_1035

全网最详细的hive-site.xml配置文件里如何添加达到Hive与HBase的集成,即Hive通过这些参数去连接HBase(图文详解)

不多说,直接上干货! 一般,普通的情况是 <configuration> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://master:3306/metastore?createDatabaseIfNotExist=true</value> </property> <property> <name>

Hive 12、Hive优化

要点:优化时,把hive sql当做map reduce程序来读,会有意想不到的惊喜. 理解hadoop的核心能力,是hive优化的根本. 长期观察hadoop处理数据的过程,有几个显著的特征: 1.不怕数据多,就怕数据倾斜. 2.对jobs数比较多的作业运行效率相对比较低,比如即使有几百行的表,如果多次关联多次汇总,产生十几个jobs,没半小时是跑不完的.map reduce作业初始化的时间是比较长的. 3.对sum,count来说,不存在数据倾斜问题. 4.对count(distinct )

Hive(四)hive函数与hive shell

一.hive函数 1.hive内置函数 (1)内容较多,见< Hive 官方文档>            https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF        (2)详细解释:            http://blog.sina.com.cn/s/blog_83bb57b70101lhmk.html (3) 测试内置函数的快捷方式: 1.创建一个 dual 表 create table dual

Hive学习心得&amp;Hive的UDF

一:Hive基本介绍 Hive是建立在Hadoop上的数据仓库基础构架,它提供了一系列工具可以用来进行数据提取.转化.加载,这是一种可以存储.查询和分析存储在Hadoop中的大规模的数据机制. 使用语言:QL语言(类SQL查询语言).能处理内建的mapper和reducer无法完成的复杂分析工作. 1.Hive是SQL解析引擎,它将SQL语句转译成M/R Job然后再Hadoop执行. 2.Hive的表其实是HDFS的目录(/文件夹),按表名把文件夹区分开.如果是分区表,则分区值是子文件夹,可以