数据同步那些事儿(优化过程分享)

简介

很久之前就像写这篇文章了,主要是介绍一下我做数据同步的过程中遇到的一些有意思的内容,和提升效率的过程。

当前在数据处理的过程中,数据同步如同血液一般充满全过程,如图:

数据同步开源产品对比:

DataX,是淘宝的开源项目,可惜不支持Postgresql

Sqoop,Apache开源项目,同步过程中字段需要严格一致,不方便扩展,不易于二次开发

整体设计思路:

使用生产者消费者模型,中间使用内存,数据不落地,直接插入目标数据

优化过程:

1、插入数据部分:

首先生产者通过Jdbc获取源数据内容,放入固定大小的缓存队列,同时消费者不断的从缓存读取数据,根据不同的数据类型分别读取出来,并逐条插入目标数据库。

速度每秒300条,每分钟1.8W条。

这样做表面上看起来非常美好,流水式的处理,来一条处理一下,可是发现插入的速度远远赶不上读取的速度,所以为了提升写入的速度,决定采用批量处理的方法,事例代码:

    @Override
    public Boolean call() {
        long beginTime = System.currentTimeMillis();
        this.isRunning.set(true);
        try {
            cyclicBarrier.await();
            int lineNum = 0;
            int commitCount = 0; // 缓存数量
            List<RowData> tmpRowDataList = new ArrayList<RowData>();// 缓存数组
            while (this.isGetDataRunning.get() || this.queue.size() > 0) {
                // 从队列获取一条数据
                RowData rowData = this.queue.poll(1, TimeUnit.SECONDS);
                if (rowData == null) {
                    logger.info("this.isGetDataRunning:" + this.isGetDataRunning + ";this.queue.size():" + this.queue.size());
                    Thread.sleep(10000);
                    continue;
                }
                // 添加到缓存数组
                tmpRowDataList.add(rowData);
                lineNum++;
                commitCount++;
                if (commitCount == SyncConstant.INSERT_SIZE) {
                    this.insertContractAch(tmpRowDataList); // 批量写入
                    tmpRowDataList.clear(); // 清空缓存
                    commitCount = 0;
                }

                if (lineNum % SyncConstant.LOGGER_SIZE == 0) {
                    logger.info(" commit line: " + lineNum + "; queue size: " + queue.size());
                }
            }

            this.insertContractAch(tmpRowDataList); // 批量写入
            tmpRowDataList.clear();// 清空缓存
            logger.info(" commit line end: " + lineNum);
        } catch (Exception e) {
            logger.error(" submit data error" , e);
        } finally {
            this.isRunning.set(false);
        }
        logger.info(String.format("SubmitDataToDatabase used %s second times", (System.currentTimeMillis() - beginTime) / 1000.00));
        return true;
    }

    /**
     * 批量插入数据
     *
     * @param rowDatas
     * @return
     */
    public int insertContractAch(List<RowData> rowDatas) {
        final List<RowData> tmpObjects = rowDatas;
        String sql = SqlService.createInsertPreparedSql(tableMetaData); // 获取sql
        try {
            int[] index = this.jdbcTemplate.batchUpdate(sql, new PreparedStatementSetter(tmpObjects, this.columnMetaDataList));
            return index.length;
        } catch (Exception e) {
            logger.error(" insertContractAch error: " , e);
        }
        return 0;
    }

    /**
     * 处理批量插入的回调类
     */
    private class PreparedStatementSetter implements BatchPreparedStatementSetter {
        private List<RowData> rowDatas;
        private List<ColumnMetaData> columnMetaDataList;

        /**
         * 通过构造函数把要插入的数据传递进来处理
         */
        public PreparedStatementSetter(List<RowData> rowDatas, List<ColumnMetaData> columnList) {
            this.rowDatas = rowDatas;
            this.columnMetaDataList = columnList;
        }

        @Override
        public void setValues(PreparedStatement ps, int i) throws SQLException {
            RowData rowData = this.rowDatas.get(i);
            for (int j = 0; j < rowData.getColumnObjects().length; j++) {
                // 类型转换
                try {
                    ColumnAdapterService.setParameterValue(ps, j + 1, rowData.getColumnObjects()[j], this.columnMetaDataList.get(j).getType());
                } catch (Exception e) {
                    ps.setObject(j + 1, null);
                }
            }
        }
    }

咱们不是需要讲解代码,所以这里截取了代码片段,全部的代码github上有,感兴趣的同学可以看看。PreparedStatement的好处,可以参考文章:http://www.cnblogs.com/liqiu/p/3825544.html

由于增加批量插入的功能,终于速度提升到每秒1000条

2、多线程优化

每秒1000条,速度依然不理想,特别是写的速度跟不上读取的速度,队列是满的,如图:

所以只能提升消费者的数量,采用了多消费者的模式:

速度提升到每秒3000条。

3、升级读取方式

