MyCat - 源代码篇(12)

数据库路由中间件MyCat - 源代码篇(12)

4.配置模块

4.2 schema.xml

接上一篇,接下来载入每个schema的配置(也就是每个MyCat中虚拟化的数据库的配置):

XMLSchemaLoader.java

private void loadSchemas(Element root) {
        NodeList list = root.getElementsByTagName("schema");
    for (int i = 0, n = list.getLength(); i < n; i++) {
        Element schemaElement = (Element) list.item(i);
        //读取各个属性
        String name = schemaElement.getAttribute("name");
        String dataNode = schemaElement.getAttribute("dataNode");
        String checkSQLSchemaStr = schemaElement.getAttribute("checkSQLschema");
        String sqlMaxLimitStr = schemaElement.getAttribute("sqlMaxLimit");
        int sqlMaxLimit = -1;
        //读取sql返回结果集限制
        if (sqlMaxLimitStr != null && !sqlMaxLimitStr.isEmpty()) {
            sqlMaxLimit = Integer.valueOf(sqlMaxLimitStr);
        }

        // check dataNode already exists or not,看schema标签中是否有datanode
        String defaultDbType = null;
        //校验检查并添加dataNode
        if (dataNode != null && !dataNode.isEmpty()) {
            List<String> dataNodeLst = new ArrayList<String>(1);
            dataNodeLst.add(dataNode);
            checkDataNodeExists(dataNodeLst);
            String dataHost = dataNodes.get(dataNode).getDataHost();
            defaultDbType = dataHosts.get(dataHost).getDbType();
        } else {
            dataNode = null;
        }
        //加载schema下所有tables
        Map<String, TableConfig> tables = loadTables(schemaElement);
        //判断schema是否重复
        if (schemas.containsKey(name)) {
            throw new ConfigException("schema " + name + " duplicated!");
        }

        // 设置了table的不需要设置dataNode属性,没有设置table的必须设置dataNode属性
        if (dataNode == null && tables.size() == 0) {
            throw new ConfigException(
                    "schema " + name + " didn‘t config tables,so you must set dataNode property!");
        }

        SchemaConfig schemaConfig = new SchemaConfig(name, dataNode,
                tables, sqlMaxLimit, "true".equalsIgnoreCase(checkSQLSchemaStr));

        //设定DB类型,这对之后的sql语句路由解析有帮助
        if (defaultDbType != null) {
            schemaConfig.setDefaultDataNodeDbType(defaultDbType);
            if (!"mysql".equalsIgnoreCase(defaultDbType)) {
                schemaConfig.setNeedSupportMultiDBType(true);
            }
        }

        // 判断是否有不是mysql的数据库类型,方便解析判断是否启用多数据库分页语法解析
        for (String tableName : tables.keySet()) {
            TableConfig tableConfig = tables.get(tableName);
            if (isHasMultiDbType(tableConfig)) {
                schemaConfig.setNeedSupportMultiDBType(true);
                break;
            }
        }
        //记录每种dataNode的DB类型
        Map<String, String> dataNodeDbTypeMap = new HashMap<>();
        for (String dataNodeName : dataNodes.keySet()) {
            DataNodeConfig dataNodeConfig = dataNodes.get(dataNodeName);
            String dataHost = dataNodeConfig.getDataHost();
            DataHostConfig dataHostConfig = dataHosts.get(dataHost);
            if (dataHostConfig != null) {
                String dbType = dataHostConfig.getDbType();
                dataNodeDbTypeMap.put(dataNodeName, dbType);
            }
        }
        schemaConfig.setDataNodeDbTypeMap(dataNodeDbTypeMap);
        schemas.put(name, schemaConfig);
    }
}

首先读取schema每个配置属性项,并作有效性判断。比如默认的dataNode是否存在。只要验证之前读取的dataNode里面有没有就可以

private void checkDataNodeExists(Collection<String> nodes) {
    if (nodes == null || nodes.size() < 1) {
        return;
    }
    for (String node : nodes) {
        if (!dataNodes.containsKey(node)) {
            throw new ConfigException("dataNode ‘" + node + "‘ is not found!");
        }
    }
}

之后载入所有的table和childTable:

private Map<String, TableConfig> loadTables(Element node) {

    // Map<String, TableConfig> tables = new HashMap<String, TableConfig>();

    // 支持表名中包含引号[`] BEN GONG
    Map<String, TableConfig> tables = new TableConfigMap();
    NodeList nodeList = node.getElementsByTagName("table");
    for (int i = 0; i < nodeList.getLength(); i++) {
        Element tableElement = (Element) nodeList.item(i);
        String tableNameElement = tableElement.getAttribute("name").toUpperCase();

        //TODO:路由, 增加对动态日期表的支持
        String tableNameSuffixElement = tableElement.getAttribute("nameSuffix").toUpperCase();
        if ( !"".equals( tableNameSuffixElement ) ) {               

            if( tableNameElement.split(",").length > 1 ) {
                throw new ConfigException("nameSuffix " + tableNameSuffixElement + ", require name parameter cannot multiple breaks!");
            }
            //前缀用来标明日期格式
            tableNameElement = doTableNameSuffix(tableNameElement, tableNameSuffixElement);
        }
        //记录主键,用于之后路由分析,以及启用自增长主键
        String[] tableNames = tableNameElement.split(",");
        String primaryKey = tableElement.hasAttribute("primaryKey") ? tableElement.getAttribute("primaryKey").toUpperCase() : null;
        //记录是否主键自增,默认不是,(启用全局sequence handler)
        boolean autoIncrement = false;
        if (tableElement.hasAttribute("autoIncrement")) {
            autoIncrement = Boolean.parseBoolean(tableElement.getAttribute("autoIncrement"));
        }
        //记录是否需要加返回结果集限制,默认需要加
        boolean needAddLimit = true;
        if (tableElement.hasAttribute("needAddLimit")) {
            needAddLimit = Boolean.parseBoolean(tableElement.getAttribute("needAddLimit"));
        }
        //记录type,是否为global
        String tableTypeStr = tableElement.hasAttribute("type") ? tableElement.getAttribute("type") : null;
        int tableType = TableConfig.TYPE_GLOBAL_DEFAULT;
        if ("global".equalsIgnoreCase(tableTypeStr)) {
            tableType = TableConfig.TYPE_GLOBAL_TABLE;
        }
        //记录dataNode,就是分布在哪些dataNode上
        String dataNode = tableElement.getAttribute("dataNode");
        TableRuleConfig tableRule = null;
        if (tableElement.hasAttribute("rule")) {
            String ruleName = tableElement.getAttribute("rule");
            tableRule = tableRules.get(ruleName);
            if (tableRule == null) {
                throw new ConfigException("rule " + ruleName + " is not found!");
            }
        }

        boolean ruleRequired = false;
        //记录是否绑定有分片规则
        if (tableElement.hasAttribute("ruleRequired")) {
            ruleRequired = Boolean.parseBoolean(tableElement.getAttribute("ruleRequired"));
        }

        if (tableNames == null) {
            throw new ConfigException("table name is not found!");
        }
        //distribute函数,重新编排dataNode
        String distPrex = "distribute(";
        boolean distTableDns = dataNode.startsWith(distPrex);
        if (distTableDns) {
            dataNode = dataNode.substring(distPrex.length(), dataNode.length() - 1);
        }
        //分表功能
        String subTables = tableElement.getAttribute("subTables");

        for (int j = 0; j < tableNames.length; j++) {
            String tableName = tableNames[j];
            TableConfig table = new TableConfig(tableName, primaryKey,
                    autoIncrement, needAddLimit, tableType, dataNode,
                    getDbType(dataNode),
                    (tableRule != null) ? tableRule.getRule() : null,
                    ruleRequired, null, false, null, null,subTables);

            checkDataNodeExists(table.getDataNodes());

            if (distTableDns) {
                distributeDataNodes(table.getDataNodes());
            }
            //检查去重
            if (tables.containsKey(table.getName())) {
                throw new ConfigException("table " + tableName + " duplicated!");
            }
            //放入map
            tables.put(table.getName(), table);
        }
        //只有tableName配置的是单个表(没有逗号)的时候才能有子表
        if (tableNames.length == 1) {
            TableConfig table = tables.get(tableNames[0]);
            // process child tables
            processChildTables(tables, table, dataNode, tableElement);
        }
    }
    return tables;
}

对于子表,有递归读取配置:

private void processChildTables(Map<String, TableConfig> tables,
            TableConfig parentTable, String dataNodes, Element tableNode) {

        // parse child tables
        NodeList childNodeList = tableNode.getChildNodes();
        for (int j = 0; j < childNodeList.getLength(); j++) {
            Node theNode = childNodeList.item(j);
            if (!theNode.getNodeName().equals("childTable")) {
                continue;
            }
            Element childTbElement = (Element) theNode;
            //读取子表信息
            String cdTbName = childTbElement.getAttribute("name").toUpperCase();
            String primaryKey = childTbElement.hasAttribute("primaryKey") ? childTbElement.getAttribute("primaryKey").toUpperCase() : null;

            boolean autoIncrement = false;
            if (childTbElement.hasAttribute("autoIncrement")) {
                autoIncrement = Boolean.parseBoolean(childTbElement.getAttribute("autoIncrement"));
            }
            boolean needAddLimit = true;
            if (childTbElement.hasAttribute("needAddLimit")) {
                needAddLimit = Boolean.parseBoolean(childTbElement.getAttribute("needAddLimit"));
            }
            String subTables = childTbElement.getAttribute("subTables");
            //子表join键,和对应的parent的键,父子表通过这个关联
            String joinKey = childTbElement.getAttribute("joinKey").toUpperCase();
            String parentKey = childTbElement.getAttribute("parentKey").toUpperCase();
            TableConfig table = new TableConfig(cdTbName, primaryKey,
                    autoIncrement, needAddLimit,
                    TableConfig.TYPE_GLOBAL_DEFAULT, dataNodes,
                    getDbType(dataNodes), null, false, parentTable, true,
                    joinKey, parentKey, subTables);

            if (tables.containsKey(table.getName())) {
                throw new ConfigException("table " + table.getName() + " duplicated!");
            }
            tables.put(table.getName(), table);
            //对于子表的子表,递归处理
            processChildTables(tables, table, dataNodes, childTbElement);
        }
    }

对于表的dataNode对应关系,有个特殊配置即类似dataNode=”distributed(dn$1-10)”,这个含义是:

/**
     * distribute datanodes in multi hosts,means ,dn1 (host1),dn100
     * (host2),dn300(host3),dn2(host1),dn101(host2),dn301(host3)...etc
     *  将每个host上的datanode按照host重新排列。比如上面的例子host1拥有dn1,dn2,host2拥有dn100,dn101,host3拥有dn300,dn301,
     * 按照host重新排列: 0->dn1 (host1),1->dn100(host2),2->dn300(host3),3->dn2(host1),4->dn101(host2),5->dn301(host3)
     *
     * @param theDataNodes
     */
    private void distributeDataNodes(ArrayList<String> theDataNodes) {
        Map<String, ArrayList<String>> newDataNodeMap = new HashMap<String, ArrayList<String>>(dataHosts.size());
        for (String dn : theDataNodes) {
            DataNodeConfig dnConf = dataNodes.get(dn);
            String host = dnConf.getDataHost();
            ArrayList<String> hostDns = newDataNodeMap.get(host);
            hostDns = (hostDns == null) ? new ArrayList<String>() : hostDns;
            hostDns.add(dn);
            newDataNodeMap.put(host, hostDns);
        }

        ArrayList<String> result = new ArrayList<String>(theDataNodes.size());
        boolean hasData = true;
        while (hasData) {
            hasData = false;
            for (ArrayList<String> dns : newDataNodeMap.values()) {
                if (!dns.isEmpty()) {
                    result.add(dns.remove(0));
                    hasData = true;
                }
            }
        }
        theDataNodes.clear();
        theDataNodes.addAll(result);
    }

读取完所有表之后,记录好DB类型,这对之后的sql语句路由解析有帮助。将所有schema的配置保存在:

private final Map<String, SchemaConfig> schemas;

4.3 server.xml

之后会读取载入server配置。

XMLConfigLoader.java:

public XMLConfigLoader(SchemaLoader schemaLoader) {
    XMLServerLoader serverLoader = new XMLServerLoader();
    this.system = serverLoader.getSystem();
    this.users = serverLoader.getUsers();
    this.quarantine = serverLoader.getQuarantine();
    this.cluster = serverLoader.getCluster();
    this.dataHosts = schemaLoader.getDataHosts();
    this.dataNodes = schemaLoader.getDataNodes();
    this.schemas = schemaLoader.getSchemas();
    schemaLoader = null;
}

XMLServerLoader.java

public XMLServerLoader() {
    this.system = new SystemConfig();
    this.users = new HashMap<String, UserConfig>();
    this.quarantine = new QuarantineConfig();
    this.load();
}
private void load() {
    //读取server.xml配置
    InputStream dtd = null;
    InputStream xml = null;
    try {
        dtd = XMLServerLoader.class.getResourceAsStream("/server.dtd");
        xml = XMLServerLoader.class.getResourceAsStream("/server.xml");
        Element root = ConfigUtil.getDocument(dtd, xml).getDocumentElement();
        //加载System标签
        loadSystem(root);
        //加载User标签
        loadUsers(root);
        //加载集群配置
        this.cluster = new ClusterConfig(root, system.getServerPort());
        //加载权限和黑白名单
        loadQuarantine(root);
    } catch (ConfigException e) {
        throw e;
    } catch (Exception e) {
        throw new ConfigException(e);
    } finally {
        if (dtd != null) {
            try {
                dtd.close();
            } catch (IOException e) {
            }
        }
        if (xml != null) {
            try {
                xml.close();
            } catch (IOException e) {
            }
        }
    }
}

