sparkSQL1.1入门之二:sparkSQL运行架构

在介绍sparkSQL之前,我们首先来看看,传统的关系型数据库是怎么运行的。当我们提交了一个很简单的查询:

SELECT  a1,a2,a3  FROM  tableA  Where  condition 

可以看得出来,该语句是由Projection(a1,a2,a3)、Data Source(tableA)、Filter(condition)组成,分别对应sql查询过程中的Result、Data Source、Operation,也就是说SQL语句按Result-->Data Source-->Operation的次序来描述的。那么,SQL语句在实际的运行过程中是怎么处理的呢?一般的数据库系统先将读入的SQL语句(Query)先进行解析(Parse),分辨出SQL语句中哪些词是关键词(如SELECT、FROM、WHERE),哪些是表达式、哪些是Projection、哪些是Data
Source等等。这一步就可以判断SQL语句是否规范,不规范就报错,规范就继续下一步过程绑定(Bind),这个过程将SQL语句和数据库的数据字典(列、表、视图等等)进行绑定,如果相关的Projection、Data Source等等都是存在的话,就表示这个SQL语句是可以执行的;而在执行前,一般的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库会在这些计划中选择一个最优计划(Optimize),最终执行该计划(Execute),并返回结果。当然在实际的执行过程中,是按Operation-->Data
Source-->Result的次序来进行的,和SQL语句的次序刚好相反;在执行过程有时候甚至不需要读取物理表就可以返回结果,比如重新运行刚运行过的SQL语句,可能直接从数据库的缓冲池中获取返回结果。

以上过程看上去非常简单,但实际上会包含很多复杂的操作细节在里面。而这些操作细节都和Tree有关,在数据库解析(Parse)SQL语句的时候,会将SQL语句转换成一个树型结构来进行处理,如下面一个查询,会形成一个含有多个节点(TreeNode)的Tree,然后在后续的处理过程中对该Tree进行一系列的操作。

下图给出了对Tree的一些可能的操作细节,对于Tree的处理过程中所涉及更多的细节,可以查看相关的数据库论文。

OK,上面简单介绍了关系型数据库的运行过程,那么,sparkSQL是不是也采用类似的方式处理呢?答案是肯定的。下面我们先来看看sparkSQL中的两个重要概念Tree和Rule、然后再介绍一下sparkSQL的两个分支sqlContext和hiveContext、最后再综合看看sparkSQL的优化器Catalyst。

1:Tree和Rule

sparkSQL对SQL语句的处理和关系型数据库对SQL语句的处理采用了类似的方法,首先会将SQL语句进行解析(Parse),然后形成一个Tree,在后续的如绑定、优化等处理过程都是对Tree的操作,而操作的方法是采用Rule,通过模式匹配,对不同类型的节点采用不同的操作。

A:Tree

  • Tree的相关代码定义在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees
  • Logical Plans、Expressions、Physical Operators都可以使用Tree表示
  • Tree的具体操作是通过TreeNode来实现的
    • sparkSQL定义了catalyst.trees的日志,通过这个日志可以形象的表示出树的结构
    • TreeNode可以使用scala的集合操作方法(如foreach, map, flatMap, collect等)进行操作
    • 有了TreeNode,通过Tree中各个TreeNode之间的关系,可以对Tree进行遍历操作,如使用transformDown、transformUp将Rule应用到给定的树段,然后用结果替代旧的树段;也可以使用transformChildrenDown、transformChildrenUp对一个给定的节点进行操作,通过迭代将Rule应用到该节点以及子节点。

  • TreeNode可以细分成三种类型的Node:

    • UnaryNode 一元节点,即只有一个子节点。如Limit、Filter操作
    • BinaryNode 二元节点,即有左右子节点的二叉节点。如Jion、Union操作
    • LeafNode 叶子节点,没有子节点的节点。主要用户命令类操作,如SetCommand

B:Rule

  • Rule的相关代码定义在sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules
  • Rule在sparkSQL的analyzer、optimizer、SparkPlan等各个组件中都有应用到
  • Rule是一个抽象类,具体的Rule实现是通过RuleExecutor完成
  • Rule通过定义batch和batchs,可以简便的、模块化地对Tree进行transform操作
  • Rule通过定义Once和FixedPoint,可以对Tree进行一次操作或多次操作(如对某些Tree进行多次迭代操作的时候,达到FixedPoint次数迭代或达到前后两次的树结构没变化才停止操作,具体参看RuleExecutor.apply)

