Spark SQL Catalyst源码分析之UDF

/** Spark SQL源码分析系列文章*/

在SQL的世界里,除了官方提供的常用的处理函数之外,一般都会提供可扩展的对外自定义函数接口,这已经成为一种事实的标准。

在前面Spark SQL源码分析之核心流程一文中,已经介绍了Spark SQL Catalyst
Analyzer的作用,其中包含了ResolveFunctions这个解析函数的功能。但是随着Spark1.1版本的发布,Spark SQL的代码有很多新完善和新功能了,和我先前基于1.0的源码分析多少有些不同,比如支持UDF:

spark1.0及以前的实现:

  protected[sql] lazy val catalog: Catalog = new SimpleCatalog
  @transient
  protected[sql] lazy val analyzer: Analyzer =
    new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = true) //EmptyFunctionRegistry空实现
  @transient
  protected[sql] val optimizer = Optimizer

  Spark1.1及以后的实现:

  protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry //SimpleFunctionRegistry实现,支持简单的UDF

  @transient
  protected[sql] lazy val analyzer: Analyzer =
    new Analyzer(catalog, functionRegistry, caseSensitive = true)

一、引子:

对于SQL语句中的函数,会经过SqlParser的的解析成UnresolvedFunction。UnresolvedFunction最后会被Analyzer解析。

SqlParser:

除了非官方定义的函数外,还可以定义自定义函数,sql parser会进行解析。

  ident ~ "(" ~ repsep(expression, ",") <~ ")" ^^ {
      case udfName ~ _ ~ exprs => UnresolvedFunction(udfName, exprs)

将SqlParser传入的udfName和exprs封装成一个class class UnresolvedFunction继承自Expression。

只是这个Expression的dataType等一系列属性和eval计算方法均无法访问,强制访问会抛出异常,因为它没有被Resolved,只是一个载体。

case class UnresolvedFunction(name: String, children: Seq[Expression]) extends Expression {
  override def dataType = throw new UnresolvedException(this, "dataType")
  override def foldable = throw new UnresolvedException(this, "foldable")
  override def nullable = throw new UnresolvedException(this, "nullable")
  override lazy val resolved = false

  // Unresolved functions are transient at compile time and don't get evaluated during execution.
  override def eval(input: Row = null): EvaluatedType =
    throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")

  override def toString = s"'$name(${children.mkString(",")})"
}<strong></strong>

Analyzer:

Analyzer初始化的时候会需要Catalog,database和table的元数据关系,以及FunctionRegistry来维护UDF名称和UDF实现的元数据,这里使用SimpleFunctionRegistry。

  /**
   * Replaces [[UnresolvedFunction]]s with concrete [[catalyst.expressions.Expression Expressions]].
   */
  object ResolveFunctions extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
      case q: LogicalPlan =>
        q transformExpressions { //对当前LogicalPlan进行transformExpressions操作
          case u @ UnresolvedFunction(name, children) if u.childrenResolved => //如果遍历到了UnresolvedFunction
            registry.lookupFunction(name, children) //从UDF元数据表里查找udf函数
        }
    }
  }

二、UDF注册

2.1 UDFRegistration

registerFunction("len", (x:String)=>x.length)

registerFunction是UDFRegistration下的方法,SQLContext现在实现了UDFRegistration这个trait,只要导入SQLContext,即可以使用udf功能。

UDFRegistration核心方法registerFunction:

registerFunction方法签名def registerFunction[T: TypeTag](name: String, func: Function1[_, T]): Unit

接受一个udfName 和 一个FunctionN,可以是Function1 到Function22。即这个udf的参数只支持1-22个。(scala的痛啊)

内部builder通过ScalaUdf来构造一个Expression,这里ScalaUdf继承自Expression(可以简单的理解目前的SimpleUDF即是一个Catalyst的一个Expression),传入scala的function作为UDF的实现,并且用反射检查字段类型是否是Catalyst允许的,见ScalaReflection.

    def registerFunction[T: TypeTag](name: String, func: Function1[_, T]): Unit = {
    def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)//构造Expression
    functionRegistry.registerFunction(name, builder)//向SQLContext的functionRegistry(维护了一个hashMap来管理udf映射)注册
  }

