(转)Hive自定义UDAF详解

UDAF有两种,第一种是比较简单的形式,利用抽象类UDAF和UDAFEvaluator,暂不做讨论。主要说一下第二种形式,利用接口GenericUDAFResolver2(或者抽象类AbstractGenericUDAFResolver)和抽象类GenericUDAFEvaluator。
        这里用AbstractGenericUDAFResolver做说明。

public abstract class AbstractGenericUDAFResolver implements GenericUDAFResolver2 {

  @SuppressWarnings("deprecation")
  @Override
  public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
    throws SemanticException {

    if (info.isAllColumns()) {
      throw new SemanticException(
          "The specified syntax for UDAF invocation is invalid.");
    }

    return getEvaluator(info.getParameters());
  }

  @Override
  public GenericUDAFEvaluator getEvaluator(TypeInfo[] info)
    throws SemanticException {
    throw new SemanticException(
          "This UDAF does not support the deprecated getEvaluator() method.");
  }
}

可以看到,该抽象类有两个方法,其中一个已经被弃用,所以只需要实现参数类型为TypeInfo的getEvaluator方法即可。

该方法其实相当于一个工厂,TypeInfo表示在使用时传入该UDAF的参数的类型。该方法主要做的工作有:

  • 检查参数长度和类型
  • 根据参数返回对应的实际处理对象

返回的对象类型为GenericUDAFEvaluator,这是一个抽象类:

public abstract class GenericUDAFEvaluator implements Closeable {

    ......

    public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
        // This function should be overriden in every sub class
        // And the sub class should call super.init(m, parameters) to get mode set.
        mode = m;
        return null;
    }

    public abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;

    public abstract void reset(AggregationBuffer agg) throws HiveException;

    public abstract void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;

    public abstract Object terminatePartial(AggregationBuffer agg) throws HiveException;

    public abstract void merge(AggregationBuffer agg, Object partial) throws HiveException;

    public abstract Object terminate(AggregationBuffer agg) throws HiveException;
    ......
}

说明上述方法的之前,需要提一个GenericUDAFEvaluator的内部枚举类Mode

public static enum Mode {
    /**
     * 相当于map阶段,调用iterate()和terminatePartial()
     */
    PARTIAL1,
    /**
     * 相当于combiner阶段,调用merge()和terminatePartial()
     */
    PARTIAL2,
    /**
     * 相当于reduce阶段调用merge()和terminate()
     */
    FINAL,
    /**
     * COMPLETE: 相当于没有reduce阶段map,调用iterate()和terminate()
     */
    COMPLETE
  };

可以看到,UDAF将任务分成了几种类型,PARTIAL1相当于MR程序的map阶段,负责迭代处理记录并返回该阶段的中间结果。PARTIAL2相当于Combiner,对map阶段的结果进行一次聚合。FINAL是reduce阶段,进行整体聚合以及返回最终结果。COMPLETE有点特殊,是一个没有reduce阶段的map过程,所以在进行记录迭代之后,直接返回最终结果。
        再来看GenericUDAFEvaluator中的各方法

public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {...}

初始化方法,在Mode的每一个阶段启动时会执行init方法。该方法有两个参数,第一个参数是Mode,可以根据此参数判断当前执行的是哪个阶段,进行该阶段相应的初始化工作。ObjectInspector是一个抽象的类型描述,例如:当参数类型是原生类型时,可以转化为PrimitiveObjectInspector,除此之外还有StructObjectInspector等等。ObjectInspector只是描述类型,并不存储实际数据。后面的具体例子中会有一些使用说明。

ObjectInspector[]的长度不是固定的,要看当前是处于哪个阶段。如果是PARTIAL1,那么与使用时传入该UDAF的参数个数一致;如果是FINAL阶段,长度就是1了,因为map阶段返回的结果只有一个对象。

public abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;

public abstract void reset(AggregationBuffer agg) throws HiveException;

AggregationBuffer是一个标识接口,没有任何需要实现的方法。实现该接口的类被用于暂存中间结果。reset是为了重置AggregationBuffer,但是在实际应用场景中没有发现单独调用该方法进行重置,有可能是聚合key的数据量还不够大,在后面会再说一下这个问题。

public abstract void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;

public abstract Object terminatePartial(AggregationBuffer agg) throws HiveException;

public abstract void merge(AggregationBuffer agg, Object partial) throws HiveException;

public abstract Object terminate(AggregationBuffer agg) throws HiveException;

iterate方法存在于MR的M阶段,用于处理每一条输入记录。Object[]作为输入传入UFAF,AggregationBuffer作为中间缓存暂存结果。需要注意的是,每次调用iterate传入的AggregationBuffer并不一定是同一个对象。Hive调用UDAF的时候会用一个Map来管理AggregationBuffer,Map的key即为需要聚合的key。就通过实际运行过程来看,在每一次iterate调用之前,会根据聚合key从Map中查找对应的AggregationBuffer,若能找到则直接返回AggregationBuffer对象,找不到则调用getNewAggregationBuffer方法新建并插入Map中并返回结果。

terminatePartial方法在iterate处理完所有输入后调用,用于返回初步的聚合结果。

merge方法存在于MR的R阶段(也同样存在于Combine阶段),用于最后的聚合。Object类型的partial参数与terminatePartial返回值一致,AggregationBuffer参数与上述一致。         terminate方法在merge方法执行完毕之后调用,用于进行最后的处理,并返回最后结果。