拿个简单的例子,在处理由解析器(SqlParse)生成的LogicPlan Tree的时候,在Analyzer中就定义了多种Rules应用到LogicPlan Tree上。

应用示意图:

Analyzer中使用的Rules,定义了batches,由多个batch构成,如MultiInstanceRelations、Resolution、Check Analysis、AnalysisOperators等构成;每个batch又有不同的rule构成,如Resolution由ResolveReferences 、ResolveRelations、ResolveSortReferences 、NewRelationInstances等构成;每个rule又有自己相对应的处理函数,可以具体参看Analyzer中的ResolveReferences
、ResolveRelations、ResolveSortReferences 、NewRelationInstances函数;同时要注意的是,不同的rule应用次数是不同的:如CaseInsensitiveAttributeReferences这个batch中rule只应用了一次(Once),而Resolution这个batch中的rule应用了多次(fixedPoint = FixedPoint(100),也就是说最多应用100次,除非前后迭代结果一致退出)。

在整个sql语句的处理过程中,Tree和Rule相互配合,完成了解析、绑定(在sparkSQL中称为Analysis)、优化、物理计划等过程,最终生成可以执行的物理计划。

知道了sparkSQL的各个过程的基本处理方式,下面来看看sparkSQL的运行过程。sparkSQL有两个分支,sqlContext和hivecontext,sqlContext现在只支持sql语法解析器(SQL-92语法);hiveContext现在支持sql语法解析器和hivesql语法解析器,默认为hivesql语法解析器,用户可以通过配置切换成sql语法解析器,来运行hiveql不支持的语法,如select 1。关于sqlContext和hiveContext的具体应用请参看第六部分。

2:sqlContext的运行过程

sqlContext是使用sqlContext.sql(sqlText)来提交用户sql语句:

/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala  */
  def sql(sqlText: String): SchemaRDD = {
    if (dialect == "sql") {
      new SchemaRDD(this, parseSql(sqlText))   //parseSql(sqlText)对sql语句进行语法解析
    } else {
      sys.error(s"Unsupported SQL dialect: $dialect")
    }
  }

sqlContext.sql的返回结果是SchemaRDD,调用了new SchemaRDD(this, parseSql(sqlText)) 来对sql语句进行处理,处理之前先使用catalyst.SqlParser对sql语句进行语法解析,使之生成UnresolvedLogicalPlan。

/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala  */
  protected[sql] val parser = new catalyst.SqlParser
  protected[sql] def parseSql(sql: String): LogicalPlan = parser(sql)

类SchemaRDD继承自SchemaRDDLike

/**源自sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala  */
class SchemaRDD(
    @transient val sqlContext: SQLContext,
    @transient val baseLogicalPlan: LogicalPlan)
  extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike

SchemaRDDLike中调用sqlContext.executePlan(baseLogicalPlan)来执行catalyst.SqlParser解析后生成UnresolvedLogicalPlan,这里的baseLogicalPlan就是指UnresolvedLogicalPlan。

/**源自sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala  */
private[sql] trait SchemaRDDLike {
  @transient val sqlContext: SQLContext
  @transient val baseLogicalPlan: LogicalPlan
  private[sql] def baseSchemaRDD: SchemaRDD

  lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)

sqlContext.executePlan做了什么呢?它调用了QueryExecution类

/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala  */
protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
    new this.QueryExecution { val logical = plan }

QueryExecution类的定义:

/**源自sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala  */
protected abstract class QueryExecution {
    def logical: LogicalPlan

    //对Unresolved LogicalPlan进行analyzer,生成resolved LogicalPlan
    lazy val analyzed = ExtractPythonUdfs(analyzer(logical))
    //对resolved LogicalPlan进行optimizer,生成optimized LogicalPlan
    lazy val optimizedPlan = optimizer(analyzed)
    // 将optimized LogicalPlan转换成PhysicalPlan
    lazy val sparkPlan = {
      SparkPlan.currentContext.set(self)
      planner(optimizedPlan).next()
    }
    // PhysicalPlan执行前的准备工作,生成可执行的物理计划
    lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

    //执行可执行物理计划
    lazy val toRdd: RDD[Row] = executedPlan.execute()

    ......
  }

sqlContext总的一个过程如下图所示:

  1. SQL语句经过SqlParse解析成UnresolvedLogicalPlan;
  2. 使用analyzer结合数据数据字典(catalog)进行绑定,生成resolvedLogicalPlan;
  3. 使用optimizer对resolvedLogicalPlan进行优化,生成optimizedLogicalPlan;
  4. 使用SparkPlan将LogicalPlan转换成PhysicalPlan;
  5. 使用prepareForExecution()将PhysicalPlan转换成可执行物理计划;
  6. 使用execute()执行可执行物理计划;
  7. 生成SchemaRDD。

在整个运行过程中涉及到多个sparkSQL的组件,如SqlParse、analyzer、optimizer、SparkPlan等等,其功能和实现在下一章节中详解。

3:hiveContext的运行过程

在分布式系统中,由于历史原因,很多数据已经定义了hive的元数据,通过这些hive元数据,sparkSQL使用hiveContext很容易实现对这些数据的访问。值得注意的是hiveContext继承自sqlContext,所以在hiveContext的的运行过程中除了override的函数和变量,可以使用和sqlContext一样的函数和变量。

从sparkSQL1.1开始,hiveContext使用hiveContext.sql(sqlText)来提交用户sql语句进行查询:

/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */
override def sql(sqlText: String): SchemaRDD = {
    // 使用spark.sql.dialect定义采用的语法解析器
    if (dialect == "sql") {
      super.sql(sqlText)    //如果使用sql解析器,则使用sqlContext的sql方法
    } else if (dialect == "hiveql") {     //如果使用和hiveql解析器,则使用HiveQl.parseSql
      new SchemaRDD(this, HiveQl.parseSql(sqlText))
    }  else {
      sys.error(s"Unsupported SQL dialect: $dialect.  Try 'sql' or 'hiveql'")
    }
  }

hiveContext.sql首先根据用户的语法设置(spark.sql.dialect)决定具体的执行过程,如果dialect == "sql"则采用sqlContext的sql语法执行过程;如果是dialect
== "hiveql",则采用hiveql语法执行过程。在这里我们主要看看hiveql语法执行过程。可以看出,hiveContext.sql调用了new SchemaRDD(this, HiveQl.parseSql(sqlText))对hiveql语句进行处理,处理之前先使用对语句进行语法解析。

/**源自src/main/scala/org/apache/spark/sql/hive/HiveQl.scala  */
  /** Returns a LogicalPlan for a given HiveQL string. */
  def parseSql(sql: String): LogicalPlan = {
    try {
      if (条件)   {
      //非hive命令的处理,如set、cache table、add jar等直接转化成command类型的LogicalPlan
      .....
      } else {
        val tree = getAst(sql)
        if (nativeCommands contains tree.getText) {
          NativeCommand(sql)
        } else {
          nodeToPlan(tree) match {
            case NativePlaceholder => NativeCommand(sql)
            case other => other
          }
        }
      }
    } catch {
      //异常处理
      ......
    }
  }

因为sparkSQL所支持的hiveql除了兼容hive语句外,还兼容一些sparkSQL本身的语句,所以在HiveQl.parseSql对hiveql语句语法解析的时候:

  • 首先考虑一些非hive语句的处理,这些命令属于sparkSQL本身的命令语句,如设置sparkSQL运行参数的set命令、cache
    table、add jar等,将这些语句转换成command类型的LogicalPlan;
  • 如果是hive语句,则调用getAst(sql)使用hive的ParseUtils将该语句先解析成AST树,然后根据AST树中的关键字进行转换:类似命令型的语句、DDL类型的语句转换成command类型的LogicalPlan;其他的转换通过nodeToPlan转换成LogicalPlan。
/**源自src/main/scala/org/apache/spark/sql/hive/HiveQl.scala  */
/** * Returns the AST for the given SQL string.    */
  def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql))

和sqlContext一样,类SchemaRDD继承自SchemaRDDLike
,SchemaRDDLike调用sqlContext.executePlan(baseLogicalPlan),不过hiveContext重写了executePlan()函数,

/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */
override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
    new this.QueryExecution { val logical = plan }

并使用了一个继承自sqlContext.QueryExecution的新的QueryExecution类:

/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */
 protected[sql] abstract class QueryExecution extends super.QueryExecution {
    // TODO: Create mixin for the analyzer instead of overriding things here.
    override lazy val optimizedPlan =
      optimizer(ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed))))

    override lazy val toRdd: RDD[Row] = executedPlan.execute().map(_.copy())
    ......
  }

所以在hiveContext的运行过程基本和sqlContext一致,除了override的catalog、functionRegistry、analyzer、planner、optimizedPlan、toRdd。

hiveContext的catalog,是指向 Hive Metastore:

/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */
/* A catalyst metadata catalog that points to the Hive Metastore. */
  @transient
  override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog {
    override def lookupRelation(
      databaseName: Option[String],
      tableName: String,
      alias: Option[String] = None): LogicalPlan = {

      LowerCaseSchema(super.lookupRelation(databaseName, tableName, alias))
    }
  }

hiveContext的analyzer,使用了新的catalog和functionRegistry:

/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */
  /* An analyzer that uses the Hive metastore. */
  @transient
  override protected[sql] lazy val analyzer =
    new Analyzer(catalog, functionRegistry, caseSensitive = false)

hiveContext的planner,使用新定义的hivePlanner:

/**源自sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala  */
  @transient
  override protected[sql] val planner = hivePlanner

所以hiveContext总的一个过程如下图所示:

  1. SQL语句经过HiveQl.parseSql解析成UnresolvedLogicalPlan,在这个解析过程中对hiveql语句使用getAst()获取AST树,然后再进行解析;
  2. 使用analyzer结合数据hive源数据Metastore(新的catalog)进行绑定,生成resolvedLogicalPlan;
  3. 使用optimizer对resolvedLogicalPlan进行优化,生成optimizedLogicalPlan,优化前使用了ExtractPythonUdfs(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))进行预处理;
  4. 使用hivePlanner将LogicalPlan转换成PhysicalPlan;
  5. 使用prepareForExecution()将PhysicalPlan转换成可执行物理计划;
  6. 使用execute()执行可执行物理计划;
  7. 执行后,使用map(_.copy)将结果导入SchemaRDD。

hiveContxt还有很多针对hive的特性,更细节的内容参看源码。

4:catalyst优化器

sparkSQL1.1总体上由四个模块组成:core、catalyst、hive、hive-Thriftserver:

  • core处理数据的输入输出,从不同的数据源获取数据(RDD、Parquet、json等),将查询结果输出成schemaRDD;
  • catalyst处理查询语句的整个处理过程,包括解析、绑定、优化、物理计划等,说其是优化器,还不如说是查询引擎;
  • hive对hive数据的处理
  • hive-ThriftServer提供CLI和JDBC/ODBC接口

在这四个模块中,catalyst处于最核心的部分,其性能优劣将影响整体的性能。由于发展时间尚短,还有很多不足的地方,但其插件式的设计,为未来的发展留下了很大的空间。下面是catalyst的一个设计图:

其中虚线部分是以后版本要实现的功能,实线部分是已经实现的功能。从上图看,catalyst主要的实现组件有:

  • sqlParse,完成sql语句的语法解析功能,目前只提供了一个简单的sql解析器;
  • Analyzer,主要完成绑定工作,将不同来源的UnresolvedLogicalPlan 和数据元数据(如hive metastore、Schema catalog)进行绑定,生成resolvedLogicalPlan;
  • optimizer对resolvedLogicalPlan进行优化,生成optimizedLogicalPlan;
  • Planner将LogicalPlan转换成PhysicalPlan;
  • CostModel,主要根据过去的性能统计数据,选择最佳的物理执行计划

这些组件的基本实现方法:

  • 先将sql语句通过解析生成Tree,然后在不同阶段使用不同的Rule应用到Tree上,通过转换完成各个组件的功能。
  • Analyzer使用Analysis Rules,配合数据元数据(如hive metastore、Schema catalog),完善UnresolvedLogicalPlan的属性而转换成resolvedLogicalPlan;
  • optimizer使用Optimization Rules,对resolvedLogicalPlan进行合并、列裁剪、过滤器下推等优化作业而转换成optimizedLogicalPlan;
  • Planner使用Planning Strategies,对optimizedLogicalPlan

关于本篇中涉及到的相关概念和组件在下篇再详细介绍。

时间: 2024-10-27 05:34:23

sparkSQL1.1入门之二:sparkSQL运行架构的相关文章

sparkSQL1.1入门之三:sparkSQL组件之解析

上篇在总体上介绍了sparkSQL的运行架构及其基本实现方法(Tree和Rule的配合),也大致介绍了sparkSQL中涉及到的各个概念和组件.本篇将详细地介绍一下关键的一些概念和组件,由于hiveContext继承自sqlContext,关键的概念和组件类似,只不过后者针对hive的特性做了一些修正和重写,所以本篇就只介绍sqlContext的关键的概念和组件. 概念: LogicalPlan 组件: SqlParser Analyzer Optimizer Planner 1:Logical

sparkSQL1.1入门之八:sparkSQL之综合应用

