python--同一mysql数据库下批量迁移数据

最近接手些mysql数据库维护,发现mysql在批量操作方面就是个渣渣啊,比起MS SQL SERVER简直就是“不可同日而语”。

咨询了下MySQL的高手,对于数据迁移这种问题,一种处理方式就是直接“一步到位” ,一次性将所有数据查询插入到另外一个表,然后再删除原表数据;另外一种处理方式就是使用pt--archiver工具来归档。

然并卵,“一步到位”法太刺激,pt--archiver工具用不顺手,由于目前大部分的表都以自增id为主键,以此为此为前提自己写个小脚本,厚脸拿出来供各位参考:

# coding: utf-8
import MySQLdb
import time

# common config
EXEC_DETAIL_FILE = ‘exec_detail.txt‘
DATETIME_FORMAT = ‘%Y-%m-%d %X‘
Default_MySQL_Host = ‘192.168.166.169‘
Default_MySQL_Port = 3358
Default_MySQL_User = "mysql_admin"
Default_MySQL_Password = ‘[email protected]@Pwd‘
Default_MySQL_Charset = "utf8"
Default_MySQL_Connect_TimeOut = 120

# Transfer Config
Transfer_Database_Name = "db001"
Transfer_Source_Table_Name = "tb2001"
Transfer_Target_Table_Name = "tb2001_his"
Transfer_Condition = "dt <‘2016-10-01‘"
Transfer_Rows_Per_Batch = 10000
Sleep_Second_Per_Batch = 0.5

def get_time_string(dt_time):
    """
    获取指定格式的时间字符串
    :param dt_time: 要转换成字符串的时间
    :return: 返回指定格式的字符串
    """
    global DATETIME_FORMAT
    return time.strftime(DATETIME_FORMAT, dt_time)

def get_time_string(dt_time):
    return time.strftime("%Y-%m-%d %X", dt_time)

def highlight(s):
    return "%s[30;2m%s%s[1m" % (chr(27), s, chr(27))

def print_warning_message(message):
    """
    以红色字体显示消息内容
    :param message: 消息内容
    :return: 无返回值
    """
    message = str(message)
    print(highlight(‘‘) + "%s[31;1m%s%s[0m" % (chr(27), message, chr(27)))

def print_info_message(message):
    """
    以绿色字体输出提醒性的消息
    :param message: 消息内容
    :return: 无返回值
    """
    message = str(message)
    print(highlight(‘‘) + "%s[32;2m%s%s[0m" % (chr(27), message, chr(27)))

def write_file(file_path, message):
    """
    将传入的message追加写入到file_path指定的文件中
    请先创建文件所在的目录
    :param file_path: 要写入的文件路径
    :param message: 要写入的信息
    :return:
    """
    file_handle = open(file_path, ‘a‘)
    file_handle.writelines(message)
    # 追加一个换行以方便浏览
    file_handle.writelines(chr(13))
    file_handle.close()

def get_mysql_connection():
    """
    根据默认配置返回数据库连接
    :return: 数据库连接
    """
    conn = MySQLdb.connect(
            host=Default_MySQL_Host,
            port=Default_MySQL_Port,
            user=Default_MySQL_User,
            passwd=Default_MySQL_Password,
            connect_timeout=Default_MySQL_Connect_TimeOut,
            charset=Default_MySQL_Charset,
            db=Transfer_Database_Name
    )
    return conn

def mysql_exec(sql_script, sql_param=None):
    """
    执行传入的脚本,返回影响行数
    :param sql_script:
    :param sql_param:
    :return: 脚本最后一条语句执行影响行数
    """
    try:
        conn = get_mysql_connection()
        print_info_message("在服务器{0}上执行脚本:{1}".format(
                conn.get_host_info(), sql_script))
        cursor = conn.cursor()
        if sql_param is not None:
            cursor.execute(sql_script, sql_param)
        else:
            cursor.execute(sql_script)
        affect_rows = cursor.rowcount
        conn.commit()
        cursor.close()
        conn.close()
        return affect_rows
    except Exception as ex:
        cursor.close()
        conn.rollback()
        raise Exception(str(ex))

