MyCat - 源代码篇(14)

数据库路由中间件MyCat - 源代码篇(14)

5. 路由模块

真正取得RouteResultset的步骤:AbstractRouteStrategy的route方法:

Created with Rapha?l 2.1.0Start处理一些路由之前的逻辑,返回真假?return nullsql拦截器拦截(就是用户自定义拦截一些语句并改写)是否需要checkSQLschema去掉schema name设置autocommit?是否是DDL语句rrs = DDL语句路由return rrsschema有默认node并且不是show语句rrs = 查询信息路由是不是查询信息语句(如show等)rrs = AST语义解析路由yesnoyesnoyesnoyesnoyesno

对应源代码:

public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,
            String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {

    /**
     * 处理一些路由之前的逻辑
     * 全局序列号,父子表插入
     */
    if ( beforeRouteProcess(schema, sqlType, origSQL, sc) )
        return null;

    /**
     * SQL 语句拦截
     */
    String stmt = MycatServer.getInstance().getSqlInterceptor().interceptSQL(origSQL, sqlType);
    if (origSQL != stmt && LOGGER.isDebugEnabled()) {
        LOGGER.debug("sql intercepted to " + stmt + " from " + origSQL);
    }

    //对应schema标签checkSQLschema属性,把表示schema的字符去掉
    if (schema.isCheckSQLSchema()) {
        stmt = RouterUtil.removeSchema(stmt, schema.getName());
    }

    RouteResultset rrs = new RouteResultset(stmt, sqlType);

    /**
     * 优化debug loaddata输出cache的日志会极大降低性能
     */
    if (LOGGER.isDebugEnabled() && origSQL.startsWith(LoadData.loadDataHint)) {
        rrs.setCacheAble(false);
    }

       /**
        * rrs携带ServerConnection的autocommit状态用于在sql解析的时候遇到
        * select ... for update的时候动态设定RouteResultsetNode的canRunInReadDB属性
        */
    if (sc != null ) {
        rrs.setAutocommit(sc.isAutocommit());
    }

    /**
     * DDL 语句的路由
     */
    if (ServerParse.DDL == sqlType) {
        return RouterUtil.routeToDDLNode(rrs, sqlType, stmt, schema);
    }

    /**
     * 检查是否有分片
     */
    if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {
        rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);
    } else {
        RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);
        if (returnedSet == null) {
            rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool);
        }
    }

    return rrs;
}

5.3 路由之前的逻辑 - 判断子表插入以及全局序列号的生成:

AbstractRouteStrategy.java

/**
 * 路由之前必要的处理
 * 主要是全局序列号插入,还有子表插入
 */
private boolean beforeRouteProcess(SchemaConfig schema, int sqlType, String origSQL, ServerConnection sc)
        throws SQLNonTransientException {

    return RouterUtil.processWithMycatSeq(schema, sqlType, origSQL, sc)
            || (sqlType == ServerParse.INSERT && RouterUtil.processERChildTable(schema, origSQL, sc))
            || (sqlType == ServerParse.INSERT && RouterUtil.processInsert(schema, sqlType, origSQL, sc));
}

这里利用了Java的一个特性,||表达式,前半部分如果为真,则后半部分不会被执行。首先执行RouterUtil.processWithMycatSeq(schema, sqlType, origSQL, sc),这个方法是判断是否是显示使用全局序列号的sql语句,比如像:insert into table1(id,name) values(next value for MYCATSEQ_GLOBAL,‘test’);

如果不是,则执行(sqlType == ServerParse.INSERT && RouterUtil.processERChildTable(schema, origSQL, sc)),这个方法判断是否是子表插入:

部分代码:

