在Apache Spark中使用UDF

用户自定义函数(UDF)是大多数SQL环境的一个关键特性,其主要用于扩展系统的内置功能。UDF允许开发人员通过抽象其低级语言实现在更高级语言(如SQL)中应用的新函数。Apache Spark也不例外,其为UDF与Spark SQL工作流集成提供了各种选项。

在本篇博文中,我们将回顾Python、Java和Scala上的Apache Spark UDF和UDAF(用户自定义的聚合函数)实现的简单示例。我们还将讨论重要的UDF API功能和集成点,包括各发行版本之间的当前可用性。总而言之,我们将介绍一些重要的性能注意事项,使您对应用程序中利用UDF的选择有所了解。

Spark SQL UDFs

UDF转换了表中单个行的数值,为每行生成单个对应的输出值。例如,大多数的SQL环境都提供了一个UPPER函数,同时返回了一个大写版本的字符串以作为输入。

自定义函数可以在Spark SQL中定义和注册为UDF,并具有可用于SQL查询的关联别名。下面我们将为您介绍一个简单的例子,我们将定义一个UDF将以下JSON数据中的温度值从摄氏度(Celsius)转换为华氏度(Fahrenheit):

下面的示例代码使用SQL别名CTOF注册我们的转换UDF,然后使用它从SQL查询中转换每个城市的温度值。为简洁起见,省略了SQLContext对象和其他样板代码的创建,并在每个代码段下面提供了完整列表的链接。

Python

Scala

Java

请注意,Spark SQL定义了UDF1~UDF22类别,支持包含最多22个输入参数的UDF。上面的例子中使用UDF1处理单个温度值作为输入。如果未能对Apache Spark源代码进行更新,使用数组(arrays)或结构体(structs)作为参数对于需要超过22个输入的应用程序可能很有帮助;从风格的角度来看,如果您发现自己使用的是UDF6或更高版本,这一方案可能是首选。

Spark SQL UDAF函数

用户自定义聚合函数(UDAF)可以同时处理多行,然后返回单个值作为结果,其通常与GROUP BY语句(例如COUNT或SUM)共同使用。为了让示例简单明了,我们将实现一个别名为SUMPRODUCT的UDAF按使用分组、给定价格和库存中的整数数量计算所有车辆的零售价值:

目前,Apache Spark UDAF的实现定义在扩展继承的了UserDefinedAggregateFunction类别中并有由Scala和Java语法支持。一旦定义好之后,我们可以在别名SUMPRODUCT下举例说明并注册我们的SumProductAggregateFunction UDAF对象,并从SQL查询中予以使用,这与前面示例中的CTOF UDF大致相同。

Scala

Apache Spark中的其他UDF支持

Spark SQL支持UDF、UDAF和UDTF等现有Hive(Java或Scala)函数实现的集成。顺便提醒一下,UDTFs(用户自定义表函数)可以返回多个列和行 – 这超出了本文的范围,但是我们可能在以后的博文中涉及。对于使用前面示例中强调的方法重新实现和注册相同逻辑,集成现有的Hive UDF是有价值的一种替代方法;从性能角度来看,该方法对于PySpark也是有帮助的,这将在下一节中讨论。通过包含Hive UDF函数实现的JAR文件利用spark-submit的-jars选项,可以从HiveContext中访问Hive函数;然后使用CREATE TEMPORARY FUNCTION对函数进行声明(如在Hive [1]中所做,包含一个UDF),具体示例如下所述:

Java 中的Hive UDF定义

从Python访问HiveUDF

请注意,如上文中我们实现的UDF和UDAF函数,Hive UDF只能使用Apache Spark的SQL查询语言进行调用 – 也就是说,不能与Dataframe API的域特定语言(DSL)一起使用。

