Hive学习之自定义聚合函数

Hive支持用户自定义聚合函数(UDAF),这种类型的函数提供了更加强大的数据处理功能。Hive支持两种类型的UDAF:简单型和通用型。正如名称所暗示的,简单型UDAF的实现非常简单,但由于使用了反射的原因会出现性能的损耗,并且不支持长度可变的参数列表等特征。而通用型UDAF虽然支持长度可变的参数等特征,但不像简单型那么容易编写。

这篇文章将学习编写UDAF的规则,比如需要实现哪些接口,继承哪些类,定义哪些方法等, 实现通用型UDAF需要编写两个类:解析器和计算器。解析器负责UDAF的参数检查,操作符的重载以及对于给定的一组参数类型查找正确的计算器。计算器实现实际UDAF的计算逻辑。通常解析器可以实现org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2接口,但建议继承org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver抽象类,该类实现了GenericUDAFResolver2接口。计算器需要继承org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator抽象类,并做为解析器的内部静态类实现。

解析器的类型检查确保用户传递正确的参数,比如UDAF的参数为Integer类型,那么用户传递Double就需要抛出异常。操作符重载则允许为不同类型的参数定义不同的UDAF逻辑。在编码之前,先了解一下AbstractGenericUDAFResolver类,该类有两个重载的方法public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfoinfo)和public GenericUDAFEvaluatorgetEvaluator(TypeInfo[]
info),其中前者不再建议使用,这样继承该类时只覆盖第二个方法即可。该方法的参数类型为TypeInfo[],返回值为GenericUDAFEvaluator,在该方法中完成参数的检查,不仅包括参数的数量还有参数的类型。TypeInfo位于包org.apache.hadoop.hive.serde2.typeinfo中,该类存储类型信息,Hive目前支持5种类型:基本类型(String,Number等)、List对象、Map对象、Struct对象和Union对象。该类的getCategory()方法返回类型信息的类别,具体为枚举类ObjectInspector.Category,该枚举类包含了对应上述5种类型的枚举常量,分别为:PRIMITIVE、LIST、MAP、STRUCT和UNION。getEvaluator(TypeInfo[]
info)的具体实现如下:

@Override
  public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
    throws SemanticException {
    if (parameters.length != 1) {
      throw new UDFArgumentTypeException(parameters.length - 1,
          "Exactly one argument is expected.");
    }
    ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);
    if (!ObjectInspectorUtils.compareSupported(oi)) {
      throw new UDFArgumentTypeException(parameters.length - 1,
          "Cannot support comparison of map<> type or complex type containing map<>.");
    }
    return new GenericUDAFMaxEvaluator();
  }

如果想实现操作符重载,需要创建与操作符数目相同的计算器内部类,比如有两个重载方法,那么需要创建两个计算器,然后根据输入参数的不同返回不同的计算器。

正如上面提到的计算器需要继承org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator抽象类,该类提供了几个需要被子类实现的抽象方法,这些方法建立了处理UDAF语义的过程。在具体学习如何编写计算器之前,先了解一下计算器的4种模式,这些模式由枚举类GenericUDAFEvaluator.Mode定义:

public static enum Mode {
    PARTIAL1,
    PARTIAL2,
    FINAL,
    COMPLETE
  };

PARTIAL1模式是从原始数据到部分聚合数据的过程,将调用方法iterate() 和terminatePartial()。PARTIAL2模式是从部分聚合数据到部分聚合数据的过程,将调用方法merge() 和terminatePartial()。FINAL模式是从部分聚合到全部聚合的过程,将调用merge()和 terminate()。最后一种模式为COMPLETE,该模式为从原始数据直接到全部聚合的过程,将调用merge() 和 terminate()。

