3.1 数据本地化
SQL On Hadoop 设计的一个基本原则是:将计算任务移动到数据所在的节点而不是反过来。这主要出于网络优化的目的,因为数据分布在不同的节点,如果移动数据那么将会产生大量的低效的网络数据传输。数据本地化一般分为三种:节点局部性 (Node Locality), 机架局部性 (Rack Locality) 和全局局部性 (Global Locality)。节点局部性是指将计算任务分配到数据所在的节点上,此时无需任何数据传输,效率最佳。机架局部性是指将计算任务移动到数据所在的机架,虽然计算任务和数据分属不同的计算节点,但是因为机架内部网络传输速度明显高于机架间网络传输,所以机架局部性也是一种不错的方式。其他的情况属于全局局部性,此时需要跨机架进行网络传输,会产生非常大的网络传输开销。
调度系统在进行任务调度时,应该尽可能的保证节点局部性,然后是机架局部性,如果以上两者都不能满足,调度系统也会通过网络传输将数据移动到计算任务所在的节点,虽然性能相对低效,但也比资源空置比较好。
为了实现数据本地化调度,调度系统会结合延迟调度算法来进行任务调度。核心思想是优先将计算任务调度到数据所在的节点 i,如果节点 i 没有足够的计算资源,那么等待几秒钟后如果节点 i 依然没有计算资源可用,那么就放弃数据本地化将该计算任务调度到其他计算节点。
3.2 减少中间结果的物化
在一个追求低延迟的 SQL On Hadoop 系统中,尽可能的减少中间结果的磁盘物化可以极大的提高查询性能。 如下图,Hive 执行引擎采用 pull 获取数据,其优点是可以进行细粒度的容错,缺点是下游的 MapReduce 必须等待上游 MapReduce 完全将数据写入到磁盘后才能开始 pull 数据。Presto 采用 push 方式获取数据,数据完全以流的方式在不同 stage 之间进行传输,中间结果不需要物化到磁盘,从而使得 presto 具有非常高效的执行速度,缺点是不能支持细粒度的容错。
(点击放大图像)
图 3push 和 pull
3.3 列存储
传统的关系存储模型将一个元组的列连续存储,即使只查询一个列,也需要将整个元组读取出来,可以发现,当查询只有少量列时,性能非常低。
列存储的思想是将元组垂直划分为列族集合,每一个列族独立存储,列族可以退化为只仅包含一个列的平凡列族。当查询少量列时,列存储模型可以极大的减少磁盘 IO 操作,提高查询性能。当查询的列跨越多个列族时,需要将存储在不同列族中列数据拼接成原始数据,由于不同列族存储在不同的 HDFS 节点上,导致大量的数据跨越网络传输,从而降低查询性能。因此在实际使用列族时,通常根据业务查询特点,将频繁访问的列放在一个列族中。
在传统的数据库领域中,人们已经对列存储进行了非常深刻的研究,并且很多研究成果已经被应用到工业领域,其中包括轻量级压缩算法,直接操作压缩数据,延迟物化,向量化执行引擎。可是纵观目前 SQL On Hadoop 系统,这些技术的应用仍然远远的落后于传统数据库,在最近的一些 SQL On Hadoop 中已经添加了向量化执行引擎,轻量级压缩算法,但是诸如直接操作压缩数据,延迟解压等技术还没有被应用到 SQL on Hadop 系统。关于列存储的更多内容可以参见 [20]。
列存储压缩
列存储压缩算法具有如下特点:
压缩比 列存储模型具有非常高的压缩比,通常可以达到 10:1,而行存储压缩比通常只有 4:1。如图 4:
(点击放大图像)
图 4 重量级压缩算法
轻量级压缩算法 (Leight-Weight Compression) 轻量级压缩算法是 CPU 友好的。行存储模型只能使用 zip,lzo,snappy 等重量级压缩算法,这些算法最大的缺点是压缩和解压缩速度比较慢,通常每秒只能解压至多几百兆数据。相反,列存储模型不仅可以使用重量级压缩算法,还可以使用一些非常轻量级的压缩算法,比如 Run-length encode,Bit Vector。轻量级压缩算法不仅具有较好的压缩比,而且还具有非常高的压缩和解压速度。目前在 ORC File 和 Parquet 存储中,已经支持 Bit packing,Run-length enode,Dictionary encode 等轻量级压缩算法。
直接操作压缩数据 (Operating Directly on Compressed Data) 当使用轻量级压缩算法时,可能无需解压即可直接获取计算结果。例如:Run Length Encode 算法将连续重复的字符压缩为字符个数和字符,比如 aaaaaabbccccaaaa 将被压缩为 6a2b4c4a,其中 6a 表示有连续 6 个字符 a。现在假设一个某列包含上述压缩的字符串,当执行 select count(*) from table where columnA=’a’时,不需要解压 6a2b4c4a,就能够知道 a 的个数是 10。
需要注意的是,由于行存储只能使用重量级压缩算法,所以直接操作压缩数据不能被应用到行存储。
延迟解压 parquet 中的数据按块存储,每个块存储了最小值,最大值等轻量级索引,比如某个块的最小值最大值分别是 100 和 120,这表明该块中的任意一条数据都介于 100 到 120 之间,因此当我们执行 select column a from table where v>120 时,执行引擎可以跳过这个数据块,而不必将其解压再进行数据过滤。相反,在行存储中,必须将数据块完整的读取到内存中,解压,然后再进行数据过滤,导致不必要的磁盘读取操作。
3.6 压缩
一般情况下,压缩 HDFS 中的文件可以极大的提高查询性能。压缩能够减少数据所占用的存储空间,减少磁盘 IO 的读写,提高数据处理速度,此外,压缩还能够减少网络传输量,提高网络传输速度。在 SQL On Hadoop 中,压缩主要应用在 HDFS 中的数据源,shuffle 数据,最终计算结果。
如果应用程序是 io-bound 的,那么压缩数据可以提高数据处理速度,因为压缩后的数据变小了,所以可以增加数据读写速度。需要主要的是,压缩算法并不是压缩比越高越好,压缩率越高的算法压缩和解压缩速度就越慢,用户需要在 cpu 和 io 之间取得一个良好的平衡。例如 gzip2 拥有非常高的压缩比,但是其压缩和解压缩速度却非常慢,甚至可能超过数据未压缩时的读写时间,因此没有 SQL On Hadooop 系统使用 gzip2 算法,目前在 SQL On Hadoop 系统中比较流行的压缩算法主要有:Snappy,Lzo,Glib。
如果应用程序是 cpu-bound 的,那么选择一个可以 splittable 的压缩算法是很重要的,如果一个文件是 splittabe 的,那么这个文件可以被切分为多个可以并行读取的数据块,这样 MR 或者 Spark 在读取文件时,会为每一个数据块分配一个 task 来读取数据,从而提高数据查询速度。
3.7 向量化执行引擎
查询执行引擎 (query execution engine) 是数据库中的一个核心组件,用于将查询计划转换为物理计划,并对其求值返回结果。查询执行引擎对数据库系统性能影响很大,目前主要的执行引擎有如下四类:Volcano-style,Block-oriented processing,Column-at-a-time,Vectored iterator model。下面分别介绍这四种执行引擎。
Volcano-style, 最早的查询执行引擎是 Volcano-style execution engine(火山执行引擎,火山模型),也叫做迭代模型 (iterator model),或者 one-tuple-at-a-time。在这种模型中,查询计划是一个由 operator 组成的 tree 或者 DAG,其中每一个 operator 包含三个函数:open,next,close。Open 用于申请资源,比如分配内存,打开文件,close 用于释放资源,next 方法递归的调用子 operator 的 next 方法生成一个元组。图 1 描述了 select id,name,age from people where age >30 的火山模型的查询计划,该查询计划包含 User,Project,Select,Scan 四个 operator,每个 operator 的 next 方法递归调用子节点的 next,一直递归调用到叶子节点 Scan operato,Scan Operator 的 next 从文件中返回一个元组。
(点击放大图像)
图 3-4 火山模型 摘自文献 [2,page 39]
火山模型的主要缺点是昂贵的解释开销 (interpretation overhead) 和低下的 CPU Cache 命中率。首先,火山模型的 next 方法通常实现为一个虚函数,在编译器中,虚函数调用需要查找虚函数表, 并且虚函数调用是一个非直接跳转 (indirect jump), 会导致一次错误的 CPU 分支预测 (brance misprediction), 一次错误的分支预测需要十几个周期的开销。火山模型为了返回一个元组,需要调用多次 next 方法,导致昂贵的函数调用开销。[] 研究表明,在采用火山执行模型的 MySQL 中执行 TPC-H Q1 查询,仅有 10% 的时间用于真正的查询计算,其余的 90% 时间都浪费在解释开销 (interpretation overhead)。其次,next 方法一次只返回一个元组,元组通常采用行存储,如图 3-5 Row Format,如果顺序访问第一列 1,2,3,那么每次访问都将导致 CPU Cache 命中失败 (假设该行不能完全放入 CPU Cache 中)。如果采用 Column Format,那么只有在访问第一个值时才出现缓存命中失败,后续访问 2 和 3 时都将缓存命中成功, 从而极大的提高查询性能。
(点击放大图像)
图 3-6 行存储和列存储
Block-oriented processing,Block-oriented processing 模型是对火山模型的一个改进,该模型一次 next 调用返回一批元组, 元组个数在 100-1000 不等,next 内部使用一个循环来处理这批元组。在图 1 的火山模型中,Select operator next 方法可以如下实现:
def next():Array[Tuple]={ // 调用子节点的 next 方法,返回一个元组向量,该向量包含 1024 个元组 val tuples=child.next() val result=new ArrayBuffer[Tuple] for(i=0;i<tuples.length;i++){ 30="" age="" val="">30) result.append(tuples(i)) } result// 返回结果 }
Block-oriented processing 模型的优点是一次 next 返回多个元组,减少了解释开销,同时也被证明增加了 CPU Cache 的命中率,当 CPU 访问元组中的某个列时会将该元组加载到 CPU Cache(如果该元组大小小于 CPU Cache 缓存行的大小), 访问后继的列将直接从 CPU Cache 中获取,从而具有较高的 CPU Cache 命中率,然而如果之访问一个列或者少数几个列时 CPU 命中率仍然不理想。该模型最大的一个缺点是不能充分利用现代编译器技术,比如在上面的循环中,很难使用 SIMD 指令处理数据。
Column-at-a-time 模型,向量化执行的最早历史可以追朔到 MonetDB[], 在 MonetDB 提出了一个叫做 Column-at-a-time 的查询执行模型,该模型中每一次 next 调用返回一个或者多个列,每个列以数组形式返回。该模型优点是具有非常高的查询效率,缺点是一个列数据需要被物化到内存甚至磁盘,导致很高的内存占用和 io 开销,同时数据不能放到 CPU Cache 中,导致较低的 CPU Cache 命中率。
Vectored iterator model,VectorWise 提出了 Vectored iterator model 模型,该模型是对 Column-at-a-time 的改进,next 调用不是返回完整的一个列,而是返回一个可以放到 CPU Cache 的向量。该模型避免了 Column-at-a-tim CPU Cache 命中率低的缺点。Vectored iterator model 最大的优点是可以使用运行时编译器 (JIT) 动态的生成更适合现代处理器的指令,比如 JIT 可以生成 SIMD 指令来处理向量。考虑 TPC-H Q1 查询:SELECT l_extprice*(1-l_discount)*(1+l_tax) FROM lineitem。该 SQL 查询的执行计划如下:
(点击放大图像)
其中 Project operator 的 next 方法可以如下实现 (scala 伪代码):
def next():Array[Tuple]={ val tuples=child.next() var result=new ArrayBuffer[Int] for(i=0;i<tuples.length;i++){ r="tuples.l_extprice*(1-tuple.l_discount)*(1+tuple.l_tax)" retult="" tuple="tuples(i)" val="">
近几年,一些 SQL On Hadoop 系统引入了向量化执行引擎,比如 Hive,Impala,Presto,Spark 等,尽管其实现细节不同,但核心思想是一致的:尽可能的在一次 next 方法调用返回多条数据,然后使用动态代码生成技术来优化循环,表达式计算从而减少解释开销,提高 CPU Cache 命中率,减少分支预测。
Impala 中的向量化执行引擎本质上属于 Block-oriented processing,imapla 的每次 next 调用返回一批元组,这种模型仍然具有较低的 CPU Cache 命中率,同时也很难使用 SIMD 等指令进行优化,为了缓解这个问题,Impala 使用动态代码生成技术,对于大循环,表达式计算等进行使用动态代码生成来进行优化。
在 Spark2.0 中,实现了基于 Parquet 的向量化执行引擎 [12],该执行引擎属于 Vectored iterator model,引擎在调用 next 方法时以列存储格式返回一批元组,可以使用循环来处理该批元组。此外为了更充分的利用现代 CPU 特性,Spark 还支持整阶段代码生成技术,核心思想是将多个 operator 编译到一个方法中,从而减少解释开销。
3.8 动态代码生成
动态代码生成一般和向量化执行引擎结合使用,因为向量执行引擎的 next 方法内部可以使用 for 循环来处理元组向量或者列向量,使用动态代码生成技术可以在运行时对 next 方法生成更高效的执行代码。研究证明向量化执行引擎和动态代码生成可以减少解释开销 (interpretation overhead), 见文献 [18],主要影响以下三个方面:
- Select, 当 select 语句中包含复杂的表达式计算时,比如 avg,sum,count,select 的计算性能主要受 CPU Cache 和 SIMD 指令影响。当数据不能放到 CPU Cache 时,CPU 大部分时间都在等待数据从内存加载到 CPU Cache,因此当 CPU 执行计算所需的数据在 CPU Cache 中时可以极大的提高计算性能。一条 SIMD 指令可以同时计算多个数据,因此使用 SIMD 指令执行表达式计算可以提高计算性能。
- where,与 Select 语句不同的是 Where 语句一般不需要复杂的计算,影响 where 性能更多的是分支预测。如果 CPU 分支预测错误,那么之前的 CPU 流水线将全被清洗,一次 CPU 分支预测错误可能至少浪费十几个指令周期的开销。通过使用动态代码生成技术,JIT 编译器能够自动的生成分支预测友好的指令。
- Hash,hash 算法影响 equal-join,group 的查询性能,hash 算法的 CPU Cache 命中率很低。[18] 描述了一种缓存友好的 hash 算法,可以显著的提高 hash 计算性能。
动态代码生成有两种:C++ 系和 java 系。其中 C++ 系可以直接生成本机可执行二进制代码,并且能够生成高效的 SIMD 指令,例如 Impala 使用 C++ 实现查询执行引擎,同时使用 LLVM 编译器动态的生成本机可执行二进制代码,LLVM 可以生成 SIMD 指令对表达式执行计算。Java 系利用反射机制动态的生成 java 字节码,一般而言,不能充分利用 SIMD 指令进行优化,Spark 使用反射机制动态的生成 java 字节码,通常很难直接利用 SIMD 进行表达式优化。此外在 Spark2.0 中所提供的整阶段代码生成 (Whole-Stage Code Generation) 技术也是动态代码生成技术将多个 Operator 编译成一个方法进行优化。
需要注意的是,动态代码生成技术并不总是万能药,在下图中,impala 的动态代码生成技术并没有提高 TPC-DS Q42,Q52,Q55 的查询速度,主要原因这些 SQL 语句的 SELECT 语句中并没有什么复杂的计算。