Spark SQL UDF使用

Spark1.1推出了Uer Define Function功能,用户可以在Spark SQL 里自定义实际需要的UDF来处理数据。

因为目前Spark SQL本身支持的函数有限,一些常用的函数都没有,比如len, concat...etc 但是使用UDF来自己实现根据业务需要的功能是非常方便的。

Spark SQL UDF其实是一个Scala函数,被catalyst封装成一个Expression结点,最后通过eval方法计根据当前Row计算UDF的结果,源码分析见:Spark SQL源码分析之UDF

Spark SQL UDF使用起来非常方便,分2个步骤:

一、注册

当我们导入了SQLContext或者HiveContext,即有注册UDF的功能。

registerFunction(udfName : String, func : FunctionN)

由于scala语言的限制,这里UDF的参数仅支持22个。

二、使用

select udfName(param1, param2....) from tableName

三、示例

我们这里创建2张表:

第一张dual会从README.md读取记录,里面仅有一个字段line : String

第二张表src,有2个字段key,value,数据是spark sql自带的测试数据。

我们使用 sbt/sbt hive/console进入测试环境:

1、字符串取长度 len()

创建table dual:

scala> sql("create table dual(line string)").collect()
14/09/19 17:41:34 INFO metastore.HiveMetaStore: 0: create_table: Table(tableName:dual, dbName:default, owner:root, createTime:1411119694, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:line, type:string, comment:null)], location:null, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, privileges:PrincipalPrivilegeSet(userPrivileges:null, groupPrivileges:null, rolePrivileges:null))
14/09/19 17:41:34 INFO HiveMetaStore.audit: ugi=root    ip=unknown-ip-addr      cmd=create_table: Table(tableName:dual, dbName:default, owner:root, createTime:1411119694, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:line, type:string, comment:null)], location:null, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, privileges:PrincipalPrivilegeSet(userPrivileges:null, groupPrivileges:null, rolePrivileges:null))

载入README.md数据:

sql("load data local inpath 'README.md' into table dual ").collect()