def mysql_exec_many(sql_script_list):
    """
    执行传入的脚本,返回影响行数
    :param sql_script_list: 要执行的脚本List,List中每个元素为sql_script, sql_param对
    :return: 返回执行每个脚本影响的行数列表
    """
    try:
        conn = get_mysql_connection()
        exec_result_list = []
        for sql_script, sql_param in sql_script_list:
            print_info_message("在服务器{0}上执行脚本:{1}".format(
                    conn.get_host_info(), sql_script))
            cursor = conn.cursor()
            if sql_param is not None:
                cursor.execute(sql_script, sql_param)
            else:
                cursor.execute(sql_script)
            affect_rows = cursor.rowcount
            exec_result_list.append("影响行数:{0}".format(affect_rows))
        conn.commit()
        cursor.close()
        conn.close()
        return exec_result_list

    except Exception as ex:
        cursor.close()
        conn.rollback()
        raise Exception(str(ex))

def mysql_query(sql_script, sql_param=None):
    """
    执行传入的SQL脚本,并返回查询结果
    :param sql_script:
    :param sql_param:
    :return: 返回SQL查询结果
    """
    try:
        conn = get_mysql_connection()
        print_info_message("在服务器{0}上执行脚本:{1}".format(
                conn.get_host_info(), sql_script))
        cursor = conn.cursor()
        if sql_param != ‘‘:
            cursor.execute(sql_script, sql_param)
        else:
            cursor.execute(sql_script)
        exec_result = cursor.fetchall()
        cursor.close()
        conn.close()
        return exec_result
    except Exception as ex:
        cursor.close()
        conn.close()
        raise Exception(str(ex))

def get_column_info_list(table_name):
    sql_script = """
DESC {0}
""".format(table_name)
    column_info_list = []
    query_result = mysql_query(sql_script=sql_script, sql_param=None)
    for row in query_result:
        column_name = row[0]
        column_key = row[3]
        column_info = column_name, column_key
        column_info_list.append(column_info)
    return column_info_list

def get_id_range():
    """
    按照传入的表获取要删除数据最大ID、最小ID、删除总行数
    :return: 返回要删除数据最大ID、最小ID、删除总行数
    """
    global Transfer_Condition
    global Transfer_Rows_Per_Batch
    sql_script = """
SELECT
MAX(ID) AS MAX_ID,
MIN(ID) AS MIN_ID,
COUNT(1) AS Total_Count
FROM {0}
WHERE {1};
""".format(Transfer_Source_Table_Name, Transfer_Condition)
    query_result = mysql_query(sql_script=sql_script, sql_param=None)
    max_id, min_id, total_count = query_result[0]
    # 此处有一坑,可能出现total_count不为0 但是max_id 和min_id 为None的情况
    # 因此判断max_id和min_id 是否为NULL
    if (max_id is None) or (min_id is None):
        max_id, min_id, total_count = 0, 0, 0
    return max_id, min_id, total_count

def check_env():
    try:
        source_columns_info_list = get_column_info_list(Transfer_Source_Table_Name)
        target_columns_info_list = get_column_info_list(Transfer_Target_Table_Name)
        if len(source_columns_info_list) != len(target_columns_info_list):
            print_info("源表和目标表的列数不对,不满足迁移条件")
            return False
        column_count = len(source_columns_info_list)
        id_flag = False
        for column_id in range(column_count):
            source_column_name, source_column_key = source_columns_info_list[column_id]
            target_column_name, target_column_key = target_columns_info_list[column_id]
            if source_column_name != target_column_name:
                print_info("源表和目标表的列名不匹配,不满足迁移条件")
                return False
            if source_column_name.lower() == ‘id‘                     and source_column_key.lower() == ‘pri‘                     and target_column_name.lower() == ‘id‘                     and target_column_key.lower() == ‘pri‘:
                id_flag = True
        if not id_flag:
            print_info("未找到为主键的id列,不满足迁移条件")
            return False
        return True
    except Exception as ex:
        print_info("执行出现异常,异常为{0}".format(ex.message))
        return False

