sparksql 自定义用户函数(UDF)

自定义用户函数有两种方式,区别:是否使用强类型,参考demo:https://github.com/asker124143222/spark-demo

1、不使用强类型,继承UserDefinedAggregateFunction

package com.home.spark

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

object Ex_sparkUDAF {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf(true).setAppName("spark udf").setMaster("local[*]")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    //自定义聚合函数
    //创建聚合函数对象
    val myUdaf = new MyAgeAvgFunc

    //注册自定义函数
    spark.udf.register("ageAvg",myUdaf)

    //使用聚合函数
    val frame: DataFrame = spark.read.json("input/userinfo.json")
    frame.createOrReplaceTempView("userinfo")
    spark.sql("select ageAvg(age) from userinfo").show()

    spark.stop()
  }
}

//声明自定义函数
//实现对年龄的平均,数据如:{ "name": "tom", "age" : 20}
class MyAgeAvgFunc extends UserDefinedAggregateFunction {
  //函数输入的数据结构,本例中只有年龄是输入数据
  override def inputSchema: StructType = {
    new StructType().add("age", LongType)
  }

  //计算时的数据结构(缓冲区)
  // 本例中有要计算年龄平均值,必须有两个计算结构,一个是年龄总计(sum),一个是年龄个数(count)
  override def bufferSchema: StructType = {
    new StructType().add("sum", LongType).add("count", LongType)
  }

  //函数返回的数据类型
  override def dataType: DataType = DoubleType

  //函数是否稳定
  override def deterministic: Boolean = true

  //计算前缓冲区的初始化,结构类似数组,这里缓冲区与之前定义的bufferSchema顺序一致
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    //sum
    buffer(0) = 0L
    //count
    buffer(1) = 0L
  }

  //根据查询结果更新缓冲区数据,input是每次进入的数据,其数据结构与之前定义的inputSchema相同
  //本例中每次输入的数据只有一个就是年龄
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if(input.isNullAt(0)) return
    //sum
    buffer(0) = buffer.getLong(0) + input.getLong(0)

    //count,每次来一个数据加1
    buffer(1) = buffer.getLong(1) + 1
  }

  //将多个节点的缓冲区合并到一起(因为spark是分布式的)
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    //sum
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)

    //count
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }

  //计算最终结果,本例中就是(sum / count)
  override def evaluate(buffer: Row): Any = {
    buffer.getLong(0).toDouble / buffer.getLong(1)
  }
}

2、使用强类型,

package com.home.spark

import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Aggregator

object Ex_sparkUDAF2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf(true).setAppName("spark udf class").setMaster("local[*]")
    val spark = SparkSession.builder().config(conf).getOrCreate()

    //rdd转换成df或者ds需要SparkSession实例的隐式转换
    //导入隐式转换,注意这里的spark不是包名,而是SparkSession的对象名
    import spark.implicits._

    //创建聚合函数对象
    val myAvgFunc = new MyAgeAvgClassFunc
    val avgCol: TypedColumn[UserBean, Double] = myAvgFunc.toColumn.name("avgAge")
    val frame = spark.read.json("input/userinfo.json")
    val userDS: Dataset[UserBean] = frame.as[UserBean]
    //应用函数
    userDS.select(avgCol).show()

    spark.stop()
  }
}

case class UserBean(name: String, age: BigInt)

case class AvgBuffer(var sum: BigInt, var count: Int)

//声明用户自定义函数(强类型方式)
//继承Aggregator,设定泛型
//实现方法
class MyAgeAvgClassFunc extends Aggregator[UserBean, AvgBuffer, Double] {
  //初始化缓冲区
  override def zero: AvgBuffer = {
    AvgBuffer(0, 0)
  }

  //聚合数据
  override def reduce(b: AvgBuffer, a: UserBean): AvgBuffer = {
    if(a.age == null) return b
    b.sum = b.sum + a.age
    b.count = b.count + 1

    b
  }

  //缓冲区合并操作
  override def merge(b1: AvgBuffer, b2: AvgBuffer): AvgBuffer = {
    b1.sum = b1.sum + b2.sum
    b1.count = b1.count + b2.count

    b1
  }

  //完成计算
  override def finish(reduction: AvgBuffer): Double = {
    reduction.sum.toDouble / reduction.count
  }

  override def bufferEncoder: Encoder[AvgBuffer] = Encoders.product

  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

继承Aggregator

原文地址:https://www.cnblogs.com/asker009/p/12092684.html

时间: 2024-10-08 01:04:47

sparksql 自定义用户函数(UDF)的相关文章

java mysql自定义函数UDF之调用c函数

正如sqlite可以定义自定义函数,它是通过API定义c函数的,不像其他,如这里的mysql.sqlite提供原生接口就可以方便的调用其他语言的方法,同样的mysql也支持调用其它语言的方法. google "mysql call c function"发现一片文章 MySQL User Defined Functions  This tutorial explains what an User Defined Function (UDF) is, what it does and w

hive自定义函数UDF UDTF UDAF

Hive 自定义函数 UDF UDTF UDAF 1.UDF:用户定义(普通)函数,只对单行数值产生作用: UDF只能实现一进一出的操作. 定义udf 计算两个数最小值 public class Min extends UDF { public Double evaluate(Double a, Double b) { if (a == null) a = 0.0; if (b == null) b = 0.0; if (a >= b) { return b; } else { return a

Hive自定义函数(UDF、UDAF)

当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数. UDF 用户自定义函数(user defined function)–针对单条记录. 创建函数流程 1.自定义一个Java类 2.继承UDF类 3.重写evaluate方法 4.打成jar包 6.在hive执行add jar方法 7.在hive执行创建模板函数 8.hql中使用 Demo01: 自定义一个Java类 package UDFDemo; import org.apache.hadoop.hive.

T-SQL: 17 个与日期时间相关的自定义函数(UDF),周日作为周的最后一天,均不受 @@DateFirst、语言版本影响!

原文:T-SQL: 17 个与日期时间相关的自定义函数(UDF),周日作为周的最后一天,均不受 @@DateFirst.语言版本影响! CSDN 的 Blog 太滥了!无时不刻地在坏! 开始抢救性搬家 ... ... 到这里重建家园 /* T-SQL: 17 个与日期时间相关的自定义函数(UDF),周日作为周的最后一天,均不受 @@DateFirst.语言版本影响 都是从老文章里收集或提炼出来的! 提示: (@@Datefirst + datepart(weekday,@Date)) % 7 判

SparkSQL自定义无类型聚合函数

准备数据: Michael,3000 Andy,4500 Justin,3500 Betral,4000 一.定义自定义无类型聚合函数 想要自定义无类型聚合函数,那必须得继承org.spark.sql.expressions.UserDefinedAggregateFunction,然后重写父类得抽象变量和成员方法. package com.cjs import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{M

Oracle自定义聚集函数

今天工作中看见别人写的自定义聚集函数,所以一门心思的想搞懂,就在网上找资料了. 自定义聚集函数 自定义聚集函数接口简介 Oracle提供了很多预定义好的聚集函数,比如Max(), Sum(), AVG(), 但是这些预定义的聚集函数基本上都是适应于标量数据(scalar data), 对于复杂的数据类型,比如说用户自定义的Object type, Clob等, 是不支持的. 但是,幸运的是, 用户可以通过实现Oracle的Extensibility Framework中的ODCIAggregat

在spring security手动 自定义 用户认证 SecurityContextHolder

1.Spring Security 目前支持认证一体化如下认证技术: HTTP BASIC authentication headers (一个基于IEFT  RFC 的标准) HTTP Digest authentication headers (一个基于IEFT  RFC 的标准) HTTP X.509 client certificate exchange  (一个基于IEFT RFC 的标准) LDAP (一个非常常见的跨平台认证需要做法,特别是在大环境) Form-based auth

SQL Server如何定位自定义标量函数被那个SQL调用次数最多浅析

前阵子遇到一个很是棘手的问题,监控系统DPA发现某个自定义标量函数被调用的次数非常高,高到一个离谱的程度.然后在Troubleshooting这个问题的时候,确实遇到了一些问题让我很是纠结,下文是解决问题过程的一点思索和尝试,如果你有更好的思路和解决方法,也请多多指教. DPA可以监控到该函数每小时被调用的次数,如下截图所示: 那么第一个问题来了. DPA如何监控获取这个函数每小时执行多少次呢? 其实这个很简单, sys.dm_exec_query_stats视图里面有个字段execution_

ashx ajax 与 自定义javascript函数

1.getUserPower为自定义javascript函数 获取权限  (1).ashx 处理程序的相对地址(必须是相对地址)  (2).au 权限名称  (3).classname 类名  (4).funsuccess 成功时做的事,函数. */ $.fn.getUserPower = function (operate, mdlName, funsuccess) { $(this).click(function (event) { if (cookie == "admin") {