hive------ Group by、join、distinct等实现原理

1. Hive 的 distribute by

Order by 能够预期产生完全排序的结果,但是它是通过只用一个reduce来做到这点的。所以对于大规模的数据集它的效率非常低。在很多情况下,并不需要全局排序,此时可以换成Hive的非标准扩展sort by。Sort by为每个reducer产生一个排序文件。在有些情况下,你需要控制某个特定行应该到哪个reducer,通常是为了进行后续的聚集操作。Hive的distribute by 子句可以做这件事。

// 根据年份和气温对气象数据进行排序,以确保所有具有相同年份的行最终都在一个reducer分区中

from record2

select year, temperature

distribute by year

sort by year asc, temperature desc;

2. Distinct 的实现

准备数据

语句

SELECT COUNT, COUNT(DISTINCT uid) FROM logs GROUP BY COUNT;
hive> SELECT * FROM logs;
OK
a	苹果	3
a	橙子	3
a	烧鸡	1
b	烧鸡	3

hive> SELECT COUNT, COUNT(DISTINCT uid) FROM logs GROUP BY COUNT;

根据count分组,计算独立用户数。

计算过程

1. 第一步先在mapper计算部分值,会以count和uid作为key,如果是distinct并且之前已经出现过,则忽略这条计算。第一步是以组合为key,第二步是以count为key.
2. ReduceSink是在mapper.close()时才执行的,在GroupByOperator.close()时,把结果输出。注意这里虽然key是count和uid,但是在reduce时分区是按count来的!
3. 第一步的distinct计算的值没用,要留到reduce计算的才准确。这里只是减少了key组合相同的行。不过如果是普通的count,后面是会合并起来的。
4. distinct通过比较lastInvoke判断要不要+1(因为在reduce是排序过了的,所以判断distict的字段变了没有,如果没变,则不+1)

Operator

Explain

hive> explain select count, count(distinct uid) from logs group by count;
OK
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME logs))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL count)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL uid)))) (TOK_GROUPBY (TOK_TABLE_OR_COL count))))

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        logs
          TableScan //表扫描
            alias: logs
            Select Operator//列裁剪,取出uid,count字段就够了
              expressions:
                    expr: count
                    type: int
                    expr: uid
                    type: string
              outputColumnNames: count, uid
              Group By Operator //先来map聚集
                aggregations:
                      expr: count(DISTINCT uid) //聚集表达式
                bucketGroup: false
                keys:
                      expr: count
                      type: int
                      expr: uid
                      type: string
                mode: hash //hash方式
                outputColumnNames: _col0, _col1, _col2
                Reduce Output Operator
                  key expressions: //输出的键
                        expr: _col0 //count
                        type: int
                        expr: _col1 //uid
                        type: string
                  sort order: ++
                  Map-reduce partition columns: //这里是按group by的字段分区的
                        expr: _col0 //这里表示count
                        type: int
                  tag: -1
                  value expressions:
                        expr: _col2
                        type: bigint
      Reduce Operator Tree:
        Group By Operator //第二次聚集
          aggregations:
                expr: count(DISTINCT KEY._col1:0._col0) //uid:count
          bucketGroup: false
          keys:
                expr: KEY._col0 //count
                type: int
          mode: mergepartial //合并
          outputColumnNames: _col0, _col1
          Select Operator //列裁剪
            expressions:
                  expr: _col0
                  type: int
                  expr: _col1
                  type: bigint
            outputColumnNames: _col0, _col1
            File Output Operator //输出结果到文件
              compressed: false
              GlobalTableId: 0
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

  Stage: Stage-0
    Fetch Operator
      limit: -1

3.Group By 的实现
数据准备SELECT uid, SUM(COUNT) FROM logs GROUP BY uid;
hive> SELECT * FROM logs;
a	苹果	5
a	橙子	3
a      苹果   2
b	烧鸡	1

hive> SELECT uid, SUM(COUNT) FROM logs GROUP BY uid;
a	10
b	1

计算过程


默认设置了hive.map.aggr=true,所以会在mapper端先group by一次,最后再把结果merge起来,为了减少reducer处理的数据量。注意看explain的mode是不一样的。mapper是hash,reducer是mergepartial。如果把hive.map.aggr=false,那将groupby放到reducer才做,他的mode是complete.

Operator

Explain