String tableName = StringUtil.getTableName(origSQL).toUpperCase();
final TableConfig tc = schema.getTables().get(tableName);
//判断是否为子表,如果不是,只会返回false
if (null != tc && tc.isChildTable()) {
final RouteResultset rrs = new RouteResultset(origSQL, ServerParse.INSERT);
String joinKey = tc.getJoinKey();
//因为是Insert语句,用MySqlInsertStatement进行parse
MySqlInsertStatement insertStmt = (MySqlInsertStatement) (new MySqlStatementParser(origSQL)).parseInsert();
......

这里注意,所有类型的SQL语句都有druid对应的SQLparser,比如说这里的插入语句就用MySqlInsertStatement解析。druidparser在这节先不讲,会在 AST语义解析路由中详细讲述。

Created with Rapha?l 2.1.0Start是否为子表?利用MySqlInsertStatement进行parse语句是否包含joinKey字段是否是批量插入是否是二级子表(父表不再有父表),并且分片字段正好是joinkey字段调用routeByERParentKey(就是直接用父表规则计算出datanode)得到RouteResultSet,判断是否需要全局序列号,执行语句,返回true启动异步线程去后台分片查询出datanode,只要查询出上一级表的parentkey字段的对应值在哪个分片即可,将查询结果封装成RouteResultSet返回falseyesnoyesnoyesno

接上面代码:

//判断条件完整性,取得解析后语句列中的joinkey列的index
    int joinKeyIndex = getJoinKeyIndex(insertStmt.getColumns(), joinKey);
    if (joinKeyIndex == -1) {
        String inf = "joinKey not provided :" + tc.getJoinKey() + "," + insertStmt;
        LOGGER.warn(inf);
        throw new SQLNonTransientException(inf);
    }
    //子表不支持批量插入
    if (isMultiInsert(insertStmt)) {
        String msg = "ChildTable multi insert not provided";
        LOGGER.warn(msg);
        throw new SQLNonTransientException(msg);
    }
    //取得joinkey的值
    String joinKeyVal = insertStmt.getValues().getValues().get(joinKeyIndex).toString();

    String sql = insertStmt.toString();

    // try to route by ER parent partion key
    //如果是二级子表(父表不再有父表),并且分片字段正好是joinkey字段,调用routeByERParentKey
    RouteResultset theRrs = RouterUtil.routeByERParentKey(sc, schema, ServerParse.INSERT, sql, rrs, tc, joinKeyVal);
    if (theRrs != null) {
        boolean processedInsert=false;
        //判断是否需要全局序列号
              if ( sc!=null && tc.isAutoIncrement()) {
                  String primaryKey = tc.getPrimaryKey();
                  processedInsert=processInsert(sc,schema,ServerParse.INSERT,sql,tc.getName(),primaryKey);
              }
              if(processedInsert==false){
                rrs.setFinishedRoute(true);
                  sc.getSession2().execute(rrs, ServerParse.INSERT);
              }
        return true;
    }

    // route by sql query root parent‘s datanode
    //如果不是二级子表或者分片字段不是joinKey字段结果为空,则启动异步线程去后台分片查询出datanode
    //只要查询出上一级表的parentkey字段的对应值在哪个分片即可
    final String findRootTBSql = tc.getLocateRTableKeySql().toLowerCase() + joinKeyVal;
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("find root parent‘s node sql " + findRootTBSql);
    }

    ListenableFuture<String> listenableFuture = MycatServer.getInstance().
            getListeningExecutorService().submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            FetchStoreNodeOfChildTableHandler fetchHandler = new FetchStoreNodeOfChildTableHandler();
            return fetchHandler.execute(schema.getName(), findRootTBSql, tc.getRootParent().getDataNodes());
        }
    });

    Futures.addCallback(listenableFuture, new FutureCallback<String>() {
        @Override
        public void onSuccess(String result) {
            //结果为空,证明上一级表中不存在那条记录,失败
            if (Strings.isNullOrEmpty(result)) {
                StringBuilder s = new StringBuilder();
                LOGGER.warn(s.append(sc.getSession2()).append(origSQL).toString() +
                        " err:" + "can‘t find (root) parent sharding node for sql:" + origSQL);
                sc.writeErrMessage(ErrorCode.ER_PARSE_ERROR, "can‘t find (root) parent sharding node for sql:" + origSQL);
                return;
            }

            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("found partion node for child table to insert " + result + " sql :" + origSQL);
            }
            //找到分片,进行插入(和其他的一样,需要判断是否需要全局自增ID)
            boolean processedInsert=false;
                  if ( sc!=null && tc.isAutoIncrement()) {
                      try {
                          String primaryKey = tc.getPrimaryKey();
                    processedInsert=processInsert(sc,schema,ServerParse.INSERT,origSQL,tc.getName(),primaryKey);
                } catch (SQLNonTransientException e) {
                    LOGGER.warn("sequence processInsert error,",e);
                    sc.writeErrMessage(ErrorCode.ER_PARSE_ERROR , "sequence processInsert error," + e.getMessage());
                }
                  }
                  if(processedInsert==false){
                    RouteResultset executeRrs = RouterUtil.routeToSingleNode(rrs, result, origSQL);
                    sc.getSession2().execute(executeRrs, ServerParse.INSERT);
                  }

        }

        @Override
        public void onFailure(Throwable t) {
            StringBuilder s = new StringBuilder();
            LOGGER.warn(s.append(sc.getSession2()).append(origSQL).toString() +
                    " err:" + t.getMessage());
            sc.writeErrMessage(ErrorCode.ER_PARSE_ERROR, t.getMessage() + " " + s.toString());
        }
    }, MycatServer.getInstance().
            getListeningExecutorService());
    return true;
}
return false;