在了解了计算器的模式后,具体看看计算器必须实现的方法。GenericUDAFEvaluator类提供了下面几个抽象方法:

  • getNewAggregationBuffer():用于返回存储临时聚合结果的 GenericUDAFEvaluator.AggregationBuffer对象。
  • reset(GenericUDAFEvaluator.AggregationBuffer agg):重置聚合,该方法在重用相同的聚合时很有用。
  • iterate(GenericUDAFEvaluator.AggregationBuffer agg,Object[] parameters):迭代parameters表示的原始数据并保存到agg中。
  • terminatePartial(GenericUDAFEvaluator.AggregationBuffer agg):以持久化的方式返回agg表示部分聚合结果,这里的持久化意味着返回值只能Java基础类型、数组、基础类型包装器、Hadoop的Writables、Lists和Maps。即使实现了java.io.Serializable,也不要使用自定义的类。
  • merge(GenericUDAFEvaluator.AggregationBuffer agg,Object partial):合并由partial表示的部分聚合结果到agg中。
  • terminate(GenericUDAFEvaluator.AggregationBuffer agg):返回由agg表示的最终结果。

除了上述抽象方法,GenericUDAFEvaluato还有一个虽然不是抽象方法但通常也需要覆盖的方法ObjectInspector 
init(GenericUDAFEvaluator.Mode m,ObjectInspector[] parameters),该方法用于初始化计算器,在不同的模式下第二参数的含义是不同的,比如m为PARTIAL1 和 COMPLETE时,第二个参数为原始数据,m为PARTIAL2 和 FINAL时,该参数仅为部分聚合数据(该数组总是只有一个元素)。在PARTIAL1和PARTIAL2模式下,ObjectInspector 用于terminatePartial方法的返回值,在FINAL和COMPLETE模式下ObjectInspector 用于terminate方法的返回值。

上述这些方法基本按照init、getNewAggregationBuffer、iterate、terminatePartial、merge、terminate的顺序调用。还有一点需要明确的是聚合计算必须在数据上是任意可分的。

可以参考Hive自带的聚合函数,比如求最大值的max函数,其计算器的源代码如下所示。在计算器中必须注意的是ObjectInspector及其子类的使用,该类表示特定的类型及如何在内存中存储该类型的数据,具体的使用方法可以参考API。

public static class GenericUDAFMaxEvaluator extends GenericUDAFEvaluator {
    private transient ObjectInspector inputOI;
    private transient ObjectInspector outputOI;

    @Override
    public ObjectInspector init(Mode m, ObjectInspector[] parameters)
        throws HiveException {
      assert (parameters.length == 1);
      super.init(m, parameters);
      inputOI = parameters[0];
      // Copy to Java object because that saves object creation time.
      // Note that on average the number of copies is log(N) so that's not
      // very important.
      outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI,
          ObjectInspectorCopyOption.JAVA);
      return outputOI;
    }
    /** class for storing the current max value */
    static class MaxAgg extends AbstractAggregationBuffer {
      Object o;
    }

    @Override
    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
      MaxAgg result = new MaxAgg();
      return result;
    }

    @Override
    public void reset(AggregationBuffer agg) throws HiveException {
      MaxAgg myagg = (MaxAgg) agg;
      myagg.o = null;
    }

    boolean warned = false;

    @Override
    public void iterate(AggregationBuffer agg, Object[] parameters)
        throws HiveException {
      assert (parameters.length == 1);
      merge(agg, parameters[0]);
    }
    @Override
public Object terminatePartial(AggregationBuffer agg)
throws HiveException {
      return terminate(agg);
    }
    @Override
    public void merge(AggregationBuffer agg, Object partial)
        throws HiveException {
      if (partial != null) {
        MaxAgg myagg = (MaxAgg) agg;
        int r = ObjectInspectorUtils.compare(myagg.o, outputOI, partial, inputOI);
        if (myagg.o == null || r < 0) {
          myagg.o = ObjectInspectorUtils.copyToStandardObject(partial, inputOI,ObjectInspectorCopyOption.JAVA);
        }
      }
    }
    @Override
    public Object terminate(AggregationBuffer agg) throws HiveException {
      MaxAgg myagg = (MaxAgg) agg;
      return myagg.o;
    }
  }
时间: 2024-10-14 04:07:48

Hive学习之自定义聚合函数的相关文章

Hive通用型自定义聚合函数(UDAF)

