hive.optimize.cp=true:列裁剪,取数只取
在读数据的时候,只读取查询中需要用到的列,而忽略其他列。例如,对于查询:SELECT a,b FROM T WHEREe < 10;其中,T 包含 5 个列 (a,b,c,d,e),列 c,d 将会被忽略,只会读取a, b, e 列
hive.optimize.prunner:分区裁剪
LIMIT
hive.limit.optimize.enable=true:优化LIMIT n语句
使用简单limit抽样数据时是否开启优化选项,默认是false,关于limit的优化问题,在hive programming书中解释的是这个feature有drawback,对于抽样的不确定性给出了风险提示;
hive.limit.row.max.size=1000000:
hive.limit.optimize.limit.file=10:最大文件数
hive.limit.optimize.fetch.max:使用简单limit抽样数据允许的最大行数,默认50000,查询query受限,insert不受影响;
1. 本地模式(小任务): 在单台机器上处理所有的任务
需要满足以下条件:
1.job的输入数据大小必须小于参数:hive.exec.mode.local.auto.inputbytes.max(默认128MB)
2.job的map数必须小于参数:hive.exec.mode.local.auto.tasks.max(默认4)
3.job的reduce数必须为0或者1
hive.exec.mode.local.auto.inputbytes.max=134217728
hive.exec.mode.local.auto.tasks.max=4
hive.exec.mode.local.auto=true
hive.mapred.local.mem:本地模式启动的JVM内存大小
2. 并发执行: 参数控制在同一个sql中的不同的job是否可以同时运行,默认为false.
http://blog.csdn.net/jiedushi/article/details/7965604
hive.exec.parallel=true ,默认为false
hive.exec.parallel.thread.number=8 控制对于同一个sql来说同时可以运行的job的最大值,该参数默认为8.此时最大可以同时运行8个job.
3.Strict Mode:nonstrict,strict,默认是nonstrict
hive.mapred.mode=true,严格模式不允许执行以下查询:
分区表上没有指定了分区
没有limit限制的order by语句
笛卡尔积:JOIN时没有ON语句
4.动态分区:
hive.exec.dynamic.partition.mode=strict:该模式下必须指定一个静态分区
hive.exec.max.dynamic.partitions=1000 默认的可以创建的分区数
hive.exec.max.dynamic.partitions.pernode=100:在每一个mapper/reducer节点允许创建的最大分区数
DATANODE:dfs.datanode.max.xceivers=8192:允许DATANODE打开多少个文件
5.推测执行:
mapred.map.tasks.speculative.execution=true
mapred.reduce.tasks.speculative.execution=true
hive.mapred.reduce.tasks.speculative.execution=true;
6.Single MapReduce MultiGROUP BY
hive.multigroupby.singlemar=true:当多个GROUP BY语句有相同的分组列,则会优化为一个MR任务
7. hive.exec.rowoffset:是否提供虚拟列
hive提供了三个虚拟列:INPUT__FILE__NAME,BLOCK__OFFSET__INSIDE__FILE和ROW__OFFSET__INSIDE__BLOCK。但ROW__OFFSET__INSIDE__BLOCK默认是不可用的,需要设置hive.exec.rowoffset为true才可以。可以用来排查有问题的输入数据。
8. 分组
两个聚集函数不能有不同的DISTINCT列,以下表达式是错误的:
INSERT OVERWRITE TABLE pv_gender_agg SELECT pv_users.gender, count(DISTINCT pv_users.userid), count(DISTINCT pv_users.ip) FROM pv_users GROUP BY pv_users.gender;
SELECT语句中只能有GROUP BY的列或者聚集函数。
9. 数据倾斜
关键词
情形
后果
Join
其中一个表较小,
但是key集中
分发到某一个或几个Reduce上的数据远高于平均值
大表与大表,但是分桶的判断字段0值或空值过多
这些空值都由一个reduce处理,灰常慢
group by
group by 维度过小,
某值的数量过多
处理某值的reduce灰常耗时
Count Distinct
某特殊值过多
处理此特殊值的reduce耗时
Select *
from log a
left outer join bmw_users b
on case when a.user_id is null then concat(‘dp_hive’,rand() ) else a.user_id end = b.user_id;
10
hive.map.aggr=true;在map中会做部分聚集操作,效率更高但需要更多的内存。
hive.map.aggr控制如何聚合,默认是false,如果设置为true,Hive将会在map端做第一级的聚合。这通常提供更好的效果,但是要求更多的内存才能运行成功。
set hive.map.aggr=true;
SELECT COUNT(*) FROM table2;
set hive.exec.reducers.max=200;
set mapred.reduce.tasks= 200;---增大Reduce个数
set hive.groupby.mapaggr.checkinterval=100000 ;--这个是group的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置
set hive.skewjoin.key=100000; --这个是join的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置
set hive.optimize.skewjoin=true;--如果是join 过程出现倾斜 应该设置为true
hive.groupby.skewindata=true:数据倾斜时负载均衡,当选项设定为true,生成的查询计划会有两个MRJob。第一个MRJob 中,
Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key
有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到
Reduce中(这个过程可以保证相同的GroupBy Key被分布到同一个Reduce中),最后完成最终的聚合操作。
Multi-Group-By Inserts
multi insert适合基于同一个源表按照不同逻辑不同粒度处理插入不同表的场景,做到只需要扫描源表一次,job个数不变,减少源表扫描次数
union all用好,可减少表的扫描次数,减少job的个数,通常预先按不同逻辑不同条件生成的查询union all后,再统一group by计算,不同表的union all相当于multiple inputs,同一个表的union all,相当map一次输出多条
简单查询或者聚合的输出,可以发送到多个表甚至是hadoop的dfs文件(可以使用hdfs工具管理)。例子,如果伴随着性别的细分,按年龄细分,人们需要根据这些,找到每个页面的访问量,使用下面的查询,可以做到这一点:
FROM pv_users
INSERT OVERWRITE TABLE pv_gender_sum
SELECT pv_users.gender, count(DISTINCT pv_users.userid)
GROUP BY pv_users.gender
INSERT OVERWRITE DIRECTORY ‘/user/facebook/tmp/pv_age_sum‘
SELECT pv_users.age, count(DISTINCT pv_users.userid)
GROUP BY pv_users.age;
12.排序
ORDER BY colName ASC/DESC
hive.mapred.mode=strict时需要跟limit子句
hive.mapred.mode=nonstrict时使用单个reduce完成排序
SORT BY colName ASC/DESC :每个reduce内排序
DISTRIBUTE BY(子查询情况下使用 ):控制特定行应该到哪个reducer,并不保证reduce内数据的顺序
CLUSTER BY :当SORT BY 、DISTRIBUTE BY使用相同的列时。
13.合并小文件
hive.merg.mapfiles=true:合并map输出
hive.merge.mapredfiles=false:合并reduce输出
hive.merge.size.per.task=256*1000*1000:合并文件的大小
hive.mergejob.maponly=true:如果支持CombineHiveInputFormat则生成只有Map的任务执行merge
hive.merge.smallfiles.avgsize=16000000:文件的平均大小小于该值时,会启动一个MR任务执行merge。
14.map/reduce数目
减少map数目:
set mapred.max.split.size
set mapred.min.split.size
set mapred.min.split.size.per.node
set mapred.min.split.size.per.rack
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
增加map数目:
当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,来使得每个map处理的数据量减少,从而提高任务的执行效率。
假设有这样一个任务:
select data_desc, count(1), count(distinct id),sum(case when …),sum(case when ...),sum(…) from a group by data_desc
如果表a只有一个文件,大小为120M,但包含几千万的记录,如果用1个map去完成这个任务,肯定是比较耗时的,这种情况下,我们要考虑将这一个文件合理的拆分成多个,这样就可以用多个map任务去完成。
set mapred.reduce.tasks=10;
create table a_1 as select * from a distribute by rand(123);
这样会将a表的记录,随机的分散到包含10个文件的a_1表中,再用a_1代替上面sql中的a表,则会用10个map任务去完成。每个map任务处理大于12M(几百万记录)的数据,效率肯定会好很多。
reduce数目设置:
参数1:hive.exec.reducers.bytes.per.reducer=1G:每个reduce任务处理的数据量
参数2:hive.exec.reducers.max=999(0.95*TaskTracker数):每个任务最大的reduce数目
reducer数=min(参数2,总输入数据量/参数1)
set mapred.reduce.tasks:每个任务默认的reduce数目。典型为0.99*reduce槽数,hive将其设置为-1,自动确定reduce数目。
hive.exec.reducers.max:reducer的最大个数,如果在mapred.reduce.tasks设置为负值,那么hive将取该值作为reducers的最大可能值。当然还要依赖(输入文件大小/hive.exec.reducers.bytes.per.reducer)所得出的大小,取其小值作为reducer的个数,hive默认是999;
15.使用索引:
hive.optimize.index.filter:自动使用索引
hive.optimize.index.groupby:使用聚合索引优化GROUP BY操作
压缩:
hive.exec.compress.output:一个查询的最后一个map/reduce任务输出是否被压缩的标志,默认为false,但是一般会开启为true,好处的话,节省空间不说,在不考虑cpu压力的时候会提高io;
hive.exec.compress.intermediate:类似上个,在一个查询的中间的map/reduce任务输出是否要被压缩,默认false,
尽量尽早地过滤数据,减少每个阶段的数据量,对于分区表要加分区,同时只选择需要使用到的字段
尽量原子化操作,尽量避免一个SQL包含复杂逻辑,可以使用中间表来完成复杂的逻辑
单个SQL所起的JOB个数尽量控制在5个以下
慎重使用mapjoin,一般行数小于2000行,大小小于1M(扩容后可以适当放大)的表才能使用,小表要注意放在join的左边(目前TCL里面很多都小表放在join的右边)。
否则会引起磁盘和内存的大量消耗
SQL要先了解数据本身的特点,如果有join ,group操作的话,要注意是否会有数据倾斜
如果union all的部分个数大于2,或者每个union部分数据量大,应该拆成多个insert into 语句,实际测试过程中,执行时间能提升50%