由于 hive 与传统关系型数据库面对的业务场景及底层技术架构都有着很大差异,因此,传统数据库领域的一些技能放到 Hive 中可能已不再适用。关于 hive 的优化与原理、应用的文章,前面也陆陆续续的介绍了一些,但大多都偏向理论层面,本文就介绍一个实例,从实例中一步步加深对 hive 调优的认识与意识。
1、需求
需求我做了简化,很简单,两张表做个 join,求指定城市,每天的 pv,用传统的 RDBMS SQL 写出来就这样的:
SELECT t.statdate, c.cname, count(t.cookieid) FROM tmpdb.city c JOIN ecdata.ext_trackflow t ON (t.area1= c.cname OR t.area2 =c.cname OR t.area3 = c.cname) WHERE t.statdate>=‘20140818‘ and t.statdate<=‘20140824‘ AND platform=‘pc‘ GROUP BY t.statdate, c.cname;
怎么样?根据 SQL 看懂需求没问题吧?
2、非等值 join 问题
然后把这条 SQL 贴到 hive 中去执行,然后你会发现报错了:
FAILED: SemanticException [Error 10019]: Line 5:32 OR not supported in JOIN currently ‘cname‘
这是因为 hive 受限于 MapReduce 算法模型,只支持 equi-joins(等值 join),要实现上述的非等值 join,你可以采用笛卡儿积( full Cartesian product )来实现:
SELECT t.statdate, c.cname, count(t.cookieid) FROM tmpdb.city c JOIN ecdata.ext_trackflow t WHERE t.statdate>=‘20140818‘ AND t.statdate<=‘20140824‘ AND platform=‘pc‘ AND (t.area1= c.cname OR t.area2 =c.cname OR t.area3 = c.cname) GROUP BY t.statdate, c.cname;
然后再拿着这条语句执行下。
3、优化:reduce side join VS Cartesian product
如果你真的把这条语句放到 Hive 上执行,然后恰好你有张表还非常大,那么恭喜你。。。集群管理员估计会找你的麻烦了。。。
友情提示:笛卡儿积这种语句在 Hive 下慎用,大数据场景下的 m * n 映射结果你懂的。。。对此,Hive 特意提供了一个环境变量:hive.mapred.mode=strict; 防止笛卡儿积的执行:
FAILED: SemanticException [Error 10052]: In strict mode, cartesian product is not allowed. If you really want to perform the operation, set hive.mapred.mode=nonstrict
从 2 中的观察得知我们在 on 后面跟 join 条件,走的是 reduce side join,如果你在 where 后跟则是走 Cartesian product,但是这里单条 sql 又没法实现 reduce side join,还有没有其它办法呢?
4、改写非等值 join:union all
既然不允许非等值 join,那我们换一下思路,多个子查询 union all,然后汇总:
SELECT dt, name, count(cid) FROM (SELECT t.statdate dt, c.cname name, t.cookieid cid FROM tmpdb.city c JOIN ecdata.ext_trackflow t ON t.area1 =c.cname WHERE t.statdate>=‘20140818‘ AND t.statdate<=‘20140824‘ AND platform=‘pc‘ UNION ALL SELECT t.statdate dt, c.cname name, t.cookieid cid FROM tmpdb.city c JOIN ecdata.ext_trackflow t ON t.area2 =c.cname WHERE t.statdate>=‘20140818‘ AND t.statdate<=‘20140824‘ AND platform=‘pc‘ UNION ALL SELECT t.statdate dt, c.cname name, t.cookieid cid FROM tmpdb.city c JOIN ecdata.ext_trackflow t ON t.area3 =c.cname WHERE t.statdate>=‘20140818‘ AND t.statdate<=‘20140824‘ AND platform=‘pc‘) tmp_trackflow GROUP BY dt, name;
5、优化:map side join
上述语句走的是 reduce side join,从我们的需求及业务得知,tmpdb.city 是一张字典表,数据量很小,因此我们可以试试把上述的语句改写成 mapjoin:
SELECT dt, name, count(cid) FROM (SELECT /*+ MAPJOIN(c) */ t.statdate dt, c.cname name, t.cookieid cid FROM tmpdb.city c JOIN ecdata.ext_trackflow t ON t.area1 =c.cname WHERE t.statdate>=‘20140818‘ AND t.statdate<=‘20140824‘ AND platform=‘pc‘ UNION ALL SELECT /*+ MAPJOIN(c) */ t.statdate dt, c.cname name, t.cookieid cid FROM tmpdb.city c JOIN ecdata.ext_trackflow t ON t.area2 =c.cname WHERE t.statdate>=‘20140818‘ AND t.statdate<=‘20140824‘ AND platform=‘pc‘ UNION ALL SELECT /*+ MAPJOIN(c) */ t.statdate dt, c.cname name, t.cookieid cid FROM tmpdb.city c JOIN ecdata.ext_trackflow t ON t.area3 =c.cname WHERE t.statdate>=‘20140818‘ AND t.statdate<=‘20140824‘ AND platform=‘pc‘) tmp_trackflow GROUP BY dt, name;
6、优化无极限:开启 parallel 和 控制 reduce 个数
上述语句执行时,你可以看到执行计划和状态信息,以及结合你的 union all 语句可知,三个 union 语句之间没有依赖关系,其实是可以并行执行的:
explain SQL... ... STAGE DEPENDENCIES: Stage-11 is a root stage Stage-1 depends on stages: Stage-11 Stage-2 depends on stages: Stage-1 Stage-3 depends on stages: Stage-2, Stage-6, Stage-9 Stage-12 is a root stage Stage-5 depends on stages: Stage-12 Stage-6 depends on stages: Stage-5 Stage-13 is a root stage Stage-8 depends on stages: Stage-13 Stage-9 depends on stages: Stage-8 Stage-0 is a root stage ...
我们在 SQL 前加上如下环境变量选项:
set mapred.reduce.tasks=60; set hive.exec.parallel=true;
让执行计划中的 Stage-11、Stage-12、Stage-13 并行执行,并控制好 reduce task 个数。
完整的语句如下:
hive -e " SET mapred.reduce.tasks=60; SET hive.exec.parallel=TRUE; SELECT dt, name, count(cid) FROM (SELECT /*+ MAPJOIN(c) */ t.statdate dt, c.cname name, t.cookieid cid FROM tmpdb.city c JOIN ecdata.ext_trackflow t ON t.area1 =c.cname WHERE t.statdate>=‘20140818‘ AND t.statdate<=‘20140824‘ AND platform=‘pc‘ UNION ALL SELECT /*+ MAPJOIN(c) */ t.statdate dt, c.cname name, t.cookieid cid FROM tmpdb.city c JOIN ecdata.ext_trackflow t ON t.area2 =c.cname WHERE t.statdate>=‘20140818‘ AND t.statdate<=‘20140824‘ AND platform=‘pc‘ UNION ALL SELECT /*+ MAPJOIN(c) */ t.statdate dt, c.cname name, t.cookieid cid FROM tmpdb.city c JOIN ecdata.ext_trackflow t ON t.area3 =c.cname WHERE t.statdate>=‘20140818‘ AND t.statdate<=‘20140824‘ AND platform=‘pc‘) tmp_trackflow GROUP BY dt, name; " > a1.txt
最后的优化效果是:2 中的语句三个小时没出结果。。。5 比 4 快 8 倍左右,6 比 5 快 2 倍左右。
7、最后的问题:
在 6 的语句执行的时候你会发现,其扫描了 三遍 源文件。而 hive 本身是对 union all 的 join 做了优化的,当多个 union all 子查询同一张表时,只扫描一次源文件,但这里为什么会三个子查询各扫描一次呢?
可能是这里的 union all 子查询使用了 join 的缘故,导致 hive 的 union all 执行计划优化失效了。
关于这块怎么能优化成只扫描一次源文件,或者你有更好的优化方案,欢迎留言交流。
8、Refer:
[1] Hive Query- Joining two tables on three joining conditions with OR operator
[2] LanguageManual JoinOptimization
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+JoinOptimization
[3] hive 执行计划
http://yychao.iteye.com/blog/1749562
[4] Hive SQL解析/执行计划生成流程分析
http://yanbohappy.sinaapp.com/?p=265
[5] 数据仓库中的SQL性能优化(Hive篇)
http://www.zihou.me/html/2014/02/12/9207.html
[6] Hive优化以及执行原理
http://www.smartcitychina.cn/upload/2014-01/14012015376829.pdf
[7] Hive作业优化总结
http://my.oschina.net/yangzhiyuan/blog/262910