Spark SQL之External DataSource外部数据源(二)源码分析

上周Spark1.2刚发布,周末在家没事,把这个特性给了解一下,顺便分析下源码,看一看这个特性是如何设计及实现的。

/** Spark SQL源码分析系列文章*/

(Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)示例 http://blog.csdn.net/oopsoom/article/details/42061077

一、Sources包核心

Spark SQL在Spark1.2中提供了External DataSource API,开发者可以根据接口来实现自己的外部数据源,如avro, csv, json, parquet等等。

在Spark SQL源代码的org/spark/sql/sources目录下,我们会看到关于External DataSource的相关代码。这里特别介绍几个:

1、DDLParser 

专门负责解析外部数据源SQL的SqlParser,解析create temporary table xxx using options (key ‘value‘, key ‘value‘) 创建加载外部数据源表的语句。

 protected lazy val createTable: Parser[LogicalPlan] =
    CREATE ~ TEMPORARY ~ TABLE ~> ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
      case tableName ~ provider ~ opts =>
        CreateTableUsing(tableName, provider, opts)
    }

2、CreateTableUsing

一个RunnableCommand,通过反射从外部数据源lib中实例化Relation,然后注册到为temp table。

private[sql] case class CreateTableUsing(
    tableName: String,
    provider: String,  // org.apache.spark.sql.json
    options: Map[String, String]) extends RunnableCommand {

  def run(sqlContext: SQLContext) = {
    val loader = Utils.getContextOrSparkClassLoader
    val clazz: Class[_] = try loader.loadClass(provider) catch { //do reflection
      case cnf: java.lang.ClassNotFoundException =>
        try loader.loadClass(provider + ".DefaultSource") catch {
          case cnf: java.lang.ClassNotFoundException =>
            sys.error(s"Failed to load class for data source: $provider")
        }
    }
    val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider] //json包DefaultDataSource
    val relation = dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))//创建JsonRelation

    sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)//注册
    Seq.empty
  }
}

    2、DataSourcesStrategy

在 Strategy 一文中,我已讲过Streategy的作用,用来Plan生成物理计划的。这里提供了一种专门为了解析外部数据源的策略。

最后会根据不同的BaseRelation生产不同的PhysicalRDD。不同的BaseRelation的scan策略下文会介绍。

private[sql] object DataSourceStrategy extends Strategy {
  def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: CatalystScan)) =>
      pruneFilterProjectRaw(
        l,
        projectList,
        filters,
        (a, f) => t.buildScan(a, f)) :: Nil
    ......
    case l @ LogicalRelation(t: TableScan) =>
      execution.PhysicalRDD(l.output, t.buildScan()) :: Nil

    case _ => Nil
  }

3、interfaces.scala

该文件定义了一系列可扩展的外部数据源接口,对于想要接入的外部数据源,我们只需实现该接口即可。里面比较重要的trait RelationProvider 和 BaseRelation,下文会详细介绍。

4、filters.scala

该Filter定义了如何在加载外部数据源的时候,就进行过滤。注意哦,是加载外部数据源到Table里的时候,而不是Spark里进行filter。这个有点像hbase的coprocessor,查询过滤在Server上就做了,不在Client端做过滤。

5、LogicalRelation

封装了baseRelation,继承了catalyst的LeafNode,实现MultiInstanceRelation。

二、External DataSource注册流程

用spark sql下sql/json来做示例, 画了一张流程图,如下:

注册外部数据源的表的流程:

1、提供一个外部数据源文件,比如json文件。

2、提供一个实现了外部数据源所需要的interfaces的类库,比如sql下得json包,在1.2版本后改为了External Datasource实现。

3、引入SQLContext,使用DDL创建表,如create temporary table xxx using options (key ‘value‘, key ‘value‘)

4、External Datasource的DDLParser将对该SQL进行Parse

5、Parse后封装成为一个CreateTableUsing类的对象。该类是一个RunnableCommand,其run方法会直接执行创建表语句。

6、该类会通过反射来创建一个org.apache.spark.sql.sources.RelationProvider,该trait定义要createRelation,如json,则创建JSONRelation,若avro,则创建AvroRelation。

7、得到external releation后,直接调用SQLContext的baseRelationToSchemaRDD转换为SchemaRDD

8、最后registerTempTable(tableName) 来注册为Table,可以用SQL来查询了。

三、External DataSource解析流程

先看图,图如下:

Spark SQL解析SQL流程如下:

1、Analyzer通过Rule解析,将UnresolvedRelation解析为JsonRelation。

2、通过Parse,Analyzer,Optimizer最后得到JSONRelation(file:///path/to/shengli.json,1.0)

3、通过sources下得DataSourceStrategy将LogicalPlan映射到物理计划PhysicalRDD。

4、PhysicalRDD里包含了如何查询外部数据的规则,可以调用execute()方法来执行Spark查询。

四、External Datasource Interfaces

在第一节我已经介绍过,主要的interfaces,主要看一下BaseRelation和RelationProvider。

如果我们要实现一个外部数据源,比如avro数据源,支持spark sql操作avro file。那么久必须定义AvroRelation来继承BaseRelation。同时也要实现一个RelationProvider。

BaseRelation:

是外部数据源的抽象,里面存放了schema的映射,和如何scan数据的规则。

abstract class BaseRelation {
  def sqlContext: SQLContext
  def schema: StructType
abstract class PrunedFilteredScan extends BaseRelation {
  def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}

1、schema我们如果自定义Relation,必须重写schema,就是我们必须描述对于外部数据源的Schema。

2、buildScan我们定义如何查询外部数据源,提供了4种Scan的策略,对应4种BaseRelation。

我们支持4种BaseRelation,分为TableScan, PrunedScan,PrunedFilterScan,CatalystScan。

1、TableScan

默认的Scan策略。

2、PrunedScan

这里可以传入指定的列,requiredColumns,列裁剪,不需要的列不会从外部数据源加载。

3、PrunedFilterScan

在列裁剪的基础上,并且加入Filter机制,在加载数据也的时候就进行过滤,而不是在客户端请求返回时做Filter。

4、CatalystScan

Catalyst的支持传入expressions来进行Scan。支持列裁剪和Filter。

RelationProvider:

我们要实现这个,接受Parse后传入的参数,来生成对应的External Relation,就是一个反射生产外部数据源Relation的接口。

trait RelationProvider {
  /**
   * Returns a new base relation with the given parameters.
   * Note: the parameters‘ keywords are case insensitive and this insensitivity is enforced
   * by the Map that is passed to the function.
   */
  def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
}

五、External Datasource定义示例

在Spark1.2之后,json和parquet也改为通过实现External API来进行外部数据源查询的。

下面以json的外部数据源定义为示例,说明是如何实现的:

1、JsonRelation

定义处理对于json文件的,schema和Scan策略,均基于JsonRDD,细节可以自行阅读JsonRDD。

private[sql] case class JSONRelation(fileName: String, samplingRatio: Double)(
    @transient val sqlContext: SQLContext)
  extends TableScan {

  private def baseRDD = sqlContext.sparkContext.textFile(fileName) //读取json file

  override val schema =
    JsonRDD.inferSchema(  // jsonRDD的inferSchema方法,能自动识别json的schema,和类型type。
      baseRDD,
      samplingRatio,
      sqlContext.columnNameOfCorruptRecord)

  override def buildScan() =
    JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.columnNameOfCorruptRecord) //这里还是JsonRDD,调用jsonStringToRow查询返回Row
}

2、DefaultSource

parameters中可以获取到options中传入的path等自定义参数。

这里接受传入的参数,来狗仔JsonRelation。

private[sql] class DefaultSource extends RelationProvider {
  /** Returns a new base relation with the given parameters. */
  override def createRelation(
      sqlContext: SQLContext,
      parameters: Map[String, String]): BaseRelation = {
    val fileName = parameters.getOrElse("path", sys.error("Option ‘path‘ not specified"))
    val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)

    JSONRelation(fileName, samplingRatio)(sqlContext)
  }
}

六、总结

External DataSource源码分析下来,可以总结为3部分。

1、外部数据源的注册流程

2、外部数据源Table查询的计划解析流程

3、如何自定义一个外部数据源,重写BaseRelation定义外部数据源的schema和scan的规则。定义RelationProvider,如何生成外部数据源Relation。

External Datasource此部分API还有可能在后续的build中改动,目前只是涉及到了查询,关于其它的操作还未涉及。

——EOF——