如果返回false,则继续执行(sqlType == ServerParse.INSERT && RouterUtil.processInsert(schema, sqlType, origSQL, sc))

这个是处理一般的SQL插入语句,将其中的自增主键字段的值改写成内置的全局ID生成器生成的id。

RouterUtil.java:

public static boolean processInsert(SchemaConfig schema, int sqlType,
                                        String origSQL, ServerConnection sc) throws SQLNonTransientException {
    String tableName = StringUtil.getTableName(origSQL).toUpperCase();
    TableConfig tableConfig = schema.getTables().get(tableName);
    boolean processedInsert=false;
    //判断是有自增字段
    if (null != tableConfig && tableConfig.isAutoIncrement()) {
        String primaryKey = tableConfig.getPrimaryKey();
        processedInsert=processInsert(sc,schema,sqlType,origSQL,tableName,primaryKey);
    }
    return processedInsert;
}

调用processInsert(sc,schema,sqlType,origSQL,tableName,primaryKey):

Created with Rapha?l 2.1.0Start是否为类似于insert into table1 select * from table2的语句抛对应异常是否为批量插入插入语句是否提供列结构?如果主键不在插入语句的fields中,则需要进一步处理yesnoyesnoyesno

public static boolean processInsert(ServerConnection sc,SchemaConfig schema,
            int sqlType,String origSQL,String tableName,String primaryKey) throws SQLNonTransientException {

    int firstLeftBracketIndex = origSQL.indexOf("(");
    int firstRightBracketIndex = origSQL.indexOf(")");
    String upperSql = origSQL.toUpperCase();
    int valuesIndex = upperSql.indexOf("VALUES");
    int selectIndex = upperSql.indexOf("SELECT");
    int fromIndex = upperSql.indexOf("FROM");
    //屏蔽insert into table1 select * from table2语句
    if(firstLeftBracketIndex < 0) {
        String msg = "invalid sql:" + origSQL;
        LOGGER.warn(msg);
        throw new SQLNonTransientException(msg);
    }
    //屏蔽批量插入
    if(selectIndex > 0 &&fromIndex>0&&selectIndex>firstRightBracketIndex&&valuesIndex<0) {
        String msg = "multi insert not provided" ;
        LOGGER.warn(msg);
        throw new SQLNonTransientException(msg);
    }
    //插入语句必须提供列结构,因为MyCat默认对于表结构无感知
    if(valuesIndex + "VALUES".length() <= firstLeftBracketIndex) {
        throw new SQLSyntaxErrorException("insert must provide ColumnList");
    }
    //如果主键不在插入语句的fields中,则需要进一步处理
    boolean processedInsert=!isPKInFields(origSQL,primaryKey,firstLeftBracketIndex,firstRightBracketIndex);
    if(processedInsert){
        processInsert(sc,schema,sqlType,origSQL,tableName,primaryKey,firstLeftBracketIndex+1,origSQL.indexOf(‘(‘,firstRightBracketIndex)+1);
    }
    return processedInsert;
}

对于主键不在插入语句的fields中的SQL,需要改写。比如hotnews主键为id,插入语句为:

insert into hotnews(title) values(‘aaa‘);

需要改写成:

insert into hotnews(id, title) values(next value for MYCATSEQ_hotnews,‘aaa‘);

这个在下面这个函数实现:

private static void processInsert(ServerConnection sc, SchemaConfig schema, int sqlType, String origSQL,
            String tableName, String primaryKey, int afterFirstLeftBracketIndex, int afterLastLeftBracketIndex) {

    int primaryKeyLength = primaryKey.length();
    int insertSegOffset = afterFirstLeftBracketIndex;
    String mycatSeqPrefix = "next value for MYCATSEQ_";
    int mycatSeqPrefixLength = mycatSeqPrefix.length();
    int tableNameLength = tableName.length();

    char[] newSQLBuf = new char[origSQL.length() + primaryKeyLength + mycatSeqPrefixLength + tableNameLength + 2];
    origSQL.getChars(0, afterFirstLeftBracketIndex, newSQLBuf, 0);
    primaryKey.getChars(0, primaryKeyLength, newSQLBuf, insertSegOffset);
    insertSegOffset += primaryKeyLength;
    newSQLBuf[insertSegOffset] = ‘,‘;
    insertSegOffset++;
    origSQL.getChars(afterFirstLeftBracketIndex, afterLastLeftBracketIndex, newSQLBuf, insertSegOffset);
    insertSegOffset += afterLastLeftBracketIndex - afterFirstLeftBracketIndex;
    mycatSeqPrefix.getChars(0, mycatSeqPrefixLength, newSQLBuf, insertSegOffset);
    insertSegOffset += mycatSeqPrefixLength;
    tableName.getChars(0, tableNameLength, newSQLBuf, insertSegOffset);
    insertSegOffset += tableNameLength;
    newSQLBuf[insertSegOffset] = ‘,‘;
    insertSegOffset++;
    origSQL.getChars(afterLastLeftBracketIndex, origSQL.length(), newSQLBuf, insertSegOffset);
    processSQL(sc, schema, new String(newSQLBuf), sqlType);
}

最后的processSQL(sc, schema, new String(newSQLBuf), sqlType);是将语句放入执行队列:

这里MyCat考虑NIO线程吞吐量以及全局ID生成线程安全的问题,使用如下流程执行需要全局ID的SQL insert语句。

processSQL(sc, schema, new String(newSQLBuf), sqlType):

SessionSQLPair sessionSQLPair = new SessionSQLPair(sc.getSession2(), schema, sql, sqlType);
MycatServer.getInstance().getSequnceProcessor().addNewSql(sessionSQLPair);

Created with Rapha?l 2.1.0将改写的语句写入队列,当前线程返回队列线程ExecuteThread,单线程,读取队列队列中有数据?读取数据,改写sql,传回ServerConnection的routeEndExecuteSQL方法这时,不会再进入 路由之前的逻辑,而是进入AST语义解析路由yes

时间: 2024-10-10 08:43:16

MyCat - 源代码篇(14)的相关文章

MyCat - 源代码篇(11)

数据库路由中间件MyCat - 源代码篇(11) 4.配置模块 每个MyCatServer初始化时,会初始化: MyCatServer.java: public static final String NAME = "MyCat"; private static final long LOG_WATCH_DELAY = 60000L; private static final long TIME_UPDATE_PERIOD = 20L; private static final Myc

MyCat - 源代码篇(12)

数据库路由中间件MyCat - 源代码篇(12) 4.配置模块 4.2 schema.xml 接上一篇,接下来载入每个schema的配置(也就是每个MyCat中虚拟化的数据库的配置): XMLSchemaLoader.java private void loadSchemas(Element root) { NodeList list = root.getElementsByTagName("schema"); for (int i = 0, n = list.getLength();

数据库路由中间件MyCat - 源代码篇(1)

此文已由作者张镐薪授权网易云社区发布. 欢迎访问网易云社区,了解更多网易技术产品运营经验. 进入了源代码篇,我们先从整体入手,之后拿一个简单流程前端连接建立与认证作为例子,理清代码思路和设计模式.然后,针对每一个重点模块进行分析. 1. 整体通信与业务框架: 前端与后端通信框架都为NIO/AIO,因为目前生产上用的linux发行版内核都没有真正实现网络上的AIO,如果应用用AIO的话可能比NIO还要慢一些,所以,我们这里只分析NIO相关的通信模块. NIOAcceptor:作为服务器接受客户端连

数据库路由中间件MyCat - 源代码篇(8)

此文已由作者张镐薪授权网易云社区发布. 欢迎访问网易云社区,了解更多网易技术产品运营经验. 3. 连接模块 3.5 后端连接 对于后端连接,我们只关心MySQL的. 从后端连接工厂开始MySQLConnectionFactory.java: public MySQLConnection make(MySQLDataSource pool, ResponseHandler handler,             String schema) throws IOException {       

数据库路由中间件MyCat - 源代码篇(17)

此文已由作者张镐薪授权网易云社区发布. 欢迎访问网易云社区,了解更多网易技术产品运营经验. 调用processInsert(sc,schema,sqlType,origSQL,tableName,primaryKey): public static boolean processInsert(ServerConnection sc,SchemaConfig schema,            int sqlType,String origSQL,String tableName,String 

数据库路由中间件MyCat - 源代码篇(7)

此文已由作者张镐薪授权网易云社区发布. 欢迎访问网易云社区,了解更多网易技术产品运营经验. 3. 连接模块 3.4 FrontendConnection前端连接 构造方法: public FrontendConnection(NetworkChannel channel) throws IOException {     super(channel);      InetSocketAddress localAddr = (InetSocketAddress) channel.getLocalA

数据库路由中间件MyCat - 源代码篇(9)

此文已由作者张镐薪授权网易云社区发布. 欢迎访问网易云社区,了解更多网易技术产品运营经验. 3. 连接模块 3.5 后端连接 3.5.1 后端连接获取与负载均衡 上一节我们讲了后端连接的基本建立和响应处理,那么这些后端连接是什么时候建立的呢? 首先,MyCat配置文件中,DataHost标签中有minIdle这个属性.代表在MyCat初始化时,会在这个DataHost上初始化维护多少个连接(这些连接可以理解为连接池).每个前端Client连接会创建Session,而Session会根据命令的不同

数据库路由中间件MyCat - 源代码篇(16)

此文已由作者张镐薪授权网易云社区发布. 欢迎访问网易云社区,了解更多网易技术产品运营经验. 5. 路由模块 真正取得RouteResultset的步骤:AbstractRouteStrategy的route方法:对应源代码: public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,             String charset, ServerC

MyCat - 使用篇(4)

数据库路由中间件MyCat - 使用篇(4) 配置MyCat 3. 配置conf/rule.xml 1.4.1中的规则配置比较笨,1.5中优化了一些,将tableRule标签和function标签合并了,并且支持Velocity模板语言,更加灵活.这里先介绍1.4.1的: <?xml version="1.0" encoding="UTF-8"?><!DOCTYPE mycat:rule SYSTEM "rule.dtd">