Mycat sql预处理功能分析

前言

Mycat从1.6版本开始支持预处理。目前1.6分支还是开发测试阶段。Mycat发展自Cobar,在Cobar源码里面可以看到预处理功能的影子(未完全实现,当然你如果往Cobar上面调用预处理命令,那么Cobar会返回结果,告诉你,我是不支持预处理sql的!)。我在以前的公司接到需求,需要在Mycat中实现预处理,所以花了时间研究了这部分代码并以自己的思路去实现。后面我将代码贡献给社区,在社区的帮助下,修复了一些bug,最终整合到1.6分支上。

Mycat预处理的使用场景不多,针对java开发来说(大量持久层框架),jdbc一般不会开启服务端预处理(后面章节会讲到mysql jdbc客户端预处理和服务端预处理)。因此,如果应用以java语言连接Mycat,很少能够使用到Mycat的预处理功能。但是如果是C、C++、或者PHP,那么使用到mysql预处理命令来进行操作的可能性就会大很多,如果应用是以上述语言实现,那么很可能会用到Mycat的预处理功能。

1. mysql预处理

在介绍Mycat预处理的实现机制之前,需要先解释mysql的预处理功能:

1.1 预处理说明

mysql从5.1版本开始支持sql预处理功能。sql预处理首先要求客户端提交需要执行的sql,这时提交的sql不传递真实的参数值,参数以问号的形式传递过去

eg:

insert into user(id, name) values(?, ?);
select * from user where id = ?;

在mysql服务端完成预编译解析,这里的预编译解析包括解析参数的个数、类型等,然后响应给客户端。接下来客户端第二次交互只需要传递参数过去就可以完成一个完整sql的执行。相比传统的sql执行,预处理需要两次交互,才能够完成一次sql执行,那么预处理sql的优势在哪里呢?

(1) 预处理sql能防止一定程度的sql注入

(2) sql预编译效率更高

多次执行相同的sql语句(参数不变或者有变化),预处理sql的执行效率优于普通的sql语句。这是因为多次执行只需要传递一次sql到服务端完成预编译和解析,而后只需要多次传递参数执行即可。

(3) 二进制包协议让sql预处理更加高效

mysql预处理命令参数的封装以及结果集的返回,均采用二进制格式封装数据,体积更小,面向底层,能直接被mysql服务端利用。相比普通sql文本协议传输的数据,二进制协议传输数据更加高效。

1.2 预处理命令说明

1.2.1 COM_STMT_PREPARE

COM_STMT_PREPARE命令用于客户端往服务端提交一个预处理的sql,如上面提到的:

insert into user(id, name) values(?,?);

该命令的协议格式参考COM_STMT_PREPARE协议格式

1.2.2 COM_STMT_EXECUTE

COM_STMT_EXECUTE用于执行预处理sql。正如我们前面说到的,如果预处理sql需要传递参数,这个命令会发送预处理语句所需要的参数到服务端。如上面的例子,我们需要传递两个参数iduser的具体值到服务端。

该命令的协议格式参考COM_STMT_EXECUTE协议格式

1.2.3 COM_STMT_CLOSE

COM_STMT_CLOSE用于关闭服务端预处理sql。每一个预处理的sql提交后都保存在mysql服务端的内存当中,每个预处理sql都有一个唯一的id标识,这个命令将发送需要关闭的sql的id,通知服务端可以将所有该预处理sql的资源释放掉(过多的预处理sql保留在mysql服务端会占用较多的内存, 因此有必要执行该命令清理无用的预处理sql)。

该命令的协议格式参考COM_STMT_CLOSE协议格式

1.2.4 COM_STMT_RESET

COM_STMT_RESET命令用于重置COM_STMT_SEND_LONG_DATA命令发送的blob数据。

该命令的协议格式参考COM_STMT_SEND_LONG_DATA协议格式

1.2.5 COM_STMT_SEND_LONG_DATA

