Hive性能优化

架构层面优化:

l  分表

l  合理利用中间结果集,避免查过就丢的资源浪费,减低Hadoop的IO负载

l  常用复杂或低效函数尽量不用或拆分成其他实现方式,如count(distinct)

l  合理设计表分区,静态分区和动态分区

l  优化时一定要把握整体,单个作业最优不如整个作业最优。

l  文件存储格式和压缩方式

l  Hadoop本身的优化

l  有些逻辑,使用系统函数可能比较复杂,可能涉及多层嵌套,建议使用自定义函数实现。

架构层面优化,我这里不做过多介绍了,写HQL时要时常考虑按照map-reduce执行方式来写,平时多注意一下,很多问题都可以避免的。下面的介绍的优化中,或多或少对架构层面的优化都有涉及。

参数层面优化手段

l  合理控制map和reduce数

l  合并小文件

l  避免数据倾斜,解决数据倾斜问题

l  减少job数(合并Job,大Job拆分)

l  Hive Job优化

合理控制mappers和reducers数

Mappers数

Mappers过多情况下:

l  Map阶段输出文件太小,产生大量小文件

l  初始化和创建Mappers进程的开销很大

Mappers太少情况下:

l  文件处理或查询并发度小,Job执行时间过长

l  大量作业时,容易堵塞集群

通常情况下,Job会通过输入文件产生一个或多个mapper数,

主要的决定因素有两个:输入的文件数,输入的文件大小。

 

举例:

a)      假设输入只有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个block(6个128M的block和1个12M的block,dfs.block.size是128M),从而产生7个mappers数。

b)      假设输入有3个文件a、b、c,大小分别为10m,20m,130m,那么hadoop会将其分隔为4个block(10m,20m,128m,2m),从而产生4个mappers数。

注释:以上两种情况均不考虑文件合并的情况。

两种方式控制Mapper数:即减少mapper数和增加mapper数

l  减少mapper数可以通过合并小文件来实现,这点是对文件源处理。

l  增加mapper数可以通过控制上一个job的reducer数来控制(比如:一个sql中多表join会产生多个Map-Reduce任务)。

比如增大mapred.reduce.tasks数值。

下面介绍map端的几个控制参数:

l  set mapred.map.tasks=10;

此参数直接设置,有时并不生效,其实它是hadoop的参考数值。

下面我说一下直接设置不生效的原因:

默认mapper个数计算为:

# total_size为输入文件总大小,dfs_block_size为HDFS设置的数据块大小(一般为128MB)

default_mapper_num=total_size/dfs_block_size;

我们通过参数直接设置的期望mapper个数为:

# setmapred.map.tasks=10;

#这个参数设置只有在大于default_mapper_num的时候,才会生效

goal_mapper_num=mapred.map.tasks;

下面我们来计算一下,经过map端split处理的文件大小和个数:

#mapred.min.split.size(数据的最小分割单元大小)

#mapred.min.split.size 设置每个task处理的文件大小,只有在大于dfs_block_size值时才会生效

split_size=max(mapred.min.split.size,dfs_block_size);

split_num=total_size/split_size;

 

最终计算的mapper个数:

compute_mapper_num=min(split_num,max(default_mapper_num,goal_mapper_num))

        

         总结:

         其实根据我自己的实践,调整mapper数之前,我们一定要确定处理的文件大概大小以及文件的存在形式(很多小文件,还是单个大文件以及其他形式),然后合理地调整mapred.min.split.size和mapred.max.split.size的值。

比如,如果想减少mapper个数,则需要增大mapred.min.split.size的值(因为dfs_block_size一般不变)。

示例:

情况1:输入文件很大,但不是小文件组成的

增大mapred.min.split.size的值。

情况2:输入文件数量很多,且都是小文件,同时每个文件都小于dfs_block_size。

这种情况下通过增大mapred.min.split.size不可行。

原因:增大mapred.min.split.size会造成小文件在网络上来回传输,造成网络负载很大。

解决办法:需要设置下面参数,使用合并小文件方法,将多个输入文件合并后送给mapper处理,从而减少mapper的数量。

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

(下个小章节会介绍小文件合并的优化)

l  Map端的聚合,减少Reduce处理负担:

sethive.map.aggr=true;

l  推测执行:

set mapred.map.tasks.speculative.execution=true;

(reduce端也有类似的参数:mapred.reduce.tasks.speculative.execution)

所谓的推测执行,就是当所有的task都开始运行之后,Job Tracker会统计所有任务的平均进度,如果某个task所在的task node节点配置比较低或者CPU负载很大,导致任务执行比总体任务的平均执行要慢,此时Job Tracker就会在其他节点启动一个新的任务(duplicatetask),原有任务和新任务哪个先执行完就把其他节点的另外一个任务kill掉。这也是我们经常在Job Tracker页面看到,虽然任务执行成功了,但是发现一些任务被kill掉了,就是这个原因。

reduce数

l  Reducers数过多的情况:

生成了很多个小文件(最终输出文件由reducer决定,一个reducer输出一个文件),那么如果这些小文件作为下一个Job输入,则会出现小文件过多需要进行合并的问题。而且启动和初始化reducer需要耗费时间和资源。

l  Reducers数过少:

执行耗时,并且可能出现数据倾斜

l  Reducer个数的决定:

默认情况下,Hive分配reducer个数由下列参数决定:

参数1:hive.exec.reducers.bytes.per.reducer(默认为1G)

参数2:hive.exec.reducers.max(默认为999)

计算reducer数的公式:

N=min(参数2,总输入数据量/参数1)

即默认一个reduce处理1G数据量。

注意:与mapred.map.tasks参数不同,如果设置了setmapred.reduce.tasks参数的数值,忽略上述计算,reducer个数可以由mapred.reduce.tasks直接指定。

l  以下情况只有一个reducer:

某些情况下我们会发现任务中不管数据量多大,不管怎么调整reducer相关的的参数,任务中一直都只有一个reducer任务:

1、  除了数据量小于hive.exec.reducers.bytes.per.reducer参数值的情况外

2、  用了group by的汇总

3、  用了order by

l  Reduce数决定中间或落地文件数,文件大小和HDFS的block大小无关。

l  使用场景描述:

当某个任务有多个Job时,其中某个Job的结果被后面Job多次引用时,设大该参数,以便增加后面访问的Mapper数。

比如,如果一个Job的输出被另外多个Job调用,假如最前面的Job只生成1G的一个文件,那么后面Job也只会有一个Map来处理,效率明显低很多。

l  推测执行

setmapred.reduce.tasks.speculative.execution =true;

sethive.mapred.reduce.tasks.speculative.execution =true;

可以看到除了Map-Reduce侧提供推测执行参数,hive侧也提供了推测执行的参数。

合并小文件

Map阶段Hive自动对小文件进行合并

参数控制:

#Map任务结束时就会合并小文件(Map-Only)

set hive.merge.mapfiles=true;

#在Map-Reduce的任务结束时合并小文件

set hive.merge.mapredfiles=true;

#合并文件的大小(256MB)

set hive.merge.size.per.task=256000000;

#每个mapper最大分隔大小(输入大小)

#结合上面块大小(dfs.block.size=128MB),决定拆分几个mapper数

set mapred.max.split.size=256000000;

#一个节点上split至少的大小

set mapred.min.split.size.per.node=100000000;

#执行Map前进行小文件合并

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

在开启了org.apache.hadoop.hive.ql.io.CombineHiveInputFormat之后,一个datanode节点上多个小文件会进行合并,合并文件数由mapred.max.split.size限制的大小决定。

mapred.min.split.size.per.node决定多个datanode上的文件是否需要合并,即多个节点上的文件也可以合并,大小由此决定。

l  Job合并输入小文件

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

多个小文件由一个map执行。

合并文件数由mapred.max.split.size限制的大小决定。

l  Job合并输出小文件

sethive.merge.smallfiles.avgsize=256000000;

当输出文件平均大小小于该值,启动新job用于合并文件。

对于多个job时,前一个job输出很多大小不均匀的数据文件,对后续的job处理会造成数据倾斜的问题。

如果输出文件大小均匀,则后续处理的mapper数比较合理。

sethive.merge.size.per.task=64000000;

合并之后的文件大小。

案例(那我们自己的开发环境来测试,我们的环境dfs.block.size为64MB):

环境如下(默认配置):

set dfs.block.size=134217728;

sethive.merge.mapfiles=true;

sethive.merge.mapredfiles=false;

sethive.merge.size.per.task=256000000;

setmapred.max.split.size=256000000;

setmapred.min.split.size.per.node=256000000;

setmapred.min.split.size.per.rack=256000000;

sethive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

我构造的表如下:

hive (annuity_safe)> desc formattedtest_data;

OK

col_name        data_type       comment

# col_name              data_type               comment

deptno                  string

polno                   string

certno                  string

brno                    decimal(2,0)

………………………………

set_of_books_id         string

is_return               string

pk_serial               string

op_month                string

# Detailed Table Information

Database:               annuity_safe

Owner:                  hduser0103

CreateTime:             Tue Jul 28 11:41:20 CST 2015

LastAccessTime:         UNKNOWN

Protect Mode:           None

Retention:              0

Location:               hdfs://dev-l002781.app.paic.com.cn:9000/user/hive/warehouse/annuity_safe.db/test_data

Table Type:             MANAGED_TABLE

Table Parameters:

COLUMN_STATS_ACCURATE   true

numFiles                5

numRows                 0

rawDataSize             0

totalSize               279752108

transient_lastDdlTime   1438055637

# Storage Information

SerDe Library:         org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

InputFormat:           org.apache.hadoop.mapred.TextInputFormat

OutputFormat:          org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

Compressed:             No

Num Buckets:            -1

Bucket Columns:         []

Sort Columns:           []

Storage Desc Params:

serialization.format    1

Time taken: 0.083 seconds, Fetched: 95row(s)

构造的表对应的文件路径下有5个文件,文件大小除了xae文件为11MB,其他都大概为64MB。

hive (annuity_safe)> ! hadoop fs -lsrhdfs://dev-l002781.app.paic.com.cn:9000/user/hive/warehouse/annuity_safe.db/test_data;

-rw-r-----   3hduser0103 hduser0103   671088652015-07-28 11:53 /user/hive/warehouse/annuity_safe.db/test_data/xaa

-rw-r-----   3hduser0103 hduser0103   671088642015-07-28 11:53 /user/hive/warehouse/annuity_safe.db/test_data/xab

-rw-r-----   3hduser0103 hduser0103   671088642015-07-28 11:53 /user/hive/warehouse/annuity_safe.db/test_data/xac

-rw-r-----   3hduser0103 hduser0103   671088642015-07-28 11:53 /user/hive/warehouse/annuity_safe.db/test_data/xad

-rw-r-----   3hduser0103 hduser0103   113166512015-07-28 11:53 /user/hive/warehouse/annuity_safe.db/test_data/xae

执行查询(未优化参数情况下):

hive (annuity_safe)> selectdeptno,count(1),min(fcd),max(fcd) from test_data group by deptno;

Total jobs = 1

Launching Job 1 out of 1

Number of reduce tasks not specified.Estimated from input data size: 1

In order to change the average load for areducer (in bytes):

sethive.exec.reducers.bytes.per.reducer=<number>

In order to limit the maximum number ofreducers:

sethive.exec.reducers.max=<number>

In order to set a constant number ofreducers:

setmapred.reduce.tasks=<number>

Starting Job = job_201507191627_38369,Tracking URL =http://dev-l002781.app.paic.com.cn:50030/jobdetails.jsp?user.name=hadoop&jobid=job_201507191627_38369

Kill Command =/appcom/HadoopInstall/hadoop-1.2.1/libexec/../bin/hadoop job  -kill job_201507191627_38369

Hadoop job information for Stage-1: number of mappers:4; number of reducers: 1

2015-07-28 13:44:01,643 Stage-1 map =0%,  reduce = 0%

2015-07-28 13:44:13,786 Stage-1 map =50%,  reduce = 0%, Cumulative CPU 12.15sec

2015-07-28 13:44:14,800 Stage-1 map = 75%,  reduce = 0%, Cumulative CPU 13.89 sec

2015-07-28 13:44:37,214 Stage-1 map =100%,  reduce = 0%, Cumulative CPU 18.61sec

2015-07-28 13:44:38,228 Stage-1 map =100%,  reduce = 33%, Cumulative CPU 18.61sec

2015-07-28 13:44:40,254 Stage-1 map =100%,  reduce = 100%, Cumulative CPU22.29 sec

MapReduce Total cumulative CPU time: 22seconds 290 msec

Ended Job = job_201507191627_38369

MapReduce Jobs Launched:

Job 0: Map: 4  Reduce: 1  Cumulative CPU: 22.29 sec   HDFSRead: 279753293 HDFS Write: 6927 SUCCESS

Total MapReduce CPU Time Spent: 22 seconds290 msec

OK

deptno _c1     _c2     _c3

-08 01:20:15.0  1      NULL    NULL

0000   1       NULL    NULL

01000592864     1      G000000000      G000000000

71679  1       G000000000      G000000000

G01    1       2001-07-19 15:46:42.0   2001-07-19 15:46:42.0

G010103 9       2002-09-17 14:08:45.0   2002-09-17 14:10:44.0

G0123  6161    2006-08-16 14:59:06.0   2010-01-18 15:06:14.0

G014205 51893   2010-10-08 01:05:01.0   2010-10-08 01:23:55.0

G014302 4       2011-09-26 19:12:36.0   2011-09-26 19:12:36.0

G02    32      2000-07-18 09:53:52.0   2002-06-10 13:14:24.0

G020105 10      2000-06-20 14:08:23.0   2000-08-08 10:37:10.0

G020301 5       2000-08-15 10:35:00.0   2000-08-21 11:29:07.0

G020302 2       2000-08-17 09:31:00.0   2000-09-08 10:45:39.0

…………………………………………………

可以发现,出现了4个mappers来处理数据文件。

我们查看页面查看4个task的Counters发现,4个task读取的文件字节数为:

67,109,107

201,327,048

11,316,894

244

加起来为268MB左右,但是每个task处理的数据不均匀,其中有一个task处理了约200MB的数据,一个task处理了244字节的数据,便会出现木桶效应。

案例优化:

1、 环境的参数优化:

set hive.merge.mapfiles=true;

set hive.merge.mapredfiles=true;

sethive.merge.size.per.task=64000000;

set mapred.min.split.size=64000000;

set mapred.max.split.size=64000000;

setmapred.min.split.size.per.node=64000000;

set mapred.min.split.size.per.rack=64000000;

sethive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

因为合并小文件默认为true,而dfs.block.size与hive.merge.size.per.task的搭配使得合并后的绝大部分文件都在64MB左右。

我们不用执行上面的查询语句,就大概可以分析如下:

对于上面表对应的5个数据文件:4个为64MB,1个为11MB,那么上面的查询,会有5个mapper,其中4个mapper分布处理64MB的数据,其他1个mapper分布处理11MB的数据。

2、  运行上面的HQL,出现的结果与我们上面分析的一致。

 

总结:这里我只是提供了一种优化的思路,其实这里还不是最优,我们可以实际的Hadoop环境,hdfs的block大小以及表对应的数据文件,来调整上面的参数(比如将64MB全部修改为128MB,或许更快一点),最终每个task处理的数据大致相同,均衡IO负载,以达到资源最佳使用。

数据倾斜

1、   什么是数据倾斜?

Hadoop框架的特性决定最怕数据倾斜。

JobTracker和TaskTracker关系相当于老师和学生的关系,JobTracker分发任务给TaskTracker去处理各自的工作,但是不是平均分配的,还得根据TaskTracker本地的数据量多少去做判断。如果每个节点数据分配不均匀,势必造成有的TaskTracker处理的数据量大,有的处理的数据量小。

由于数据分布不均匀,可能会造成数据大量集中到一个节点或极少数个节点,造成数据热点。

2、   数据倾斜造成的症状:

map阶段快,reduce阶段非常慢,

某些mapper很快,某些mapper很慢,

某些reducer很快,某些reducer奇慢。

3、   数据倾斜可能在如下场景中出现:

A、 数据在节点上分布不均匀(无法避免)

B、 join时on关键词中个别值量很大(如null值)

C、 count(distinct)在数据量大的情况下,容易数据倾斜,因为count(distinct)是按group by字段分组,按distinct字段排序(有时无法避免)。

其中A无法避免,B见后边介绍的Join优化部分,C语法上有时无法避免。


关键词


情形


后果


Join


其中一个表较小,

