数据库路由中间件MyCat - 源代码篇(12)
4.配置模块
4.2 schema.xml
接上一篇,接下来载入每个schema的配置(也就是每个MyCat中虚拟化的数据库的配置):
XMLSchemaLoader.java
private void loadSchemas(Element root) {
NodeList list = root.getElementsByTagName("schema");
for (int i = 0, n = list.getLength(); i < n; i++) {
Element schemaElement = (Element) list.item(i);
//读取各个属性
String name = schemaElement.getAttribute("name");
String dataNode = schemaElement.getAttribute("dataNode");
String checkSQLSchemaStr = schemaElement.getAttribute("checkSQLschema");
String sqlMaxLimitStr = schemaElement.getAttribute("sqlMaxLimit");
int sqlMaxLimit = -1;
//读取sql返回结果集限制
if (sqlMaxLimitStr != null && !sqlMaxLimitStr.isEmpty()) {
sqlMaxLimit = Integer.valueOf(sqlMaxLimitStr);
}
// check dataNode already exists or not,看schema标签中是否有datanode
String defaultDbType = null;
//校验检查并添加dataNode
if (dataNode != null && !dataNode.isEmpty()) {
List<String> dataNodeLst = new ArrayList<String>(1);
dataNodeLst.add(dataNode);
checkDataNodeExists(dataNodeLst);
String dataHost = dataNodes.get(dataNode).getDataHost();
defaultDbType = dataHosts.get(dataHost).getDbType();
} else {
dataNode = null;
}
//加载schema下所有tables
Map<String, TableConfig> tables = loadTables(schemaElement);
//判断schema是否重复
if (schemas.containsKey(name)) {
throw new ConfigException("schema " + name + " duplicated!");
}
// 设置了table的不需要设置dataNode属性,没有设置table的必须设置dataNode属性
if (dataNode == null && tables.size() == 0) {
throw new ConfigException(
"schema " + name + " didn‘t config tables,so you must set dataNode property!");
}
SchemaConfig schemaConfig = new SchemaConfig(name, dataNode,
tables, sqlMaxLimit, "true".equalsIgnoreCase(checkSQLSchemaStr));
//设定DB类型,这对之后的sql语句路由解析有帮助
if (defaultDbType != null) {
schemaConfig.setDefaultDataNodeDbType(defaultDbType);
if (!"mysql".equalsIgnoreCase(defaultDbType)) {
schemaConfig.setNeedSupportMultiDBType(true);
}
}
// 判断是否有不是mysql的数据库类型,方便解析判断是否启用多数据库分页语法解析
for (String tableName : tables.keySet()) {
TableConfig tableConfig = tables.get(tableName);
if (isHasMultiDbType(tableConfig)) {
schemaConfig.setNeedSupportMultiDBType(true);
break;
}
}
//记录每种dataNode的DB类型
Map<String, String> dataNodeDbTypeMap = new HashMap<>();
for (String dataNodeName : dataNodes.keySet()) {
DataNodeConfig dataNodeConfig = dataNodes.get(dataNodeName);
String dataHost = dataNodeConfig.getDataHost();
DataHostConfig dataHostConfig = dataHosts.get(dataHost);
if (dataHostConfig != null) {
String dbType = dataHostConfig.getDbType();
dataNodeDbTypeMap.put(dataNodeName, dbType);
}
}
schemaConfig.setDataNodeDbTypeMap(dataNodeDbTypeMap);
schemas.put(name, schemaConfig);
}
}
首先读取schema每个配置属性项,并作有效性判断。比如默认的dataNode是否存在。只要验证之前读取的dataNode里面有没有就可以
private void checkDataNodeExists(Collection<String> nodes) {
if (nodes == null || nodes.size() < 1) {
return;
}
for (String node : nodes) {
if (!dataNodes.containsKey(node)) {
throw new ConfigException("dataNode ‘" + node + "‘ is not found!");
}
}
}
之后载入所有的table和childTable:
private Map<String, TableConfig> loadTables(Element node) {
// Map<String, TableConfig> tables = new HashMap<String, TableConfig>();
// 支持表名中包含引号[`] BEN GONG
Map<String, TableConfig> tables = new TableConfigMap();
NodeList nodeList = node.getElementsByTagName("table");
for (int i = 0; i < nodeList.getLength(); i++) {
Element tableElement = (Element) nodeList.item(i);
String tableNameElement = tableElement.getAttribute("name").toUpperCase();
//TODO:路由, 增加对动态日期表的支持
String tableNameSuffixElement = tableElement.getAttribute("nameSuffix").toUpperCase();
if ( !"".equals( tableNameSuffixElement ) ) {
if( tableNameElement.split(",").length > 1 ) {
throw new ConfigException("nameSuffix " + tableNameSuffixElement + ", require name parameter cannot multiple breaks!");
}
//前缀用来标明日期格式
tableNameElement = doTableNameSuffix(tableNameElement, tableNameSuffixElement);
}
//记录主键,用于之后路由分析,以及启用自增长主键
String[] tableNames = tableNameElement.split(",");
String primaryKey = tableElement.hasAttribute("primaryKey") ? tableElement.getAttribute("primaryKey").toUpperCase() : null;
//记录是否主键自增,默认不是,(启用全局sequence handler)
boolean autoIncrement = false;
if (tableElement.hasAttribute("autoIncrement")) {
autoIncrement = Boolean.parseBoolean(tableElement.getAttribute("autoIncrement"));
}
//记录是否需要加返回结果集限制,默认需要加
boolean needAddLimit = true;
if (tableElement.hasAttribute("needAddLimit")) {
needAddLimit = Boolean.parseBoolean(tableElement.getAttribute("needAddLimit"));
}
//记录type,是否为global
String tableTypeStr = tableElement.hasAttribute("type") ? tableElement.getAttribute("type") : null;
int tableType = TableConfig.TYPE_GLOBAL_DEFAULT;
if ("global".equalsIgnoreCase(tableTypeStr)) {
tableType = TableConfig.TYPE_GLOBAL_TABLE;
}
//记录dataNode,就是分布在哪些dataNode上
String dataNode = tableElement.getAttribute("dataNode");
TableRuleConfig tableRule = null;
if (tableElement.hasAttribute("rule")) {
String ruleName = tableElement.getAttribute("rule");
tableRule = tableRules.get(ruleName);
if (tableRule == null) {
throw new ConfigException("rule " + ruleName + " is not found!");
}
}
boolean ruleRequired = false;
//记录是否绑定有分片规则
if (tableElement.hasAttribute("ruleRequired")) {
ruleRequired = Boolean.parseBoolean(tableElement.getAttribute("ruleRequired"));
}
if (tableNames == null) {
throw new ConfigException("table name is not found!");
}
//distribute函数,重新编排dataNode
String distPrex = "distribute(";
boolean distTableDns = dataNode.startsWith(distPrex);
if (distTableDns) {
dataNode = dataNode.substring(distPrex.length(), dataNode.length() - 1);
}
//分表功能
String subTables = tableElement.getAttribute("subTables");
for (int j = 0; j < tableNames.length; j++) {
String tableName = tableNames[j];
TableConfig table = new TableConfig(tableName, primaryKey,
autoIncrement, needAddLimit, tableType, dataNode,
getDbType(dataNode),
(tableRule != null) ? tableRule.getRule() : null,
ruleRequired, null, false, null, null,subTables);
checkDataNodeExists(table.getDataNodes());
if (distTableDns) {
distributeDataNodes(table.getDataNodes());
}
//检查去重
if (tables.containsKey(table.getName())) {
throw new ConfigException("table " + tableName + " duplicated!");
}
//放入map
tables.put(table.getName(), table);
}
//只有tableName配置的是单个表(没有逗号)的时候才能有子表
if (tableNames.length == 1) {
TableConfig table = tables.get(tableNames[0]);
// process child tables
processChildTables(tables, table, dataNode, tableElement);
}
}
return tables;
}
对于子表,有递归读取配置:
private void processChildTables(Map<String, TableConfig> tables,
TableConfig parentTable, String dataNodes, Element tableNode) {
// parse child tables
NodeList childNodeList = tableNode.getChildNodes();
for (int j = 0; j < childNodeList.getLength(); j++) {
Node theNode = childNodeList.item(j);
if (!theNode.getNodeName().equals("childTable")) {
continue;
}
Element childTbElement = (Element) theNode;
//读取子表信息
String cdTbName = childTbElement.getAttribute("name").toUpperCase();
String primaryKey = childTbElement.hasAttribute("primaryKey") ? childTbElement.getAttribute("primaryKey").toUpperCase() : null;
boolean autoIncrement = false;
if (childTbElement.hasAttribute("autoIncrement")) {
autoIncrement = Boolean.parseBoolean(childTbElement.getAttribute("autoIncrement"));
}
boolean needAddLimit = true;
if (childTbElement.hasAttribute("needAddLimit")) {
needAddLimit = Boolean.parseBoolean(childTbElement.getAttribute("needAddLimit"));
}
String subTables = childTbElement.getAttribute("subTables");
//子表join键,和对应的parent的键,父子表通过这个关联
String joinKey = childTbElement.getAttribute("joinKey").toUpperCase();
String parentKey = childTbElement.getAttribute("parentKey").toUpperCase();
TableConfig table = new TableConfig(cdTbName, primaryKey,
autoIncrement, needAddLimit,
TableConfig.TYPE_GLOBAL_DEFAULT, dataNodes,
getDbType(dataNodes), null, false, parentTable, true,
joinKey, parentKey, subTables);
if (tables.containsKey(table.getName())) {
throw new ConfigException("table " + table.getName() + " duplicated!");
}
tables.put(table.getName(), table);
//对于子表的子表,递归处理
processChildTables(tables, table, dataNodes, childTbElement);
}
}
对于表的dataNode对应关系,有个特殊配置即类似dataNode=”distributed(dn$1-10)”,这个含义是:
/**
* distribute datanodes in multi hosts,means ,dn1 (host1),dn100
* (host2),dn300(host3),dn2(host1),dn101(host2),dn301(host3)...etc
* 将每个host上的datanode按照host重新排列。比如上面的例子host1拥有dn1,dn2,host2拥有dn100,dn101,host3拥有dn300,dn301,
* 按照host重新排列: 0->dn1 (host1),1->dn100(host2),2->dn300(host3),3->dn2(host1),4->dn101(host2),5->dn301(host3)
*
* @param theDataNodes
*/
private void distributeDataNodes(ArrayList<String> theDataNodes) {
Map<String, ArrayList<String>> newDataNodeMap = new HashMap<String, ArrayList<String>>(dataHosts.size());
for (String dn : theDataNodes) {
DataNodeConfig dnConf = dataNodes.get(dn);
String host = dnConf.getDataHost();
ArrayList<String> hostDns = newDataNodeMap.get(host);
hostDns = (hostDns == null) ? new ArrayList<String>() : hostDns;
hostDns.add(dn);
newDataNodeMap.put(host, hostDns);
}
ArrayList<String> result = new ArrayList<String>(theDataNodes.size());
boolean hasData = true;
while (hasData) {
hasData = false;
for (ArrayList<String> dns : newDataNodeMap.values()) {
if (!dns.isEmpty()) {
result.add(dns.remove(0));
hasData = true;
}
}
}
theDataNodes.clear();
theDataNodes.addAll(result);
}
读取完所有表之后,记录好DB类型,这对之后的sql语句路由解析有帮助。将所有schema的配置保存在:
private final Map<String, SchemaConfig> schemas;
4.3 server.xml
之后会读取载入server配置。
XMLConfigLoader.java:
public XMLConfigLoader(SchemaLoader schemaLoader) {
XMLServerLoader serverLoader = new XMLServerLoader();
this.system = serverLoader.getSystem();
this.users = serverLoader.getUsers();
this.quarantine = serverLoader.getQuarantine();
this.cluster = serverLoader.getCluster();
this.dataHosts = schemaLoader.getDataHosts();
this.dataNodes = schemaLoader.getDataNodes();
this.schemas = schemaLoader.getSchemas();
schemaLoader = null;
}
XMLServerLoader.java
public XMLServerLoader() {
this.system = new SystemConfig();
this.users = new HashMap<String, UserConfig>();
this.quarantine = new QuarantineConfig();
this.load();
}
private void load() {
//读取server.xml配置
InputStream dtd = null;
InputStream xml = null;
try {
dtd = XMLServerLoader.class.getResourceAsStream("/server.dtd");
xml = XMLServerLoader.class.getResourceAsStream("/server.xml");
Element root = ConfigUtil.getDocument(dtd, xml).getDocumentElement();
//加载System标签
loadSystem(root);
//加载User标签
loadUsers(root);
//加载集群配置
this.cluster = new ClusterConfig(root, system.getServerPort());
//加载权限和黑白名单
loadQuarantine(root);
} catch (ConfigException e) {
throw e;
} catch (Exception e) {
throw new ConfigException(e);
} finally {
if (dtd != null) {
try {
dtd.close();
} catch (IOException e) {
}
}
if (xml != null) {
try {
xml.close();
} catch (IOException e) {
}
}
}
}
首先加载System标签
时间: 2024-10-08 02:06:10