原创文章,转载请注明出自:http://blog.csdn.net/oopsoom/article/details/42064075

时间: 2024-10-03 00:23:43

Spark SQL之External DataSource外部数据源(二)源码分析的相关文章

Spark SQL之External DataSource外部数据源(二)源代码分析

上周Spark1.2刚公布,周末在家没事,把这个特性给了解一下,顺便分析下源代码,看一看这个特性是怎样设计及实现的. /** Spark SQL源代码分析系列文章*/ (Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)演示样例 http://blog.csdn.net/oopsoom/article/details/42061077) 一.Sources包核心 Spark SQL在Spark1.2中提供了Exte

Spark SQL之External DataSource外部数据源(一)示例

一.Spark SQL External DataSource简介 随着Spark1.2的发布,Spark SQL开始正式支持外部数据源.Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现. 这使得Spark SQL支持了更多的类型数据源,如json, parquet, avro, csv格式.只要我们愿意,我们可以开发出任意的外部数据源来连接到Spark SQL.之前大家说的支持HBASE,Cassandra都可以用外部数据源的方式来实现无缝集成. (Ps: 关于Exter

第十一篇:Spark SQL 源码分析之 External DataSource外部数据源

上周Spark1.2刚发布,周末在家没事,把这个特性给了解一下,顺便分析下源码,看一看这个特性是如何设计及实现的. /** Spark SQL源码分析系列文章*/ (Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)示例 http://blog.csdn.net/oopsoom/article/details/42061077) 一.Sources包核心 Spark SQL在Spark1.2中提供了External

Spark大师之路:广播变量(Broadcast)源码分析

概述 最近工作上忙死了--广播变量这一块其实早就看过了,一直没有贴出来. 本文基于Spark 1.0源码分析,主要探讨广播变量的初始化.创建.读取以及清除. 类关系 BroadcastManager类中包含一个BroadcastFactory对象的引用.大部分操作通过调用BroadcastFactory中的方法来实现. BroadcastFactory是一个Trait,有两个直接子类TorrentBroadcastFactory.HttpBroadcastFactory.这两个子类实现了对Htt

Spark GraphX图计算【代码实现,源码分析】

一.简介 参考:https://www.cnblogs.com/yszd/p/10186556.html 二.代码实现 1 package big.data.analyse.graphx 2 3 import org.apache.log4j.{Level, Logger} 4 import org.apache.spark.graphx._ 5 import org.apache.spark.rdd.RDD 6 import org.apache.spark.sql.SparkSession

Spark技术内幕:Stage划分及提交源码分析

当触发一个RDD的action后,以count为例,调用关系如下: org.apache.spark.rdd.RDD#count org.apache.spark.SparkContext#runJob org.apache.spark.scheduler.DAGScheduler#runJob org.apache.spark.scheduler.DAGScheduler#submitJob org.apache.spark.scheduler.DAGSchedulerEventProcess

spark DAGScheduler、TaskSchedule、Executor执行task源码分析

摘要 spark的调度一直是我想搞清楚的东西,以及有向无环图的生成过程.task的调度.rdd的延迟执行是怎么发生的和如何完成的,还要就是RDD的compute都是在executor的哪个阶段调用和执行我们定义的函数的.这些都非常的基础和困难.花一段时间终于弄白了其中的奥秘.总结起来,以便以后继续完善.spark的调度分为两级调度:DAGSchedule和TaskSchedule.DAGSchedule是根据job来生成相互依赖的stages,然后把stages以TaskSet形式传递给Task

Spring整合MyBatis(二)源码分析

在Spring配置Mybatis的文件中我们可以看到如下代码: <!-- 扫描dao --> <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer"> <property name="basePackage" value="org.tarena.note.dao"> </property> MapperScannerConfig

【Spark SQL 源码分析系列文章】

从决定写Spark SQL源码分析的文章,到现在一个月的时间里,陆陆续续差不多快完成了,这里也做一个整合和索引,方便大家阅读,这里给出阅读顺序 :) 第一篇 Spark SQL源码分析之核心流程 第二篇 Spark SQL Catalyst源码分析之SqlParser 第三篇 Spark SQL Catalyst源码分析之Analyzer 第四篇 Spark SQL Catalyst源码分析之TreeNode Library 第五篇 Spark SQL Catalyst源码分析之Optimize