def main():
    flag = check_env()
    if not flag:
        return
    loop_trans_data()

def trans_data(current_min_id, current_max_id):
    global Transfer_Source_Table_Name
    global Transfer_Target_Table_Name
    global Transfer_Condition
    global Transfer_Rows_Per_Batch
    print_info_message("*" * 70)
    copy_data_script = """
INSERT INTO {0}
SELECT * FROM {1}
WHERE ID>={2}
AND ID<{3}
AND {4} ;
""".format(Transfer_Target_Table_Name, Transfer_Source_Table_Name, current_min_id, current_max_id, Transfer_Condition)
    delete_data_script = """
DELETE FROM {0}
WHERE ID IN (
SELECT ID
FROM {1}
WHERE ID>={2}
AND ID<{3})
AND ID>={4}
AND ID<{5};
""".format(Transfer_Source_Table_Name, Transfer_Target_Table_Name, current_min_id, current_max_id, current_min_id,
           current_max_id)
    sql_script_list = []
    tem_sql_script = copy_data_script, None
    sql_script_list.append(tem_sql_script)
    tem_sql_script = delete_data_script, None
    sql_script_list.append(tem_sql_script)
    exec_result_list = mysql_exec_many(sql_script_list)
    print_info_message("执行结果:")
    for item in exec_result_list:
        print_info_message(item)

def loop_trans_data():
    max_id, min_id, total_count = get_id_range()
    if min_id == max_id:
        print_info_message("无数据需要结转")
        return

    current_min_id = min_id
    global Transfer_Rows_Per_Batch
    while current_min_id <= max_id:
        current_max_id = current_min_id + Transfer_Rows_Per_Batch
        trans_data(current_min_id, current_max_id)
        current_percent = (current_max_id - min_id) * 100.0 / (max_id - min_id)
        left_rows = max_id - current_max_id
        if left_rows < 0:
            left_rows = 0
        current_percent_str = "%.2f" % current_percent
        info = "当前复制进度{0}/{1},剩余{2},进度为{3}%".format(current_max_id,
                                                    max_id, left_rows,
                                                    current_percent_str)
        print_info_message(info)
        time.sleep(Sleep_Second_Per_Batch)
        current_min_id = current_max_id
    print_info_message("*" * 70)
    print_info_message("执行完成")

if __name__ == ‘__main__‘:
    main()

按照各位场景的,需要修改数据库连接信息:

还有需要迁移表的信息:

生成测试数据的mysql脚本:

CREATE TABLE `tb2001` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `c1` varchar(200) DEFAULT NULL,
  `dt` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

create table tb2001_his like tb2001;

insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM mysql.user;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;
insert tb2001(c1,dt) select ‘abc‘,date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001;

最终运行结果如下:

显示简单粗暴,有兴趣的可以在此基础上修改!

=================================================================

时间: 2024-12-11 23:34:33

python--同一mysql数据库下批量迁移数据的相关文章

MySQL+MyBatis下批量修改数据的问题

今天处理数据批量的更新,场景是这样子的,web站管理的字典功能,需要添加一个记录的整体描述,以及详细内容的描述.一个字典整体概述只有一组信息,但是其详细内容,会有很多项,不确定. 这个场景,在关系型数据库操作中,就是典型的1vN的问题,即一对多的问题. 做内容修改时,涉及到批量的更新过程.这里,只针对具体的问题描述细节,不过多介绍字典的设计. 字典的查询没有问题,mybatis的mapper函数如下: <resultMap id="FullConfResultMap" type=

详解:如何恢复MySQL数据库下误删的数据

