目前 Spark SQL 不支持自定义UDF ,底层 SQL 引擎用的 catalyst 。
在SqlContext 中 有一个 Analyzer给的一个EmptyFunctionRegistry ,如果 SQL 引擎函数中找不到了,会到这个FunctionRegistry 中找
EmptyFunctionRegistry 中lookup 只是抛出一个异常。
所以自定义了一个 FunctionRegistry ,SqlContext
@transient protected[sql]lazyval analyzer:Analyzer = newAnalyzer(catalog, EmptyFunctionRegistry, caseSensitive =true)
class UDFRgistry extends FunctionRegistry { def lookupFunction(name: String, children: Seq[Expression]): Expression = { name.toLowerCase match { case "col_set" =>Collect(children(0)) case "array" =>Array(children(0)) case "contains" =>Contains(children) case _ => throw new UnsupportedOperationException } } } class SparkSqlContext(val spctx: SparkContext) extends SQLContext(spctx) { @transient override lazy val analyzer: Analyzer = new Analyzer(catalog, new UDF.UDFRgistry, caseSensitive = true) }
这样就可以找到自定义的函数了。
时间: 2024-12-18 03:25:02