Spark(十八)SparkSQL的自定义函数UDF

在Spark中,也支持Hive中的自定义函数。自定义函数大致可以分为三种:

  • UDF(User-Defined-Function),即最基本的自定义函数,类似to_char,to_date等
  • UDAF(User- Defined Aggregation Funcation),用户自定义聚合函数,类似在group by之后使用的sum,avg等
  • UDTF(User-Defined Table-Generating Functions),用户自定义生成函数,有点像stream里面的flatMap

自定义一个UDF函数需要继承UserDefinedAggregateFunction类,并实现其中的8个方法

示例

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}

object GetDistinctCityUDF extends UserDefinedAggregateFunction{
  /**
    * 输入的数据类型
    * */
  override def inputSchema: StructType = StructType(
    StructField("status",StringType,true) :: Nil
  )
  /**
    * 缓存字段类型
    * */
  override def bufferSchema: StructType = {
    StructType(
      Array(
        StructField("buffer_city_info",StringType,true)
      )
    )
  }
/**
  * 输出结果类型
  * */
  override def dataType: DataType = StringType
/**
  * 输入类型和输出类型是否一致
  * */
  override def deterministic: Boolean = true
/**
  * 对辅助字段进行初始化
  * */
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer.update(0,"")
  }
/**
  *修改辅助字段的值
  * */
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    //获取最后一次的值
    var last_str = buffer.getString(0)
    //获取当前的值
    val current_str = input.getString(0)
    //判断最后一次的值是否包含当前的值
    if(!last_str.contains(current_str)){
      //判断是否是第一个值,是的话走if赋值,不是的话走else追加
      if(last_str.equals("")){
        last_str = current_str
      }else{
        last_str += "," + current_str
      }
    }
    buffer.update(0,last_str)

  }
/**
  *对分区结果进行合并
  * buffer1是机器hadoop1上的结果
  * buffer2是机器Hadoop2上的结果
  * */
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    var buf1 = buffer1.getString(0)
    val buf2 = buffer2.getString(0)
    //将buf2里面存在的数据而buf1里面没有的数据追加到buf1
    //buf2的数据按照,进行切分
    for(s <- buf2.split(",")){
      if(!buf1.contains(s)){
        if(buf1.equals("")){
          buf1 = s
        }else{
          buf1 += s
        }
      }
    }
    buffer1.update(0,buf1)
  }
/**
  * 最终的计算结果
  * */
  override def evaluate(buffer: Row): Any = {
    buffer.getString(0)
  }
}

注册自定义的UDF函数为临时函数

def main(args: Array[String]): Unit = {
    /**
      * 第一步 创建程序入口
      */
    val conf = new SparkConf().setAppName("AralHotProductSpark")
    val sc = new SparkContext(conf)
    val hiveContext = new HiveContext(sc)  //注册成为临时函数
    hiveContext.udf.register("get_distinct_city",GetDistinctCityUDF)
  //注册成为临时函数
    hiveContext.udf.register("get_product_status",(str:String) =>{
      var status = 0
      for(s <- str.split(",")){
        if(s.contains("product_status")){
          status = s.split(":")(1).toInt
        }
      }
    })
}

原文地址:https://www.cnblogs.com/frankdeng/p/9301783.html

时间: 2024-09-29 09:18:15

Spark(十八)SparkSQL的自定义函数UDF的相关文章

Spark学习之路 (十九)SparkSQL的自定义函数UDF

讨论QQ:1586558083 在Spark中,也支持Hive中的自定义函数.自定义函数大致可以分为三种: UDF(User-Defined-Function),即最基本的自定义函数,类似to_char,to_date等 UDAF(User- Defined Aggregation Funcation),用户自定义聚合函数,类似在group by之后使用的sum,avg等 UDTF(User-Defined Table-Generating Functions),用户自定义生成函数,有点像str

Spark学习之路 (十九)SparkSQL的自定义函数UDF[转]