COM_STMT_SEND_LONG_DATA用于往服务端发送字节流数据,通常来说只有在发送blob字段数据才需要用到该命令。可以多次调用该命令续传同一个字段的字节数据。注意这个命令必须COM_STMT_EXECUTE命令发送之前执行。

该命令的协议格式参考COM_STMT_SEND_LONG_DATA

1.3 预处理协议结果包说明

mysql预处理结果集采用了二进制协议包进行封装,与普通的查询结果集格式不同(普通的结果集包采用文本协议包进行封装)。下面我们分别对普通查询结果集协议包预处理结果集协议包进行介绍:

1.3.1 普通查询结果集协议包

普通sql查询(相比预处理sql查询)返回的结果集包用文本协议(官方称为Text Protocol)封装。文本协议的结果集包格式根据官网的一个图来说明:

一个结果集包主要包括以下部分(顺序传输):

1.3.2 预处理结果集协议包

预处理结果集包的组成和普通协议包类似,区别只在于row packet(数据以二进制协议格式存放)。

【说明】

Binary Row Packet的第一个字节恒为0,表示packet header,接下来,由NULL-Bitmap标示那些值为NULL的列,NULL-Bitmap的长度计算方式为(column-count + 7 + 2) / 8,其中column-count表示列数,而非空的列值以二进制协议格式(协议格式参考Binary Protocol Value)顺序存储在NULL-Bitmap的后面。

Binary Row Packet payload :

1              packet header [00]
string[$len]   NULL-bitmap, length: (column-count + 7 + 2) / 8
string[$len]   values

【温馨提示】

返回相同结果行,预处理包协议包所占字节会比普通协议包小,在列数越多,列越长的情况下,相差的大小越明显。

2. mysql jdbc预处理

我们知道,使用java.sql.PrepareStatement 可以执行预处理sql。mysql jdbc实现了该接口,并且将预处理分为客户端预处理服务端预处理

2.1 jdbc客户端预处理

mysql jdbc默认情况下采用的就是客户端预处理。客户端预处理的意思是,所有预处理参数都将被缓存在mysql jdbc层,而不是缓存在mysql server。在PrepareStatement执行的时候,在jdbc端完成sql语句的拼接(主要是使用缓存的参数对sql中问号?进行替换,最终发送到mysql的就是完整的sql语句)。客户端预处理走的是普通的查询协议,而不是真正的mysql预处理协议。

2.2 jdbc服务端预处理

在jdbcUrl中,如果显式地指定连接参数useServerPrepStmts=true,表示开启服务端预处理,eg:

jdbc:mysql://localhost:3306:test?useServerPrepStmts=true

这种情况下jdbc会使用真正的mysql预处理协议与server进行通讯,在PrepareStatement生成的时候,就会把预处理的sql语句(eg: insert into user(id, name) values(?, ?);)发送到mysql server端,而后使用PrepareStatement设置的查询参数,将会在PrepareStatement执行的时候发送到mysql服务端。

jdbc API 与 MySQL Prepare Command的关系为

Connection prepareStatement -> COM_STMT_PREPARE

PrepareStatement setXxx(colIndex, value)

...

PrepareStatement executeQuery(execute or executeUpdate) -> COM_STMT_EXECUTE

PrepareStatement close -> COM_STMT_CLOSE

【温馨提示】

mysql jdbc的com.mysql.jdbc.ServerPreparedStatement就是用来实现服务端预处理。有兴趣的同学可以搜一下这个类的源码,可以在里面搜索到上面说到的预处理命令的发送。

很少人在用到连接池框架(针对java)的时候,会在jdbcUrl中指定useServerPrepStmts=true,所以几乎也没人会用到真正的服务端预处理。

3. Mycat预处理实现机制