或者,通过包含实现jar文件(使用含有spark-submit的-jars选项),以Scala和Java语言实现的UDF可以从PySpark中进行访问,然后通过SparkContext对象的私有引用执行器JVM、底层Scala或装载在jar文件中的Java UDF实现来访问UDF定义。Holden Karau在一次精彩的演讲中[2]对这种方法进行了探讨。请注意,在此技术中所使用的一些Apache Spark私有变量不是正式面向终端用户的。这样做还带来了额外的好处,允许将UDAF(目前必须在Java和Scala中定义)用于PySpark,下文中的示例中使用了前面在Scala中定义的SUMPRODUCT UDAF进行证明:

Scala UDAF定义

Scala UDAF from PySpark

UDF相关的功能正在连续不断地添加至Apache Spark的每一个版本中。例如2.0版本在R中增加了对UDF的支持。作为参考,下表总结了本文中讨论的各版本的关键特性:



表格中汇总了目前为止本博客中介绍过的相关版本的关键特性。

性能注意事项

了解Apache Spark UDF功能的性能影响是非常重要的。例如,Python UDF(比如我们的CTOF函数)导致数据在运行UDF逻辑的执行器JVM和Python注释器之间被序列化 - 与Java或Scala中的UDF实现相比,这大大降低了性能。缓解这种序列化瓶颈的潜在解决方案包括以下方面:

  1. 如上一节所述,从PySpark中访问Hive UDF。Java UDF实现可以由执行器JVM直接访问。请再次注意,这种方法只用于从Apache Spark的SQL查询语言访问UDF。
  2. 这种方法的使用也可以参考PySpark访问在Java或Scala中执行的UDF,如我们之前定义的Scala UDAF示例所示。

一般来说,UDF逻辑应尽可能的精简,因为可能每一行都会被调用。例如,在扩展到10亿行时,UDF逻辑中的一个步骤需要耗费100毫秒的时间才能完成,从而很快就会导致重大的要性能问题。

Spark SQL的另一个重要组成部分是Catalyst查询优化器。这一功能随着每个版本而扩展,通常可以为Spark SQL查询提供显著的性能改进;然而,任意UDF实现代码对于Catalyst而言可能不是很好理解(虽然分析字节码的未来功能[3]被认为可以解决这一问题)。因此,使用Apache Spark内置SQL查询函数功能通常可以带来最佳性能,并且应该是在避免引入UDF时考虑的第一种方法。高级用户寻求利用Catalyst与其代码更紧密地结合,可以参考以下Chris Fregly的演讲[4],该演讲人使用Expression.genCode优化UDF代码,并且使用了新的Apache Spark 2.0实验功能[5],其为定制Catalyst优化程序规则提供了一个可即插即用的API。

结论

当Spark SQL的内置功能需要扩展时,UDF是一个非常有用的工具。本篇博文中提供了一次UDF和UDAF实现的演练,并讨论了其集成步骤,以在Spark SQL中利用Spark SQL中现有的Java Hive UDF。UDF可以在Python、Scala、Java和(在Spark 2.0中)R中实现,同时UDAF 可以在以及Scala和Java的UDAF中实现。当在PySpark中使用UDF时,必须考虑数据序列化成本,并且应该考虑采用上文所讨论的两个策略来解决这个问题。最后,我们探讨了Spark SQL的Catalyst优化器,以及基于性能考虑的因素,在解决方案中引入UDF之前坚持使用内置SQL函数的性能考虑因素。

代码https://github.com/curtishoward/sparkudfexamples
CDH版本:5.8.0(Apache Spark 1.6.0)

原文地址:https://blog.51cto.com/14086291/2408886

时间: 2024-10-29 00:36:09

在Apache Spark中使用UDF的相关文章

全面解析Apache Spark中的决策树

Apache Spark中的决策树 决策树是在顺序决策问题进行分类,预测和促进决策的有效方法.决策树由两部分组成: 决策(Desion) 结果(Outcome) 决策树包含三种类型的节点: 根节点(Root node):包含所有数据的树的顶层节点. 分割节点(Splitting node):将数据分配给子组(subgroup)的节点. 终端节点(Terminal node):最终决定(即结果). 分割节点(Splitting node),仅就离散数学中的树的概念而言,就是指分支节点. 为了抵达终

新手福利:Apache Spark入门攻略

新手福利:Apache Spark入门攻略 作者Ashwini Kuntamukkala  出处:CSDN 本文聚焦Apache Spark入门,了解其在大数据领域的地位,覆盖Apache Spark的安装及应用程序的建立,并解释一些常见的行为和操作. 一. 为什么要使用Apache Spark 时下,我们正处在一个"大数据"的时代,每时每刻,都有各种类型的数据被生产.而在此紫外,数据增幅的速度也在显著增加.从广义上看,这些数据包含交易数据.社交媒体内容(比如文本.图像和视频)以及传感

Apache spark入门精品文章

本文聚焦 Apache Spark 入门,了解其在大数据领域的地位,覆盖 Apache Spark 的安装及应用程序的建立,并解释一些常见的行为和操作. 一. 为什么要使用 Apache Spark 时下,我们正处在一个“大数据”的时代,每时每刻,都有各种类型的数据被生产.而在此紫外,数据增幅的速度也在显著增加.从广义上看,这些数据包含交易数据.社交媒体内容(比如文本.图像和视频)以及传感器数据.那么,为什么要在这些内容上投入如此多精力,其原因无非就是从海量数据中提取洞见可以对生活和生产实践进行

2018年前100名Apache Spark面试问题和解答(上)

我们知道Apache Spark现在是一项蓬勃发展的技术.因此,了解Apache Spark的各个方面以及Spark面试问题非常重要.我将介绍Spark的每个方面,这也可能是经常被问到的Spark面试问题.此外,我将尽力提供每个问题,从现在开始,您搜索最佳和所有Spark面试问题将在此结束. Apache Spark面试问题答案 一,什么是Apache Spark? Apache Spark是一个功能强大的开源灵活数据处理框架,围绕速度,易用性和复杂的分析而构建.Apache Spark在集群计

Apache Spark 2.2.0 中文文档 - SparkR (R on Spark) | ApacheCN

SparkR (R on Spark) 概述 SparkDataFrame 启动: SparkSession 从 RStudio 来启动 创建 SparkDataFrames 从本地的 data frames 来创建 SparkDataFrames 从 Data Sources(数据源)创建 SparkDataFrame 从 Hive tables 来创建 SparkDataFrame SparkDataFrame 操作 Selecting rows(行), columns(列) Groupin

Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

Spark SQL, DataFrames and Datasets Guide Overview SQL Datasets and DataFrames 开始入门 起始点: SparkSession 创建 DataFrames 无类型的Dataset操作 (aka DataFrame 操作) Running SQL Queries Programmatically 全局临时视图 创建Datasets RDD的互操作性 使用反射推断Schema 以编程的方式指定Schema Aggregatio

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas

Apache Spark 2.2.0 中文文档 - 集群模式概述 | ApacheCN

集群模式概述 该文档给出了 Spark 如何在集群上运行.使之更容易来理解所涉及到的组件的简短概述.通过阅读 应用提交指南 来学习关于在集群上启动应用. 组件 Spark 应用在集群上作为独立的进程组来运行,在您的 main 程序中通过 SparkContext 来协调(称之为 driver 程序). 具体的说,为了运行在集群上,SparkContext 可以连接至几种类型的 Cluster Manager(既可以用 Spark 自己的 Standlone Cluster Manager,或者

Apache Spark 2.2.0 中文文档 - Structured Streaming 编程指南 | ApacheCN

Structured Streaming 编程指南 概述 快速示例 Programming Model (编程模型) 基本概念 处理 Event-time 和延迟数据 容错语义 API 使用 Datasets 和 DataFrames 创建 streaming DataFrames 和 streaming Datasets Input Sources (输入源) streaming DataFrames/Datasets 的模式接口和分区 streaming DataFrames/Dataset