Hive自定义函数(UDF、UDAF)

当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数。

UDF

用户自定义函数(user defined function)–针对单条记录。

创建函数流程

1、自定义一个Java类

2、继承UDF类

3、重写evaluate方法

4、打成jar包

6、在hive执行add jar方法

7、在hive执行创建模板函数

8、hql中使用

Demo01:

自定义一个Java类

package UDFDemo;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class UDFTest extends UDF{

    public boolean evaluate(){
        return true;
    }

    public boolean evaluate(int b){
        //int b=Integer.parseInt(a);
        if(b<0){
            return false;
        }
        if(b%2==0){
            return true;
        }else {
            return false;
        }

    }

    public boolean evaluate(String a){
        int b=Integer.parseInt(a);

        if(b<0){
            return false;
        }
        if(b%2==0){
            return true;
        }else {
            return false;
        }

    }

    public boolean evaluate(Text a){
        int b=Integer.parseInt(a.toString());

        if(b<0){
            return false;
        }
        if(b%2==0){
            return true;
        }else {
            return false;
        }

    }
    public boolean evaluate(Text t1,Text t2){
    //public boolean evaluate(String t1, String t2){
         if(t1==null || t2 ==null){
             return false;
         }

         double d1 = Double.parseDouble(t1.toString());
         double d2 = Double.parseDouble(t2.toString());
        /* double d1 = Double.parseDouble(t1);
         double d2 = Double.parseDouble(t2);*/
         if(d1>d2){
             return true;
         }else{
             return false;
         }
    }

    public boolean evaluate(String t1, String t2){
         if(t1==null || t2 ==null){
             return false;
         }

         double d1 = Double.parseDouble(t1);
         double d2 = Double.parseDouble(t2);
         if(d1>d2){
             return true;
         }else{
             return false;
         }
    }
}

打成jar包UDFTest.jar

在hive执行add jar方法

在hive创建一个bigthan的函数,引入的类是UDF.UDFTest

add jar /liguodong/UDFTest.jar;
create temporary function bigthan as ‘UDFDemo.UDFTest‘;

select no,num,bigthan(no,num) from testudf;

UDAF

UDAF(user defined aggregation function)用户自定义聚合函数,针对记录集合

开发UDAF通用有两个步骤

第一个是编写resolver类,resolver负责类型检查,操作符重载。

第二个是编写evaluator类,evaluator真正实现UDAF的逻辑

通常来说,顶层UDAF类继承

org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator里面编写嵌套类evaluator实现UDAF的逻辑。

一、实现resolver

resolver通常继承

org.apache.hadoop.hive.ql.udf.GenericUDAFResolver2,但是更建议继承AbstractGenericUDAFResolver,隔离将来hive接口的变化。

GenericUDAFResolver和GenericUDAFResolver2接口的区别是后面的允许evaluator实现可以访问更多的信息,例如DISTINCT限定符,通配符FUNCTION(*)

二、实现evaluator

所有eva1uators必须继承抽象类

org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator。予类必须实现它的一些抽象方法,实现UDAF的逻辑。

Mode

这个类比较重要,它表示了udaf在mapreduce的各个阶段,理解Mode的含义,就理解了hive的UDAF的运行流程。

public static enum Mode{
    PARTIAL1,
    PARTIAL2,
    FINAL,
    COMPLETE
};

PARTIAL1:这个是mapreduce的map阶段:从原始数据到部分数据聚合,将会调用iterate()terminatePartial()

PARTIAL2:这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据;从部分数据聚合到部分数据聚合,将会调用merge()terminatePartial()

FINAL:mapreduce的reduce阶段:从部分数据的聚合到完全聚合,将会调用merge()terminate()

COMPLETE:如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了;从原始数据直接到完全聚合,将会调用iterate()terminate()

流程–无Combiner

流程–有Combiner

mapreduce阶段调用函数

MAP

init()

iterate()

terminatePartial()

Combiner

merge()

terminatePartial()

REDUCE

init()

merge()