Mycat中也实现了mysql的预处理协议,可以接收预处理命令的处理。当使用预处理查询,也可以返回正确的二进制结果集包。Mycat预处理的实现是一种取巧的设计,查询走到后端mysql实际上不是发送了预处理命令,而是普通的COM_QUERY命令,后端mysql返回给Mycat的结果集包也是文本协议包,只是在Mycat将结果集包发送往客户端(前端)的中间过程,将普通的文本协议结果集包包装成为二进制协议结果集包,然后再返回给客户端。

3.1 预处理流程图

以一个流程图来说明Mycat预处理的处理流程:

如上图所示,Mycat接收到客户端发送的COM_STMT_PREPARE命令后,解析协议包的内容得到预处理sql语句,eg: insert into user(id, name) values(?, ?),将这些预处理语句缓存在Mycat里面,当Mycat再次接收到客户端发送的COM_STMT_EXECUTE命令,就把对应sql的问号替换为实际传递过来的参数值,这时候已经得到了完整的sql语句。接下来,直接把这个语句丢给Mycat sql查询处理器去执行,中间会经过sql解析模块,路由解析模块以及最后的执行。最后,当收到后端mysql传递给Mycat的数据准备发往客户端的时候,做一个协议包转换,将普通文本结果集协议包转换成二进制结果集协议包并发往客户端。就是这样,在Mycat里面处理预处理命令的接收与处理,并将结果集封装好发送给客户端,让客户端看起来Mycat是真正地实现了预处理sql

3.2 预处理命令接收处理

Mycat对前端发来的命令处理体现在FrontendCommandHandler的handle方法上,同样对于预处理命令的接收判断,是在这个方法上:

@Override
    public void handle(byte[] data)
    {
        if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData())
        {
            MySQLMessage mm = new MySQLMessage(data);
            int  packetLength = mm.readUB3();
            if(packetLength+4==data.length)
            {
                source.loadDataInfileData(data);
            }
            return;
        }
        switch (data[4])
        {

            ...

            case MySQLPacket.COM_STMT_PREPARE:
                commands.doStmtPrepare();
                source.stmtPrepare(data);
                break;
            case MySQLPacket.COM_STMT_SEND_LONG_DATA:
                commands.doStmtSendLongData();
                source.stmtSendLongData(data);
                break;
            case MySQLPacket.COM_STMT_RESET:
                commands.doStmtReset();
                source.stmtReset(data);
                break;
            case MySQLPacket.COM_STMT_EXECUTE:
                commands.doStmtExecute();
                source.stmtExecute(data);
                break;
            case MySQLPacket.COM_STMT_CLOSE:
                commands.doStmtClose();
                source.stmtClose(data);
                break;

            ...

            default:
                     commands.doOther();
                     source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR,
                             "Unknown command");

        }
    }

最终对预处理命令的处理交给ServerPrepareHandler类去处理,ServerPrepareHandler实现了FrontendPrepareHandler接口,在这个接口里面定义了预处理命令处理的相关接口:

package io.mycat.net.handler;

/**
 * SQL预处理处理器
 *
 * @author mycat, CrazyPig
 */
public interface FrontendPrepareHandler {

    void prepare(String sql); // COM_STMT_PREPARE的处理

    void sendLongData(byte[] data); // COM_STMT_SEND_LONG_DATA的处理

    void reset(byte[] data); // COM_STMT_RESET的处理

    void execute(byte[] data); // COM_STMT_EXECUTE的处理

    void close(byte[] data); // COM_STMT_CLOSE的处理

    void clear(); // 在前端连接关闭时清理资源

}

【温馨提示】

clear()的调用会清理该连接所缓存的所有预处理sql数据。

下面重点介绍三个主要方法的逻辑代码:

3.2.1 ServerPrepareHandler.prepare方法

在这个方法里面,解析sql里面的查询列和对应的查询参数,对客户端进行应答。

