sharding-jdbc-core 源码分析

目录

  • Sharding-Jdbc 源码分析

    • 1. Sharding-Jdbc 包结构
    • 2. JDBC 四大对象
      • 2.1 DataSource
      • 2.2 Connection
      • 2.3 Statement
      • 2.4 ResultSet
    • 3. Sharding-Jdbc 执行流程分析
    • 4. sharding-jdbc-core 任务执行分析
      • 4.1 ShardingStatement
      • 4.2 StatementExecutor

Sharding-Jdbc 源码分析

Apache Sharding-Sphere 系列目录(https://www.cnblogs.com/binarylei/p/12217637.html)

在看 Sharding-Jdbc 源码之前,强烈建议先阅读一直官网的文章:

  1. Apache Sharding-Jdbc 数据分片

JDBC 调用过程如下:APP -> ORM -> JDBC -> PROXY -> MySQL。如果要完成数据的分库分表,可以在这五层任意地方进行,Sharding-Jdbc 是在 JDBC 层进行分库分表,Sharding-Proxy 是在 PROXY 进行分库分表。

Sharding-Jdbc 是一个轻量级的分库分表框架,使用时最关键的是配制分库分表策略,其余的和使用普通的 MySQL 驱动一样,几乎不用改代码。具体使用方法参考:Apache Sharding-Jdbc 使用示例

try(DataSource dataSource =  ShardingDataSourceFactory.createDataSource(
    createDataSourceMap(), shardingRuleConfig, new Properties()) {
    connection Connection = dataSource.getConnection();
    ...
}

1. Sharding-Jdbc 包结构

sharding-jdbc
    ├── sharding-jdbc-core      重写DataSource/Connection/Statement/ResultSet四大对象
    └── sharding-jdbc-orchestration        配置中心
sharding-core
    ├── sharding-core-api       接口和配置类
    ├── sharding-core-common    通用分片策略实现...
    ├── sharding-core-entry     SQL解析、路由、改写,核心类BaseShardingEngine
    ├── sharding-core-route     SQL路由,核心类StatementRoutingEngine
    ├── sharding-core-rewrite   SQL改写,核心类ShardingSQLRewriteEngine
    ├── sharding-core-execute   SQL执行,核心类ShardingExecuteEngine
    └── sharding-core-merge     结果合并,核心类MergeEngine
shardingsphere-sql-parser
    ├── shardingsphere-sql-parser-spi       SQLParserEntry,用于初始化SQLParser
    ├── shardingsphere-sql-parser-engine    SQL解析,核心类SQLParseEngine
    ├── shardingsphere-sql-parser-relation
    └── shardingsphere-sql-parser-mysql     MySQL解析器,核心类MySQLParserEntry和MySQLParser
shardingsphere-underlying           基础接口和api
    ├── shardingsphere-rewrite      SQLRewriteEngine接口
    ├── shardingsphere-execute      QueryResult查询结果
    └── shardingsphere-merge        MergeEngine接口
shardingsphere-spi                  SPI加载工具类
sharding-transaction
    ├── sharding-transaction-core   接口ShardingTransactionManager,SPI加载
    ├── sharding-transaction-2pc    实现类XAShardingTransactionManager
    └── sharding-transaction-base   实现类SeataATShardingTransactionManager

2. JDBC 四大对象

所有的一切都从 ShardingDataSourceFactory 开始的,创建了一个 ShardingDataSource 的分片数据源。除了 ShardingDataSource(分片数据源),在 Sharding-Sphere 中还有 MasterSlaveDataSourceFactory(主从数据源)、EncryptDataSourceFactory(脱敏数据源)。

public static DataSource createDataSource(
        final Map<String, DataSource> dataSourceMap,
        final ShardingRuleConfiguration shardingRuleConfig,
        final Properties props) throws SQLException {
    return new ShardingDataSource(dataSourceMap,
               new ShardingRule(shardingRuleConfig, dataSourceMap.keySet()), props);
}

说明: 本文主要以 ShardingDataSource 为切入点分析 Sharding-Sphere 是如何对 JDBC 四大对象 DataSource、Connection、Statement、ResultSet 进行封装的。

2.1 DataSource

DataSource、Connection


DataSource 和 Connection 都比较简单,没有处理过多的逻辑,只是 dataSourceMap, shardingRule 进行简单的封装。

ShardingDataSource 持有对数据源和分片规则,可以通过 getConnection 方法获取 ShardingConnection 连接。

private final ShardingRuntimeContext runtimeContext = new ShardingRuntimeContext(
                dataSourceMap, shardingRule, props, getDatabaseType());
@Override
public final ShardingConnection getConnection() {
    return new ShardingConnection(getDataSourceMap(), runtimeContext,
            TransactionTypeHolder.get());
}

ShardingDataSource 的功能非常简单,就不多说了。

2.2 Connection

ShardingConnection 可以创建 Statement 和 PrepareStatement 两种运行方式:

@Override
public Statement createStatement(final int resultSetType,
        final int resultSetConcurrency, final int resultSetHoldability) {
    return new ShardingStatement(this, resultSetType,
            resultSetConcurrency, resultSetHoldability);
}

@Override
public PreparedStatement prepareStatement(final String sql, final int resultSetType,
        final int resultSetConcurrency, final int resultSetHoldability)
        throws SQLException {
    return new ShardingPreparedStatement(this, sql, resultSetType,
            resultSetConcurrency, resultSetHoldability);
}

说明: ShardingConnection 主要是将创建 ShardingStatement 和 ShardingPreparedStatement 两个对象,主要的执行逻辑都在 Statement 对象中。当然 ShardingConnection 还有两个重要的功能,一个是获取真正的数据库连接,一个是事务提交功能,事务不在本文讨论范围中。

protected Connection createConnection(final String dataSourceName,
        final DataSource dataSource) throws SQLException {
    return isInShardingTransaction()
            ? shardingTransactionManager.getConnection(dataSourceName)
            : dataSource.getConnection();
}

说明: 如果有事务,则需要通过事务管理器获取连接,关于事务不在本文讨论范围中。

2.3 Statement

Statement 相对来说比较复杂,因为它都是 JDBC 的真正执行器,所有逻辑都封装在 Statement 中。

Statement

说明: Statement 分为 ShardingStatement 和 ShardingPrepareStatement 两种情况。本文以 ShardingStatement 为例分析 Sharding-Jdbc 执行过程。下一节会重点为分析 ShardingStatement 的执行流程。

2.4 ResultSet

ResultSet

说明: ShardingResultSet 只是对 MergedResult 的简单封装。

private final MergedResult mergeResultSet;
@Override
public boolean next() throws SQLException {
    return mergeResultSet.next();
}

3. Sharding-Jdbc 执行流程分析

ShardingStatement 执行时序图

总结: ShardingStatement 执行过程如下:

  1. SimpleQueryShardingEngine(或 PreparedQueryShardingEngine):完成 SQL 解析、路由、改写,位于 sharding-jdbc-core 工程中。SimpleQueryShardingEngine 直接将路由的功能委托给 StatementRoutingEngine(或 PreparedQueryShardingEngine),本质是对 StatementRoutingEngine、SQLParseEngine、ShardingSQLRewriteEngine 的封装。
  2. StatementExecutor(或 PreparedStatementExecutor): 提供 SQL 执行的操作,位于 sharding-jdbc-core 工程中。本质是对 ShardingExecuteEngine 的封装。
  3. StatementRoutingEngine:SQL 路由引擎,位于 sharding-core-route 工程中。路由引擎包装了 SQL 解析、路由、改写三点。SQL 路由分两步,先进行数据分片路由(ShardingRouter),再进行主从路由(ShardingMasterSlaveRouter)。
  4. SQLParseEngine:SQL 解析引擎,位于 shardingsphere-sql-parser 工程中。目前有 MySQL和 PostgreSQL 两种。
  5. ShardingSQLRewriteEngine:SQL 改写引擎,位于 sharding-core-rewrite 工程中。
  6. ShardingExecuteEngine:执行引擎,位于 sharding-core-execute 工程中。StatementExecutor 对
  7. MergeEngine:结果合并引擎,位于 sharding-core-merge 工程中。

接下来一下会对 ShardingStatement 深入分析,之后会对 StatementRoutingEngine、SQLParseEngine、ShardingSQLRewriteEngine、ShardingExecuteEngine、MergeEngine 一个引擎进行分析。

4. sharding-jdbc-core 任务执行分析

ShardingStatement 内部有三个核心的类,一是 SimpleQueryShardingEngine 完成 SQL 解析、路由、改写;一是 StatementExecutor 进行 SQL 执行;最后调用 MergeEngine 对结果进行合并处理。

4.1 ShardingStatement

4.1.2 初始化

private final ShardingConnection connection;
private final StatementExecutor statementExecutor;

public ShardingStatement(final ShardingConnection connection) {
    this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
            ResultSet.HOLD_CURSORS_OVER_COMMIT);
}

public ShardingStatement(final ShardingConnection connection, final int resultSetType,
        final int resultSetConcurrency, final int resultSetHoldability) {
    super(Statement.class);
    this.connection = connection;
    statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency,
            resultSetHoldability, connection);
}