2.2 注册Function:

注意:这里FunctionBuilder是一个type FunctionBuilder = Seq[Expression] => Expression

class SimpleFunctionRegistry extends FunctionRegistry {
  val functionBuilders = new mutable.HashMap[String, FunctionBuilder]() //udf映射关系维护[udfName,Expression]

  def registerFunction(name: String, builder: FunctionBuilder) = { //put expression进Map
    functionBuilders.put(name, builder)
  }

  override def lookupFunction(name: String, children: Seq[Expression]): Expression = {
    functionBuilders(name)(children) //查找udf,返回Expression
  }
}

至此,我们将一个scala function注册为一个catalyst的一个Expression,这就是spark的simple udf。

三、UDF计算:

UDF既然已经被封装为catalyst树里的一个Expression节点,那么计算的时候也就是计算ScalaUdf的eval方法。

先通过Row和表达式计算function所需要的参数,最后通过反射调用function,来达到计算udf的目的。

ScalaUdf继承自Expression:

scalaUdf接受一个function, dataType,和一系列表达式。

比较简单,看注释即可:

case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expression])
  extends Expression {

  type EvaluatedType = Any

  def nullable = true

  override def toString = s"scalaUDF(${children.mkString(",")})"
 override def eval(input: Row): Any = {
    val result = children.size match {
      case 0 => function.asInstanceOf[() => Any]()
      case 1 => function.asInstanceOf[(Any) => Any](children(0).eval(input)) //反射调用function
      case 2 =>
        function.asInstanceOf[(Any, Any) => Any](
          children(0).eval(input), //表达式参数计算
          children(1).eval(input))
      case 3 =>
        function.asInstanceOf[(Any, Any, Any) => Any](
          children(0).eval(input),
          children(1).eval(input),
          children(2).eval(input))
      case 4 =>
     ......
       case 22 => //scala function只支持22个参数,这里枚举了。
        function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](
          children(0).eval(input),
          children(1).eval(input),
          children(2).eval(input),
          children(3).eval(input),
          children(4).eval(input),
          children(5).eval(input),
          children(6).eval(input),
          children(7).eval(input),
          children(8).eval(input),
          children(9).eval(input),
          children(10).eval(input),
          children(11).eval(input),
          children(12).eval(input),
          children(13).eval(input),
          children(14).eval(input),
          children(15).eval(input),
          children(16).eval(input),
          children(17).eval(input),
          children(18).eval(input),
          children(19).eval(input),
          children(20).eval(input),
          children(21).eval(input))

四、总结

Spark目前的UDF其实就是scala function。将scala function封装到一个Catalyst Expression当中,在进行sql计算时,使用同样的Eval方法对当前输入Row进行计算。

编写一个spark udf非常简单,只需给UDF起个函数名,并且传递一个scala function即可。依靠scala函数编程的表现能力,使得编写scala udf比较简单,且相较hive的udf更容易使人理解。

——EOF——

原创文章,转载请注明出自:http://blog.csdn.net/oopsoom/article/details/39395641

时间: 2024-11-06 14:13:43

Spark SQL Catalyst源码分析之UDF的相关文章

第八篇:Spark SQL Catalyst源码分析之UDF

/** Spark SQL源码分析系列文章*/ 在SQL的世界里,除了官方提供的常用的处理函数之外,一般都会提供可扩展的对外自定义函数接口,这已经成为一种事实的标准. 在前面Spark SQL源码分析之核心流程一文中,已经介绍了Spark SQL Catalyst Analyzer的作用,其中包含了ResolveFunctions这个解析函数的功能.但是随着Spark1.1版本的发布,Spark SQL的代码有很多新完善和新功能了,和我先前基于1.0的源码分析多少有些不同,比如支持UDF: sp

Spark SQL Catalyst源码分析之Physical Plan 到 RDD的具体实现

接上一篇文章Spark SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节: 我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD. lazy val toRdd: RDD[Row] = executedPlan.execute() Spark Plan基本包含4种操作类型,即BasicOperator基本类型,还有就是Join.Aggregate和Sort这种

Spark SQL Catalyst源码分析之TreeNode Library

前几篇文章介绍了Spark SQL的Catalyst的SqlParser,和Analyzer,本来打算直接写Optimizer的,但是发现忘记介绍TreeNode这个Catalyst的核心概念,介绍这个可以更好的理解Optimizer是如何对Analyzed Logical Plan进行优化的生成Optimized Logical Plan,本文就将TreeNode基本架构进行解释. 一.TreeNode类型 TreeNode Library是Catalyst的核心类库,语法树的构建都是由一个个

Spark SQL Catalyst源码分析之Optimizer

前几篇文章介绍了Spark SQL的Catalyst的核心运行流程.SqlParser,和Analyzer 以及核心类库TreeNode,本文将详细讲解Spark SQL的Optimizer的优化思想以及Optimizer在Catalyst里的表现方式,并加上自己的实践,对Optimizer有一个直观的认识. Optimizer的主要职责是将Analyzer给Resolved的Logical Plan根据不同的优化策略Batch,来对语法树进行优化,优化逻辑计划节点(Logical Plan)以

Spark SQL Catalyst源码分析之Physical Plan

前面几篇文章主要介绍的是spark sql包里的的spark sql执行流程,以及Catalyst包内的SqlParser,Analyzer和Optimizer,最后要介绍一下Catalyst里最后的一个Plan了,即Physical Plan.物理计划是Spark SQL执行Spark job的前置,也是最后一道计划. 如图: 一.SparkPlanner 话接上回,Optimizer接受输入的Analyzed Logical Plan后,会有SparkPlanner来对Optimized L

第三篇:Spark SQL Catalyst源码分析之Analyzer

/** Spark SQL源码分析系列文章*/ 前面几篇文章讲解了Spark SQL的核心执行流程和Spark SQL的Catalyst框架的Sql Parser是怎样接受用户输入sql,经过解析生成Unresolved Logical Plan的.我们记得Spark SQL的执行流程中另一个核心的组件式Analyzer,本文将会介绍Analyzer在Spark SQL里起到了什么作用. Analyzer位于Catalyst的analysis package下,主要职责是将Sql Parser

第二篇:Spark SQL Catalyst源码分析之SqlParser

/** Spark SQL源码分析系列文章*/ Spark SQL的核心执行流程我们已经分析完毕,可以参见Spark SQL核心执行流程,下面我们来分析执行流程中各个核心组件的工作职责. 本文先从入口开始分析,即如何解析SQL文本生成逻辑计划的,主要设计的核心组件式SqlParser是一个SQL语言的解析器,用scala实现的Parser将解析的结果封装为Catalyst TreeNode ,关于Catalyst这个框架后续文章会介绍. 一.SQL Parser入口 Sql Parser 其实是

第六篇:Spark SQL Catalyst源码分析之Physical Plan

/** Spark SQL源码分析系列文章*/ 前面几篇文章主要介绍的是spark sql包里的的spark sql执行流程,以及Catalyst包内的SqlParser,Analyzer和Optimizer,最后要介绍一下Catalyst里最后的一个Plan了,即Physical Plan.物理计划是Spark SQL执行Spark job的前置,也是最后一道计划. 如图: 一.SparkPlanner 话接上回,Optimizer接受输入的Analyzed Logical Plan后,会有S

第四篇:Spark SQL Catalyst源码分析之TreeNode Library

/** Spark SQL源码分析系列文章*/ 前几篇文章介绍了Spark SQL的Catalyst的核心运行流程.SqlParser,和Analyzer,本来打算直接写Optimizer的,但是发现忘记介绍TreeNode这个Catalyst的核心概念,介绍这个可以更好的理解Optimizer是如何对Analyzed Logical Plan进行优化的生成Optimized Logical Plan,本文就将TreeNode基本架构进行解释. 一.TreeNode类型 TreeNode Lib