@Override
    public void prepare(String sql) {

        LOGGER.debug("use server prepare, sql: " + sql);
        PreparedStatement pstmt = null;
        if ((pstmt = pstmtForSql.get(sql)) == null) {
            // 解析获取字段个数和参数个数
            int columnCount = getColumnCount(sql);
            int paramCount = getParamCount(sql);
            pstmt = new PreparedStatement(++pstmtId, sql, columnCount, paramCount);
            pstmtForSql.put(pstmt.getStatement(), pstmt);
            pstmtForId.put(pstmt.getId(), pstmt);
        }
        PreparedStmtResponse.response(pstmt, source); // 发送响应包给客户端(响应包的封装 -> `PreparedOkPacket.java`)
    }

3.2.1 ServerPrepareHandler.execute方法

入参是COM_STMT_EXECUTE整个报文的字节,这个方法首先需要解析出这个包所带的信息,如要执行的预处理sql的id标识以及对应的执行参数(如果有的话), 然后完成预处理sql的拼接,最后交给Mycat SQL查询模块去执行:

@Override
    public void execute(byte[] data) {
        long pstmtId = ByteUtil.readUB4(data, 5);
        PreparedStatement pstmt = null;
        if ((pstmt = pstmtForId.get(pstmtId)) == null) {
            source.writeErrMessage(ErrorCode.ER_ERROR_WHEN_EXECUTING_COMMAND, "Unknown pstmtId when executing.");
        } else {
            ExecutePacket packet = new ExecutePacket(pstmt); // 解析COM_STMT_EXECUTE报文携带的信息
            try {
                packet.read(data, source.getCharset());
            } catch (UnsupportedEncodingException e) {
                source.writeErrMessage(ErrorCode.ER_ERROR_WHEN_EXECUTING_COMMAND, e.getMessage());
                return;
            }
            BindValue[] bindValues = packet.values;
            // 还原sql中的动态参数为实际参数值
            String sql = prepareStmtBindValue(pstmt, bindValues);
            // 执行sql
            source.getSession2().setPrepared(true);
            if(LOGGER.isDebugEnabled()) {
                LOGGER.debug("execute prepare sql: " + sql);
            }
            source.query( sql ); // 交给普通的查询模块进行处理
        }
    }

3.2.3 ServerPrepareHandler.close方法

close方法入参是COM_STMT_CLOSE命令的报文内容, 这个方法逻辑比较简单,就是解析出报文,需要关闭哪个预处理语句,对于Mycat来说,需要做的动作就是将缓存的预处理sql相关内容从Mycat内存中移除即可,代码如下所示:

@Override
    public void close(byte[] data) {
        long pstmtId = ByteUtil.readUB4(data, 5); // 获取prepare stmt id
        if(LOGGER.isDebugEnabled()) {
            LOGGER.debug("close prepare stmt, stmtId = " + pstmtId);
        }
        PreparedStatement pstmt = pstmtForId.remove(pstmtId);
        if(pstmt != null) {
            pstmtForSql.remove(pstmt.getStatement());
        }
    }

3.5 后端返回数据的处理

以查询来说,如果是路由到单节点,那么Mycat是使用SingleNodeHandler来处理结果集的返回,对于row packet的处理,体现在该类的rowResponse方法里面,预处理最后一步处理结果集包转换正是在这个方法里面完成的。

/**
     * select
     *
     * 行数据返回时触发,将行数据写入缓冲区中
     */
    @Override
    public void rowResponse(byte[] row, BackendConnection conn) {

        this.netOutBytes += row.length;
        this.selectRows++;

        if (isDefaultNodeShowTable || isDefaultNodeShowFullTable) {
            RowDataPacket rowDataPacket = new RowDataPacket(1);
            rowDataPacket.read(row);
            String table = StringUtil.decode(rowDataPacket.fieldValues.get(0), conn.getCharset());
            if (shardingTablesSet.contains(table.toUpperCase())) {
                return;
            }
        }
        row[3] = ++packetId;

        if ( prepared ) {
            RowDataPacket rowDataPk = new RowDataPacket(fieldCount);
            rowDataPk.read(row);
            BinaryRowDataPacket binRowDataPk = new BinaryRowDataPacket();
            binRowDataPk.read(fieldPackets, rowDataPk);
            binRowDataPk.packetId = rowDataPk.packetId;
//          binRowDataPk.write(session.getSource());
            /*
             * [fix bug] : 这里不能直接将包写到前端连接,
             * 因为在fieldEofResponse()方法结束后buffer还没写出,
             * 所以这里应该将包数据顺序写入buffer(如果buffer满了就写出),然后再将buffer写出
             */
            buffer = binRowDataPk.write(buffer, session.getSource(), true);
        } else {
            buffer = session.getSource().writeToBuffer(row, allocBuffer());
            //session.getSource().write(row);
        }

    }