说明: ShardingStatement 内部执行 SQL 委托给了 statementExecutor。关于 ResultSet.CONCUR_READ_ONLY 等参考这里

4.1.2 执行

(1)executeQuery 执行过程

@Override
public ResultSet executeQuery(final String sql) throws SQLException {
    ResultSet result;
    try {
        clearPrevious();
        // 1. SQL 解析、路由、改写,最终生成 SQLRouteResult
        shard(sql);
        // 2. 生成执行计划 SQLRouteResult -> StatementExecuteUnit
        initStatementExecutor();
        // 3. statementExecutor.executeQuery() 执行任务
        MergeEngine mergeEngine = MergeEngineFactory.newInstance(
                connection.getRuntimeContext().getDatabaseType(),
                connection.getRuntimeContext().getRule(), sqlRouteResult,
                connection.getRuntimeContext().getMetaData().getRelationMetas(),
                statementExecutor.executeQuery());
        // 4. 结果合并
        result = getResultSet(mergeEngine);
    } finally {
        currentResultSet = null;
    }
    currentResultSet = result;
    return result;
}

(2)SQL 路由(包括 SQL 解析、路由、改写)

private SQLRouteResult sqlRouteResult;
private void shard(final String sql) {
    ShardingRuntimeContext runtimeContext = connection.getRuntimeContext();
    SimpleQueryShardingEngine shardingEngine = new SimpleQueryShardingEngine(
            runtimeContext.getRule(), runtimeContext.getProps(),
            runtimeContext.getMetaData(), runtimeContext.getParseEngine());
    sqlRouteResult = shardingEngine.shard(sql, Collections.emptyList());
}

