java mysql大数据量批量插入与流式读取分析

总结下这周帮助客户解决报表生成操作的mysql 驱动的使用上的一些问题,与解决方案。由于生成报表逻辑要从数据库读取大量数据并在内存中加工处理后在

生成大量的汇总数据然后写入到数据库。基本流程是 读取->处理->写入。

1 读取操作开始遇到的问题是当sql查询数据量比较大时候基本读不出来。开始以为是server端处理太慢。但是在控制台是可以立即返回数据的。于是在应用

这边抓包,发现也是发送sql后立即有数据返回。但是执行ResultSet的next方法确实阻塞的。查文档翻代码原来mysql驱动默认的行为是需要把整个结果全部读取到

内存中才开始允许应用读取结果。显然与期望的行为不一致,期望的行为是流的方式读取,当结果从myql服务端返回后立即还是读取处理。这样应用就不需要大量内存

来存储这个结果集。正确的流式读取方式代码示例:

PreparedStatement ps = connection.prepareStatement("select .. from ..",
            ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
//forward only read only也是mysql 驱动的默认值,所以不指定也是可以的 比如: PreparedStatement ps = connection.prepareStatement("select .. from ..");
ps.setFetchSize(Integer.MIN_VALUE); //也可以修改jdbc url通过defaultFetchSize参数来设置,这样默认所以的返回结果都是通过流方式读取.ResultSet rs = ps.executeQuery(); 

while (rs.next()) {   System.out.println(rs.getString("fieldName")); }

代码分析:下面是mysql判断是否开启流式读取结果的方法,有三个条件forward-only,read-only,fatch size是Integer.MIN_VALUE

/**
 * We only stream result sets when they are forward-only, read-only, and the
 * fetch size has been set to Integer.MIN_VALUE
 *
 * @return true if this result set should be streamed row at-a-time, rather
 * than read all at once.
 */
protected boolean createStreamingResultSet() {
    try {
        synchronized(checkClosed().getConnectionMutex()) {
            return ((this.resultSetType == java.sql.ResultSet.TYPE_FORWARD_ONLY)
                 && (this.resultSetConcurrency == java.sql.ResultSet.CONCUR_READ_ONLY) && (this.fetchSize == Integer.MIN_VALUE));
        }
    } catch (SQLException e) {
        // we can‘t break the interface, having this be no-op in case of error is ok

        return false;
    }
}

2 批量写入问题。开始时应用程序是一条一条的执行insert来写入报表结果。写入也是比较慢的。主要原因是单条写入时候需要应用于db之间大量的
请求响应交互。每个请求都是一个独立的事务提交。这样网络延迟大的情况下多次请求会有大量的时间消耗的网络延迟上。第二个是由于每个事务db都会
有刷新磁盘操作写事务日志,保证事务的持久性。由于每个事务只是写入一条数据 所以磁盘io利用率不高,因为对于磁盘io是按块来的,所以连续写入大量数据效率
更好。所以必须改成批量插入的方式,减少请求数与事务数。下面是批量插入的例子:还有jdbc连接串必须加下rewriteBatchedStatements=true

int batchSize = 1000;
PreparedStatement ps = connection.prepareStatement("insert into tb1 (c1,c2,c3...) values (?,?,?...)");

for (int i = 0; i < list.size(); i++) {

    ps.setXXX(list.get(i).getC1());
    ps.setYYY(list.get(i).getC2());
    ps.setZZZ(list.get(i).getC3());

    ps.addBatch();

    if ((i + 1) % batchSize == 0) {
        ps.executeBatch();
    }
}

if (list.size() % batchSize != 0) {
    ps.executeBatch();
}

上面代码示例是每1000条数据发送一次请求。mysql驱动内部在应用端会把多次addBatch()的参数合并成一条multi value的insert语句发送给db去执行
比如insert into tb1(c1,c2,c3) values (v1,v2,v3),(v4,v5,v6),(v7,v8,v9)...
这样可以比每条一个insert 明显少很多请求。减少了网络延迟消耗时间与磁盘io时间,从而提高了tps。

代码分析: 从代码可以看出,
1 rewriteBatchedStatements=true,insert是参数化语句且不是insert ... select 或者 insert... on duplicate key update with an id=last_insert_id(...)的话会执行 
executeBatchedInserts,也就是muti value的方式

2 rewriteBatchedStatements=true 语句是都是参数化(没有addbatch(sql)方式加入的)的而且mysql server版本在4.1以上 语句超过三条,则执行executePreparedBatchAsMultiStatement
就是将多个语句通过;分隔一次提交多条sql。比如 "insert into tb1(c1,c2,c3) values (v1,v2,v3);insert into tb1(c1,c2,c3) values (v1,v2,v3)..."

3 其余的执行executeBatchSerially,也就是还是一条条处理

public void addBatch(String sql)throws SQLException {
    synchronized(checkClosed().getConnectionMutex()) {
        this.batchHasPlainStatements = true;

        super.addBatch(sql);
    }
}

public int[] executeBatch()throws SQLException {
    //...
    if (!this.batchHasPlainStatements
         && this.connection.getRewriteBatchedStatements()) {

        if (canRewriteAsMultiValueInsertAtSqlLevel()) {
            return executeBatchedInserts(batchTimeout);
        }

        if (this.connection.versionMeetsMinimum(4, 1, 0)
             && !this.batchHasPlainStatements
             && this.batchedArgs != null
             && this.batchedArgs.size() > 3 /* cost of option setting rt-wise */
        )
        {
            return executePreparedBatchAsMultiStatement(batchTimeout);
        }
    }

    return executeBatchSerially(batchTimeout);
    //.....
}

executeBatchedInserts相比executePreparedBatchAsMultiStatement的方式传输效率更好,因为一次请求只重复一次前面的insert table (c1,c2,c3)

mysql server 对请求报文的最大长度有限制,如果batch size 太大造成请求报文超过最大限制,mysql 驱动会内部按最大报文限制查分成多个报文。所以要真正减少提交次数

还要检查下mysql server的max_allowed_packet 否则batch size 再大也没用.

mysql> show VARIABLES like ‘%max_allowed_packet%‘;
+--------------------+-----------+
| Variable_name | Value |
+--------------------+-----------+
| max_allowed_packet | 167772160 |
+--------------------+-----------+
1 row in set (0.00 sec)

要想验证mysql 发送了正确的sql 有两种方式

1 抓包,下图是wireshark在 应用端抓包mysql的报文

2 另一个办法是在mysql server端开启general log 可以查看mysql收到的所有sql

3 在jdbc url上加上参数traceProtocol=true

性能测试对比

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import com.alibaba.druid.pool.DruidDataSource;

public class BatchInsert {

    public static void main(String[] args) throws SQLException {

        int batchSize = 1000;
        int insertCount = 1000;

        testDefault(batchSize, insertCount);

        testRewriteBatchedStatements(batchSize,insertCount);

    }

    private static void testDefault(int batchSize, int insertCount) throws SQLException {  

        long start = System.currentTimeMillis();

        doBatchedInsert(batchSize, insertCount,"");

        long end = System.currentTimeMillis();

        System.out.println("default:" + (end -start) + "ms");
    }

    private static void testRewriteBatchedStatements(int batchSize, int insertCount) throws SQLException {

        long start = System.currentTimeMillis();

        doBatchedInsert(batchSize, insertCount, "rewriteBatchedStatements=true");

        long end = System.currentTimeMillis();

        System.out.println("rewriteBatchedStatements:" + (end -start) + "ms");
    }

    private static void doBatchedInsert(int batchSize, int insertCount, String mysqlProperties) throws SQLException {
        DruidDataSource dataSource = new DruidDataSource();
        dataSource.setUrl("jdbc:mysql://ip:3306/test?" + mysqlProperties);
        dataSource.setUsername("name");
        dataSource.setPassword("password");

        dataSource.init();

        Connection connection = dataSource.getConnection();

        PreparedStatement preparedStatement = connection.prepareStatement("insert into Test (name,gmt_created,gmt_modified) values (?,now(),now())");

        for (int i = 0; i < insertCount; i++) {
            preparedStatement.setString(1, i+" ");
            preparedStatement.addBatch();
            if((i+1) % batchSize == 0) {
                preparedStatement.executeBatch();
            }
        }
        preparedStatement.executeBatch();

        connection.close();   

        dataSource.close();
    }

}

网络环境ping测试延迟是35ms ,测试结果:

default:75525ms
rewriteBatchedStatements:914ms

时间: 2024-08-02 06:55:14

java mysql大数据量批量插入与流式读取分析的相关文章

MySQL 大数据量快速插入方法和语句优化

MySQL大数据量快速插入方法和语句优化是本文我们主要要介绍的内容,接下来我们就来一一介绍,希望能够让您有所收获! INSERT语句的速度 插入一个记录需要的时间由下列因素组成,其中的数字表示大约比例: 连接:(3) 发送查询给服务器:(2) 分析查询:(2) 插入记录:(1x记录大小) 插入索引:(1x索引) 关闭:(1) 这不考虑打开表的初始开销,每个并发运行的查询打开. 表的大小以logN (B树)的速度减慢索引的插入. 加快插入的一些方法 如果同时从同一个客户端插入很多行,使用含多个VA

SQL Server 大数据量批量插入

private void AddShuJu_Click(object sender, RoutedEventArgs e) { Stopwatch wath = new Stopwatch(); wath.Start(); for (int i = 0; i < 10; i++) { //创建datatable实例 DataTable data = new DataTable(); //填充字段 data = GetFiled(data); for (int count = i * 100000

【MySql 大数据量快速插入和语句优化】

INSERT语句的速度 插入一个记录需要的时间由下列因素组成,其中的数字表示大约比例:连接:(3)发送查询给服务器:(2)分析查询:(2)插入记录:(1x记录大小)插入索引:(1x索引)关闭:(1)这不考虑打开表的初始开销,每个并发运行的查询打开. 表的大小以logN (B树)的速度减慢索引的插入. 加快插入的一些方法: · 如果同时从同一个客户端插入很多行,使用含多个VALUE的INSERT语句同时插入几行.这比使用单行INSERT语句快(在某些情况下快几倍).如果你正向一个非空表添加数据,可

MySQL大数据量分页查询方法及其优化

方法1: 直接使用数据库提供的SQL语句 语句样式: MySQL中,可用如下方法: SELECT * FROM 表名称 LIMIT M,N 适应场景: 适用于数据量较少的情况(元组百/千级) 原因/缺点: 全表扫描,速度会很慢 且 有的数据库结果集返回不稳定(如某次返回1,2,3,另外的一次返回2,1,3). Limit限制的是从结果集的M位置处取出N条输出,其余抛弃. 方法2: 建立主键或唯一索引, 利用索引(假设每页10条) 语句样式: MySQL中,可用如下方法: SELECT * FRO

java+Mysql大数据的一些优化技巧

众所周知,java在处理数据量比较大的时候,加载到内存必然会导致内存溢出,而在一些数据处理中我们不得不去处理海量数据,在做数据处理中,我们常见的手段是分解,压缩,并行,临时文件等方法; 例如,我们要将数据库(不论是什么数据库)的数据导出到一个文件,一般是Excel或文本格式的CSV;对于Excel来讲,对于POI和JXL的接口,你很多时候没有办法去控制内存什么时候向磁盘写入,很恶心,而且这些API在内存构造的对象大小将比数据原有的大小要大很多倍数,所以你不得不去拆分Excel,还好,POI开始意

提高MYSQL大数据量查询的速度

1.对查询进行优化,应尽量避免全表扫描,首先应考虑在 where 及 order by 涉及的列上建立索引. 2.应尽量避免在 where 子句中对字段进行 null 值判断,否则将导致引擎放弃使用索引而进行全表扫描,如:select id from t where num is null可以在num上设置默认值0,确保表中num列没有null值,然后这样查询:select id from t where num=0 3.应尽量避免在 where 子句中使用!=或<>操作符,否则引擎将放弃使用

MySQL大数据量快速分页实现(转载)

在mysql中如果是小数据量分页我们直接使用limit x,y即可,但是如果千万数据使用这样你无法正常使用分页功能了,那么大数据量要如何构造sql查询分页呢? 般刚开始学SQL语句的时候,会这样写 代码如下:  代码如下 复制代码 SELECT * FROM table ORDER BY id LIMIT 1000, 10; 但在数据达到百万级的时候,这样写会慢死 代码如下:  代码如下 复制代码 SELECT * FROM table ORDER BY id LIMIT 1000000, 10

java处理大数据量任务时的可用思路--未验证版,具体实现方法有待实践

1.Bloom filter 适用范围:可以用来实现数据字典,进行数据的判重,或者集合求交集 基本原理及要点:对于原理来说很简单,位数组+k个独立hash函数.将hash函数对应的值的位数组置1,查找时如果发现所有hash函数对应位都是1说明存在,很明显这个过程并不保证查找的结果是100%正确的.同时也不支持删除一个已经插入的关键字,因为该关键字对应的位会牵动到其他的关键字.所以一个简单的改进就是 counting Bloom filter,用一个counter数组代替位数组,就可以支持删除了.

mysql大数据量表索引与非索引对比

1:不要在大数据量表中轻易改名字(做任何操作都是非常花费时间) 2个多亿数据量的表 改名操作  执行时间花费8分多钟 (如果是加索引等其他操作 那时间花费不可预估) 2:给大数据量的mysql表 添加索引 所花费的时间 如下 在日后生产环境 如果需要给表添加索引等操作 心里要有预估时间的花费范围 3: explain 解释 语句 type:ALL 进行完整的表扫描 .row:213284372  mysql预估需要扫描213284372 条记录来完成这个查询.可想而知 表数据量越大全表扫描越慢.