Spark之所以万人瞩目,除了内存计算,还有其ALL-IN-ONE的特性,实现了One stack rule them all.下面简单模拟了几个综合应用场景,不仅使用了sparkSQL,还使用了其他Spark组件: 店铺分类,根据销售额对店铺分类 货品调拨,根据货品的销售数量和店铺之间的距离进行货品调拨 前者将使用sparkSQL+MLlib的聚类算法,后者将使用sparkSQL+GraphX算法.本实验采用IntelliJ IDEA调试代码,最后生成doc.jar,然后使用spark-sub

sparkSQL1.1入门之六:sparkSQL之基础应用

sparkSQL1.1对数据的查询分成了2个分支:sqlContext 和 hiveContext. 在sqlContext中,sparkSQL可以使用SQL-92语法对定义的表进行查询,表的源数据可以来自: RDD parquet文件 json文件 在hiveContext中,sparkSQL可以使用HQL语法,对hive数据进行查询,sparkSQL1.1支持hive0.12的HQL语法:如果遇上不支持的语法,用户可以通过更改配置切换到sql语法.笔者猜测,从spark1.1开始,将打开sq

Flink入门(二)——Flink架构介绍

1.基本组件栈 了解Spark的朋友会发现Flink的架构和Spark是非常类似的,在整个软件架构体系中,同样遵循着分层的架构设计理念,在降低系统耦合度的同时,也为上层用户构建Flink应用提供了丰富且友好的接口. https://mmbiz.qpic.cn/mmbiz_png/mqibsuEhdUyIVKMN1mHneQiantTzuhJYqwSD0k9gn8RCcJZHeD19KxcLj8ydCUr9KuepDWu6fk2J47oKx6dyQlfQ/640?wx_fmt=png&wxfrom

sparkSQL1.1入门之一:为什么sparkSQL

2014年9月11日,Spark1.1.0忽然之间发布.笔者立即下载.编译.部署了Spark1.1.0.关于Spark1.1的编译和部署,请参看笔者博客Spark1.1.0 源码编译和部署包生成 . Spark1.1.0中变化较大是sparkSQL和MLlib,sparkSQL1.1.0主要的变动有: 增加了JDBC/ODBC Server(ThriftServer),用户可以在应用程序中连接到SparkSQL并使用其中的表和缓存表. 增加了对JSON文件的支持 增加了对parquet文件的本地

sparkSQL1.1入门之十:总结

回顾一下,在前面几章中,就sparkSQL1.1.0基本概念.运行架构.基本操作和实用工具做了基本介绍. 基本概念: SchemaRDD Rule Tree LogicPlan Parser Analyzer Optimizer SparkPlan 运行架构: sqlContext运行架构 hiveContext运行架构 基本操作 原生RDD的操作 parquet文件的操作 json文件的操作 hive数据的操作 和其他spark组件混合使用 实用工具 hive/console的操作 CLI的配

Spark入门实战系列--4.Spark运行架构

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1. Spark运行架构 1.1 术语定义 lApplication:Spark Application的概念和Hadoop MapReduce中的类似,指的是用户编写的Spark应用程序,包含了一个Driver 功能的代码和分布在集群中多个节点上运行的Executor代码: lDriver:Spark中的Driver即运行上述Application的main()函数并且创建SparkContext

区块链快速入门(二)——分布式系统核心技术

区块链快速入门(二)--分布式系统核心技术 一.分布式系统的一致性问题 1.分布式系统的一致性问题 随着摩尔定律碰到瓶颈,越来越多情况下要依靠可扩展的分布式架构来实现海量处理能力.单点结构演变到分布式结构,首要解决的问题就是数据的一致性.如果分布式集群中多个节点不能保证处理结果的一致性,建立在其上的业务系统将无法正常工作.区块链系统是一个典型的分布式系统,在设计上必然也要考虑一致性问题.在面向大规模复杂任务场景时,单点的服务往往难以解决可扩展(Scalability)和容错(Fault-tole

C语言快速入门教程(二)

C语言快速入门教程(二) C语言的基本语法 本节学习路线图: 引言: C语言,顾名思义就是一门语言,可以类比一下英语; 你要说出一个英语的句子需要:  单词  +  语法!  将单词按照一定的语法拼凑起来就成了一个英语句子了; C语言同样是这样,只不过单词可以理解为一些固定的知识点,而语法可以理解为算法(可以理解为解决问题的方法) 在这一节中我们就对固定知识点中的语言描述与数据存储进行解析! 1.C语言的基本元素 1.1  标识符 什么是标识符? 答:在C语言中,符号常量,变量,数组,函数等都需