在使用hive进行数据处理时,经常会用到group by语法,但对分组的操作,hive没有mysql支持得好: group_concat([DISTINCT] 要连接的字段 [Order BY ASC/DESC 排序字段] [Separator '分隔符']) hive只有一个collect_set内置函数,返回去重后的元素数组,但我们可以通过编写UDAF,来实现想要的功能. 编写通用型UDAF需要两个类:解析器和计算器.解析器负责UDAF的参数检查,操作符的重载以及对于给定的一组参数类型来查找

SQL Server 自定义聚合函数

说明:本文依据网络转载整理而成,因为时间关系,其中原理暂时并未深入研究,只是整理备份留个记录而已. 目标:在SQL Server中自定义聚合函数,在Group BY语句中 ,不是单纯的SUM和MAX等运算,可以加入拼接字符串. 环境: 1:Sqlserver 2008 R2 2:Visual Studio 2013 第一部分: .net代码: using System; using System.Data; using Microsoft.SqlServer.Server; using Syst

pandas rolling对象的自定义聚合函数

pandas rolling对象的自定义聚合函数 计算标准差型的波动率剪刀差 利用自定义的聚合函数, 把它应用到pandas的滚动窗长对象上, 可以求出 标准差型的波动率剪刀差 代码 def volat_diff(roc1_rolling, center=-0.001, nSD=5): '''计算: 标准差型波动率剪刀差 参数: roc1_rolling: 滚动窗长里的roc1 center: roc1(1日波动率)的平均值 nSD: 求标准差时用的窗长 用法: 1. rolling.apply

sql server 2012 自定义聚合函数(MAX_O3_8HOUR_ND) 计算最大的臭氧8小时滑动平均值

采用c#开发dll,并添加到sql server 中. 具体代码,可以用visual studio的向导生成模板. using System; using System.Collections; using System.Data; using Microsoft.SqlServer.Server; using System.Data.SqlTypes; using System.IO; using System.Text; [Serializable] [Microsoft.SqlServer

SQL SERVER 2005允许自定义聚合函数-表中字符串分组连接

不多说了,说明后面是完整的代码,用来将字符串型的字段的各行的值拼成一个大字符串,也就是通常所说的Concat 例如有如下表dict  ID  NAME  CATEGORY  1 RED  COLOR   2 BLUE COLOR  3 APPLE  FRUIT  4 ORANGE FRUIT 执行SQL语句:select category,dbo.concatenate(name) as names from dict group by category. 得到结果表如下  category  

数据库学习笔记4---MySQL聚合函数、控制流程函数(含navicat软件的介绍)

[声明] 欢迎转载,但请保留文章原始出处→_→ 生命壹号:http://www.cnblogs.com/smyhvae/ 文章来源:http://www.cnblogs.com/smyhvae/p/4030506.html [正文] 一.navicat的引入:(第三方可视化的客户端,方便MySQL数据库的管理和维护) NavicatTM是一套快速.可靠并价格相宜的数据库管理工具,专为简化数据库的管理及降低系统管理成本而设.它的设计符合数据库管理员.开发人员及中小企业的需要.Navicat 是以直

hive grouping sets 等聚合函数

函数说明: grouping sets 在一个 group by 查询中,根据不同的维度组合进行聚合,等价于将不同维度的 group by 结果集进行 union allcube 根据 group by 的维度的所有组合进行聚合rollup 是 cube 的子集,以最左侧的维度为主,从该维度进行层级聚合. -- grouping sets select order_id, departure_date, count(*) as cnt from ord_test where order_id=4

Hive Sum MAX MIN聚合函数

数据准备cookie1,2015-04-10,1cookie1,2015-04-11,5cookie1,2015-04-12,7cookie1,2015-04-13,3cookie1,2015-04-14,2cookie1,2015-04-15,4cookie1,2015-04-16,4创建数据库及表create database if not exists cookie;use cookie;drop table if exists cookie1;create table cookie1(c

Oracle 自定义聚合函数

create or replace type str_concat_type as object ( cat_string varchar2(4000), static function ODCIAggregateInitialize(cs_ctx In Out str_concat_type) return number, member function ODCIAggregateIterate(self In Out str_concat_type,value in varchar2) re