scala> sql("select * from dual").collect()
res4: Array[org.apache.spark.sql.Row] = Array([# Apache Spark], [], [Spark is a fast and general cluster computing system for Big Data. It provides], [high-level APIs in Scala, Java, and Python, and an optimized engine that], [supports general computation graphs for data analysis. It also supports a], [rich set of higher-level tools including Spark SQL for SQL and structured], [data processing, MLLib for machine learning, GraphX for graph processing,], [and Spark Streaming.], [], [<http://spark.apache.org/>], [], [], [## Online Documentation], [], [You can find the latest Spark documentation, including a programming], [guide, on the project webpage at <http://spark.apache.org/documentation.html>.], [This README file only contains basic setup instructions.], [], [## Building Spark], [], ...

编写len函数并,注册函数:

scala> registerFunction("len",(x:String)=>x.length)

测试:

scala> sql("select len(line) from dual").collect()
14/09/19 17:45:07 INFO spark.SparkContext: Job finished: collect at SparkPlan.scala:85, took 0.072239295 s
res6: Array[org.apache.spark.sql.Row] = Array([14], [0], [78], [72], [73], [73], [73], [20], [0], [26], [0], [0], [23], [0], [68], [78], [56], [0], [17], [0], [75], [0], [22], [0], [67], [0], [26], [0], [64], [0], [21], [0], [52], [0], [44], [0], [27], [0], [66], [0], [17], [4], [61], [0], [43], [0], [19], [0], [74], [74], [0], [29], [0], [32], [0], [75], [63], [67], [74], [72], [22], [0], [54], [0], [69], [0], [16], [0], [84], [17], [0], [19], [0], [31], [0], [77], [76], [77], [77], [0], [67], [27], [0], [25], [45], [0], [42], [58], [0], [91], [29], [0], [31], [58], [0], [42], [61], [0], [35], [52], [0], [77], [79], [74], [22], [0], [51], [0], [90], [0], [16], [42], [44], [30], [17], [0], [0], [56], [0], [46], [86], [78], [0], [30], [0], [16], [0], [97], [70], [0], [0], [24], [0], [78]...

2、字符串连接concat_str

这里为了简单起见,就根据src表的key value类型 Int, String来做例子:

scala> sql("desc src").collect()
res8: Array[org.apache.spark.sql.Row] = Array([key,int,null], [value,string,null])
scala> sql("select * from src limit 10").collect()
res7: Array[org.apache.spark.sql.Row] = Array([238,val_238], [86,val_86], [311,val_311], [27,val_27], [165,val_165], [409,val_409], [255,val_255], [278,val_278], [98,val_98], [484,val_484])

编写并注册concat_str函数:

scala> registerFunction("concat_str",(a:Int, b:String)=>a.toString+b)

测试concat函数

scala> sql("select concat_str(key,value) from src ").collect()
14/09/19 18:17:22 INFO spark.SparkContext: Job finished: collect at SparkPlan.scala:85, took 0.082076377 s
res28: Array[org.apache.spark.sql.Row] = Array([238val_238], [86val_86], [311val_311], [27val_27], [165val_165], [409val_409], [255val_255], [278val_278], [98val_98], [484val_484], [265val_265], [193val_193], [401val_401], [150val_150], [273val_273], [224val_224], [369val_369], [66val_66], [128val_128], [213val_213], [146val_146], [406val_406], [429val_429], [374val_374], [152val_152], [469val_469], [145val_145], [495val_495], [37val_37], [327val_327], [281val_281], [277val_277], [209val_209], [15val_15], [82val_82], [403val_403], [166val_166], [417val_417], [430val_430], [252val_252], [292val_292], [219val_219], [287val_287], [153val_153], [193val_193], [338val_338], [446val_446], [459val_459], [394val_394], [237val_237], [482val_482], [174val_174], [413val_413], [494val_494], [207val_...
scala> 

——EOF——

原创文章,转载请注明出自:http://blog.csdn.net/oopsoom/article/details/39401391

时间: 2025-01-08 01:10:08

Spark SQL UDF使用的相关文章

Spark SQL UDF

目前 Spark SQL 不支持自定义UDF ,底层 SQL 引擎用的 catalyst . 在SqlContext 中 有一个 Analyzer给的一个EmptyFunctionRegistry ,如果 SQL 引擎函数中找不到了,会到这个FunctionRegistry 中找 EmptyFunctionRegistry 中lookup 只是抛出一个异常. 所以自定义了一个 FunctionRegistry ,SqlContext @transient protected[sql]lazyva

Spark SQL UDF示例

UDF即用户自定函数,注册之后,在sql语句中使用. 基于scala-sdk-2.10.7,Spark2.0.0. package UDF_UDAF import java.util import org.apache.spark.sql.{RowFactory, SparkSession} import org.apache.spark.SparkConf import org.apache.spark.sql.api.java.UDF1 import org.apache.spark.sql

详解Spark sql用户自定义函数:UDF与UDAF

UDAF = USER DEFINED AGGREGATION FUNCTION Spark sql提供了丰富的内置函数供猿友们使用,辣为何还要用户自定义函数呢?实际的业务场景可能很复杂,内置函数hold不住,所以Spark sql提供了可扩展的内置函数接口:哥们,你的业务太变态了,我满足不了你,自己按照我的规范去定义一个sql函数,该怎么折腾就怎么折腾! 例如,MySQL数据库中有一张task表,共两个字段taskid (任务ID)与taskParam(JSON格式的任务请求参数).简单起见,

第八篇:Spark SQL Catalyst源码分析之UDF

/** Spark SQL源码分析系列文章*/ 在SQL的世界里,除了官方提供的常用的处理函数之外,一般都会提供可扩展的对外自定义函数接口,这已经成为一种事实的标准. 在前面Spark SQL源码分析之核心流程一文中,已经介绍了Spark SQL Catalyst Analyzer的作用,其中包含了ResolveFunctions这个解析函数的功能.但是随着Spark1.1版本的发布,Spark SQL的代码有很多新完善和新功能了,和我先前基于1.0的源码分析多少有些不同,比如支持UDF: sp

Spark SQL Catalyst源码分析之UDF

/** Spark SQL源码分析系列文章*/ 在SQL的世界里,除了官方提供的常用的处理函数之外,一般都会提供可扩展的对外自定义函数接口,这已经成为一种事实的标准. 在前面Spark SQL源码分析之核心流程一文中,已经介绍了Spark SQL Catalyst Analyzer的作用,其中包含了ResolveFunctions这个解析函数的功能.但是随着Spark1.1版本的发布,Spark SQL的代码有很多新完善和新功能了,和我先前基于1.0的源码分析多少有些不同,比如支持UDF: sp

转】Spark SQL 之 DataFrame

原博文出自于: http://www.cnblogs.com/BYRans/p/5003029.html 感谢! Spark SQL 之 DataFrame 转载请注明出处:http://www.cnblogs.com/BYRans/ 概述(Overview) Spark SQL是Spark的一个组件,用于结构化数据的计算.Spark SQL提供了一个称为DataFrames的编程抽象,DataFrames可以充当分布式SQL查询引擎. DataFrames DataFrame是一个分布式的数据

前世今生:Hive、Shark、spark SQL

Hive (http://en.wikipedia.org/wiki/Apache_Hive )(非严格的原文顺序翻译)  Apache Hive是一个构建在Hadoop上的数据仓库框架,它提供数据的概要信息.查询和分析功能.最早是Facebook开发的,现在也被像Netflix这样的公司使用.Amazon维护了一个为自己定制的分支.   Hive提供了一个类SQL的语音--HiveQL,它将对关系数据库的模式操作转换为Hadoop的map/reduce.Apache Tez和Spark 执行引

Spark SQL笔记——技术点汇总

目录 · 概述 · 原理 · 组成 · 执行流程 · 性能 · API · 应用程序模板 · 通用读写方法 · RDD转为DataFrame · Parquet文件数据源 · JSON文件数据源 · Hive数据源 · 数据库JDBC数据源 · DataFrame Operation · 性能调优 · 缓存数据 · 参数调优 · 案例 · 数据准备 · 查询部门职工数 · 查询各部门职工工资总数,并排序 · 查询各部门职工考勤信息 概述 1. Spark SQL是Spark的结构化数据处理模块.

spark sql简单示例

运行环境 集群环境:CDH5.3.0 具体JAR版本如下: spark版本:1.2.0-cdh5.3.0 hive版本:0.13.1-cdh5.3.0 hadoop版本:2.5.0-cdh5.3.0 spark sql的JAVA版简单示例 spark sql直接查询JSON格式的数据 spark sql的自定义函数 spark sql查询hive上面的表 import java.util.ArrayList; import java.util.List; import org.apache.sp