mycat对sql的解析分为两部分,一个是普通sql,另一个是PreparedStatment。
下面以解析普通sql为例分析(另一种方式大同小异),sql从客户端发过来后server接收后会调用FrontendCommandHandler的handle方法,这个方法会调用FrontendConnection的query方法,接着query方法会调用ServerQueryHandler的query方法,接着调用ServerConnection的execute方法。如下图所示:
public void execute(String sql, int type) { //连接状态检查 if (this.isClosed()) { LOGGER.warn("ignore execute ,server connection is closed " + this); return; } // 事务状态检查 if (txInterrupted) { writeErrMessage(ErrorCode.ER_YES, "Transaction error, need to rollback." + txInterrputMsg); return; } // 检查当前使用的DB String db = this.schema; boolean isDefault = true; if (db == null) { db = SchemaUtil.detectDefaultDb(sql, type); if (db == null) { writeErrMessage(ErrorCode.ERR_BAD_LOGICDB, "No MyCAT Database selected"); return; } isDefault = false; } // 兼容PhpAdmin's, 支持对MySQL元数据的模拟返回 //// TODO: 2016/5/20 支持更多information_schema特性 if (ServerParse.SELECT == type && db.equalsIgnoreCase("information_schema") ) { MysqlInformationSchemaHandler.handle(sql, this); return; } if (ServerParse.SELECT == type && sql.contains("mysql") && sql.contains("proc")) { SchemaUtil.SchemaInfo schemaInfo = SchemaUtil.parseSchema(sql); if (schemaInfo != null && "mysql".equalsIgnoreCase(schemaInfo.schema) && "proc".equalsIgnoreCase(schemaInfo.table)) { // 兼容MySQLWorkbench MysqlProcHandler.handle(sql, this); return; } } SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db); if (schema == null) { writeErrMessage(ErrorCode.ERR_BAD_LOGICDB, "Unknown MyCAT Database '" + db + "'"); return; } //fix navicat SELECT STATE AS `State`, ROUND(SUM(DURATION),7) AS `Duration`, CONCAT(ROUND(SUM(DURATION)/*100,3), '%') AS `Percentage` FROM INFORMATION_SCHEMA.PROFILING WHERE QUERY_ID= GROUP BY STATE ORDER BY SEQ if(ServerParse.SELECT == type &&sql.contains(" INFORMATION_SCHEMA.PROFILING ")&&sql.contains("CONCAT(ROUND(SUM(DURATION)/*100,3)")) { InformationSchemaProfiling.response(this); return; } /* 当已经设置默认schema时,可以通过在sql中指定其它schema的方式执行 * 相关sql,已经在mysql客户端中验证。 * 所以在此处增加关于sql中指定Schema方式的支持。 */ if (isDefault && schema.isCheckSQLSchema() && isNormalSql(type)) { SchemaUtil.SchemaInfo schemaInfo = SchemaUtil.parseSchema(sql); if (schemaInfo != null && schemaInfo.schema != null && !schemaInfo.schema.equals(db)) { SchemaConfig schemaConfig = MycatServer.getInstance().getConfig().getSchemas().get(schemaInfo.schema); if (schemaConfig != null) schema = schemaConfig; } } routeEndExecuteSQL(sql, type, schema); }
最后有个routeEndExecuteSQL方法,它会首先调用RouteService的route方法先进行路由,然后调用HintSQLHandler的route方法,这个方法里调用RouteStrategy的route方法,这里使用了一个策略模式,包含下面几种sql类型,不同类型使用不同策略。
public final class ServerParse { public static final int OTHER = -1; public static final int BEGIN = 1; public static final int COMMIT = 2; public static final int DELETE = 3; public static final int INSERT = 4; public static final int REPLACE = 5; public static final int ROLLBACK = 6; public static final int SELECT = 7; public static final int SET = 8; public static final int SHOW = 9; public static final int START = 10; public static final int UPDATE = 11; public static final int KILL = 12; public static final int SAVEPOINT = 13; public static final int USE = 14; public static final int EXPLAIN = 15; public static final int EXPLAIN2 = 151; public static final int KILL_QUERY = 16; public static final int HELP = 17; public static final int MYSQL_CMD_COMMENT = 18; public static final int MYSQL_COMMENT = 19; public static final int CALL = 20; public static final int DESCRIBE = 21; public static final int LOAD_DATA_INFILE_SQL = 99; public static final int DDL = 100;
使用不同的路由方法是在routeNormalSqlWithAST中决定的,
public RouteResultset routeNormalSqlWithAST(SchemaConfig schema, String stmt, RouteResultset rrs, String charset, LayerCachePool cachePool) throws SQLNonTransientException { /** * 只有mysql时只支持mysql语法 */ SQLStatementParser parser = null; if (schema.isNeedSupportMultiDBType()) { parser = new MycatStatementParser(stmt); } else { parser = new MySqlStatementParser(stmt); } MycatSchemaStatVisitor visitor = null; SQLStatement statement; /** * 解析出现问题统一抛SQL语法错误 */ try { statement = parser.parseStatement(); visitor = new MycatSchemaStatVisitor(); } catch (Exception t) { LOGGER.error("DruidMycatRouteStrategyError", t); throw new SQLSyntaxErrorException(t); } /** * 检验unsupported statement */ checkUnSupportedStatement(statement); DruidParser druidParser = DruidParserFactory.create(schema, statement, visitor); druidParser.parser(schema, rrs, statement, stmt,cachePool,visitor); /** * DruidParser 解析过程中已完成了路由的直接返回 */ if ( rrs.isFinishedRoute() ) { return rrs; } /** * 没有from的select语句或其他 */ DruidShardingParseInfo ctx= druidParser.getCtx() ; if((ctx.getTables() == null || ctx.getTables().size() == 0)&&(ctx.getTableAliasMap()==null||ctx.getTableAliasMap().isEmpty())) { return RouterUtil.routeToSingleNode(rrs, schema.getRandomDataNode(), druidParser.getCtx().getSql()); } if(druidParser.getCtx().getRouteCalculateUnits().size() == 0) { RouteCalculateUnit routeCalculateUnit = new RouteCalculateUnit(); druidParser.getCtx().addRouteCalculateUnit(routeCalculateUnit); } SortedSet<RouteResultsetNode> nodeSet = new TreeSet<RouteResultsetNode>(); for(RouteCalculateUnit unit: druidParser.getCtx().getRouteCalculateUnits()) { RouteResultset rrsTmp = RouterUtil.tryRouteForTables(schema, druidParser.getCtx(), unit, rrs, isSelect(statement), cachePool); if(rrsTmp != null) { for(RouteResultsetNode node :rrsTmp.getNodes()) { nodeSet.add(node); } } } RouteResultsetNode[] nodes = new RouteResultsetNode[nodeSet.size()]; int i = 0; for (Iterator<RouteResultsetNode> iterator = nodeSet.iterator(); iterator.hasNext();) { nodes[i] = iterator.next(); i++; } rrs.setNodes(nodes); //分表 /** * subTables="t_order$1-2,t_order3" *目前分表 1.6 开始支持 幵丏 dataNode 在分表条件下只能配置一个,分表条件下不支持join。 */ if(rrs.isDistTable()){ return this.routeDisTable(statement,rrs); } return rrs; }
它使用druid做数据库连接池,支持分库分表,下面我们以多个表的分库分表路由策略为例子进行分析。
public static void findRouteWithcConditionsForTables(SchemaConfig schema, RouteResultset rrs, Map<String, Map<String, Set<ColumnRoutePair>>> tablesAndConditions, Map<String, Set<String>> tablesRouteMap, String sql, LayerCachePool cachePool, boolean isSelect) throws SQLNonTransientException { //为分库表找路由 for(Map.Entry<String, Map<String, Set<ColumnRoutePair>>> entry : tablesAndConditions.entrySet()) { String tableName = entry.getKey().toUpperCase(); TableConfig tableConfig = schema.getTables().get(tableName); if(tableConfig == null) { String msg = "can't find table define in schema " + tableName + " schema:" + schema.getName(); LOGGER.warn(msg); throw new SQLNonTransientException(msg); } if(tableConfig.getDistTables()!=null && tableConfig.getDistTables().size()>0){ routeToDistTableNode(tableName,schema,rrs,sql, tablesAndConditions, cachePool,isSelect); } //全局表或者不分库的表略过(全局表后面再计算) if(tableConfig.isGlobalTable() || schema.getTables().get(tableName).getDataNodes().size() == 1) { continue; } else {//非全局表:分库表、childTable、其他 Map<String, Set<ColumnRoutePair>> columnsMap = entry.getValue(); String joinKey = tableConfig.getJoinKey(); String partionCol = tableConfig.getPartitionColumn(); String primaryKey = tableConfig.getPrimaryKey(); boolean isFoundPartitionValue = partionCol != null && entry.getValue().get(partionCol) != null; boolean isLoadData=false; if (LOGGER.isDebugEnabled() && sql.startsWith(LoadData.loadDataHint)||rrs.isLoadData()) { //由于load data一次会计算很多路由数据,如果输出此日志会极大降低load data的性能 isLoadData=true; } if(entry.getValue().get(primaryKey) != null && entry.getValue().size() == 1&&!isLoadData) {//主键查找 // try by primary key if found in cache Set<ColumnRoutePair> primaryKeyPairs = entry.getValue().get(primaryKey); if (primaryKeyPairs != null) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("try to find cache by primary key "); } String tableKey = schema.getName() + '_' + tableName; boolean allFound = true; for (ColumnRoutePair pair : primaryKeyPairs) {//可能id in(1,2,3)多主键 String cacheKey = pair.colValue; String dataNode = (String) cachePool.get(tableKey, cacheKey); if (dataNode == null) { allFound = false; continue; } else { if(tablesRouteMap.get(tableName) == null) { tablesRouteMap.put(tableName, new HashSet<String>()); } tablesRouteMap.get(tableName).add(dataNode); continue; } } if (!allFound) { // need cache primary key ->datanode relation if (isSelect && tableConfig.getPrimaryKey() != null) { rrs.setPrimaryKey(tableKey + '.' + tableConfig.getPrimaryKey()); } } else {//主键缓存中找到了就执行循环的下一轮 continue; } } } if (isFoundPartitionValue) {//分库表 Set<ColumnRoutePair> partitionValue = columnsMap.get(partionCol); if(partitionValue == null || partitionValue.size() == 0) { if(tablesRouteMap.get(tableName) == null) { tablesRouteMap.put(tableName, new HashSet<String>()); } tablesRouteMap.get(tableName).addAll(tableConfig.getDataNodes()); } else { for(ColumnRoutePair pair : partitionValue) { if(pair.colValue != null) { Integer nodeIndex = tableConfig.getRule().getRuleAlgorithm().calculate(pair.colValue); if(nodeIndex == null) { String msg = "can't find any valid datanode :" + tableConfig.getName() + " -> " + tableConfig.getPartitionColumn() + " -> " + pair.colValue; LOGGER.warn(msg); throw new SQLNonTransientException(msg); } ArrayList<String> dataNodes = tableConfig.getDataNodes(); String node; if (nodeIndex >=0 && nodeIndex < dataNodes.size()) { node = dataNodes.get(nodeIndex); } else { node = null; String msg = "Can't find a valid data node for specified node index :" + tableConfig.getName() + " -> " + tableConfig.getPartitionColumn() + " -> " + pair.colValue + " -> " + "Index : " + nodeIndex; LOGGER.warn(msg); throw new SQLNonTransientException(msg); } if(node != null) { if(tablesRouteMap.get(tableName) == null) { tablesRouteMap.put(tableName, new HashSet<String>()); } tablesRouteMap.get(tableName).add(node); } } if(pair.rangeValue != null) { Integer[] nodeIndexs = tableConfig.getRule().getRuleAlgorithm() .calculateRange(pair.rangeValue.beginValue.toString(), pair.rangeValue.endValue.toString()); ArrayList<String> dataNodes = tableConfig.getDataNodes(); String node; for(Integer idx : nodeIndexs) { if (idx >= 0 && idx < dataNodes.size()) { node = dataNodes.get(idx); } else { String msg = "Can't find valid data node(s) for some of specified node indexes :" + tableConfig.getName() + " -> " + tableConfig.getPartitionColumn(); LOGGER.warn(msg); throw new SQLNonTransientException(msg); } if(node != null) { if(tablesRouteMap.get(tableName) == null) { tablesRouteMap.put(tableName, new HashSet<String>()); } tablesRouteMap.get(tableName).add(node); } } } } } } else if(joinKey != null && columnsMap.get(joinKey) != null && columnsMap.get(joinKey).size() != 0) {//childTable (如果是select 语句的父子表join)之前要找到root table,将childTable移除,只留下root table Set<ColumnRoutePair> joinKeyValue = columnsMap.get(joinKey); Set<String> dataNodeSet = ruleByJoinValueCalculate(rrs, tableConfig, joinKeyValue); if (dataNodeSet.isEmpty()) { throw new SQLNonTransientException( "parent key can't find any valid datanode "); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("found partion nodes (using parent partion rule directly) for child table to update " + Arrays.toString(dataNodeSet.toArray()) + " sql :" + sql); } if (dataNodeSet.size() > 1) { routeToMultiNode(rrs.isCacheAble(), rrs, dataNodeSet, sql); rrs.setFinishedRoute(true); return; } else { rrs.setCacheAble(true); routeToSingleNode(rrs, dataNodeSet.iterator().next(), sql); return; } } else { //没找到拆分字段,该表的所有节点都路由 if(tablesRouteMap.get(tableName) == null) { tablesRouteMap.put(tableName, new HashSet<String>()); } tablesRouteMap.get(tableName).addAll(tableConfig.getDataNodes()); } } } }
mycat会先找主键(支持多主键),根据主键去找不同的node节点,然后在不同的node分别执行sql,这样它就获取了sql的路由表,所谓的路由表就是查找表存在于哪些节点中。这个如果是在依据主键分库分表(同时存在多种分片类型如下图所示)的情况下主要通过分析sql中的存在的表名和主键的键值在schema配置中通过算法(RuleAlgorithm)查找的(如果没有主键范围就路由到所有节点),找到节点后,才具体去执行sql。
PartitionByDate PartitionByFileMap PartitionByHashMod PartitionByHotDate PartitionByJumpConsistentHash PartitionByLong PartitionByMod PartitionByMonth PartitionByMurmurHash PartitionByPattern PartitionByPrefixPattern PartitionByRangeDateHash PartitionByRangeMod PartitionByString PartitionDirectBySubString
在上面提到的routeEndExecuteSQL方法中找到路由节点后它会调用NonBlockingSession的execute方法,它分为单节点模式和多节点模式,下面以多节点模式为例,在这种情况下它会调用MultiNodeQueryHandler的execute方法。
public void execute() throws Exception { final ReentrantLock lock = this.lock; lock.lock(); try { this.reset(rrs.getNodes().length); this.fieldsReturned = false; this.affectedRows = 0L; this.insertId = 0L; } finally { lock.unlock(); } MycatConfig conf = MycatServer.getInstance().getConfig(); startTime = System.currentTimeMillis(); LOGGER.debug("rrs.getRunOnSlave()-" + rrs.getRunOnSlave()); for (final RouteResultsetNode node : rrs.getNodes()) { BackendConnection conn = session.getTarget(node); if (session.tryExistsCon(conn, node)) { LOGGER.debug("node.getRunOnSlave()-" + node.getRunOnSlave()); node.setRunOnSlave(rrs.getRunOnSlave()); // 实现 master/slave注解 LOGGER.debug("node.getRunOnSlave()-" + node.getRunOnSlave()); _execute(conn, node); } else { // create new connection LOGGER.debug("node.getRunOnSlave()1-" + node.getRunOnSlave()); node.setRunOnSlave(rrs.getRunOnSlave()); // 实现 master/slave注解 LOGGER.debug("node.getRunOnSlave()2-" + node.getRunOnSlave()); PhysicalDBNode dn = conf.getDataNodes().get(node.getName()); dn.getConnection(dn.getDatabase(), autocommit, node, this, node); // 注意该方法不仅仅是获取连接,获取新连接成功之后,会通过层层回调,最后回调到本类 的connectionAcquired // 这是通过 上面方法的 this 参数的层层传递完成的。 // connectionAcquired 进行执行操作: // session.bindConnection(node, conn); // _execute(conn, node); } } }
到此优化后的sql被发给路由结果中的各个节点执行。