由变量prepared来指示该查询是否为预处理查询,如果是,就进入协议包转换流程。

同样的,如果查询是路由到多节点,那么Mycat是使用MultiNodeQueryHandler来处理结果集的返回。同样我们在rowResponse方法里面可以看到预处理的相应判断和处理。

3.4 预处理结果集包封装

预处理结果集包封装到BinaryRowDataPacket类里面,重要的字段:

public int fieldCount; // 列数量定义
public List<byte[]> fieldValues; // 列值
public byte packetHeader = (byte) 0; // 协议包第一个字节,恒为0
public byte[] nullBitMap; // NULL-Bitmap

public List<FieldPacket> fieldPackets; // 存放普通协议包的列值

3.5 预处理结果集包转换为普通结果集协议包

逻辑代码同样在BinaryRowDataPacket类里面,体现在:

public void read(List<FieldPacket> fieldPackets, UnsafeRow unsafeRow);
public void read(List<FieldPacket> fieldPackets, RowDataPacket rowDataPk);
  • 当server.xml开启isOffHeapuseOffHeapForMerge参数时,会使用UnsafeRow封装数据,因此需要从这个对象里面将数据封装成BinaryRowDataPacket,这个时候Mycat内部会调用第一个read方法进行包转换。
  • 相反,没有开启isOffHeapuseOffHeapForMerge的情况下,会使用RowDataPacket来表示数据,那么这个时候Mycat会调用第二个read方法,从RowDataPacket转换成BinaryRowDataPacket

【温馨提示】

  • BinaryRowDataPacket代码就不贴出来了,有兴趣的同学请自己搜一下代码去看。
  • 最后写完前端的代码封装在write方法里面。

4. 写在最后

看到这里,你会发现,其实Mycat的预处理也不是真正的预处理,最后发往后端的时候,还是利用了普通的协议,而不是真正走预处理协议。如果后端要走真正的预处理协议,需要花更大的功夫才行。目前这个实现性能上可能比不上普通的sql查询,但是基本上能满足应用调用预处理命令,以及结果的正确返回。

如果您在测试中或者使用中发现这部分功能存在什么疑问、缺陷,或者有更好的改进思路,可以与我联系。

时间: 2024-10-14 22:34:48

Mycat sql预处理功能分析的相关文章

Python爬取CSDN博客文章

之前解析出问题,刚刚看到,这次仔细审查了 0 url :http://blog.csdn.net/youyou1543724847/article/details/52818339Redis一点基础的东西目录 1.基础底层数据结构 2.windows下环境搭建 3.java里连接redis数据库 4.关于认证 5.redis高级功能总结1.基础底层数据结构1.1.简单动态字符串SDS定义: ...47分钟前1 url :http://blog.csdn.net/youyou1543724847/

笔记:MyBatis 动态SQL

有时候,静态的SQL语句并不能满足应用程序的需求.我们可以根据一些条件,来动态地构建SQL语句.例如,在Web应用程序中,有可能有一些搜索界面,需要输入一个或多个选项,然后根据这些已选择的条件去执行检索操作.在实现这种类型的搜索功能,我们可能需要根据这些条件来构建动态的SQL语句.如果用户提供了任何输入条件,我们需要将那个条件 添加到SQL语句的WHERE子句中. MyBatis通过使用<if>,<choose>,<where>,<foreach>,<

SQL语法详解

ALTER DATABASE修改数据库全局特性 ALTER DATABASE实际上是修改数据库目录中的dp.opt文件 ALTER TABLE修改表的结构 ALTER TABLE对表进行增删列,创建取消索引,重命名列或者表 CREATE DATABASE创建数据库 CREATE INDEX创建索引 CREATE TABLE建表 DROP DATABASE删除数据库 DROP TABLE删表 RANAME TABLE重命名表 数据库的增删改查 INSERT DELETE UPDATE SELECT

Hibernate 关于执行sql查询语句(转)

原文  http://www.yshjava.cn/post/543.html 主题 SQLHibernate Hibernate对原生SQL查询的支持和控制是通过SQLQuery接口实现的.通过Session接口,我们能够很方便的创建一个SQLQuery(SQLQuery是一个接口,在Hibernate4.2.2之前,默认返回的是SQLQuery的实现类--SQLQueryImpl对象,在下文中出现的SQLQuery如非注明,都是指该子类)对象来进行原生SQL查询: session.creat

MyCat 学习笔记 第十三篇.数据分片 之 通过HINT执行存储过程

1 环境说明 VM 模拟3台MYSQL 5.6 服务器 VM1 192.168.31.187:3307 VM2 192.168.31.212:3307 VM3 192.168.31.150:  3307 MYCAT 1.5 服务部署在宿主机上 MYCAT 192.168.31.207 :8806[SQL执行端口] / 9066[管理端口] 2 应用场景 2.0 MYCAT配置 schema.xml <schema name="TESTDB" checkSQLschema=&quo

mycat 分页慢原理解析、mycat跨事务解惑、mycat注解调用存储过程分析

1结合Mycat日志,分析select * from travelrecord order by id limit100000,100 的运行过程,解释下当limit M,N中的M非常大的情况下.为什么查询结果会变慢非常多 1.1mycat控制命令台显示.explain出来走了全部的节点 mysql>explain select * from travelrecord order by id limit 100000,100 ; +-----------+--------------------

Java Persistence with MyBatis 3(中文版) 第三章 使用XML配置SQL映射器

关系型数据库和SQL是经受时间考验和验证的数据存储机制.和其他的ORM 框架如Hibernate不同,MyBatis鼓励开发者可以直接使用数据库,而不是将其对开发者隐藏,因为这样可以充分发挥数据库服务器所提供的SQL语句的巨大威力.与此同时,MyBaits消除了书写大量冗余代码的痛苦,它使使用SQL更容易. 在代码里直接嵌套SQL语句是很差的编码实践,并且维护起来困难.MyBaits使用了映射器配置文件或注解来配置SQL语句.在本章中,我们会看到具体怎样使用映射器配置文件来配置映射SQL语句.

mycat 指定mycat节点

mycat 指定节点: /*!mycat:dataNode=order1*/select seq_nextval('APPOINTMENT_NO'); 指定节点创建存储过程或建表: /*!mycat: sql=select 1 from 表 */ CREATE DEFINER=`root`@`%` PROCEDURE `proc_test`() BEGIN END ; 表: /*!mycat: sql=select 1 from 表 */create table ttt(id int); sel

PDO预处理语句PDOStatement对象使用总结

PDO预处理语句PDOStatement对象使用总结 PDO对预处理语句的支持需要使用PDOStatement类对象,但该类对象并不是通过NEW关键字实例化出来的,而是通过PDO对象中的prepare()方法,在数据库服务器中准备好一个预处理的SQL语句后直接返回的.如果通过之前执行PDO对象中的query()方法返回的PDOStatement类对象,只代表的是一个结果集对象.而如果通过执行PDO对象中的prepare()方法产生的PDOStatement类对象,则为一个查询对象,能定义和执行参