MyCAT 源码解析 —— 分片结果合并(使用unsaferow)



??????关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言都将得到认真回复。甚至不知道如何读源码也可以请教噢。
  4. 新的源码解析文章实时收到通知。每周更新一篇左右。
  5. 认真的源码交流微信群。

1. 概述

相信很多同学看过 MySQL 各种优化的文章,里面 99% 会提到:单表数据量大了,需要进行分片(水平拆分 or 垂直拆分)。分片之后,业务上必然面临的场景:跨分片的数据合并。今天我们就一起来瞅瞅 MyCAT 是如何实现分片结果合并。

跨分片查询大体流程如下:

和 《【单库单表】查询》 不同的两个过程:

  • 【2】多分片执行 SQL
  • 【4】合并多分片结果

下面,我们来逐条讲解这两个过程。

2. 多分片执行 SQL

经过 SQL 解析后,计算出需要执行 SQL 的分片节点,遍历分片节点发送 SQL 进行执行。

核心代码:

SQL 解析 详细过程,我们另开文章,避免内容过多,影响大家对 分片结果合并 流程和逻辑的理解。

3. 合并多分片结果

和 《【单库单表】查询》 不同,多个分片节点都会分别响应 记录头(header) 和 记录行(row) 。在开始分析 MyCAT 是怎么合并多分片结果之前,我们先来回想下 SQL 的执行顺序。

FROM       // [1] 选择表WHERE      // [2] 过滤表GROUP BY   // [3] 分组SELECT     // [4] 普通字段,max / min / avg / sum / count 等函数,distinctHAVING     // [5] 再过滤表ORDER BY   // [6] 排序LIMIT      // [7] 分页

3.1 记录头(header)

多个分片节点响应时,会响应多次 记录头(header) 。MyCAT 在实际处理时,只处理第一个返回的 记录头(header) 。因此,在使用时要保证表的 Schema 相同。

分片节点响应的 记录头(header) 可以直接返回 MySQL Client 吗?答案是不可以。AVG函数 是特殊情况,MyCAT 需要将 AVG 拆成 SUM + COUNT 进行计算。举个例子:

// [1] MySQL Client => MyCAT :SELECT AVG(age) FROM student;

// [2] MyCAT => MySQL Server :SELECT SUM(age) AS AVG0SUM, COUNT(age) AS AVG0COUNT FROM student;

// [3] 最终:AVG(age) = SUM(age) AS AVG0SUM / COUNT(age)

核心代码:

3.2 记录行(row)

3.1 AbstractDataNodeMerge

MyCAT 对分片结果合并通过 AbstractDataNodeMerge 子类来完成。

AbstractDataNodeMerge :

  • -packs :待合并记录行(row)队列。队列尾部插入 END_FLAG_PACK 表示队列已结束。
  • -running :合并逻辑是否正在执行中的标记。
  • ~onRowMetaData(…) :根据记录列信息(ColMeta)构建对应的排序组件和聚合组件。需要子类进行实现。
  • ~onNewRecord(…) :插入记录行(row) 到 packs
  • ~outputMergeResult(…) :插入 END_FLAG_PACK 到 packs
  • ~run(…) :执行合并分片结果逻辑,并将合并结果返回给 MySQL Client。需要子类进行实现。

通过 running 标记保证同一条 SQL 同时只有一个线程正在执行,并且不需要等到每个分片结果都返回就可以执行聚合逻辑。当然,排序逻辑需要等到所有分片结果都返回才可以执行。

核心代码:

3.2 DataNodeMergeManager

AbstractDataNodeMerge 有两种子类实现:

  • DataMergeService :基于堆内内存合并分片结果。
  • DataNodeMergeManager :基于堆外内存合并分片结果。

目前官方默认配置使用 DataNodeMergeManager。主要有如下优点:

  1. 可以使用更大的内存空间。当并发量大或者数据量大时,更大的内存空间意味着更好的性能。
  2. 减少 GC 暂停时间。记录行(row)对象小且重用性很低,需要能够进行类似 C / C++ 的自主内存释放。
  3. 更快的内存复制和读取速度,对排序和聚合带来很好的提速。

如果对堆外内存不太了解,推荐阅读如下文章:

  1. 《从0到1起步-跟我进入堆外内存的奇妙世界》
  2. 《堆内内存还是堆外内存?》
  3. 《JAVA堆外内存》
  4. 《JVM源码分析之堆外内存完全解读》

本文主要分析 DataNodeMergeManager 实现,DataMergeService 可以自己阅读或者等待后续文章(??欢迎订阅我的公众号噢)。

DataNodeMergeManager 有三个组件:

  • globalSorter :UnsafeExternalRowSorter => 实现记录行(row)合并并排序逻辑。
  • globalMergeResult :UnsafeExternalRowSorter => 实现记录行(row)合并不排序逻辑。
  • unsafeRowGrouper : UnsafeRowGrouper => 实现记录行(row)聚合逻辑。