像上面提到的Mode一样,这些方法并不一定都会被调用,与Hive解析成的MR程序类型有关。例如解析后的MR程序只有M阶段,则只会调用iterate和terminate。实际使用过程中,由于聚合key数据量有限,内存可以承载,所以没有发现reset单独调用的情况。每次遇到一个不同的key,则新建一个AggregationBuffer,没有看源码,不知道当聚合key很大的时候,是否会调用reset进行对象重用。

转载地址:http://paddy-w.iteye.com/blog/2081409

时间: 2024-10-26 12:05:30

(转)Hive自定义UDAF详解的相关文章

Hadoop Hive sql语法详解

Hive 是基于Hadoop 构建的一套数据仓库分析系统,它提供了丰富的SQL查询方式来分析存储在Hadoop 分布式文件系统中的数据,可以将结构化的数据文件映射为一张数据库表,并提供完整的SQL查询功能,可以将SQL语句转换为MapReduce任务进行运行,通过自己的SQL 去查询分析需要的内容,这套SQL 简称Hive SQL,使不熟悉mapreduce 的用户很方便的利用SQL 语言查询,汇总,分析数据.而mapreduce开发人员可以把己写的mapper 和reducer 作为插件来支持

自定义View详解

自定义View详解 虽然之前也分析过View回执过程,但是如果让我自己集成ViewGroup然后自己重新onMeasure,onLayout,onDraw方法自定义View我还是会头疼.今天索性来系统的学习下. onMeasure /** * <p> * Measure the view and its content to determine the measured width and the * measured height. This method is invoked by {@l

(转)Excel自定义格式详解

”G/通用格式”:以常规的数字显示,相当于”分类”列表中的”常规”选项.例:代码:”G/通用格式”.10显示为10:10.1显示为10.1. 2. “#”:数字占位符.只显有意义的零而不显示无意义的零.小数点后数字如大于”#”的数量,则按”#”的位数四舍五入.例:代码:”###.##”,12.1显示为12.10;12.1263显示为:12.13 3.”0”:数字占位符.如果单元格的内容大于占位符,则显示实际数字,如果小于点位符的数量,则用0补足.例:代码:”00000”.1234567显示为12

Hive的配置详解和日常维护

Hive的配置详解和日常维护 一.Hive的参数配置详解 1>.mapred.reduce.tasks  默认为-1.指定Hive作业的reduce task个数,如果保留默认值,则Hive 自己决定应该使用多少个task. 2>.hive.mapred.mode  2.x下的默认值为strict,1.x以及之前的版本默认值为nonstrict.如果 设为strict,Hive将禁止一些危险的查询:分区表未用分区字段筛选: order by语句后未跟limit子句:join后没有on语句从而形

Hive的基本操作详解

一 Hive数据类型 1.1 基本数据类型 Hive数据类型 Java数据类型 长度 例子 TINYINT byte 1byte有符号整数 20 SMALINT short 2byte有符号整数 20 INT int 4byte有符号整数 20 BIGINT long 8byte有符号整数 20 BOOLEAN boolean 布尔类型,true或者false TRUE  FALSE FLOAT float 单精度浮点数 3.14159 DOUBLE double 双精度浮点数 3.14159

angular 自定义指令详解 Directive

在angular中,Directive,自定义指令的学习,可以更好的理解angular指令的原理,当angular的指令不能满足你的需求的时候,嘿嘿,你就可以来看看这篇文章,自定义自己的指令,可以满足你的各种需求的指令. 本篇文章的参考来自  AngularJS权威指南 , 文章中主要介绍指令定义的选项配置 废话不多说,下面就直接上代码 //angular指令的定义,myDirective ,使用驼峰命名法 angular.module('myApp', []) .directive('myDi

[Hive] - Hive参数含义详解

hive中参数分为三类,第一种system环境变量信息,是系统环境变量信息:第二种是env环境变量信息,是当前用户环境变量信息:第三种是hive参数变量信息,是由hive-site.xml文件定义的以及当前hive会话定义的环境变量信息.其中第三种hive参数变量信息中又由hadoop hdfs参数(直接是hadoop的).mapreduce参数.metastore元数据存储参数.metastore连接参数以及hive运行参数构成. Hive-0.13.1-cdh5.3.6参数变量信息详解 参数

AngularJS自定义指令详解(有分页插件代码)

前言 除了 AngularJS 内置的指令外,我们还可以创建自定义指令. 通过 .directive() 函数来添加自定义的指令. 调用自定义指令时,需要在HTMl 元素上添加自定义指令名. 自定义指令命名规则:使用驼峰命名法来命名,即除第一个单词外的首字母需大写.如: myDirective. 在html页面调用该指令时需要以 - 分割,如: my-directive.示例代码: <body ng-app="myApp"> <my-directive><

Android Gradle 自定义Task 详解

转载请标明出处:http://blog.csdn.net/zhaoyanjun6/article/details/76408024 本文出自[赵彦军的博客] 一:Gradle 是什么 Gradle是一个基于Apache Ant和Apache Maven概念的项目自动化构建工具. 它使用一种基于Groovy的特定领域语言(DSL)来声明项目设置,抛弃了基于XML的各种繁琐配置.面向Java应用为主. 当前其支持的语言限于Java.Groovy.Kotlin和Scala,计划未来将支持更多的语言.基