MyCat源码分析系列之——SQL下发

更多MyCat源码分析,请戳MyCat源码分析系列



SQL下发

SQL下发指的是MyCat将解析并改造完成的SQL语句依次发送至相应的MySQL节点(datanode)的过程,该执行过程由NonBlockingSession.execute()触发:

public void execute(RouteResultset rrs, int type) {
        // clear prev execute resources
        clearHandlesResources();
        if (LOGGER.isDebugEnabled()) {
            StringBuilder s = new StringBuilder();
            LOGGER.debug(s.append(source).append(rrs).toString() + " rrs ");
        }

        // 检查路由结果是否为空
        RouteResultsetNode[] nodes = rrs.getNodes();
        if (nodes == null || nodes.length == 0 || nodes[0].getName() == null
                || nodes[0].getName().equals("")) {
            source.writeErrMessage(ErrorCode.ER_NO_DB_ERROR,
                    "No dataNode found ,please check tables defined in schema:"
                            + source.getSchema());
            return;
        }
        if (nodes.length == 1) {
            singleNodeHandler = new SingleNodeHandler(rrs, this);
            try {
                singleNodeHandler.execute();
            } catch (Exception e) {
                LOGGER.warn(new StringBuilder().append(source).append(rrs), e);
                source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
            }
        } else {
            boolean autocommit = source.isAutocommit();
            SystemConfig sysConfig = MycatServer.getInstance().getConfig()
                    .getSystem();
            int mutiNodeLimitType = sysConfig.getMutiNodeLimitType();
            multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit,
                    this);

            try {
                multiNodeHandler.execute();
            } catch (Exception e) {
                LOGGER.warn(new StringBuilder().append(source).append(rrs), e);
                source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
            }
        }
}

从代码中可以看到,首先对于路由节点信息RouteResultsetNode[]进行了判断,如果不存在任何需要派发的节点则直接返回;如果是单节点操作,则创建SingleNodeHandler实例,并调用其execute();如果是多节点操作,则创建MultiNodeQueryHandler实例,并调用其execute()

下面先来看单节点操作的SQL下发过程,以下是SingleNodeHandler的execute()方法:

public void execute() throws Exception {
        startTime=System.currentTimeMillis();
        ServerConnection sc = session.getSource();
        this.isRunning = true;
        this.packetId = 0;
        final BackendConnection conn = session.getTarget(node);
        if (session.tryExistsCon(conn, node)) {
            _execute(conn);
        } else {
            // create new connection

            MycatConfig conf = MycatServer.getInstance().getConfig();
            PhysicalDBNode dn = conf.getDataNodes().get(node.getName());
            dn.getConnection(dn.getDatabase(), sc.isAutocommit(), node, this,
                    node);
        }
}

如果session已经有该datanode关联的后端连接(session.tryExistsCon(conn, node)返回true),则调用_execute()方法下发SQL指令;反之,则调用dn.getConnection()方法从连接池中获取一个可用连接或新建一个连接,并且由于第4个参数将this作为ResponseHandler对象传入,获取到连接后会在PhysicalDatasource.takeCon()中调用handler.connectionAcquired(conn)完成回调,即SingleNodeHandler.connectionAcquired()

public void connectionAcquired(final BackendConnection conn) {
    session.bindConnection(node, conn);
    _execute(conn);
}

该方法先将获取到的后端连接关联到本session中,随后同样调用_execute()方法下发SQL指令。_execute()方法的实现如下:

private void _execute(BackendConnection conn) {
        if (session.closed()) {
            endRunning();
            session.clearResources(true);
            return;
        }
        conn.setResponseHandler(this);
        try {
            conn.execute(node, session.getSource(), session.getSource()
                    .isAutocommit());
        } catch (Exception e1) {
            executeException(conn, e1);
            return;
        }
}

首先,很重要的是通过conn.setResponseHandler(this)将SingleNodeHandler与当前后端连接(MySQLConnection)以及连接中包含的MySQLConnectionHandler实例关联起来,这样做的目的是当结果返回的时候可以回调SingleNodeHandler相应的方法处理。随后调用MySQLConnection.execute()