说明: SimpleQueryShardingEngine 进行 SQL 路由(包括 SQL 解析、路由、改写),生成 SQLRouteResult。之后会有一节专门分析 SQL 路由过程。

当 ShardingStatement 完成 SQL 的路由,生成 SQLRouteResult 后,剩下的执行任务就全部交给 StatementExecutor 完成。

4.2 StatementExecutor

StatementExecutor 内部封装了 SQL 任务的执行过程,包括:SqlExecutePrepareTemplate 类生成执行计划 StatementExecuteUnit,以及 SQLExecuteTemplate 用于执行 StatementExecuteUnit。

4.2.1 类结构

StatementExecutor 类图

4.2.2 重要属性

AbstractStatementExecutor 类中重要的属性:

// SQLExecutePrepareTemplate用于生成执行计划StatementExecuteUnit
private final SQLExecutePrepareTemplate sqlExecutePrepareTemplate;
// 保存生成的执行计划StatementExecuteUnit
private final Collection<ShardingExecuteGroup<StatementExecuteUnit>> executeGroups =
            new LinkedList<>();

// SQLExecuteTemplate用于执行StatementExecuteUnit
private final SQLExecuteTemplate sqlExecuteTemplate;
// 保存查询结果
private final List<ResultSet> resultSets = new CopyOnWriteArrayList<>();

4.2.3 生成执行计划

// 执行前清理状态
private void clearPrevious() throws SQLException {
    statementExecutor.clear();
}
// 执行时初始化
private void initStatementExecutor() throws SQLException {
    statementExecutor.init(sqlRouteResult);
    replayMethodForStatements();
}

