Hive Join

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Joins

LanguageManualJoins

Join Syntax

Hive支持下面的表join语法结构:

join_table:

table_reference JOIN table_factor[join_condition]

| table_reference {LEFT|RIGHT|FULL} [OUTER]JOIN table_reference join_condition

| table_reference LEFT SEMI JOINtable_reference join_condition

| table_reference CROSS JOIN table_reference[join_condition] (as of Hive 0.10)

table_reference:

table_factor

| join_table

table_factor:

tbl_name [alias]

| table_subquery alias

| ( table_references )

join_condition:

ON equality_expression ( ANDequality_expression )*

equality_expression:

expression = expression

注释:在Hive的joins,outerjoins和left
semi joins只支持等式连接,不支持不等式连接,因为不等式连接很难转化成map/reduce的job。

Version 0.13.0+: Implicit joinnotation(隐式连接符号)

从Hive0.13.0开始支持Implicit join notation,允许from子句去join以逗号分隔的表,省略掉join关键字,如下:

SELECT *

FROM table1t1, table2 t2, table3 t3

WHERE t1.id= t2.id AND t2.id = t3.id AND t1.zipcode = ‘02535‘;

Version 0.13.0+: Unqualified columnreferences

从Hive0.13.0开始支持非指定字段的引用,如下:

CREATE TABLE a (k1 string, v1 string);

CREATE TABLE b (k2 string, v2 string);

SELECT k1, v1, k2, v2

FROM a JOIN b ON k1 = k2;

如果一个字段在多个表中出现,则Hive会指出它是一个歧义的引用。

Examples

下面有几点关于Hive的join连接重要的地方:

1)  只支持等式join

SELECTa.* FROM a JOIN b ON (a.id = b.id);

SELECTa.* FROM a JOIN b ON (a.id = b.id AND a.department = b.department);

2)  支持多张表join

SELECTa.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key =b.key2);

3)  生成一个MRJob:多表连接,如果多个表中每个表都使用同一个列进行连接(出现在JOIN子句中),则只会生成一个MR(map/reduce)Job比如:

SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key =
b.key1) JOIN c ON (c.key =
b.key1);

三个表a、b、c都分别使用了同一个字段进行连接,亦即同一个字段同时出现在两个JOIN子句中,从而只生成一个MRJob。

生成多个MRJob:多表连接,如果多表中,其中存在一个表使用了至少2个字段进行连接(同一个表的至少2个列出现在JOIN子句中),则会至少生成2个MRJob,如下的sql将转化为两个map/reduce任务:

SELECTa.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key =b.key2);

三个表基于2个字段进行连接,这两个字段b.key1和b.key2同时出现在b表中。连接的过程是这样的:首先a和b表基于a.key和b.key1进行连接,对应着第一个MRJob;表a和b连接的结果,再和c进行连接,对应着第二个MRJob。

4)   表连接顺序优化

多表连接,会转换成多个MRJob,每一个MR Job在Hive中称为JOIN阶段(Stage)。在每一个Stage,按照JOIN顺序中的最后一个表应该尽量是大表,因为JOIN前一阶段生成的数据会存在于Reducer的buffer中,通过stream最后面的表,直接从Reducer的buffer中读取已经缓冲的中间结果数据(这个中间结果数据可能是JOIN顺序中,前面表连接的结果的Key,数据量相对较小,内存开销就小),这样,与后面的大表进行连接时,只需要从buffer中读取缓存的Key,与大表中的指定Key进行连接,速度会更快,也可能避免内存缓冲区溢出。例如:

SELECTa.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key =b.key1);

这个JOIN语句,会生成一个MRJob,在选择JOIN顺序的时候,数据量相比应该是b< c,表a和b基于a.key= b.key1进行连接,得到的结果(基于a和b进行连接的Key)会在Reducer上缓存在buffer中,在与c进行连接时,从buffer中读取Key(a.key=b.key1)来与表c的c.key进行连接。

另外,也可以通过给出一些Hint信息来启发JOIN操作,这指定了将哪个表作为大表,从而得到优化。例如:

SELECT/*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOINc ON (c.key = b.key1);

上述JOIN语句中,a表被视为大表,则首先会对表b和c进行JOIN,然后再将得到的结果与表a进行JOIN。

如果STREAMTABLE省略掉了,那么Hive会在join中streams最右边的表。

5)  LEFT,RIGHT和FULLOUTER joins的存在是为了提供更多对on语句中没有匹配的控制。

SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key);

6)  基于条件的LEFTOUTER JOIN优化(逻辑同样适合RIGHTand FULL joins)

表的join操作是在where语句之前执行的。

左连接时,左表中出现的字段值都会保留,右表没有连接上的字段值都为空。

例如:

SELECT a.val, b.val FROM a LEFT OUTERJOIN b ON (a.key=b.key)

WHERE a.ds=‘2015-06-21‘ ANDb.ds=‘2015-06-21‘;

执行顺序为:首先a和b表join,然后结果再通过where条件过滤,这样我们会发现在join过程中可能会输出大量结果,再对这些结果进行过滤操作,比较耗时。

进行优化时,可以将where条件放在on语句中,如下:

SELECT a.val, b.val FROM a LEFT OUTERJOIN b

ON (a.key=b.key AND b.ds=‘2015-06-21‘ ANDa.ds=‘2015-06-21‘);

Join 是不能交换位置的。无论是LEFT 还是 RIGHT join,都是左连接的,例如:

SELECT a.val1,a.val2, b.val, c.val

FROM a

JOIN b ON (a.key = b.key)

LEFT OUTER JOIN c ON (a.key = c.key);

先join a表和b表,丢掉所有joinkey中不匹配的记录,然后用这一中间结果和c表做join。当一个key在a表和c表都存在,但是b表中不存在的时候:整个记录在第一次join,即ajoin b的时候都被丢掉了(包括a.val1,a.val2和a.key),然后我们再和c表join的时候,就会得到这样的结果:a.val1,a.val2, b.val, null。

如果使用RIGHT OUTERJOIN代替LEFT,那么我们将得到这样的结果:

NULL,NULL,NULL,c.val

示例如下:

hive (hive)> select * from a;

a.id  a.name

1     jiangshouzhuang

2     zhangyun

hive (hive)> select * from b;

b.id b.name

1     jiangshouzhuang

3     baobao

hive (hive)> select * from c;

c.id  c.name

2     zhangyun

4     xiaosan

hive (hive)> SELECT a.name, b.name, c.name

> FROM a

> JOIN bON (a.id = b.id)

> LEFTOUTER JOIN c ON (a.id = c.id);

jiangshouzhuang  jiangshouzhuang  NULL

hive (hive)> SELECT a.name, b.name, c.name

> FROM a

> JOIN bON (a.id = b.id)

> rightOUTER JOIN c ON (a.id = c.id);

a.name   b.name   c.name

NULL     NULL     zhangyun

NULL     NULL     xiaosan

hive (hive)> SELECT a.name, b.name, c.name

> FROM cLEFT OUTER JOIN a ON (c.id = a.id) LEFT OUTER JOIN b

a.name   b.name   c.name

zhangyun      NULL     zhangyun

NULL     NULL     xiaosan

7)  左半连接(leftsemi join)

左半连接可以更加有效地实现了类似in/exists的查询语义,例如:

SELECTa.key, a.value

FROMa

WHEREa.key in

(SELECT b.key FROM B);

可以用下面的语句替换:

SELECTa.key, a.val

FROMa LEFT SEMI JOIN b ON (a.key = b.key);

需要注意的是,在leftsemi join中,b表只能出现在on子句的后面,不能出现在select和where子句中。

关于子查询,Hive支持情况如下:

·        在0.12版本,只支持FROM子句中的子查询;

·        在0.13版本,也支持WHERE子句中的子查询;

·        在0.13不包,IN/NOTIN/EXISTS/NOT EXISTS支持使用子查询。

8)  MapSide Join

MapSide Join优化的出发点是,Map任务的输出,不需要将数据拷贝到Reduce节点,从而降低了数据在网络节点之间传输的开销。

对于多表连接,如果只有一个表比较大,其他表都很小,则join操作会转化为一个只包含Map的job任务,例如:

SELECT/*+ MAPJOIN(b) */ a.key, a.value

FROMa JOIN b ON a.key = b.key;

对于a表数据的每一个map,都很够完全读取b表的数据。

注意:这里,表a和表b不允许执行FULL/RIGHTOUTER JOIN操作。

补充:

Hive内置提供的优化机制之一就包括MapJoin:

在Hivev0.7之前,需要给出MapJoin的指示(hint),Hive才会提供MapJoin的优化。

Hivev0.7之后的版本已经不需要给出MapJoin的指示就进行优化。

它是通过如下配置参数来控制的:

hive>set hive.auto.convert.join=true;

hive0.11之后,在表的大小符合设置时

hive.auto.convert.join.noconditionaltask=true

hive.auto.convert.join.noconditionaltask.size=10000000

hive.mapjoin.smalltable.filesize=25000000

默认会把join转换为mapjoin(hive.ignore.mapjoin.hint=true,hive.auto.convert.join=true)

Hivev0.12.0版本,缺省状况下MapJoin优化是打开的。

也就是hive.auto.convert.join=true。

Hive还提供另外一个参数--表文件的大小作为开启和关闭MapJoin的阈值。

hive.mapjoin.smalltable.filesize=25000000

9)  BucketMap Side Join

如果表进行join,同时join的列也是bucket列,并且一张表的bucket数是另外一张表的bucket数的倍数,那么表之间的buckets可以进行join。

如果表A有4个buckets,表B也有4个buckets,那么下面的join

SELECT/*+ MAPJOIN(b) */ a.key, a.value

FROMa JOIN b ON a.key = b.key;

只需在mapper阶段完成。默认情况下,对于表a的每一个bucket,都会去获取表b中的每一个bucket来进行join,这回造成一定的开销,因为只有表b中满足join条件的bucket才会真正与表a的bucket进行连接。

可以设置如下参数进行优化:

sethive.optimize.bucketmapjoin=true;

这样,join的过程是:表b的bucket1只会与表b的bucket1进行join,而不再考虑表b中的其他bucket2~4。

示例:

创建表a:

CREATE TABLE a(key INT, value STRING)
CLUSTERED BY(key) INTO 6 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘\001‘
COLLECTION ITEMS TERMINATED BY ‘\002‘
MAP KEYS TERMINATED BY ‘\003‘
STORED AS SEQUENCEFILE;
 
创建表b:
CREATE TABLE b(key INT, value STRING)
CLUSTERED BY(key) INTO 36 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘\001‘
COLLECTION ITEMS TERMINATED BY ‘\002‘
MAP KEYS TERMINATED BY ‘\003‘
STORED AS SEQUENCEFILE;
现在要基于a.key和b.key进行JOIN操作,此时JOIN列同时也是BUCKET列,JOIN语句如下:
SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a JOIN b ON a.key = b.key;
 
JOIN的过程是,表a的BUCKET 1只会与表b中的BUCKET 1进行JOIN,而不再考虑表b中的其他BUCKET 2~36。
如果上述表具有相同的BUCKET,如都是36个,而且还是排序的,即,在表定义中在CLUSTERED BY(key)后面增加如下约束:
SORTED BY(key)
则上述JOIN语句会执行一个Sort-Merge-Bucket (SMB) JOIN,同样需要设置如下参数来改变默认行为,优化JOIN时只遍历相关的BUCKET即可:
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
 
上面三个参数默认值为:
hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
hive.optimize.bucketmapjoin=false
hive.optimize.bucketmapjoin.sortedmerge=false;

10)        MapJoin Restrictions

SELECT/*+ MAPJOIN(b) */ a.key, a.value

FROMa JOIN b ON a.key = b.key;

不需要reducer,对于表A的每个mapper,可以完全读取表B。

下面列出的都是MapJoin不支持的:

?UnionFollowed by a MapJoin

?LateralView Followed by a MapJoin

?ReduceSink (Group By/Join/Sort By/Cluster By/Distribute By) Followed by MapJoin

?MapJoinFollowed by Union

?MapJoinFollowed by Join

?MapJoinFollowed by MapJoin

配置参数hive.auto.convert.join=true,如果可能的话,自动将joins转换为mapjoins,它应该取代使用mapjoinhint。

下面的查询应该使用mapjoinhint:

如果所有的inputs都被bucketed或者sorted,并且join应该被转换为bucketizedmap-size join或者bucketized-mergejoin。

 

参考内容

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Joins

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+JoinOptimization

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SubQueries

https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior

时间: 2024-08-08 11:13:45

Hive Join的相关文章

Hive JOIN使用详解

转自http://shiyanjun.cn/archives/588.html Hive是基于Hadoop平台的,它提供了类似SQL一样的查询语言HQL.有了Hive,如果使用过SQL语言,并且不理解Hadoop MapReduce运行原理,也就无法通过编程来实现MR,但是你仍然可以很容易地编写出特定查询分析的HQL语句,通过使用类似SQL的语法,将HQL查询语句提交Hive系统执行查询分析,最终Hive会帮你转换成底层Hadoop能够理解的MR Job. 对于最基本的HQL查询我们不再累述,这

hive join 空指针异常

2015-07-28 10:03:21,557 Stage-2 map = 100%, reduce = 0% Ended Job = job_1437720498561_1035 with errors Error during job, obtaining debugging information... Examining task ID: task_1437720498561_1035_m_000000 (and more) from job job_1437720498561_1035

hive join 优化 --小表join大表

1.小.大表 join 在小表和大表进行join时,将小表放在前边,效率会高,hive会将小表进行缓存. 2.mapjoin 使用mapjoin将小表放入内存,在map端和大表逐一匹配,从而省去reduce. 例子: select /*+MAPJOIN(b)*/ a.a1,a.a2,b.b2 from tablea a JOIN tableb b ON a.a1=b.b1 在0.7版本后,也可以用配置来自动优化 set hive.auto.convert.join=true;

一例 Hive join 优化实战

由于 hive 与传统关系型数据库面对的业务场景及底层技术架构都有着很大差异,因此,传统数据库领域的一些技能放到 Hive 中可能已不再适用.关于 hive 的优化与原理.应用的文章,前面也陆陆续续的介绍了一些,但大多都偏向理论层面,本文就介绍一个实例,从实例中一步步加深对 hive 调优的认识与意识. 1.需求 需求我做了简化,很简单,两张表做个 join,求指定城市,每天的 pv,用传统的 RDBMS SQL 写出来就这样的: SELECT t.statdate, c.cname, coun

hive join on和where条件之间的区别

hive> select ljn001.*,ljn002.* > from ljn001 left outer join ljn002   > on (ljn001.a = ljn002.a and ljn001.b = ljn002.b and ljn001.b = 2); OK a       b       a       b 1       2       1       2从执行计划中可以看出Hive在扫描ljn001表的map操作时就已经对b = 2进行了过滤.可见Hive把

hive join详解

语法 join_table: table_referenceJOIN table_factor [join_condition] | table_reference{LEFT|RIGHT|FULL} [OUTER] JOIN table_reference join_condition | table_reference[url=]LEFT SEMIJOIN[/url]  table_reference join_condition table_reference: table_factor |

Hive Join Strategies hive的连接策略

Common Join 最为普通的join策略,不受数据量的大小影响,也可以叫做reduce side join ,最没效率的一种join 方式. 它由一个mapreduce job 完成. 首先将大表和小表分别进行map 操作, 在map shuffle 的阶段每一个map output key 变成了table_name_tag_prefix + join_column_value , 但是在进行partition 的时候它仍然只使用join_column_value 进行hash. 每一个

Hive中JOIN操作

1. 只支持相等JOIN. 2. 多表连接当使用不同的列进行JOIN时,会产生多个MR作业. 3. 最后的表的数据是从流中读取,而前面的会在内存中缓存,因此最好把最大的表放在最后. SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)//暗示 4. JOIN在WHERE子句前进行处理. SELECT a.val, b.val FRO

Hive入门到剖析(二)

5 Hive参数 hive.exec.max.created.files 说明:所有hive运行的map与reduce任务可以产生的文件的和 默认值:100000 hive.exec.dynamic.partition 说明:是否为自动分区 默认值:false hive.mapred.reduce.tasks.speculative.execution 说明:是否打开推测执行 默认值:true hive.input.format 说明:Hive默认的input format 默认值: org.a