第十篇:Spark SQL 源码分析之 In-Memory Columnar Storage源码分析之 query

/** Spark SQL源码分析系列文章*/

前面讲到了Spark SQL In-Memory Columnar Storage的存储结构是基于列存储的。

那么基于以上存储结构,我们查询cache在jvm内的数据又是如何查询的,本文将揭示查询In-Memory Data的方式。

一、引子

本例使用hive console里查询cache后的src表。

select value from src

当我们将src表cache到了内存后,再次查询src,可以通过analyzed执行计划来观察内部调用。

即parse后,会形成InMemoryRelation结点,最后执行物理计划时,会调用InMemoryColumnarTableScan这个结点的方法。

如下:

[java] view plain copy

  1. scala> val exe = executePlan(sql("select value from src").queryExecution.analyzed)
  2. 14/09/26 10:30:26 INFO parse.ParseDriver: Parsing command: select value from src
  3. 14/09/26 10:30:26 INFO parse.ParseDriver: Parse Completed
  4. exe: org.apache.spark.sql.hive.test.TestHive.QueryExecution =
  5. == Parsed Logical Plan ==
  6. Project [value#5]
  7. InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)
  8. == Analyzed Logical Plan ==
  9. Project [value#5]
  10. InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)
  11. == Optimized Logical Plan ==
  12. Project [value#5]
  13. InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)
  14. == Physical Plan ==
  15. InMemoryColumnarTableScan [value#5], (InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)) //查询内存中表的入口
  16. Code Generation: false
  17. == RDD ==

二、InMemoryColumnarTableScan

InMemoryColumnarTableScan是Catalyst里的一个叶子结点,包含了要查询的attributes,和InMemoryRelation(封装了我们缓存的In-Columnar Storage数据结构)。

执行叶子节点,出发execute方法对内存数据进行查询。

1、查询时,调用InMemoryRelation,对其封装的内存数据结构的每个分区进行操作。

2、获取要请求的attributes,如上,查询请求的是src表的value属性。

3、根据目的查询表达式,来获取在对应存储结构中,请求列的index索引。

4、通过ColumnAccessor来对每个buffer进行访问,获取对应查询数据,并封装为Row对象返回。

[java] view plain copy

  1. private[sql] case class InMemoryColumnarTableScan(
  2. attributes: Seq[Attribute],
  3. relation: InMemoryRelation)
  4. extends LeafNode {
  5. override def output: Seq[Attribute] = attributes
  6. override def execute() = {
  7. relation.cachedColumnBuffers.mapPartitions { iterator =>
  8. // Find the ordinals of the requested columns.  If none are requested, use the first.
  9. val requestedColumns = if (attributes.isEmpty) {
  10. Seq(0)
  11. } else {
  12. attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId)) //根据表达式exprId找出对应列的ByteBuffer的索引
  13. }
  14. iterator
  15. .map(batch => requestedColumns.map(batch(_)).map(ColumnAccessor(_)))//根据索引取得对应请求列的ByteBuffer,并封装为ColumnAccessor。
  16. .flatMap { columnAccessors =>
  17. val nextRow = new GenericMutableRow(columnAccessors.length) //Row的长度
  18. new Iterator[Row] {
  19. override def next() = {
  20. var i = 0
  21. while (i < nextRow.length) {
  22. columnAccessors(i).extractTo(nextRow, i) //根据对应index和长度,从byterbuffer里取得值,封装到row里
  23. i += 1
  24. }
  25. nextRow
  26. }
  27. override def hasNext = columnAccessors.head.hasNext
  28. }
  29. }
  30. }
  31. }
  32. }

查询请求的列,如下:

[java] view plain copy

  1. scala> exe.optimizedPlan
  2. res93: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
  3. Project [value#5]
  4. InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)
  5. scala> val relation =  exe.optimizedPlan(1)
  6. relation: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
  7. InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None)
  8. scala> val request_relation = exe.executedPlan
  9. request_relation: org.apache.spark.sql.execution.SparkPlan =
  10. InMemoryColumnarTableScan [value#5], (InMemoryRelation [key#4,value#5], false, 1000, (HiveTableScan [key#4,value#5], (MetastoreRelation default, src, None), None))
  11. scala> request_relation.output //请求的列,我们请求的只有value列
  12. res95: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(value#5)
  13. scala> relation.output //默认保存在relation中的所有列
  14. res96: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(key#4, value#5)
  15. scala> val attributes = request_relation.output
  16. attributes: Seq[org.apache.spark.sql.catalyst.expressions.Attribute] = ArrayBuffer(value#5)

整个流程很简洁,关键步骤是第三步。根据ExprId来查找到,请求列的索引

attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))

[java] view plain copy

  1. //根据exprId找出对应ID
  2. scala> val attr_index = attributes.map(a => relation.output.indexWhere(_.exprId == a.exprId))
  3. attr_index: Seq[Int] = ArrayBuffer(1) //找到请求的列value的索引是1, 我们查询就从Index为1的bytebuffer中,请求数据
  4. scala> relation.output.foreach(e=>println(e.exprId))
  5. ExprId(4)    //对应<span style="font-family: Arial, Helvetica, sans-serif;">[key#4,value#5]</span>
  6. ExprId(5)
  7. scala> request_relation.output.foreach(e=>println(e.exprId))
  8. ExprId(5)

三、ColumnAccessor

ColumnAccessor对应每一种类型,类图如下:

最后返回一个新的迭代器:

[java] view plain copy

  1. new Iterator[Row] {
  2. override def next() = {
  3. var i = 0
  4. while (i < nextRow.length) { //请求列的长度
  5. columnAccessors(i).extractTo(nextRow, i)//调用columnType.setField(row, ordinal, extractSingle(buffer))解析buffer
  6. i += 1
  7. }
  8. nextRow//返回解析后的row
  9. }
  10. override def hasNext = columnAccessors.head.hasNext
  11. }

四、总结

Spark SQL In-Memory Columnar Storage的查询相对来说还是比较简单的,其查询思想主要和存储的数据结构有关。

即存储时,按每列放到一个bytebuffer,形成一个bytebuffer数组。

查询时,根据请求列的exprId查找到上述数组的索引,然后使用ColumnAccessor对buffer中字段进行解析,最后封装为Row对象,返回。

——EOF——

创文章,转载请注明:

转载自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

本文链接地址:http://blog.csdn.net/oopsoom/article/details/39577419

注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

转自:http://blog.csdn.net/oopsoom/article/details/39577419

时间: 2024-08-25 00:45:44

第十篇:Spark SQL 源码分析之 In-Memory Columnar Storage源码分析之 query的相关文章

第九篇:Spark SQL 源码分析之 In-Memory Columnar Storage源码分析之 cache table

/** Spark SQL源码分析系列文章*/ Spark SQL 可以将数据缓存到内存中,我们可以见到的通过调用cache table tableName即可将一张表缓存到内存中,来极大的提高查询效率. 这就涉及到内存中的数据的存储形式,我们知道基于关系型的数据可以存储为基于行存储结构 或 者基于列存储结构,或者基于行和列的混合存储,即Row Based Storage.Column Based Storage. PAX Storage. Spark SQL 的内存数据是如何组织的? Spar

Spark SQL之External DataSource外部数据源(二)源码分析

上周Spark1.2刚发布,周末在家没事,把这个特性给了解一下,顺便分析下源码,看一看这个特性是如何设计及实现的. /** Spark SQL源码分析系列文章*/ (Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)示例 http://blog.csdn.net/oopsoom/article/details/42061077) 一.Sources包核心 Spark SQL在Spark1.2中提供了External

Spark修炼之道(进阶篇)——Spark入门到精通:第十节 Spark SQL案例实战(一)

作者:周志湖 放假了,终于能抽出时间更新博客了--. 1. 获取数据 本文通过将github上的Spark项目git日志作为数据,对SparkSQL的内容进行详细介绍 数据获取命令如下: [[email protected] spark]# git log --pretty=format:'{"commit":"%H","author":"%an","author_email":"%ae"

Tachyon在Spark中的作用(Tachyon: Reliable, Memory Speed Storage for Cluster Computing Frameworks 论文阅读翻译)

摘要: Tachyon是一种分布式文件系统,能够借助集群计算框架使得数据以内存的速度进行共享.当今的缓存技术优化了read过程,可是,write过程由于须要容错机制,就须要通过网络或者是磁盘进行复制操作.Tachyon通过将"血统"技术引入到存储层进而消除了这个瓶颈.创建一个长期的以"血统机制"为基础的存储系统的关键挑战是失败情况发生的时候及时地进行数据恢复.Tachyon通过引入一种检查点的算法来解决问题,这样的方法保证了恢复过程的有限开销以及通过资源调度器下进行

【Spark SQL 源码分析系列文章】

从决定写Spark SQL源码分析的文章,到现在一个月的时间里,陆陆续续差不多快完成了,这里也做一个整合和索引,方便大家阅读,这里给出阅读顺序 :) 第一篇 Spark SQL源码分析之核心流程 第二篇 Spark SQL Catalyst源码分析之SqlParser 第三篇 Spark SQL Catalyst源码分析之Analyzer 第四篇 Spark SQL Catalyst源码分析之TreeNode Library 第五篇 Spark SQL Catalyst源码分析之Optimize

Spark SQL 源码分析系列文章

从决定写Spark SQL源码分析的文章,到现在一个月的时间里,陆陆续续差不多快完成了,这里也做一个整合和索引,方便大家阅读,这里给出阅读顺序 :) 第一篇 Spark SQL源码分析之核心流程 第二篇 Spark SQL Catalyst源码分析之SqlParser 第三篇 Spark SQL Catalyst源码分析之Analyzer 第四篇 Spark SQL Catalyst源码分析之TreeNode Library 第五篇 Spark SQL Catalyst源码分析之Optimize

Spark SQL之External DataSource外部数据源(一)示例

一.Spark SQL External DataSource简介 随着Spark1.2的发布,Spark SQL开始正式支持外部数据源.Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现. 这使得Spark SQL支持了更多的类型数据源,如json, parquet, avro, csv格式.只要我们愿意,我们可以开发出任意的外部数据源来连接到Spark SQL.之前大家说的支持HBASE,Cassandra都可以用外部数据源的方式来实现无缝集成. (Ps: 关于Exter

Spark SQL with Hive

前一篇文章是Spark SQL的入门篇Spark SQL初探,介绍了一些基础知识和API,但是离我们的日常使用还似乎差了一步之遥. 终结Shark的利用有2个: 1.和Spark程序的集成有诸多限制 2.Hive的优化器不是为Spark而设计的,计算模型的不同,使得Hive的优化器来优化Spark程序遇到了瓶颈. 这里看一下Spark SQL 的基础架构: Spark1.1发布后会支持Spark SQL CLI , Spark SQL的CLI会要求被连接到一个Hive Thrift Server

Spark1.1.0 Spark SQL Programming Guide

Spark SQL Programming Guide Overview Getting Started Data Sources RDDs Inferring the Schema Using Reflection Programmatically Specifying the Schema Parquet Files Loading Data Programmatically Configuration JSON Datasets Hive Tables Performance Tuning