在Spark中,也支持Hive中的自定义函数.自定义函数大致可以分为三种: UDF(User-Defined-Function),即最基本的自定义函数,类似to_char,to_date等 UDAF(User- Defined Aggregation Funcation),用户自定义聚合函数,类似在group by之后使用的sum,avg等 UDTF(User-Defined Table-Generating Functions),用户自定义生成函数,有点像stream里面的flatMap 自定

SparkSQL 如何自定义函数

1. SparkSql如何自定义函数 2. 示例:Average 3. 类型安全的自定义函数 1. SparkSql如何自定义函数? spark中我们定义一个函数,需要继承 UserDefinedAggregateFunction这个抽象类,实现这个抽象类中所定义的方法,这是一个模板设计模式? 我只要实现抽象类的中方法,具体的所有的计算步骤由内部完成.而我们可以看一下UserDefinedAggregateFunction这个抽象类. package org.apache.spark.sql.e

T-SQL: 17 个与日期时间相关的自定义函数(UDF),周日作为周的最后一天,均不受 @@DateFirst、语言版本影响!

原文:T-SQL: 17 个与日期时间相关的自定义函数(UDF),周日作为周的最后一天,均不受 @@DateFirst.语言版本影响! CSDN 的 Blog 太滥了!无时不刻地在坏! 开始抢救性搬家 ... ... 到这里重建家园 /* T-SQL: 17 个与日期时间相关的自定义函数(UDF),周日作为周的最后一天,均不受 @@DateFirst.语言版本影响 都是从老文章里收集或提炼出来的! 提示: (@@Datefirst + datepart(weekday,@Date)) % 7 判

全栈JavaScript之路(十八)HTML5 自定义数据属性

HTML5 规范规定,用户可以为元素 自定义非标准属性, 但是要添加 data- 前缀. 目的是为元素提供与页面渲染无关的信息.或者语义信息.这些属性名可以随意添加,只要带上前缀 data- 开头就可以. <div id="myid" data-appid="1" data-appname="csdn"></div> 添加属性这后可以通过 dataset 访问自定义属性,dataset 属性 是DOMStringMap  

第二十四节,自定义函数

第二十四节,自定义函数函数是将要实现的功能写在函数里,在要使用此功能的地方调用此函数即可实现功能,这样大大减少编程重复书写同样的代码,在多个要使用同样功能的地方调用函数即可不需要重复写同样的代码函数式编程最重要的是增强代码的重用性和可读性 函数的定义主要有如下要点: def:表示函数的关键字函数名:函数的名称,日后根据函数名调用函数函数体:函数中进行一系列的逻辑计算,如:发送邮件.计算出 [11,22,38,888,2]中的最大数等...参数:为函数体提供数据return:返回值:当函数执行完毕

hive自定义函数UDF UDTF UDAF

Hive 自定义函数 UDF UDTF UDAF 1.UDF:用户定义(普通)函数,只对单行数值产生作用: UDF只能实现一进一出的操作. 定义udf 计算两个数最小值 public class Min extends UDF { public Double evaluate(Double a, Double b) { if (a == null) a = 0.0; if (b == null) b = 0.0; if (a >= b) { return b; } else { return a

java mysql自定义函数UDF之调用c函数

正如sqlite可以定义自定义函数,它是通过API定义c函数的,不像其他,如这里的mysql.sqlite提供原生接口就可以方便的调用其他语言的方法,同样的mysql也支持调用其它语言的方法. google "mysql call c function"发现一片文章 MySQL User Defined Functions  This tutorial explains what an User Defined Function (UDF) is, what it does and w

Hive自定义函数UDF示例

简单自定义函数只需继承UDF类,然后重构evaluate函数即可 LowerCase.java: package com.example.hiveudf; import org.apache.hadoop.hive.ql.exec.UDF; public final class LowerCase extends UDF { public String evaluate(final String s) { if (s == null) { return null; } return new St