说明: StatementExecutor 是有状态的,每次执行前都要调用 statementExecutor.clear() 清理上一次执行的状态,并调用 statementExecutor.init() 重新初始化。下面我们看一下 init 主要做了些什么事。

statementExecutor.init() 初始化主要是生成执行计划 StatementExecuteUnit。

public void init(final SQLRouteResult routeResult) throws SQLException {
    setSqlStatementContext(routeResult.getSqlStatementContext());
    getExecuteGroups().addAll(obtainExecuteGroups(routeResult.getRouteUnits()));
    cacheStatements();
}

private Collection<ShardingExecuteGroup<StatementExecuteUnit>> obtainExecuteGroups(
        final Collection<RouteUnit> routeUnits) throws SQLException {
    return getSqlExecutePrepareTemplate().getExecuteUnitGroups(
            routeUnits, new SQLExecutePrepareCallback() {
                // 获取连接
                @Override
                public List<Connection> getConnections(
                        final ConnectionMode connectionMode,
                        final String dataSourceName, final int connectionSize)
                        throws SQLException {
                    return StatementExecutor.super.getConnection().getConnections(
                            connectionMode, dataSourceName, connectionSize);
                }

                // 生成执行计划RouteUnit -> StatementExecuteUnit
                @Override
                public StatementExecuteUnit createStatementExecuteUnit(
                        final Connection connection, final RouteUnit routeUnit,
                        final ConnectionMode connectionMode) throws SQLException {
                    return new StatementExecuteUnit(
                            routeUnit, connection.createStatement(
                            getResultSetType(), getResultSetConcurrency(),
                            getResultSetHoldability()), connectionMode);
                }
            });
}

说明: SqlExecutePrepareTemplate 是 sharding-core-execute 工程中提供的一个工具类,专门用于生成执行计划,将 RouteUnit 转化为 StatementExecuteUnit。同时还提供了另一个工具类 SQLExecuteTemplate 用于执行 StatementExecuteUnit,在任务执行时我们会看到这个类。

4.2.4 任务执行

public List<QueryResult> executeQuery() throws SQLException {
    final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
    SQLExecuteCallback<QueryResult> executeCallback =
        new SQLExecuteCallback<QueryResult>(getDatabaseType(), isExceptionThrown) {
        @Override
        protected QueryResult executeSQL(final String sql, final Statement statement,
                final ConnectionMode connectionMode) throws SQLException {
            return getQueryResult(sql, statement, connectionMode);
        }
    };
    // 执行StatementExecuteUnit
    return executeCallback(executeCallback);
}

// sqlExecuteTemplate 执行 executeGroups(即StatementExecuteUnit)
protected final <T> List<T> executeCallback(
        final SQLExecuteCallback<T> executeCallback) throws SQLException {
    // 执行所有的任务 StatementExecuteUnit
    List<T> result = sqlExecuteTemplate.executeGroup(
            (Collection) executeGroups, executeCallback);
    refreshMetaDataIfNeeded(connection.getRuntimeContext(), sqlStatementContext);
    return result;
}

说明: SqlExecuteTemplate 执行 StatementExecuteUnit 会回调 SQLExecuteCallback#executeSQL 方法,最终调用 getQueryResult 方法。

private QueryResult getQueryResult(final String sql, final Statement statement,
        final ConnectionMode connectionMode) throws SQLException {
    ResultSet resultSet = statement.executeQuery(sql);
    getResultSets().add(resultSet);
    return ConnectionMode.MEMORY_STRICTLY == connectionMode
            ? new StreamQueryResult(resultSet)
            : new MemoryQueryResult(resultSet);
}

说明: ConnectionMode 有两种模式:内存限制(MEMORY_STRICTLY)和连接限制(CONNECTION_STRICTLY),本质是一种资源隔离,保护服务器资源不被消耗殆尽。

如果一个连接执行多个 StatementExecuteUnit 则为内存限制(MEMORY_STRICTLY),采用流式处理,即 StreamQueryResult ,反之则为连接限制(CONNECTION_STRICTLY),此时会将所有从 MySQL 服务器返回的数据都加载到内存中。特别是在 Sharding-Proxy 中特别有用,避免将代理服务器撑爆,见 Apache Sharding-Proxy 核心原理



