SparkSQL 如何自定义函数

1. SparkSql如何自定义函数

2. 示例:Average

3. 类型安全的自定义函数

1. SparkSql如何自定义函数?

  spark中我们定义一个函数,需要继承 UserDefinedAggregateFunction这个抽象类,实现这个抽象类中所定义的方法,这是一个模板设计模式? 我只要实现抽象类的中方法,具体的所有的计算步骤由内部完成。而我们可以看一下UserDefinedAggregateFunction这个抽象类。

package org.apache.spark.sql.expressions
@org.apache.spark.annotation.InterfaceStability.Stable
abstract class UserDefinedAggregateFunction() extends scala.AnyRef with scala.Serializable {
  def inputSchema : org.apache.spark.sql.types.StructType
  def bufferSchema : org.apache.spark.sql.types.StructType
  def dataType : org.apache.spark.sql.types.DataType
  def deterministic : scala.Boolean
  def initialize(buffer : org.apache.spark.sql.expressions.MutableAggregationBuffer) : scala.Unit
  def update(buffer : org.apache.spark.sql.expressions.MutableAggregationBuffer, input : org.apache.spark.sql.Row) : scala.Unit
  def merge(buffer1 : org.apache.spark.sql.expressions.MutableAggregationBuffer, buffer2 : org.apache.spark.sql.Row) : scala.Unit
  def evaluate(buffer : org.apache.spark.sql.Row) : scala.Any
  @scala.annotation.varargs
  def apply(exprs : org.apache.spark.sql.Column*) : org.apache.spark.sql.Column = { /* compiled code */ }
  @scala.annotation.varargs
  def distinct(exprs : org.apache.spark.sql.Column*) : org.apache.spark.sql.Column = { /* compiled code */ }
}

  也就是说对于这几个函数,我们只要依次实现他们的功能,其余的交给spark就可以了。

  

2. 自定义Average函数

  首先新建一个Object类MyAvage类,继承UserDefinedAggregateFunction。下面对每一个函数的实现进行解释。

  def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)

  这个规定了输入数据的数据结构

def bufferSchema: StructType = {
    StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
  }

  这个规定了缓存区的数据结构

  def dataType: DataType = DoubleType

  这个规定了返回值的数据类型

def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
  }  

进行初始化,这里要说明一下,官网中提到:

// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
  // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
  // the opportunity to update its values. Note that arrays and maps inside the buffer are still
  // immutable.

这里翻译一下:

我们为我们的缓冲区设置初始值,我们不仅可以设置数字,还可以使用index getBoolen等去改变他的值,但是我们需要知道的是,在这个缓冲区中,数组和map依然是不可变的。

其实最后一句我也是不太明白,等我以后如果能研究并理解这句话,再回来补充吧。

def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getLong(0) + input.getLong(0)
      buffer(1) = buffer.getLong(1) + 1
    }
  }

  这个是重要的update函数,对于平均值,我们可以不断迭代输入的值进行累加。buffer(0)统计总和,buffer(1)统计长度。

def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }

  在做完update后spark 需要将结果进行merge到我们的区域,因此有一个merge 进行覆盖buffer

  def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)

  这是将最终的结果进行计算。

在写完这个类以后我们在我们的sparksession里面进行编写测试案例。

spark.sparkContext.textFile("file:///Users/4pa/Desktop/people.txt")
      .map(_.split(","))
      .map(agg=>Person(agg(0),agg(1).trim.toInt))
      .toDF().createOrReplaceTempView("people")
spark.udf.register("myAverage",Myaverage)
val udfRes = spark.sql("select name,myAverage(age) as avgAge from people group by name")
udfRes.show()

  

3. 类型安全的自定义函数

从上面我们可以看出来,这种自定义函数不是类型安全的,因此能否实现一个安全的自定义函数呢?

个人觉得最好的例子还是官网给的例子,具体的解释都已经给了出来,思路其实和上面是一样的,只不过定义了两个caseclass,用于类型的验证。

case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

object MyAverage extends Aggregator[Employee, Average, Double] {
  // 初始化
  def zero: Average = Average(0L, 0L)
  // 这个其实有点map-reduce的意思,只不过是对一个类的reduce,第一个值是和,第二个是总数
  def reduce(buffer: Average, employee: Employee): Average = {
    buffer.sum += employee.salary
    buffer.count += 1
    buffer
  }
  // 实现缓冲区的一个覆盖
  def merge(b1: Average, b2: Average): Average = {
    b1.sum += b2.sum
    b1.count += b2.count
    b1
  }
  // 计算最终数值
  def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
  // Specifies the Encoder for the intermediate value type
  def bufferEncoder: Encoder[Average] = Encoders.product
  // 指定返回类型
  def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

  

 

原文地址:https://www.cnblogs.com/tjpeng/p/12261901.html

时间: 2024-11-09 03:08:35

SparkSQL 如何自定义函数的相关文章

Spark(十八)SparkSQL的自定义函数UDF

在Spark中,也支持Hive中的自定义函数.自定义函数大致可以分为三种: UDF(User-Defined-Function),即最基本的自定义函数,类似to_char,to_date等 UDAF(User- Defined Aggregation Funcation),用户自定义聚合函数,类似在group by之后使用的sum,avg等 UDTF(User-Defined Table-Generating Functions),用户自定义生成函数,有点像stream里面的flatMap 自定

Spark学习之路 (十九)SparkSQL的自定义函数UDF

讨论QQ:1586558083 在Spark中,也支持Hive中的自定义函数.自定义函数大致可以分为三种: UDF(User-Defined-Function),即最基本的自定义函数,类似to_char,to_date等 UDAF(User- Defined Aggregation Funcation),用户自定义聚合函数,类似在group by之后使用的sum,avg等 UDTF(User-Defined Table-Generating Functions),用户自定义生成函数,有点像str

Spark学习之路 (十九)SparkSQL的自定义函数UDF[转]

在Spark中,也支持Hive中的自定义函数.自定义函数大致可以分为三种: UDF(User-Defined-Function),即最基本的自定义函数,类似to_char,to_date等 UDAF(User- Defined Aggregation Funcation),用户自定义聚合函数,类似在group by之后使用的sum,avg等 UDTF(User-Defined Table-Generating Functions),用户自定义生成函数,有点像stream里面的flatMap 自定

Oracle自定义函数1

用户定义函数是存储在数据库中的代码块,可以把值返回到调用程序.调用时如同系统函数一样,如max(value)函数,其中,value被称为参数.函数参数有3种类型. IN 参数类型:表示输入给函数的参数. OUT 参数类型:表示参数在函数中被赋值,可以传给函数调用程序. IN OUT参数类型:表示参数既可以传值也可以被赋值. 1.语法格式: SQL语法方式创建的语法格式为: CREATE OR REPLACE FUNCTION function_name         /*函数名称*/(Para

一个可以使用多个正则表达式进行多次尝试匹配,并进行替换的Excel VBA自定义函数(UFD)

以下代码可使用多个正则表达式对目标单元格进行多次匹配尝试,如匹配成功,将停止尝试匹配其他正则表达式,并且使用该正则表达式相对应的替换表达式进行替换,返回替换结果. 使用前需要做Early Binding.即在VBE编辑器中,选择菜单栏中的Tool - Reference,如图: 弹出如下图的对话框后,选择Microsoft VBSscript Regular Expression 5.5,打钩,点OK. 此UDF的使用方法为: Text参数:需要进行处理的原始文字或单元格. MatchPatte

MySQL学习笔记-自定义函数

MySQL学习笔记-自定义函数 1.自定义函数简介 自定义函数:用户自定义函数(user-defined function,UDF)是一种对MySQL扩展的途径,其用法与内置函数相同 自定义函数的两个必要条件:(1)参数  (2)返回值 自定义函数: 创建自定义函数 CREATE FUNCTION function_name RETURNS {STRING|INTEGER|REAL|DECIMAL} routine_body 关于函数体: 1.函数体可以由合法的SQL语句构成: 2.函数体可以是

FastReport调用Delphi中的自定义函数

//定义一个函数: function SmallToMoney(akey: real): string; begin   //'1234500' end; //此处为fastreport加载自定义函数以便引用 procedure Tprint_from.FormCreate(Sender: TObject);begin  frxReport1.AddFunction('function SmallToMoney(akey: real): string;', 'Myfunction', '函数功能

SQL 自定义函数(Function)——参数默认值

sql server 自定义函数分为三种类型:标量函数(Scalar Function).内嵌表值函数(Inline Function).多声明表值函数(Multi-Statement Function) 标量函数:标量函数是对单一值操作,返回单一值. 内嵌表值函数:内嵌表值函数的功能相当于一个参数化的视图.它返回的是一个表,内联表值型函数没有由BEGIN-END 语句括起来的函数体. 多声明表值函数:它的返回值是一个表,但它和标量型函数一样有一个用BEGIN-END 语句括起来的函数体,返回值

Oracle自定义函数

核心提示:函数用于返回特定数据.执行时得找一个变量接收函数的返回值; 语法如下: create or replace function function_name ( argu1 [mode1] datatype1, argu2 [mode2] datatype2, ........ ) return datatype is begin end; 执行 var v1 varchar2(100) exec :v1:=function_na 函数用于返回特定数据.执行时得找一个变量接收函数的返回值;