Spark SQL 源码分析之 In-Memory Columnar Storage 之 in-memory query

/** Spark
SQL源码分析系列文章
*/

前面讲到了Spark SQL In-Memory Columnar Storage的存储结构是基于列存储的。

那么基于以上存储结构,我们查询cache在jvm内的数据又是如何查询的,本文将揭示查询In-Memory Data的方式。

一、引子

本例使用hive console里查询cache后的src表。

select value from src

当我们将src表cache到了内存后,再次查询src,可以通过analyzed执行计划来观察内部调用。

即parse后,会形成InMemoryRelation结点,最后执行物理计划时,会调用InMemoryColumnarTableScan这个结点的方法。

如下:

scala> val exe = executePlan(sql("select value from src").queryExecution.analyzed)
14/09/26 10:30:26 INFO parse.ParseDriver: Parsing command: select value from src
14/09/26 10:30:26 INFO parse.ParseDriver: Parse Completed
exe: org.apache.spark.sql.hive.test.TestHive.QueryExecution =
== Parsed Logical Plan ==
Project [value#5]
 InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)

== Analyzed Logical Plan ==
Project [value#5]
 InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)

== Optimized Logical Plan ==
Project [value#5]
 InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)

== Physical Plan ==
InMemoryColumnarTableScan [value#5], (InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)) //查询内存中表的入口

Code Generation: false
== RDD ==

二、InMemoryColumnarTableScan

InMemoryColumnarTableScan是Catalyst里的一个叶子结点,包含了要查询的attributes,和InMemoryRelation(封装了我们缓存的In-Columnar Storage数据结构)。

执行叶子节点,出发execute方法对内存数据进行查询。

1、查询时,调用InMemoryRelation,对其封装的内存数据结构的每个分区进行操作。

2、获取要请求的attributes,如上,查询请求的是src表的value属性。

3、根据目的查询表达式,来获取在对应存储结构中,请求列的index索引。

4、通过ColumnAccessor来对每个buffer进行访问,获取对应查询数据,并封装为Row对象返回。

private[sql] case class InMemoryColumnarTableScan(
    attributes: Seq[Attribute],
    relation: InMemoryRelation)
  extends LeafNode {

  override def output: Seq[Attribute] = attributes

  override def execute() = {
    relation.cachedColumnBuffers.mapPartitions { iterator =>
      // Find the ordinals of the requested columns.  If none are requested, use the first.
      val requestedColumns = if (attributes.isEmpty) {
        Seq(0)
      } else {
        attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId)) //根据表达式exprId找出对应列的ByteBuffer的索引
      }

      iterator
        .map(batch => requestedColumns.map(batch(_)).map(ColumnAccessor(_)))//根据索引取得对应请求列的ByteBuffer,并封装为ColumnAccessor。
        .flatMap { columnAccessors =>
          val nextRow = new GenericMutableRow(columnAccessors.length) //Row的长度
          new Iterator[Row] {
            override def next() = {
              var i = 0
              while (i < nextRow.length) {
                columnAccessors(i).extractTo(nextRow, i) //根据对应index和长度,从byterbuffer里取得值,封装到row里
                i += 1
              }
              nextRow
            }

            override def hasNext = columnAccessors.head.hasNext
          }
        }
    }
  }
}

查询请求的列,如下:

scala> exe.optimizedPlan
res93: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
Project [value#5]
 InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)

scala> val relation =  exe.optimizedPlan(1)
relation: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)