每天用心记录一点点。内容也许不重要,但习惯很重要!

原文地址:https://www.cnblogs.com/binarylei/p/12234545.html

时间: 2024-10-13 11:01:10

sharding-jdbc-core 源码分析的相关文章

spark core源码分析6 Spark job的提交

本节主要讲解SparkContext的逻辑 首先看一个spark自带的最简单的例子: object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = ma

DOTNET CORE源码分析之IOC容器结果获取内容补充

补充一下ServiceProvider的内容 可能上一篇文章DOTNET CORE源码分析之IServiceProvider.ServiceProvider.IServiceProviderEngine.ServiceProviderEngine和ServiceProviderEngineScope 中还没有关联上ServiceProvider和ServiceCollection就直接通过GetService获取了值,这样不科学啊.其实是有关联的,请看一下上篇文章同样存在的一个代码段: inte

spark core源码分析7 Executor的运行

实际任务的运行,都是通过Executor类来执行的.这一节,我们只介绍Standalone模式. 源码位置:org.apache.spark.executor.CoarseGrainedExecutorBackend private def run( driverUrl: String, executorId: String, hostname: String, cores: Int, appId: String, workerUrl: Option[String], userClassPath

spark core源码分析8 从简单例子看transformation

前面提到过spark自带的一个最简单的例子,也介绍了SparkContext的部分,这节介绍剩余的内容中的transformation. object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(

spark core源码分析4 worker启动流程

源码位置:org.apache.spark.deploy.worker.Worker.scala 首先查看worker的main方法,与master类似,创建sparkConf,参数解析,以及构造worker对象并创建ActorRef用于对外或者本身的信息交互.这里masters参数可以设置多个 def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args =

spark core源码分析10 Task的运行

这一节介绍具体task的运行以及最终结果的处理 看线程运行的run方法,见代码注释 override def run(): Unit = { val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager) val deserializeStartTime = System.currentTimeMillis() Thread.currentThread.setContextClassLoader(replClass

spark core源码分析13 异常情况下的容错保证

博客地址: http://blog.csdn.net/yueqian_zhu/ standalone模式下的框架图如下: 异常分析1: worker异常退出 worker异常退出,比如说有意识的通过kill指令将worker杀死 worker在退出之前,会将自己所管控的所有小弟executor全干掉 worker需要定期向master改善心跳消息的,现在worker进程都已经玩完了,哪有心跳消息,所以Master会在超时处理中意识到有一个"分舵"离开了 Master非常伤心,伤心的Ma

spark core源码分析14 参数配置

博客地址: http://blog.csdn.net/yueqian_zhu/ spark 参数详解 一.Shuffle 相关 1.spark.shuffle.manager(默认 sort) HashShuffleManager,故名思义也就是在Shuffle的过程中写数据时不做排序操作,只是将数据根据Hash的结果,将各个Reduce分区的数据写到各自的磁盘文件中.带来的问题就是如果Reduce分区的数量比较大的话,将会产生大量的磁盘文件.如果文件数量特别巨大,对文件读写的性能会带来比较大的

spark core源码分析9 从简单例子看action操作

上一节举例讲解了transformation操作,这一节以reduce为例讲解action操作 首先看submitJob方法,它将我们reduce中写的处理函数随JobSubmitted消息传递出去,因为每个分区都需要调用它进行计算: 而resultHandler是指最后合并的方法,在每个task完成后,需要调用resultHandler将最终结果合并.所以它不需要随JobSubmitted消息传递,而是保存在JobWaiter中 /** * Submit a job to the job sc

SpringBoot - 05. 数据访问之JDBC(源码分析+代码下载)

10分钟进阶SpringBoot - 05. 数据访问之JDBC github代码下载 一.JDBC是什么? JDBC API 属于Java APIJDBC用于以下几种功能:连接到数据库.执行SQL语句 二.Spring Boot中如何使用JDBC 2.1 创建 Spring Boot Project 时引入 JDBC API 依赖和 MySQL Driver依赖,以及Spring Web依赖(测试时用到) 可以在POM中找到引入的JDBC依赖和mysql依赖: JDBC 依赖: <depend