2017-03-27 09:25 阅读 178 评论 0 作者:马哥Linux运维-Robin 血的教训,事发经过就不详述了.直接上操作步骤及恢复思路(友情提示:数据库的任何操作都要提前做好备份),以下是Mysql数据后的恢复过程: 1. 找到binlog 恢复数据的前提是必须开启Mysql的binlog日志,如果binlog日志没开启,请忽略此篇文档.binlog日志是否开启可以查看Mysql配置文件.日志位置一般在/var/lib/mysql目录或者编译安装的date目录下.也可登录Mysq

PHP删除MySQL数据库下的所有数据表

<?php //[数据无价,请谨慎操作!] $hostname ='localhost';  $userid = 'username';  $password = 'password';  $dbname = 'dbname';  $connect = mysql_connect($hostname,$userid,$password);  mysql_select_db($dbname); $result = mysql_query("show table status from $db

ubuntu 下 mysql数据库的搭建 及 数据迁移

1.mysql的安装 我是使用apt-get直接安装的 :sudo apt-get install mysql-server sudo apt-get install mysql-client 2.配置mysql管理员密码 sudo mysqladmin -u root 当前密码 新密码 安装的时候貌似也没遇到什么障碍 3.查看mysql的状态 sudo netstat -tap | grep mysql 4.启动/停止/重启mysql sudo  /etc/init.d/mysql start

Linux下使用Python操作MySQL数据库

安装mysql-python 1.下载mysql-python 打开终端: cd /usr/local sudo wget http://nchc.dl.sourceforge.net/sourceforge/mysql-python/MySQL-python-1.2.2.tar.gz 官网地址:http://sourceforge.net/projects/mysql-python/ 2.解压 sudo tar -zxvf MySQL-python-1.2.2.tar.gz cd MySQL-

Windows下安装MySQLdb, Python操作MySQL数据库的增删改查

这里的前提是windows上已经安装了MySQL数据库,且配置完毕,能正常建表能操作.在此基础上只需安装MySQL-python-1.2.4b4.win32-py2.7.exe就ok了,只有1M多.这个有点类似jdbc里的那个jar包. 下载链接:http://sourceforge.net/projects/mysql-python/ , 百度云盘 :http://pan.baidu.com/s/1dDgnfpR 密码:7bna 接着import MySQLdb就能使用了,下面给出测试代码:

使用Python创建MySQL数据库实现字段动态增加以及动态的插入数据

应用场景: 我们需要设计一个数据库来保存多个文档中每个文档的关键字.假如我们每个文档字符都超过了1000,取其中出现频率最大的为我们的关键字. 假设每个文档的关键字都超过了300,每一个文件的0-299号存储的是我们的关键字.那我们要建这样一个数据库,手动输入这样的一个表是不现实的,我们只有通过程序来帮我实现这个重复枯燥的操作. 具体的示意图如下所示: 首先图1是我们的原始表格: 图1 这个时候我们需要程序来帮我们完成自动字段的创建和数据的插入. 图2 上图是我们整个表的概况.下面我们就用程序来

Python 读取MySQL数据库表数据

环境 Python 3.6 ,Window 64bit 目的 从MySQL数据库读取目标表数据,并处理 代码 # -*- coding: utf-8 -*- import pandas as pd import pymysql ## 加上字符集参数,防止中文乱码 dbconn=pymysql.connect( host="**********", database="kimbo", user="kimbo_test", password=&quo

MySQL学习笔记_13_Linux下C++/C连接MySQL数据库(三) --处理返回数据

 Linux下C++/C连接MySQL数据库(三) --处理返回数据 一.通过返回结果集中的字段数 [cpp] view plaincopyprint? unsigned int mysql_field_count(MYSQL * connection); //将MYSQL_ROW的值作为一个存储了一行数据的数组... unsigned int mysql_field_count(MYSQL * connection); //将MYSQL_ROW的值作为一个存储了一行数据的数组... 示例: