1.hive中的四种排序
1.1 order by :对全局进行排序,只能有一个reduce
select * from hive.employee order by id;
1.2 sort by :对每一个reduce内部数据进行排序,全局结果集没有排序
set mapreduce.job.reduces=3;设置reduce的个数为3
insert overwrite local directory ‘/opt/data/employee_sort_by‘
row format delimited fields terminated by ‘\t‘ collection items terminated by ‘\n‘
select * from hive.employee sort by dept_id;
1.3 distribute by :对数据进行分区,结合sort by进行合并使用,类似于mapreduce中的mapreduce中的partition,必须在sort by 之前
insert overwrite local directory ‘/opt/data/employee_distribute_by‘
row format delimited fields terminated by ‘\t‘ collection items terminated by ‘\n‘
select * from hive.employee distribute by dept_id sort by id asc;
1.4 cluster by:当distribute by 和 sort by 的字段相同时,可以使用cluster by 代替
2.使用udf自定义函数
2.1 编写udf函数
继承extends UDF
编写evaluate 方法
2.2 导入自定义函数到hive函数库
方法一:
add jar /opt/data/jars/my_lower.jar;
create temporary function my_lower as "com.ibeifeng.hive.udf.LowerUdf";
方法二:
create function self_lower as ‘com.ibeifeng.hive.udf.LowerUdf‘ using jar ‘hdfs://life-hadoop.life.com:8020/user/yanglin/data/jars/my_lower.jar‘;
3.hiveserver2的使用
3.1 启动hiveserver2 bin/hiveserver2
3.2 使用beeline进行连接
!connect jdbc:hive2://life-hadoop.life.com:10000 yanglin life@one
4.数据压缩
4.1 map 输出结果的压缩
set mapreduce.map.output.compress =true
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
4.2 reduce 输出结果的压缩
set mapreduce.output.fileoutputformat.compress=true
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
4.3 map 输入数据的压缩
以压缩格式的文件存储数据(例如:orc,parquet)
create table if not exists hive.employee_orc_snappy (id int,name string,job string,manager_id int,apply_date string,salary double,
reward double,dept_id int)
row format delimited fields terminated by ‘\t‘
stored as orc tblproperties("orc.compress"="SNAPPY");
其中该表的数据存储格式为orc,文件压缩格式为snappy
5.hive调优
5.1 修改 hive.fetch.task.conversion参数,使尽可能少用mapreduce
<!--尽可能的少用mapreduce-->
<property>
<name>hive.fetch.task.conversion</name>
<value>more</value>
</property>
5.2 使用大表拆分为小表和子表
5.3 使用外部表分区表
5.4 对表的数据的存储格式使用orc和parquet,并使用snappy压缩
5.5 对sql进行优化
common join / shuffle join / reduce join : 连接发生在reduce task 阶段
使用于大表和大表之间,每个表中的数据都从文件中读取
map join : 连接发生在map task 阶段
使用于小表和大表之间,大表的数据从文件中读取,小表的数据通过distributedCache加载到内存中
注:可以通过设置 hive.auto.convert.join = true 让程序自动识别使用map join还是reduce join。
SMB join :sort-merge-bucket join 是对reduce join 的一种优化
在创建表时声明[CLUSTERED BY (col_name, col_name, ...) [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS],
且两个表的分区字段要一致。
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
5.6 设置job并行执行
set hive.exec.parallel = true
set hive.exec.parallel.thread.number = 8 建议10~20,一般不用超过20
5.7 设置jvm重用
set mapreduce.job.jvm.numtasks = 1 一般不用超过9
5.8 设置reduce的个数
set mapreduce.job.reduces = 1
5.9 设置推测执行
set hive.mapred.reduce.tasks.speculative.execution = true
set mapreduce.map.speculative = true
set mapreduce.reduce.speculative = true
5.10 设置map的个数
set hive.merge.size.per.task = 256000000