首先加载System标签

时间: 2024-10-08 02:06:10

MyCat - 源代码篇(12)的相关文章

MyCat - 源代码篇(14)

数据库路由中间件MyCat - 源代码篇(14) 5. 路由模块 真正取得RouteResultset的步骤:AbstractRouteStrategy的route方法: Created with Rapha?l 2.1.0Start处理一些路由之前的逻辑,返回真假?return nullsql拦截器拦截(就是用户自定义拦截一些语句并改写)是否需要checkSQLschema去掉schema name设置autocommit?是否是DDL语句rrs = DDL语句路由return rrssche

MyCat - 源代码篇(11)

数据库路由中间件MyCat - 源代码篇(11) 4.配置模块 每个MyCatServer初始化时,会初始化: MyCatServer.java: public static final String NAME = "MyCat"; private static final long LOG_WATCH_DELAY = 60000L; private static final long TIME_UPDATE_PERIOD = 20L; private static final Myc

数据库路由中间件MyCat - 源代码篇(1)

此文已由作者张镐薪授权网易云社区发布. 欢迎访问网易云社区,了解更多网易技术产品运营经验. 进入了源代码篇,我们先从整体入手,之后拿一个简单流程前端连接建立与认证作为例子,理清代码思路和设计模式.然后,针对每一个重点模块进行分析. 1. 整体通信与业务框架: 前端与后端通信框架都为NIO/AIO,因为目前生产上用的linux发行版内核都没有真正实现网络上的AIO,如果应用用AIO的话可能比NIO还要慢一些,所以,我们这里只分析NIO相关的通信模块. NIOAcceptor:作为服务器接受客户端连

数据库路由中间件MyCat - 源代码篇(8)

此文已由作者张镐薪授权网易云社区发布. 欢迎访问网易云社区,了解更多网易技术产品运营经验. 3. 连接模块 3.5 后端连接 对于后端连接,我们只关心MySQL的. 从后端连接工厂开始MySQLConnectionFactory.java: public MySQLConnection make(MySQLDataSource pool, ResponseHandler handler,             String schema) throws IOException {       

数据库路由中间件MyCat - 源代码篇(17)

此文已由作者张镐薪授权网易云社区发布. 欢迎访问网易云社区,了解更多网易技术产品运营经验. 调用processInsert(sc,schema,sqlType,origSQL,tableName,primaryKey): public static boolean processInsert(ServerConnection sc,SchemaConfig schema,            int sqlType,String origSQL,String tableName,String 

数据库路由中间件MyCat - 源代码篇(7)

此文已由作者张镐薪授权网易云社区发布. 欢迎访问网易云社区,了解更多网易技术产品运营经验. 3. 连接模块 3.4 FrontendConnection前端连接 构造方法: public FrontendConnection(NetworkChannel channel) throws IOException {     super(channel);      InetSocketAddress localAddr = (InetSocketAddress) channel.getLocalA

数据库路由中间件MyCat - 源代码篇(9)

此文已由作者张镐薪授权网易云社区发布. 欢迎访问网易云社区,了解更多网易技术产品运营经验. 3. 连接模块 3.5 后端连接 3.5.1 后端连接获取与负载均衡 上一节我们讲了后端连接的基本建立和响应处理,那么这些后端连接是什么时候建立的呢? 首先,MyCat配置文件中,DataHost标签中有minIdle这个属性.代表在MyCat初始化时,会在这个DataHost上初始化维护多少个连接(这些连接可以理解为连接池).每个前端Client连接会创建Session,而Session会根据命令的不同

数据库路由中间件MyCat - 源代码篇(16)

此文已由作者张镐薪授权网易云社区发布. 欢迎访问网易云社区,了解更多网易技术产品运营经验. 5. 路由模块 真正取得RouteResultset的步骤:AbstractRouteStrategy的route方法:对应源代码: public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,             String charset, ServerC

MyCat - 使用篇(4)

数据库路由中间件MyCat - 使用篇(4) 配置MyCat 3. 配置conf/rule.xml 1.4.1中的规则配置比较笨,1.5中优化了一些,将tableRule标签和function标签合并了,并且支持Velocity模板语言,更加灵活.这里先介绍1.4.1的: <?xml version="1.0" encoding="UTF-8"?><!DOCTYPE mycat:rule SYSTEM "rule.dtd">