public void execute(RouteResultsetNode rrn, ServerConnection sc,
            boolean autocommit) throws UnsupportedEncodingException {
        if (!modifiedSQLExecuted && rrn.isModifySQL()) {
            modifiedSQLExecuted = true;
        }
        String xaTXID = sc.getSession2().getXaTXID();
        synAndDoExecute(xaTXID, rrn, sc.getCharsetIndex(), sc.getTxIsolation(),
                autocommit);
    }

    private void synAndDoExecute(String xaTxID, RouteResultsetNode rrn,
            int clientCharSetIndex, int clientTxIsoLation,
            boolean clientAutoCommit) {
        String xaCmd = null;

        boolean conAutoComit = this.autocommit;
        String conSchema = this.schema;
        // never executed modify sql,so auto commit
        boolean expectAutocommit = !modifiedSQLExecuted || isFromSlaveDB()
                || clientAutoCommit;
        if (expectAutocommit == false && xaTxID != null && xaStatus == 0) {
            clientTxIsoLation = Isolations.SERIALIZABLE;
            xaCmd = "XA START " + xaTxID + ‘;‘;

        }
        int schemaSyn = conSchema.equals(oldSchema) ? 0 : 1;
        int charsetSyn = (this.charsetIndex == clientCharSetIndex) ? 0 : 1;
        int txIsoLationSyn = (txIsolation == clientTxIsoLation) ? 0 : 1;
        int autoCommitSyn = (conAutoComit == expectAutocommit) ? 0 : 1;
        int synCount = schemaSyn + charsetSyn + txIsoLationSyn + autoCommitSyn;
        if (synCount == 0) {
            // not need syn connection
            sendQueryCmd(rrn.getStatement());
            return;
        }
        CommandPacket schemaCmd = null;
        StringBuilder sb = new StringBuilder();
        if (schemaSyn == 1) {
            schemaCmd = getChangeSchemaCommand(conSchema);
            // getChangeSchemaCommand(sb, conSchema);
        }

        if (charsetSyn == 1) {
            getCharsetCommand(sb, clientCharSetIndex);
        }
        if (txIsoLationSyn == 1) {
            getTxIsolationCommand(sb, clientTxIsoLation);
        }
        if (autoCommitSyn == 1) {
            getAutocommitCommand(sb, expectAutocommit);
        }
        if (xaCmd != null) {
            sb.append(xaCmd);
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("con need syn ,total syn cmd " + synCount
                    + " commands " + sb.toString() + "schema change:"
                    + (schemaCmd != null) + " con:" + this);
        }
        metaDataSyned = false;
        statusSync = new StatusSync(xaCmd != null, conSchema,
                clientCharSetIndex, clientTxIsoLation, expectAutocommit,
                synCount);
        // syn schema
        if (schemaCmd != null) {
            schemaCmd.write(this);
        }
        // and our query sql to multi command at last
        sb.append(rrn.getStatement());
        // syn and execute others
        this.sendQueryCmd(sb.toString());
        // waiting syn result...
}

其中又会调用synAndDoExecute()方法,顾名思义是同步并执行,同步的目的在于之前获取到的后端连接可能在自动提交模式、数据库名、事务隔离级别和字符集上与当前要求可能不同,因此在真正执行SQL语句之前需要检查并同步相应如上设置。

如果synCount==0,则说明不需要同步,直接调用sendQuery()发送指令即可;反之,将相应的设置语句依次append到sb中(数据库切换是个例外,直接发送了COM_INIT_DB包进行设置),并创建一个StatusSync对象,最后添加待执行的SQL语句,随后调用sendQuery()发送指令。到这里,大家可能会有疑问,在此将需更改的相关设置(数据库名、字符集等)与SQL语句一起发送(并不等待其设置成功与否),万一之前的更改失败怎么办?MyCat对此就是依靠之前创建的StatusSync对象来处理的,在结果合并的流程介绍中会具体解释。

到此为止,SingleNodeHandler的SQL语句下发过程就算是结束了,当然底层真正的下发是由负责处理一个连接读写事件的NIOSocketWR对象来执行的。

接下来,看多节点操作SQL语句下发过程,与单节点极其类似,以下是MultiNodeQueryHandler的execute()方法:

public void execute() throws Exception {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            this.reset(rrs.getNodes().length);
            this.fieldsReturned = false;
            this.affectedRows = 0L;
            this.insertId = 0L;
        } finally {
            lock.unlock();
        }
        MycatConfig conf = MycatServer.getInstance().getConfig();
        startTime = System.currentTimeMillis();
        for (final RouteResultsetNode node : rrs.getNodes()) {
            BackendConnection conn = session.getTarget(node);
            if (session.tryExistsCon(conn, node)) {
                _execute(conn, node);
            } else {
                // create new connection
                PhysicalDBNode dn = conf.getDataNodes().get(node.getName());
                dn.getConnection(dn.getDatabase(), autocommit, node, this, node);
            }
        }
}

不难发现,与单节点的执行过程基本是一致的,无非是打了一层循环,对每个datanode分别进行了同样的操作而已。

 



为尊重原创成果,如需转载烦请注明本文出处:

http://www.cnblogs.com/fernandolee24/p/5236237.html,特此感谢

时间: 2024-08-05 18:59:27

MyCat源码分析系列之——SQL下发的相关文章

Spark SQL 源码分析系列文章