但是key集中


分发到某一个或几个Reducer上的数据远高于平均值


大表与大表,但是分桶的判断字段0值或空值过多


这些空值都由一个reducer处理,非常慢


group by


group by 维度过小,

某值的数量过多


处理某值的reducer灰常耗时


Count Distinct


某特殊值过多


处理此特殊值的reducer耗时

减少job数(合并Job,大Job拆分)

1、   减少Job数

当源表相同时,如下可以合并Job,从而减少job数:

l  Join时,On 字段相同

多表join on条件相同时,合并为一个Map-Reduce。

select pt.page_id,count(t.url) pv

from rpt_page_type pt

join

(select refer_page_id,url_page_id,url from trackinfo where ds = ‘2013-10-11’)t

on pt.page_id = t.url_page_id

join

(select page_id from rpt_page_kpi_new where ds = ‘2013-10-11’)r

on t.url_page_id= r.page_id

group by  pt.page_id;

利用这个特性,可以把相同join on条件的放在一个job处理。

l  union all

对同一个表的union all只查询一次源表,Hive本身对这种union all做过优化。

selecturl,session_id from

(selecturl,session_id

from trackinfowhere ds=’2013-11-01’

union all

selecturl,session_id

from trackinfowhere ds=’2013-11-02’

)t;

l  Multi-insert(Multi-group by一定会和Multi-insert一起使用,同一源表,可按照不同where、不同group by进行计算)

条件:源表相同,上方的SQL等同于:

from trackinfo

insert overwrite table tmp_testpartition(step=1)

select url,session_id where ds=’2013-11-01’

insert overwrite table tmp_test partition(step=2)

selecturl,session_id where ds=’2013-11-02’;

Hive Job优化

l  并行化执行

每个查询被hive转化成多个阶段,有些阶段关联性不大,则可以并行化执行,减少执行时间。

sethive.exec.parallel=true;

sethive.exec.parallel.thread.number=15;

实例:

select num

from

(selectcount(city) as num

from city

union all

selectcount(province) as num

fromprovince)tmp;

union all两侧的查询语句会同时执行。

l  本地化执行(感觉生产环境用处不大)

sethive.exec.mode.local.auto=true;

当一个job满足如下条件才能真正使用本地模式:

a、   job的输入数据大小必须小于参数:

hive.exec.mode.local.auto.inputbytes.max(默认128MB)

b、   job的map数必须小于参数:

hive.exec.mode.local.auto.tasks.max(默认为4)

c、  job的reducer数必须为0或者1

如果你的环境不满足上述条件时,执行过程会提示原因,如Input size大于hive.exec.mode.local.auto.inputbytes.max值,或input files个数大于hive.exec.mode.local.auto.tasks.max值,同时会取消本地化执行,改为其他方式执行。

l  JVM重利用

set mapred.job.reuse.jvm.num.tasks=15;

JVM重利用可以是Job长时间保留slot,直到作业结束,这对于有较多任务和较多小文件的任务是非常有意义的,因为减少了JVM的启动和初始化时间,从而减少执行时间。当然这个值不能设置过大,因为有些作业会有reduce任务,如果reduce任务没有完成,则map任务占用的slot不能释放,其他作业可能就需要等待。

l  Hive压缩数据

中间压缩就是处理hive查询的多个job之间的数据,对于中间压缩,最好选择一个节省CPU耗时的压缩方式(即压缩率比较适中)。

set hive.exec.compress.intermediate=true;

set hive.intermediate.compression.codec=

org.apache.hadoop.io.compress.SnappyCodec;

sethive.intermediate.compression.type=BLOCK;

Hive查询最终的输出结果文件采用压缩(落地文件的压缩率可以选择较高的压缩率)

set hive.exec.compress.output=true;

set mapred.output.compression.codec=

org.apache.hadoop.io.compress.GzipCodec;

setmapred.output.compression.type=BLOCK;

语法(包含参数)层面优化

l  Join

l  Mapjoin

l  Bucket join

l  Group by

l  Count(distinct)

l  笛卡尔积

l  提前裁剪数据,避免资源浪费

l  Hive表的优化

Join优化

l  数据按照join的key进行分发,而在join左边的表的数据会首先部分或全部读入内存,如果左边表的key相对分散(单个key值数据量小,或者说相同key的数据量小),读入内存的数据会比较小,join任务执行会比较快,而如果左边的表key比较集中,而这张表的数据量又很大,那么数据倾斜就会比较严重。

Map阶段同一key数据会分发给同一个reducer计算。

l  join原则:

1)       小表join大表

在join操作的Reduce阶段(不是map阶段),位于join左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生内存进出的几率。

解决办法:

多个表关联时,最好分拆成几个小段,避免大sql(无法控制中间job)。

2)       大表join大表

大表关联中,如果join的key中含有大量null,在使用key进行hash分发时,会将数据文件中key为空的数据都分到一个节点,造成了数据倾斜。

解决办法:

把空值的key变成一个字符串加上随机数,把倾斜的数据分发到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。

l  Join中对join key存在大量空值的优化演示:

end_user_id中存在大量null值。

原始HQL:

select u.id, t.url, t.track_time

from end_user u

join

(select end_user_id,url,track_time from trackinfo where ds= ‘2013-12-01‘)t

on u.id = t.end_user_id limit 2;

优化后HQL为:

select u.id, t.url, t.track_time

from end_user u

join

(select case when end_user_id = ‘null‘ or end_user_id is null

then cast(concat(‘00000000’,floor(rand()*1000000))as bigint)

else end_user_id endend_user_id,

url,track_time

from trackinfo where ds= ‘2013-12-01‘) t

on u.id = t.end_user_id limit 2;

Join对数据倾斜的参数优化:

set hive.optimize.skewjoin=true;

如果在join过程中出现倾斜,参数值应该设置为true。

set hive.skewjoin.key=1000000;

这个是join的键(key)对应的记录条数超过这个值则会进行join自动优化。

上面两个参数设置后的优化原理是:

没优化之前,join会启动一个job,但是设置优化参数后,会启动两个job。

第一个job会将键(key)超过hive.skewjoin.key记录的键加上一些随机数等,将这些相同的key打乱,然后跑到不同的节点上面进行计算(reduce阶段)。然后再启动一个job,在第一个job处理的基础上(即第一个job的reduce输出结果)再进行计算,将相同的key分发到相同的节点上处理。

l  Join时的关联键key的数据类型一定要相同,否则会产生数据倾斜问题

由于test_a表中的id为字符串型,所以我们将test_b表数字类型转换成字符串类型

select a.* fromtest_a a

left outer jointest_b b

On a.id =cast(b.id as string);

Mapjoin

Mapjoin(map端执行join操作):

mapjoin的计算原理:

mapjoin会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配。

l  Join 操作在Map阶段完成,如果需要的数据在Map的过程中可以处理掉,则不再需要Reduce阶段,加快了执行效率。

小表关联一个超大表时,容易发生数据倾斜,可以用Mapjoin把小表全部加载到内存,并在map端进行join操作,避免reducer处理。

如:

insert overwrite table page_pv

select /*+ MapJoin(pt)*/

pt.page_id,count(t.url) pv

from rpt_page_type pt

join

(select url_page_id, url from trackinfo where ds = ‘2013-10-11‘) t

on pt.page_id = t.url_page_id;

 

l  mapjoin的使用场景:

1)      关联操作中有一张表非常小

2)      不等值的连接操作

 

l  Mapjoin两种使用方式:

1)      通过参数设置,Hive自动选择执行Mapjoin操作

hive.auto.convert.join=true;

                      hive.mapjoin.smalltable.filesize=25000000;-------默认为25MB

原理:将小于hive.mapjoin.smalltable.filesize数值的表加载到分布式缓存中,这样整个集群节点上map端任务都可以访问缓存中的数据。

                           

2)      另外一种方式,可以不设置参数,通过hint方式指定:

select /*+ mapjoin(test_b) */a.key,a.value fromtest_a a join test_b b on a.key = b.key;

l  Mapjoin其他参数设置

set hive.mapjoin.cache.numrows=25000;

说明:mapjoin存在内存里的数据量。

set hive.mapjoin.followby.gby.localtask.max.memory.usage=0.55;

说明:map join做group by操作时,可以使用多大的内存来存储数据,如果数据太大,则不会保存在内存里。