hive> explain SELECT uid, sum(count) FROM logs group by uid;
OK
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME logs))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL uid)) (TOK_SELEXPR (TOK_FUNCTION sum (TOK_TABLE_OR_COL count)))) (TOK_GROUPBY (TOK_TABLE_OR_COL uid))))

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        logs
          TableScan // 扫描表
            alias: logs
            Select Operator //选择字段
              expressions:
                    expr: uid
                    type: string
                    expr: count
                    type: int
              outputColumnNames: uid, count
              Group By Operator //这里是因为默认设置了hive.map.aggr=true,会在mapper先做一次聚合,减少reduce需要处理的数据
                aggregations:
                      expr: sum(count) //聚集函数
                bucketGroup: false
                keys: //键
                      expr: uid
                      type: string
                mode: hash //hash方式,processHashAggr()
                outputColumnNames: _col0, _col1
                Reduce Output Operator //输出key,value给reducer
                  key expressions:
                        expr: _col0
                        type: string
                  sort order: +
                  Map-reduce partition columns:
                        expr: _col0
                        type: string
                  tag: -1
                  value expressions:
                        expr: _col1
                        type: bigint
      Reduce Operator Tree:
        Group By Operator

          aggregations:
                expr: sum(VALUE._col0)
//聚合
          bucketGroup: false
          keys:
                expr: KEY._col0
                type: string
          mode: mergepartial //合并值
          outputColumnNames: _col0, _col1
          Select Operator //选择字段
            expressions:
                  expr: _col0
                  type: string
                  expr: _col1
                  type: bigint
            outputColumnNames: _col0, _col1
            File Output Operator //输出到文件
              compressed: false
              GlobalTableId: 0
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

  Stage: Stage-0
    Fetch Operator
      limit: -1
4. join原理 

准备数据

语句
SELECT a.uid,a.name,b.age FROM logs a JOIN users b ON (a.uid=b.uid);
我们希望的结果是把users表join进来获取age字段。

hive> SELECT * FROM logs;
OK
a	苹果	5
a	橙子	3
b	烧鸡	1

hive> SELECT * FROM users;
OK
a	23
b	21

hive> SELECT a.uid,a.name,b.age FROM logs a JOIN users b ON (a.uid=b.uid);
a	苹果	23
a	橙子	23
b	烧鸡	21

计算过程

  1. key这里后面的数字是tag,后面在reduce阶段用来区分来自于那个表的数据。tag是附属在key后面的。那为什么会把a(0)和a(1)汇集在一起了呢,是因为对先对a求了hashcode,设在了HiveKey上,所以同一个key还是在一起的。
  2. Map阶段只是拆分key和value。
  3. reduce阶段主要看它是如何把它合并起来了,从图上可以直观的看到,其实就是把tag=1的内容,都加到tag=0的后面,就是这么简单。
  4. 代码实现上,就是先临时用个变量把值存储起来在storage里面, storage(0) = [{a, 苹果}, {a, 橙子}] storage(1) = [{23}],当key变化(如a变为b)或全部结束时,会调用endGroup()方法,把内容合并起来。变成[{a,苹果,23}, {a, 橙子,23}]

Operator

Explain

hive> explain SELECT a.uid,a.name,b.age FROM logs a JOIN users b ON (a.uid=b.uid);
OK

//语法树
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME logs) a) (TOK_TABREF (TOK_TABNAME users) b) (= (. (TOK_TABLE_OR_COL a) uid) (. (TOK_TABLE_OR_COL b) uid)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) uid)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) name)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL b) age)))))

//阶段
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree: //mapper阶段
        a
          TableScan //扫描表, 就只是一行一行的传递下去而已
            alias: a
            Reduce Output Operator //输出给reduce的内容
              key expressions: // key啦,这里的key是uid,就是我们写在ON子句那个,你可以试试加多几个条件
                    expr: uid
                    type: string
              sort order: + //排序
              Map-reduce partition columns://分区字段,貌似是和key一样的
                    expr: uid
                    type: string
              tag: 0 //用来区分这个key是来自哪个表的
              value expressions: //reduce用到的value字段
                    expr: uid
                    type: string
                    expr: name
                    type: string
        b
          TableScan //扫描表, 就只是一行一行的传递下去而已
            alias: b
            Reduce Output Operator //输出给reduce的内容
              key expressions: //key
                    expr: uid
                    type: string
              sort order: +
              Map-reduce partition columns: //分区字段
                    expr: uid
                    type: string
              tag: 1 //用来区分这个key是来自哪个表的
              value expressions: //值
                    expr: age
                    type: int
      Reduce Operator Tree: // reduce阶段
        Join Operator // JOIN的Operator
          condition map:
               Inner Join 0 to 1 // 内连接0和1表
          condition expressions: // 第0个表有两个字段,分别是uid和name, 第1个表有一个字段age
 {VALUE._col0} {VALUE._col1}
 {VALUE._col1}
          handleSkewJoin: false //是否处理倾斜join,如果是,会分为两个MR任务
          outputColumnNames: _col0, _col1, _col6 //输出字段
          Select Operator //列裁剪(我们sql写的select字段)
            expressions:
                  expr: _col0
                  type: string
                  expr: _col1
                  type: string
                  expr: _col6
                  type: int
            outputColumnNames: _col0, _col1, _col2
            File Output Operator //把结果输出到文件
              compressed: false
              GlobalTableId: 0
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

  Stage: Stage-0
    Fetch Operator
      limit: -1

可以看到里面都是一个个Operator顺序的执行下来

 
时间: 2024-08-25 12:19:06

hive------ Group by、join、distinct等实现原理的相关文章

mysql ORDER BY,GROUP BY 和DISTINCT原理

前言 除了常规的Join语句之外,还有一类Query语句也是使用比较频繁的,那就是ORDERBY,GROUP BY以及DISTINCT这三类查询.考虑到这三类查询都涉及到数据的排序等操作,所以我将他们放在了一起,下面就针对这三类Query语句做基本的分析. ORDER BY 的实现与优化 在MySQL中,ORDERBY的实现有如下两种类型: 一种是通过有序索引而直接取得有序的数据,这样不用进行任何排序操作即可得到满足客户端要求的有序数据返回给客户端: 另外一种则需要通过MySQL的排序算法将存储

Group by 和distinct对比

** Group by 和distinct对比** CREATE TABLE sbtest1 (id int(11) NOT NULL AUTO_INCREMENT,k int(11) NOT NULL DEFAULT '0',c char(120) NOT NULL DEFAULT '',pad char(60) NOT NULL DEFAULT '',PRIMARY KEY (id),KEY k_1 (k)) ENGINE=InnoDB AUTO_INCREMENT=10000001 DEF

hive regex insert join group cli

1.insert Insert时,from子句既能够放在select子句后,也能够放在insert子句前,以下两句是等价的 hive> FROM invites a INSERT OVERWRITE TABLE eventsSELECT a.bar, count(*) WHERE a.foo > 0 GROUP BY a.bar; hive> INSERT OVERWRITE TABLE events SELECTa.bar, count(*) FROM invites a WHERE

hive 配置文件以及join中null值的处理

一.Hive的参数设置 1.  三种设定方式:配置文件 ·   用户自定义配置文件:$HIVE_CONF_DIR/hive-site.xml ·   默认配置文件:$HIVE_CONF_DIR/hive-default.xml 用户自定义配置会覆盖默认配置.另外,Hive也会读入Hadoop的配置,因为Hive是作为Hadoop的客户端启动的,Hadoop的配置文件包括 ·   $HADOOP_CONF_DIR/hive-site.xml ·   $HADOOP_CONF_DIR/hive-de

Java Thread.join的作用和原理

很多人对Thread.join的作用以及实现了解得很少,毕竟这个api我们很少使用.这篇文章仍然会结合使用及原理进行深度分析 内容导航 Thread.join的作用 Thread.join的实现原理 什么时候会使用Thread.join Thread.join的作用 之前有人问过我一个这样的面试题 Java中如何让多线程按照自己指定的顺序执行? 这个问题最简单的回答是通过Thread.join来实现,久而久之就让很多人误以为Thread.join是用来保证线程的顺序性的. 下面这段代码演示了Th

hive概念、架构、部署及原理介绍

转:https://www.aboutyun.com/thread-21544-1-1.html 问题导读: 1.Hive 是什么? 2.Hive 架构分哪几部分? 3.Hive 文件格式是怎样的? 一.Hive是什么? Hive 是基于 Hadoop 构建的一套数据仓库分析系统,它提供了丰富的 SQL 查询方式来分析存储在 Hadoop 分布式文件系统中的数据, 可以将结构化的数据文件映射为一张数据库表,并提供完整的 SQL 查询功能,可以将 SQL 语句转换为 MapReduce 任务进行运

Hive 的 map join

学习自 http://blog.csdn.net/xqy1522/article/details/6699740 1. Map Join 的使用场景: 关联操作中有一张表非常小 不等值的链接操作 2. 语法: 使用 hint 的方式指定join时使用mapjoin. select /*+ mapjoin(c)*/ -- hint c.tag,b.yemaozi_pre from (select row_number() over(partition by 1 order by yemaozi_p

lambda Group by Join

1.Group By 2.Join

SQL join中级篇--hive中 mapreduce join方法分析

1. 概述. 本文主要介绍了mapreduce框架上如何实现两表JOIN. 2. 常见的join方法介绍 假设要进行join的数据分别来自File1和File2. 2.1 reduce side join reduce side join是一种最简单的join方式,其主要思想如下: 在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签 (tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2.