这时候观察,随着消费者的增加,观察缓存队列经常有空的情况,也就是说生产跟不上消费者速度,如果增加生产者的线程,那么也会增加程序的复杂性,因为势必要将读取的数据进行分割。所以采用Pgdump的方式直接获取数据(并不是所有情况都适用,比如数据中有特殊的分隔符与设定的分隔符一样,或者有分号,单引号之类的)

代码片段如下:

    /**
     * 将数据放入缓存队列
     */
    public void putCopyData() {
        DataSourceMetaData dataSource = dataSourceService.getDataSource(syncOptions.getSrcDataSourceName());
        String copyCommand = this.getCopyCommand(dataSource, querySql); //获取copy命令
        ShellExecuter.execute(copyCommand, queue,columnMetaDatas);
    }

    /**
     * 执行copy的shell命令
     * @param dataSource
     * @param sql
     * @return
     */
    public String getCopyCommand(DataSourceMetaData dataSource, String sql){
        String host = dataSource.getIp();
        String user = dataSource.getUserName();
        String dataBaseName = dataSource.getDatabaseName();
        //String psqlPath = "/Library/PostgreSQL/9.3/bin/psql";
        String psqlPath = "/opt/pg93/bin/psql";
        String execCopy = psqlPath + " -h " + host + " -U " + user + " " + dataBaseName +" -c \"COPY (" + sql + ") TO STDOUT WITH DELIMITER E‘"+ HiveDivideConstant.COPY_COLUMN_DIVIDE+"‘ CSV NULL AS E‘NULL‘\" "; // 执行copy命令
        LOGGER.info(execCopy);
        return execCopy;
    }

意思就是通过执行一个Shell程序,获取数据,然后读取进程的输出流,不断写入缓存。这样生产者的问题基本都解决了,速度完全取决于消费者写入数据库的速度了。下面是执行Shell的Java方法代码:

    public static int execute(String shellPath, LinkedBlockingQueue<RowData> queue, List<ColumnMetaData> columnMetaDatas) {

        int success = -1;
        Process pid = null;
        String[] cmd;

        try {
            cmd = new String[]{"/bin/sh", "-c", shellPath};
            // 执行Shell命令
            pid = Runtime.getRuntime().exec(cmd);
            if (pid != null) {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pid.getInputStream()), SyncConstant.SHELL_STREAM_BUFFER_SIZE);
                try {
                    String line;
                    while ((line = bufferedReader.readLine()) != null) {
                        // LOGGER.info(String.format("shell info output [%s]", line));
                        String[] columnObjects = line.split(HiveDivideConstant.COPY_COLUMN_DIVIDE.toString(), -1);
                        if (columnObjects.length != columnMetaDatas.size()) {
                            LOGGER.error(" 待同步的表有特殊字符,不能使用copy [{}] ", line);
                            throw new RuntimeException("待同步的表有特殊字符,不能使用copy " + line);
                        }
                        RowData rowData = new RowData(line.split(HiveDivideConstant.COPY_COLUMN_DIVIDE.toString(), -1));
                        queue.put(rowData);
                    }
                } catch (Exception ioe) {
                    LOGGER.error(" execute shell error", ioe);
                } finally {
                    try {
                        if (bufferedReader != null) {
                            bufferedReader.close();
                        }
                    } catch (Exception e) {
                        LOGGER.error("execute shell, get system.out error", e);
                    }
                }
                success = pid.waitFor();
                if (success != 0) {
                    LOGGER.error("execute shell error ");
                }
            } else {
                LOGGER.error("there is not pid ");
            }
        } catch (Exception ioe) {
            LOGGER.error("execute shell error", ioe);
        } finally {
            if (null != pid) {
                try {
                    //关闭错误输出流
                    pid.getErrorStream().close();
                } catch (IOException e) {
                    LOGGER.error("close error stream of process fail. ", e);
                } finally {
                    try {
                        //关闭标准输入流
                        pid.getInputStream().close();
                    } catch (IOException e) {
                        LOGGER.error("close input stream of process fail.", e);
                    } finally {
                        try {
                            pid.getOutputStream().close();
                        } catch (IOException e) {
                            LOGGER.error(String.format("close output stream of process fail.", e));
                        }
                    }
                }
            }
        }

        return success;
    }

整体设计方案:

现在这个项目已经开源,代码放在:https://github.com/lihehuo/synchronous

时间: 2024-10-12 03:01:34

数据同步那些事儿(优化过程分享)的相关文章

高斯模糊算法的全面优化过程分享(二)。

      相关链接: 高斯模糊算法的全面优化过程分享(一) 在高斯模糊算法的全面优化过程分享(一)一文中我们已经给出了一种相当高性能的高斯模糊过程,但是优化没有终点,经过上一个星期的发愤图强和测试,对该算法的效率提升又有了一个新的高度,这里把优化过程中的一些心得和收获用文字的形式记录下来. 第一个尝试   直接使用内联汇编替代intrinsics代码(无效) 我在某篇博客里看到说intrinsics语法虽然简化了SSE编程的难度,但是他无法直接控制XMM0-XMM7寄存器,很多指令中间都会用内