terminate()

查看源码路径

apache-hive-1.2.1-src\ql\src\java\org\apache\hadoop\hive\ql\udf\generic

例如:关于count函数的源码

package org.apache.hadoop.hive.ql.udf.generic;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.LongWritable;

/**
 * This class implements the COUNT aggregation function as in SQL.
 */
@Description(name = "count",
    value = "_FUNC_(*) - Returns the total number of retrieved rows, including "
          +        "rows containing NULL values.\n"

          + "_FUNC_(expr) - Returns the number of rows for which the supplied "
          +        "expression is non-NULL.\n"

          + "_FUNC_(DISTINCT expr[, expr...]) - Returns the number of rows for "
          +        "which the supplied expression(s) are unique and non-NULL.")
public class GenericUDAFCount implements GenericUDAFResolver2 {

  private static final Log LOG = LogFactory.getLog(GenericUDAFCount.class.getName());

  @Override
  public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
      throws SemanticException {
    // This method implementation is preserved for backward compatibility.
    return new GenericUDAFCountEvaluator();
  }

  @Override
  public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo paramInfo)
  throws SemanticException {

    TypeInfo[] parameters = paramInfo.getParameters();

    if (parameters.length == 0) {
      if (!paramInfo.isAllColumns()) {
        throw new UDFArgumentException("Argument expected");
      }
      assert !paramInfo.isDistinct() : "DISTINCT not supported with *";
    } else {
      if (parameters.length > 1 && !paramInfo.isDistinct()) {
        throw new UDFArgumentException("DISTINCT keyword must be specified");
      }
      assert !paramInfo.isAllColumns() : "* not supported in expression list";
    }

    return new GenericUDAFCountEvaluator().setCountAllColumns(
        paramInfo.isAllColumns());
  }

  /**
   * GenericUDAFCountEvaluator.
   *
   */
  public static class GenericUDAFCountEvaluator extends GenericUDAFEvaluator {
    private boolean countAllColumns = false;
    private LongObjectInspector partialCountAggOI;
    private LongWritable result;

    @Override
    public ObjectInspector init(Mode m, ObjectInspector[] parameters)
    throws HiveException {
      super.init(m, parameters);
      if (mode == Mode.PARTIAL2 || mode == Mode.FINAL) {
        partialCountAggOI = (LongObjectInspector)parameters[0];
      }
      result = new LongWritable(0);
      return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
    }

    private GenericUDAFCountEvaluator setCountAllColumns(boolean countAllCols) {
      countAllColumns = countAllCols;
      return this;
    }

    /** class for storing count value. */
    @AggregationType(estimable = true)
    static class CountAgg extends AbstractAggregationBuffer {
      long value;
      @Override
      public int estimate() { return JavaDataModel.PRIMITIVES2; }
    }

    @Override
    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
      CountAgg buffer = new CountAgg();
      reset(buffer);
      return buffer;
    }

    @Override
    public void reset(AggregationBuffer agg) throws HiveException {
      ((CountAgg) agg).value = 0;
    }

    @Override
    public void iterate(AggregationBuffer agg, Object[] parameters)
      throws HiveException {
      // parameters == null means the input table/split is empty
      if (parameters == null) {
        return;
      }
      if (countAllColumns) {
        assert parameters.length == 0;
        ((CountAgg) agg).value++;
      } else {
        boolean countThisRow = true;
        for (Object nextParam : parameters) {
          if (nextParam == null) {
            countThisRow = false;
            break;
          }
        }
        if (countThisRow) {
          ((CountAgg) agg).value++;
        }
      }
    }

    @Override
    public void merge(AggregationBuffer agg, Object partial)
      throws HiveException {
      if (partial != null) {
        long p = partialCountAggOI.get(partial);
        ((CountAgg) agg).value += p;
      }
    }

    @Override
    public Object terminate(AggregationBuffer agg) throws HiveException {
      result.set(((CountAgg) agg).value);
      return result;
    }

    @Override
    public Object terminatePartial(AggregationBuffer agg) throws HiveException {
      return terminate(agg);
    }
  }
}

Demo02:

执行过程与UDF类似,该Java、类的功能是第一列的值大于第二列计数加1。

package UDAFDemo;

import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.LongWritable;

public class UDAFTest extends AbstractGenericUDAFResolver{
    //判断
    @Override
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] info)//字段的描述信息参数parameters
            throws SemanticException {
        if(info.length !=2){
            throw new UDFArgumentTypeException(info.length-1,
                    "Exactly two argument is expected.");
        }   

        //返回处理逻辑的类
        return new GenericEvaluate();
    }

    public static class GenericEvaluate extends GenericUDAFEvaluator{

        private LongWritable result;
        private PrimitiveObjectInspector inputIO1;
        private PrimitiveObjectInspector inputIO2;

        //这个方法map与reduce阶段都需要执行
        /**
         * map阶段:parameters长度与udaf输入的参数个数有关
         * reduce阶段:parameters长度为1
         */
        //初始化
        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters)
                throws HiveException {
            super.init(m, parameters);

            //返回最终的结果
            result = new LongWritable(0);

            inputIO1 = (PrimitiveObjectInspector) parameters[0];
            if (parameters.length>1) {
                inputIO2 = (PrimitiveObjectInspector) parameters[1];
            }

            return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
        }

        //map阶段
        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters)//agg缓存结果值
                throws HiveException {

            assert(parameters.length==2);

            if(parameters==null || parameters[0]==null ||  parameters[1]==null){
                return;
            }

            double base = PrimitiveObjectInspectorUtils.getDouble(parameters[0], inputIO1);
            double tmp = PrimitiveObjectInspectorUtils.getDouble(parameters[1], inputIO2);

            if(base > tmp){
                ((CountAgg)agg).count++;
            }

        }

        //获得一个聚合的缓冲对象,每个map执行一次
        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {

            CountAgg agg = new CountAgg();

            reset(agg);

            return agg;
        }

        //自定义类用于计数
        public static class CountAgg implements AggregationBuffer{
            long count;//计数,保存每次临时的结果
        }

        //重置
        @Override
        public void reset(AggregationBuffer countagg) throws HiveException {
            CountAgg agg = (CountAgg)countagg;
            agg.count=0;
        }

        //该方法当做iterate执行后,部分结果返回。
        @Override
        public Object terminatePartial(AggregationBuffer agg)
                throws HiveException {

            result.set(((CountAgg)agg).count);

            return result;
        }

        @Override
        public void merge(AggregationBuffer agg, Object partial)
                throws HiveException {
            if(partial != null){
                long p = PrimitiveObjectInspectorUtils.getLong(partial, inputIO1);
                ((CountAgg)agg).count += p;
            }
        }

        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            result.set(((CountAgg)agg).count);
            return result;
        }
    }
}

永久函数

方式1、如果希望在hive中自定义一个函数,且能永久使用,

则修改源码添加相应的函数类,然后在修改ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java类,添加相应的注册函数代码registerUDF("parse_url",UDFParseUrl.class,false);

方式2、hive -i ‘file’

方式3、新建hiverc文件

1、jar包放到安装日录下或者指定目录下

2、${HIVE_HOME}/bin目录下有个.hiverc文件,它是隐藏文件。

3、把初始化语句加载到文件中

vi .hiverc
add jar /liguodong/UDFTest.jar;
create temporary function bigthan as ‘UDFDemo.UDFTest‘;

然后打开hive时,它会自动执行.hiverc文件。

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-12-29 23:55:57

Hive自定义函数(UDF、UDAF)的相关文章

Hive 自定义函数 UDF UDAF UDTF

UDF:用户定义(普通)函数,只对单行数值产生作用: 继承UDF类,添加方法 evaluate() /** * @function 自定义UDF统计最小值 * @author John * */ public class Min extends UDF { public Double evaluate(Double a, Double b) { if (a == null) a = 0.0; if (b == null) b = 0.0; if (a >= b) { return b; } el

hive自定义函数UDF UDTF UDAF

Hive 自定义函数 UDF UDTF UDAF 1.UDF:用户定义(普通)函数,只对单行数值产生作用: UDF只能实现一进一出的操作. 定义udf 计算两个数最小值 public class Min extends UDF { public Double evaluate(Double a, Double b) { if (a == null) a = 0.0; if (b == null) b = 0.0; if (a >= b) { return b; } else { return a

[Hive]Hive自定义函数UDF

当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数 用户自定义函数(user defined function),针对单条记录. 编写一个UDF,需要继承UDF类,并实现evaluate()函数.在查询执行过程中,查询中对应的每个应用到这个函数的地方都会对这个类进行实例化.对于每行输入都会调用到evaluate()函数.而evaluate()函数处理的值会返回给Hive.同时用户是可以重载evaluate方法的.Hive会像Java的方法重载一样,自动选择匹配的

Hive自定义函数UDF示例

简单自定义函数只需继承UDF类,然后重构evaluate函数即可 LowerCase.java: package com.example.hiveudf; import org.apache.hadoop.hive.ql.exec.UDF; public final class LowerCase extends UDF { public String evaluate(final String s) { if (s == null) { return null; } return new St

Hive自定义函数UDF和UDTF

UDF(user defined functions) 用于处理单行数据,并生成单个数据行. PS: l 一个普通UDF必须继承自"org.apache.hadoop.hive.ql.exec.UDF".l 一个普通UDF必须至少实现一个evaluate()方法,evaluate函数支持重载. 主要步骤如下: 步骤1 把以上程序打包成AddDoublesUDF.jar,并上传到HDFS指定目录下(如" /user/hive_examples_jars/" )且创建函

Hive自定义函数UDAF开发

Hive支持自定义函数,UDAF是接受多行,输出一行. 通常是group by时用到这种函数. 其实最好的学习资料就是官方自带的examples了. 我这里用的是0.10版本hive,所以对于的examples在 https://github.com/apache/hive/tree/branch-0.10/contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example 我这里的功能需求是: actionCount(act_code,ac

hive自定义函数(UDF)

首先什么是UDF,UDF的全称为user-defined function,用户定义函数,为什么有它的存在呢?有的时候 你要写的查询无法轻松地使用Hive提供的内置函数来表示,通过写UDF,Hive就可以方便地插入用户写的处理代码并在查询中使用它们,相当于在HQL(Hive SQL)中自定义一些函数,首先UDF必须用java语言编写,Hive本身就是用java写的. 编写UDF需要下面两个步骤: 1.继承org.apache.hadoop.hive.ql.UDF 2.实现evaluate函数,这

Hive自定义函数的学习笔记(1)

前言: hive本身提供了丰富的函数集, 有普通函数(求平方sqrt), 聚合函数(求和sum), 以及表生成函数(explode, json_tuple)等等. 但不是所有的业务需求都能涉及和覆盖到, 因此hive提供了自定义函数的接口, 方便用户扩展. 自己好像很久没接触hadoop了, 也很久没博客了, 今天趁这个短期的项目, 对hive中涉及的自定义函数做个笔记. 准备: 编写hive自定义函数前, 需要了解下当前线上hive的版本. hive --vesion 比如作者使用到的hive

T-SQL: 17 个与日期时间相关的自定义函数(UDF),周日作为周的最后一天,均不受 @@DateFirst、语言版本影响!

原文:T-SQL: 17 个与日期时间相关的自定义函数(UDF),周日作为周的最后一天,均不受 @@DateFirst.语言版本影响! CSDN 的 Blog 太滥了!无时不刻地在坏! 开始抢救性搬家 ... ... 到这里重建家园 /* T-SQL: 17 个与日期时间相关的自定义函数(UDF),周日作为周的最后一天,均不受 @@DateFirst.语言版本影响 都是从老文章里收集或提炼出来的! 提示: (@@Datefirst + datepart(weekday,@Date)) % 7 判