scala> val request_relation = exe.executedPlan
request_relation: org.apache.spark.sql.execution.SparkPlan =
InMemoryColumnarTableScan [value#5], (InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None))

scala> request_relation.output //请求的列,我们请求的只有value列
res95: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(value#5)

scala> relation.output //默认保存在relation中的所有列
res96: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(key#4, value#5)

scala> val attributes = request_relation.output
attributes: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(value#5)

整个流程很简洁,关键步骤是第三步。根据ExprId来查找到,请求列的索引

attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))

//根据exprId找出对应ID
scala> val attr_index = attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
attr_index: Seq[Int] = ArrayBuffer(1) //找到请求的列value的索引是1, 我们查询就从Index为1的bytebuffer中,请求数据

scala> relation.output.foreach(e=>println(e.exprId))
ExprId(4)    //对应<span style="font-family: Arial, Helvetica, sans-serif;">[key#4,value#5]</span>
ExprId(5)

scala> request_relation.output.foreach(e=>println(e.exprId))
ExprId(5)

三、ColumnAccessor

ColumnAccessor对应每一种类型,类图如下:

最后返回一个新的迭代器:

          new Iterator[Row] {
            override def next() = {
              var i = 0
              while (i < nextRow.length) { //请求列的长度
                columnAccessors(i).extractTo(nextRow, i)//调用columnType.setField(row, ordinal, extractSingle(buffer))解析buffer
                i += 1
              }
              nextRow//返回解析后的row
            }

            override def hasNext = columnAccessors.head.hasNext
          }

四、总结

Spark SQL In-Memory Columnar Storage的查询相对来说还是比较简单的,其查询思想主要和存储的数据结构有关。

即存储时,按每列放到一个bytebuffer,形成一个bytebuffer数组。

查询时,根据请求列的exprId查找到上述数组的索引,然后使用ColumnAccessor对buffer中字段进行解析,最后封装为Row对象,返回。

——EOF——

原创文章,转载请注明出自:http://blog.csdn.net/oopsoom/article/details/39577419

时间: 2025-01-12 14:32:23

Spark SQL 源码分析之 In-Memory Columnar Storage 之 in-memory query的相关文章

Spark SQL源码分析之核心流程

自从去年Spark Submit 2013 Michael Armbrust分享了他的Catalyst,到至今1年多了,Spark SQL的贡献者从几人到了几十人,而且发展速度异常迅猛,究其原因,个人认为有以下2点: 1.整合:将SQL类型的查询语言整合到 Spark 的核心RDD概念里.这样可以应用于多种任务,流处理,批处理,包括机器学习里都可以引入Sql. 2.效率:因为Shark受到hive的编程模型限制,无法再继续优化来适应Spark模型里. 前一段时间测试过Shark,并且对Spark

Spark SQL 源码分析之 In-Memory Columnar Storage 之 cache table

/** Spark SQL源码分析系列文章*/ Spark SQL 可以将数据缓存到内存中,我们可以见到的通过调用cache table tableName即可将一张表缓存到内存中,来极大的提高查询效率. 这就涉及到内存中的数据的存储形式,我们知道基于关系型的数据可以存储为基于行存储结构 或 者基于列存储结构,或者基于行和列的混合存储,即Row Based Storage.Column Based Storage. PAX Storage. Spark SQL 的内存数据是如何组织的? Spar

第一篇:Spark SQL源码分析之核心流程

/** Spark SQL源码分析系列文章*/ 自从去年Spark Submit 2013 Michael Armbrust分享了他的Catalyst,到至今1年多了,Spark SQL的贡献者从几人到了几十人,而且发展速度异常迅猛,究其原因,个人认为有以下2点: 1.整合:将SQL类型的查询语言整合到 Spark 的核心RDD概念里.这样可以应用于多种任务,流处理,批处理,包括机器学习里都可以引入Sql.    2.效率:因为Shark受到hive的编程模型限制,无法再继续优化来适应Spark

【Spark SQL 源码分析系列文章】

从决定写Spark SQL源码分析的文章,到现在一个月的时间里,陆陆续续差不多快完成了,这里也做一个整合和索引,方便大家阅读,这里给出阅读顺序 :) 第一篇 Spark SQL源码分析之核心流程 第二篇 Spark SQL Catalyst源码分析之SqlParser 第三篇 Spark SQL Catalyst源码分析之Analyzer 第四篇 Spark SQL Catalyst源码分析之TreeNode Library 第五篇 Spark SQL Catalyst源码分析之Optimize

第十一篇:Spark SQL 源码分析之 External DataSource外部数据源

上周Spark1.2刚发布,周末在家没事,把这个特性给了解一下,顺便分析下源码,看一看这个特性是如何设计及实现的. /** Spark SQL源码分析系列文章*/ (Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)示例 http://blog.csdn.net/oopsoom/article/details/42061077) 一.Sources包核心 Spark SQL在Spark1.2中提供了External

第十篇:Spark SQL 源码分析之 In-Memory Columnar Storage源码分析之 query

/** Spark SQL源码分析系列文章*/ 前面讲到了Spark SQL In-Memory Columnar Storage的存储结构是基于列存储的. 那么基于以上存储结构,我们查询cache在jvm内的数据又是如何查询的,本文将揭示查询In-Memory Data的方式. 一.引子 本例使用hive console里查询cache后的src表. select value from src 当我们将src表cache到了内存后,再次查询src,可以通过analyzed执行计划来观察内部调用

第九篇:Spark SQL 源码分析之 In-Memory Columnar Storage源码分析之 cache table

/** Spark SQL源码分析系列文章*/ Spark SQL 可以将数据缓存到内存中,我们可以见到的通过调用cache table tableName即可将一张表缓存到内存中,来极大的提高查询效率. 这就涉及到内存中的数据的存储形式,我们知道基于关系型的数据可以存储为基于行存储结构 或 者基于列存储结构,或者基于行和列的混合存储,即Row Based Storage.Column Based Storage. PAX Storage. Spark SQL 的内存数据是如何组织的? Spar

第七篇:Spark SQL 源码分析之Physical Plan 到 RDD的具体实现

/** Spark SQL源码分析系列文章*/ 接上一篇文章Spark SQL Catalyst源码分析之Physical Plan,本文将介绍Physical Plan的toRDD的具体实现细节: 我们都知道一段sql,真正的执行是当你调用它的collect()方法才会执行Spark Job,最后计算得到RDD. [java] view plain copy lazy val toRdd: RDD[Row] = executedPlan.execute() Spark Plan基本包含4种操作

Spark SQL 源码分析系列文章

从决定写Spark SQL源码分析的文章,到现在一个月的时间里,陆陆续续差不多快完成了,这里也做一个整合和索引,方便大家阅读,这里给出阅读顺序 :) 第一篇 Spark SQL源码分析之核心流程 第二篇 Spark SQL Catalyst源码分析之SqlParser 第三篇 Spark SQL Catalyst源码分析之Analyzer 第四篇 Spark SQL Catalyst源码分析之TreeNode Library 第五篇 Spark SQL Catalyst源码分析之Optimize