由Kaggle竞赛wiki文章流量预测引发的pandas内存优化过程分享

pandas内存优化分享 缘由 最近在做Kaggle上的wiki文章流量预测项目,这里由于个人电脑配置问题,我一直都是用的Kaggle的kernel,但是我们知道kernel的内存限制是16G,如下: 在处理数据过程中发现会超出,虽然我们都知道对于大数据的处理有诸如spark等分布式处理框架,但是依然存在下面的问题: 对于个人来说,没有足够的资源让这些框架发挥其优势: 从处理数据的库丰富程度上,还是pandas等更具有优势: 很多时候并不是pandas无法处理,只是数据未经优化: 所以这里还是考

一次MySQL两千万数据大表的优化过程,三种解决方案

问题概述 使用阿里云rds for MySQL数据库(就是MySQL5.6版本),有个用户上网记录表6个月的数据量近2000万,保留最近一年的数据量达到4000万,查询速度极慢,日常卡死.严重影响业务. 问题前提:老系统,当时设计系统的人大概是大学没毕业,表设计和sql语句写的不仅仅是垃圾,简直无法直视.原开发人员都已离职,到我来维护,这就是传说中的维护不了就跑路,然后我就是掉坑的那个!!! 我尝试解决该问题,so,有个这个日志. 方案概述 方案一:优化现有mysql数据库.优点:不影响现有业务

Redis数据导入工具优化过程总结

Redis数据导入工具优化过程总结 背景 使用C++开发了一个Redis数据导入工具 从oracle中将所有表数据导入到redis中: 不是单纯的数据导入,每条oracle中的原有记录,需要经过业务逻辑处理, 并添加索引(redis集合): 工具完成后,性能是个瓶颈: 优化效果 使用了2个样本数据测试: 样本数据a表8763 条记录: b表940279 条记录: 优化前,a表耗时11.417s: 优化后,a表耗时1.883s: 用到的工具 gprof, pstrace,time 使用time工具

OGG &amp;quot;Loading data from file to Replicat&amp;quot;table静态数据同步配置过程

OGG "Loading data from file to Replicat"table静态数据同步配置过程 一个.mgr过程 GGSCI (lei1) 3> view params mgr port 7809 二.抽取进程extftor GGSCI (lei1) 4> view params extftor SOURCEISTABLE userid goldengate, password yyyyy rmthost 192.168.100.189, mgrport 7

mysql主从不同步处理过程分享

背景  8月7日15:58收到报障数据库出现不同步:数据库共四台,分别为10.255.70.11,10.255.70.12,10.255.70.13,10.255.70.14(ip为虚拟ip) 数据库结构为: 故障时不同步现为:(1)70.11和70.13之间主主不同步 ,(2)70.11和70.12之间主从不同步,(3)70.11和70.14之间主从是同步的 (1)由于my.cnf文件中有slave-skip-errors=all配置,所以在出现不同步错误时跳过,检查同步参数Slave_IO_

WOT干货大放送:大数据架构发展趋势及探索实践分享

WOT大数据处理技术分会场,PingCAP CTO黄东旭.易观智库CTO郭炜.Mob开发者服务平台技术副总监林荣波.宜信技术研发中心高级架构师王东及商助科技(99Click)顾问总监郑泉五位讲师,分别针对时下热门的HTAP数据库TiDB.去ETL化的IOTA架构.数据工厂架构.实时敏捷大数据理念实践.基于场景的大数据营销等话题,展开实践分享. 作者:查士加来源:51CTO 2018年5月18-19日,由51CTO主办的全球软件与运维技术峰会在北京召开.来自全球企业的技术精英汇聚北京,畅谈软件技术

Oracle 10g通过创建物化视图实现不同数据库间表级别的数据同步

摘自:http://blog.csdn.net/javaee_sunny/article/details/53439980 目录(?)[-] Oracle 10g 物化视图语法如下 实例演示 主要步骤 在A节点创建原表和物化视图日志 在B节点创建连接A节点的远程链接 在B节点处创建目标表和与目标表名称相同的物化视图 在B节点处刷新物化视图 升级采用存储过程定时任务JOB方式定时刷新物化视图 进一步优化 文章更新记录 参考文章 Oracle 10g 物化视图语法如下: create materia

搭建中小规模集群之rsync数据同步备份

NFS重要问题 1.有关NFS客户端普通用户写NFS的问题. 1)为什么要普通用户写NFS. 2)exports加all_squash. Rsync介绍 什么是Rsync? Rsync是一款开源的.快速的.多功能的.可实现全量即增量的本地或远程数据同步备份的优秀工具.Rsync软件适用于unix.linux.windows等多种操作系统平台. Rsync简介 Rsync英文全称Remote synchronization.从软件的名称就可以看出来,Rsync具有可使本地和远程两台主机之间的数据快