set hive.mapjoin.localtask.max.memory.usage=0.90;

说明:本地任务可以使用内存的百分比

bucket join

l  使用bucket join需要满足下面两个条件

(1)      两个表以相同方式(key)划分桶

(2)      两个表的桶个数是倍数关系

create table order(cidint,price decimal(18,2)) clustered by (cid) into 32 buckets;

create tablecustomer(id int,first string,last string) clustered by (id) into 32(or 64……)buckets;

select pricefrom order o join customer c on o.cid = c.id;

说明:

查询语法与普通表一样,但是底层执行却不一样。根据key只会查找对应的桶即可,比如:如果cid=1,那么只会从customer中查找id=1的数据,这些数据都位于一个桶中,所以只需访问一个桶即可。

group by

l  Map端部分聚合:

并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端进行部分聚合,最后在Reduce端得出最终结果。

l  Map端部分聚合参数:

#是否在map端进行聚合

hive.map.aggr=true;

l  有数据倾斜的时候进行负载均衡

#如果group by过程中出现倾斜,应该设置为true

hive.groupby.skewindata=true;

#在map端进行聚合操作的条目数目

#这个是group by的键对应的记录条数超过这个值则会进行优化

hive.groupby.mapaggr.checkinterval=100000;

和mapjoin类似,group by优化后也会启动两个Job。

当选项设为true时,生成的查询计划会有两个MR job,第一个MR job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group by key有可能被分发到不同的Reduce中,从而达到负载均衡的目的。

第二个MR job再根据预处理的数据结果按照group bykey分布到Reduce中(这个过程可以保证相同的group by 被分发到同一个Reduce中),最后完成最终的聚合操作。

Count(distinct)

l  当该字段存在大量值为null或空的记录时容易造成倾斜。

解决思路:

1)  count(distinct)时,将值为空的数据在where里过滤掉,在最后结果中加1。

2)  如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union

3)  如果group by维度过小,则可以  采用count和group by的方式来替换count(distinct)完成计算

l  特殊情况特殊处理:

在业务逻辑优化效果不大情况下,有些时候是可以将倾斜的数据单独拿出来处理,最后union回去。

countdistinct优化:

实例1:

优化前:

selectcount(distinct id) from student;

只有一个job任务,而且只有一个reduce,处理的工作量比较大。

优化后:

selectcount(1) from (select distinct id from student) tmp;

selectcount(1) from (select id from student group by id) tmp;

可以通过设置set mapred.reduce.tasks的值,加快(select distinct id from student) tmp部分的处理。

实例2:

优化前:

selecta,sum(b),count(distinct c),count(distinct d)

from test

group by a;

优化后:

select a,sum(b)as b,count(c) as c,count(d) as d

from (

select a,0 as b,c,null as d from test group by a,c

union all

select a,0 as b,null as c,d from test group by a,d

union all

select a,b,null as c,null as d from test

)tmp group bya;

笛卡尔积

尽量避免笛卡尔积,join的时候不加on条件,或者无效的on条件,Hive只能使用1个reducer来完成笛卡尔积。

之前我有遇到过一种情况,不得不使用笛卡尔积,表关联的条件为不等式,还好两张表不大。如果你不得不使用笛卡尔积,那么一定要看一下其中的表是否符合Mapjoin的要求,如果符合,那么一定要使用Mapjoin。

提前裁剪数据,避免资源浪费

join优化前:

select o.cid,c.id

from order o

join customer c

on o.cid = c.id

where o.dt = ‘2015-07-26‘;

join优化后:

select o.cid,c.id

from

(select cid

from order

where dt = ‘2015-07-26‘

)o

join customer c

on o.cid = c.id;

对一些过滤条件,能尽早过滤的就尽早过滤,减少IO资源浪费。

这个需要个人工作中注意就好了。

Hive表的优化

分区:

1)      静态分区

2)      动态分区

set hive.exec.dynamic.partition=true;

set hive.exec.dynamic.partition.mode=nonstrict;

分桶

sethive.enforce.bucketing=true;

sethive.enforce.sorting=true;

数据

相同数据尽量聚集在一起,和分桶原理类似,尽量减少网络数据传输

时间: 2024-10-14 05:28:30

Hive性能优化的相关文章

【转】Hive性能优化

1.概述 继续<那些年使用Hive踩过的坑>一文中的剩余部分,本篇博客赘述了在工作中总结Hive的常用优化手段和在工作中使用Hive出现的问题.下面开始本篇文章的优化介绍. 2.介绍 首先,我们来看看Hadoop的计算框架特性,在此特性下会衍生哪些问题? 数据量大不是问题,数据倾斜是个问题. jobs数比较多的作业运行效率相对比较低,比如即使有几百行的表,如果多次关联多次汇总,产生十几个jobs,耗时很长.原因是map reduce作业初始化的时间是比较长的. sum,count,max,mi

Hive性能优化(新手重新标注版)

以下是一个技术小白根据自己的理解能力在别人整理的基础上进行了一些重点标识和归纳. 一个Hive查询生成多个Map Reduce Job,一个Map Reduce Job又有Map,Spill,Shuffle,Sort,Reduce等多个阶段,所以针对Hive查询的优化可以大致分为针对MR中单个步骤的优化(其中又会有细分),针对MR全局的优化,和针对整个查询(多MR Job)的优化,下文会分别阐述. 在开始之前,先把MR的流程图帖出来(摘自Hadoop权威指南),方便后面对照.另外要说明的是,这个

数据仓库中的 SQL 性能优化(Hive篇)

一个Hive查询生成多个map reduce job,一个map reduce job又有map,reduce,spill,shuffle,sort等多个阶段,所以针对hive查询的优化可以大致分为针对MR中单个步骤的优化(其中又会有细分),针对MR全局的优化,和针对整个查询(多MR job)的优化,下文会分别阐述. 在开始之前,先把MR的流程图帖出来(摘自Hadoop权威指南),方便后面对照.另外要说明的是,这个优化只是针对Hive 0.9版本,而不是后来Hortonwork发起Stinger

Spark性能优化指南——高级篇

Spark性能优化指南--高级篇 [TOC] 前言 继基础篇讲解了每个Spark开发人员都必须熟知的开发调优与资源调优之后,本文作为<Spark性能优化指南>的高级篇,将深入分析数据倾斜调优与shuffle调优,以解决更加棘手的性能问题. 数据倾斜调优 调优概述 有的时候,我们可能会遇到大数据计算中一个最棘手的问题--数据倾斜,此时Spark作业的性能会比期望差很多.数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能. 数据倾斜发生时的现象 绝大多数tas

Spark性能优化指南——基础篇

前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作,应用范围与前景非常广泛.在美团•大众点评,已经有很多同学在各种项目中尝试使用Spark.大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执行速度更快.性能更高. 然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的.如果没有对Spar

App的网络环境测试和性能优化

1. 网络环境测试一般是先用网络损伤模拟仪或mock工具模拟常见的七种损伤和5种网络环境,然后再国内外城市采样的方式(带宽和延时)组合测试生成报告, 下面是一些统计图 2. 采样点的选择一般都是根据自己server收集的用户信息.如果新app就要参考近品/竞品或第三方的统计数据拍脑袋 3. 从测试的角度,应该建立实时监控的web portal.其实测试的目的除了保证产品发布的质量.更重要的是为优化提供依据,所以report最后一部分都是issue list 和optmize advice,当然测

美团Spark性能优化指南——基础篇

http://tech.meituan.com/spark-tuning-basic.html 前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作,应用范围与前景非常广泛.在美团?大众点评,已经有很多同学在各种项目中尝试使用Spark.大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执行速度更快.性

【转载】 Spark性能优化指南——基础篇

前言 开发调优 调优概述 原则一:避免创建重复的RDD 原则二:尽可能复用同一个RDD 原则三:对多次使用的RDD进行持久化 原则四:尽量避免使用shuffle类算子 原则五:使用map-side预聚合的shuffle操作 原则六:使用高性能的算子 原则七:广播大变量 原则八:使用Kryo优化序列化性能 原则九:优化数据结构 资源调优 调优概述 Spark作业基本运行原理 资源参数调优 写在最后的话 前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的

Spark性能优化指南——基础篇转

前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作,应用范围与前景非常广泛.在美团•大众点评,已经有很多同学在各种项目中尝试使用Spark.大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执行速度更快.性能更高. 然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的.如果没有对Spar