DataNodeMergeManager#run(...) 逻辑如下:

  • [1] 写入记录行(row)到 UnsafeRow
  • [2] 根据情况将 UnsafeRow 插入对应组件。
  • [3] 当所有 UnsafeRow 插入完后,根据情况使用组件聚合、排序。
是否排序 是否聚合 依赖组件 [2] [3]
globalSorter 插入 globalSorter 使用 globalSorter 合并并排序
globalMergeResult 插入 globalMergeResult 使用 globalMergeResult 合并不排序
unsafeRowGrouper + globalSorter 插入 unsafeRowGrouper 进行聚合 使用 globalSorter 合并并排序
unsafeRowGrouper + globalMergeResult 插入 unsafeRowGrouper 进行聚合 使用 globalMergeResult 合并不排序

核心代码:

??看到这里,可能很多同学都有点懵逼,问题不大,我们继续往下瞅。

3.3 UnsafeRow

记录行(row)写到 UnsafeRow 的 baseObject 属性,结构如下:


  • 拆分成三个区域,每个区域按照格子记录信息,每个格子 64bits(8 Bytes)。
  • 记录行(row)按照字段顺序位置记录到 baseObject
  • [1] 空标记位区域 :标记字段对应的值是否为 NULL。
    • 当字段对应的值为 NULL 时,其对应的字段顺序对应的 bit 设置为 1。举个例子,第 0 个位置字段为 NULL,则第一个格子对应的 64 bits 从右边第一个 bit 设置为 1。
    • 因为每个格子是 64 bits,每 64 个字段占用一个格子,不满一个格子,按照一个格子计算。因此,该区域的长度(bitSetWidthInBytes) = 字段占用的格子数 * 64 bits。
  • [2] 位置长度区域 :记录字段对应的值在[3]区域所在的位置和长度。
    • 每个字段记录[2]区域的位置 = baseOffset + bitSetWidthInBytes + 8 Bytes * 字段顺序。
    • 占用一个格子,前 32 bits 为[3]区域的位置,后 32 bits 为字段对应的值长度。
  • [3] 值区域 :记录字段对应的值。
    • 每个字段对应的值占用格子数 = 字段对应的值长度 / 8 Byte,如果无法整除再 + 1。
    • 因为字段对应的值可能无法刚好占满每个格子,未使用的 bit 用 0 占位。

写入 UnsafeRow,MyCAT 可以顺序访问每个字段,而不需要在记录行(row)进行遍历。

??日常开发使用位操作的机会比较少,可能较为难理解,需要反复理解下,相信会获得很大启发。恩,该部分代码引用自开源运算框架 Spark,是不是更加有动力列??。

核心代码:

3.4 UnsafeExternalRowSorter

如果使用 Java 实现 SELECT * FROM student ORDER BY age desc, nickname asc,不考虑算法优化的情况下,我们可以简单如下实现:

Collections.sort(students, new Comparator<Comparable>() {       @Override       public int compare(Student o1, Student o2) {           int cmp = compare(o2.age, o1.age);           return cmp != 0 ? cmp : compare(o1.nickname, o2.nickname);       }   }});

从功能上,UnsafeExternalRowSorter 是这么实现排序逻辑。当然肯定的是,不是这么“简单”的实现。

UnsafeRow 会写入到两个地方:

  1. List<MemoryBlock> :内存块数组。当前 MemoryBlock 无法容纳写入的 UnsafeRow 时,生成新的 MemoryBlock 提供写入。每条 UnsafeRow 存储在 MemoryBlock 由 长度 + 字节内容 组成。
  2. LongArray :每条 UnsafeRow 存储在 LongArray 由两部分组成:address + prefix。
    • address :UnsafeRow 存储在 List<MemoryBlock> 的位置。前 13 bits 记录所在 MemoryBlock 的 index,后 51 bit 记录在 MemoryBlock 的 offset。
    • prefix :UnsafeRow 第一个排序字段值前 64 bits 计算的值。

UnsafeExternalRowSorter 排序实现方式 :提供 TimSort 和 RadixSort 两种排序算法,前者为默认实现。TimSort 折半查找时,使用 LongArray,先比较 prefix,若相等,则顺序对比每个排序字段直到不等,提升计算效率。插入操作在 LongArray 操作,List<MemoryBlock> 只作为原始数据。

另外,当需要排序特别大的数据量时,会使用存储数据到文件进行排序。限于笔者暂时未阅读该处源码,后续会另开文章分析。??

核心源码:

3.5 UnsafeRowGrouper

如果使用 Java 实现 SELECT nickname, COUNT(*) FROM student group by nickname,不考虑算法优化的情况下,我们可以简单如下实现:

Map<String, List<Object>> map = new HashMap<>();// 聚合for (student : students) {    if (map.contains(student.nickname)) {        map.put(student.nickname, map.get(student.nickname).get(1) + 1);    } else {        List<Object> value = new Array<>();        value.add(nickname);        value.add(1);        map.put(student.nickname, value);    }}// 输出for (value : map.values) {    System.out.println(value);}

从功能上,UnsafeRowGrouper 是这么实现排序逻辑。当然肯定的是,也不是这么“简单”的实现。

??具体怎么实现的呢?我们在《MyCAT 源码解析 —— 分片结果合并(二)》继续分析。

原文地址:https://www.cnblogs.com/qiumingcheng/p/10793680.html

时间: 2024-10-08 22:40:32

MyCAT 源码解析 —— 分片结果合并(使用unsaferow)的相关文章

数据库中间件Mycat源码解析(三):Mycat的SQL解析和路由

mycat对sql的解析分为两部分,一个是普通sql,另一个是PreparedStatment. 下面以解析普通sql为例分析(另一种方式大同小异),sql从客户端发过来后server接收后会调用FrontendCommandHandler的handle方法,这个方法会调用FrontendConnection的query方法,接着query方法会调用ServerQueryHandler的query方法,接着调用ServerConnection的execute方法.如下图所示: public vo

MyCat源码分析系列之——SQL下发

更多MyCat源码分析,请戳MyCat源码分析系列 SQL下发 SQL下发指的是MyCat将解析并改造完成的SQL语句依次发送至相应的MySQL节点(datanode)的过程,该执行过程由NonBlockingSession.execute()触发: public void execute(RouteResultset rrs, int type) { // clear prev execute resources clearHandlesResources(); if (LOGGER.isDe

数据库中间件 MyCAT 源码分析 —— 调试环境搭建

关注**微信公众号:[芋艿的后端小屋]**有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址 您对于源码的疑问每条留言都将得到认真回复.甚至不知道如何读源码也可以请教噢. 新的源码解析文章实时收到通知.每周更新一篇左右. 1. 依赖工具 2. 源码拉取 3. 数据库配置 4. MyCat 配置 5. MyCAT 启动 6. MyCAT 测试 7. 交流

ChrisRenke/DrawerArrowDrawable源码解析

转载请注明出处http://blog.csdn.net/crazy__chen/article/details/46334843 源码下载地址http://download.csdn.net/detail/kangaroo835127729/8765757 这次解析的控件DrawerArrowDrawable是一款侧拉抽屉效果的控件,在很多应用上我们都可以看到(例如知乎),控件的github地址为https://github.com/ChrisRenke/DrawerArrowDrawable

五.jQuery源码解析之jQuery.extend(),jQuery.fn.extend()

给jQuery做过扩展或者制作过jQuery插件的人这两个方法东西可能不陌生.jQuery.extend([deep],target,object1,,object2...[objectN]) jQuery.fn.extend([deep],target,object1,,object2...[objectN])这两个属性都是用于合并两个或多个对象的属性到target对象.deep是布尔值,表示是否进行深度合并,默认是false,不执行深度合并.通过这种方式可以在jQuery或jQuery.fn

消息中间件 RocketMQ源码解析:事务消息

关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址 您对于源码的疑问每条留言都将得到认真回复.甚至不知道如何读源码也可以请教噢. 新的源码解析文章实时收到通知.每周更新一篇左右. 1. 概述 2. 事务消息发送 2.1 Producer 发送事务消息 2.2 Broker 处理结束事务请求 2.3 Broker 生成

Spring IoC源码解析——Bean的创建和初始化

Spring介绍 Spring(http://spring.io/)是一个轻量级的Java 开发框架,同时也是轻量级的IoC和AOP的容器框架,主要是针对JavaBean的生命周期进行管理的轻量级容器,可以单独使用,也可以和Struts框架,MyBatis框架等组合使用. IoC介绍 IoC是什么 Ioc-Inversion of Control,即"控制反转",不是什么技术,而是一种设计思想.在Java开发中,Ioc意味着将你设计好的对象交给容器控制,而不是传统的在你的对象内部直接控

ExcelReport第二篇:ExcelReport源码解析

导航 目   录:基于NPOI的报表引擎--ExcelReport 上一篇:使用ExcelReport导出Excel 下一篇:扩展元素格式化器 概述 针对上一篇随笔收到的反馈,在展开对ExcelReport源码解析之前,我认为把编写该组件时的想法分享给大家是有必要的. 编写该组件时,思考如下: 1)要实现样式.格式与数据的彻底分离. 为什么要将样式.格式与数据分离呢?恩,你不妨想一想在生成报表时,那些是变的而那些又是不变的.我的结论是:变的是数据. 有了这个想法,很自然的想到用模板去承载不变的部

SpringMVC源码解析- HandlerAdapter - ModelFactory

ModelFactory主要是两个职责: 1. 初始化model 2. 处理器执行后将modle中相应参数设置到SessionAttributes中 我们来看看具体的处理逻辑(直接充当分析目录): 1. 初始化model 1.1 解析类上使用的sessionAttributres,将获取参数合并到mavContainer中 1.2 执行注解了@ModelAttribute的方法,并将结果同步到Model 参数名的生成规则:@ModelAttribute中定义的value > 方法的返回类型决定(