从决定写Spark SQL源码分析的文章,到现在一个月的时间里,陆陆续续差不多快完成了,这里也做一个整合和索引,方便大家阅读,这里给出阅读顺序 :) 第一篇 Spark SQL源码分析之核心流程 第二篇 Spark SQL Catalyst源码分析之SqlParser 第三篇 Spark SQL Catalyst源码分析之Analyzer 第四篇 Spark SQL Catalyst源码分析之TreeNode Library 第五篇 Spark SQL Catalyst源码分析之Optimize

MyBatis 源码分析系列文章合集

1.简介 我从七月份开始阅读MyBatis源码,并在随后的40天内陆续更新了7篇文章.起初,我只是打算通过博客的形式进行分享.但在写作的过程中,发现要分析的代码太多,以至于文章篇幅特别大.在这7篇文章中,有4篇文章字数超过了1万,最长的一篇文章约有2.7万字(含代码).考虑到超长文章对读者不太友好,以及拆分文章工作量也不小等问题.遂决定将博文整理成电子书,方便大家阅读. 经过两周紧张的排版,<一本小小的MyBatis源码分析书>诞生了.本书共7章,约300页.本书以电子书的形式发布,大家可自由

SpringMVC源码分析系列

说到java的mvc框架,struts2和springmvc想必大家都知道,struts2的设计基本上完全脱离了Servlet容器,而springmvc是依托着Servlet容器元素来设计的,同时springmvc基于Spring框架,Spring框架想必搞java的同学都很熟悉. 一进Spring的官网就发现了这样一排醒目的文字, spring可以让我们构造简单的.便携的.又快又易于扩展的基于jvm的系统和应用程序. 没错,基于Spring的MVC框架SpringMVC同样也可以构造具有这些特

Cordova Android源码分析系列一(项目总览和CordovaActivity分析)

PhoneGap/Cordova是一个专业的移动应用开发框架,是一个全面的WEB APP开发的框架,提供了以WEB形式来访问终端设备的API的功能.这对于采用WEB APP进行开发者来说是个福音,这可以避免了原生开发的某些功能.Cordova 只是个原生外壳,app的内核是一个完整的webapp,需要调用的原生功能将以原生插件的形式实现,以暴露js接口的方式调用. Cordova Android项目是Cordova Android原生部分的Java代码实现,提供了Android原生代码和上层We

jQuery源码分析系列(33) : AJAX中的前置过滤器和请求分发器

jQuery1.5以后,AJAX模块提供了三个新的方法用于管理.扩展AJAX请求,分别是: 1.前置过滤器 jQuery. ajaxPrefilter 2.请求分发器 jQuery. ajaxTransport, 3.类型转换器 ajaxConvert 源码结构: jQuery.extend({ /** * 前置过滤器 * @type {[type]} */ ajaxPrefilter: addToPrefiltersOrTransports(prefilters), /** * 请求分发器 *

jQuery源码分析系列(38) : 队列操作

Queue队列,如同data数据缓存与Deferred异步模型一样,都是jQuery库的内部实现的基础设施 Queue队列是animate动画依赖的基础设施,整个jQuery中队列仅供给动画使用 Queue队列 队列是一种特殊的线性表,只允许在表的前端(队头)进行删除操作(出队),在表的后端(队尾)进行插入操作(入队).队列的特点是先进先出(FIFO-first in first out),即最先插入的元素最先被删除. 为什么要引入队列? 我们知道代码的执行流有异步与同步之分,例如 var a

jQuery源码分析系列(36) : Ajax - 类型转化器

什么是类型转化器? jQuery支持不同格式的数据返回形式,比如dataType为 xml, json,jsonp,script, or html 但是浏览器的XMLHttpRequest对象对数据的响应只有 responseText与responseXML 二种 所以现在我要定义dataType为jsonp,那么所得的最终数据是一个json的键值对,所以jQuery内部就会默认帮你完成这个转化工作 jQuery为了处理这种执行后数据的转化,就引入了类型转化器,如果没有指定类型就依据响应头Con

jQuery源码分析系列(34) : Ajax - 预处理jsonp

上一章大概讲了前置过滤器和请求分发器的作用,这一章主要是具体分析每种对应的处理方式 $.ajax()调用不同类型的响应,被传递到成功处理函数之前,会经过不同种类的预处理(prefilters). 预处理的类型取决于由更加接近默认的Content-Type响应,但可以明确使用dataType选项进行设置.如果提供了dataType选项, 响应的Content-Type头信息将被忽略. 有效的数据类型是text, html, xml, json,jsonp,和 script. dataType:预期

jquery2源码分析系列目录

学习jquery的源码对于提高前端的能力很有帮助,下面的系列是我在网上看到的对jquery2的源码的分析.等有时间了好好研究下.我们知道jquery2开始就不支持IE6-8了,从jquery2的源码中可以学到很多w3c新的标准( 如html5,css3,ECMAScript).原文地址是:http://www.cnblogs.com/aaronjs/p/3279314.html 关于1.x.x版的jquery源码分析系列,本博客也转载了一个地址http://www.cnblogs.com/jav