Python开发MySQL数据库(表)克隆工具

前段时间因为某些需求,需要频繁的克隆MySQL的某些数据库或者某几个表。手动导出导入操作了几次,感觉甚是繁琐,而且效率不高,出错几率过大。索性抽时间用Python开发了一个MySQL的clone工具,用了一段时间,将数据库或者表克隆到同一台服务器的话(即源数据库与目标数据库在同一主机上),百万条数据也就是几十秒的时间搞定。该工具也支持将本地数据库或者表克隆到远程主机上。

程序比较简单,就一个Python文件,原理就是主要使用了MySQL的LOAD DATA INFILE命令。先来看下工具帮助信息:

[[email protected] ~]# python mysqlclone.py --help
usage: mysqlclone.py [-h] [--sourceHost SOURCEHOST] [--sourcePort SOURCEPORT]
                     [--sourcePasswd SOURCEPASSWD] [--sourceUser SOURCEUSER]
                     [--sourceDb SOURCEDB] [--dstDb DSTDB]
                     [--sourceTable SOURCETABLE] [--dstHost DSTHOST]
                     [--dstPort DSTPORT] [--dstPasswd DSTPASSWD]
                     [--dstUser DSTUSER] [--no-data] [--lock-all-tables]
                     [--events] [--routines] [--triggers]

optional arguments:
  -h, --help            show this help message and exit
  --sourceHost SOURCEHOST
                        The source database host,default[localhost]
  --sourcePort SOURCEPORT
                        The source database port,defalut[3306]
  --sourcePasswd SOURCEPASSWD
                        The source databas passwd,default[NULL]
  --sourceUser SOURCEUSER
                        The source databas username,default[root]
  --sourceDb SOURCEDB   The source databas name
  --dstDb DSTDB         The dst databas name
  --sourceTable SOURCETABLE
                        The source table,default[None]
  --dstHost DSTHOST     The dst database host,default[localhost]
  --dstPort DSTPORT     The dst database port,defalut[3306]
  --dstPasswd DSTPASSWD
                        The dst databas passwd,default[NULL]
  --dstUser DSTUSER     The dst databas username,default[root]
  --no-data, -d         No row information;False is default
  --lock-all-tables, -X
                        Locks all tables,default[False:Lock the table to be
                        read]
  --events, -E          Clone events,default[False]
  --routines, -R        Clone stored routines (functions and
                        procedures),default[False]
  --triggers            Clone triggers for each dumped
                        table,default[False]

简单介绍下几个选项参数:

--sourceHost   指定源数据库主机的IP或者主机名
--sourcePasswd 源数据库主机的密码
--sourceUser 源数据库主机的用户名
--sourceDb   指定要克隆的源数据库
--sourceTable 指定要克隆的源数据表,该值可以不指定,若不指定的话则为克隆上边指定数据库里的所有表
--dstHost 目标数据库主机IP或者主机名
--dstPasswd 目标数据库密码
--dstUser  目标数据库用户名
--no-data|-d 指定该参数时,只克隆表结构信息,不克隆数据。
--lock-all-tables|-X 指定该参数时,表示将整个数据库的所有表都加上读所。默认情况下只是给单个表加读所。
--events|-E 克隆events
--routines|-R 克隆function和procedure
--triggers 克隆triggers

示例,将test数据库整个clone到test3库中,test3库为提前创建好的一个空库。

[[email protected] ~]# python mysqlclone.py --sourceHost 127.0.0.1 --sourceUser root --sourcePasswd xxx --dstHost 127.0.0.1 --dstUser root --dstPasswd xxx --sourceDb test --dstDb test3 --no-data -E -R -X

  [2015-07-10 11:44:44] INFO: - Start clone database [127.0.0.1:test] To [127.0.0.1:test3]
[2015-07-10 11:44:44] INFO: - table [pcore_information_information] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_information_reply] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_item_item] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_member_consignee_address] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_member_member] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_member_region] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_member_region_district] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_member_region_district_exist] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_member_session] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_member_usergroup] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_passport_adminsession] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_passport_member] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_passport_memberfield] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_passport_seccode] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_passport_session] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_passport_usergroup] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_system_data] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_system_field] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_system_file] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_system_global] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_system_module] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_system_schedule] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_trade_cart] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_trade_order] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_trade_order_detail] was transferred.
[2015-07-10 11:44:44] INFO: - table [pcore_trade_order_return_goods] was transferred.

脚本代码:

#!/bin/env python
#coding=utf-8

import MySQLdb,datetime,sys,time,os,argparse
from abc import ABCMeta, abstractmethod
__author__ = "liuzhenwei"
__version__ = "1.2"

class Clone():
	__metaclass__ = ABCMeta

	def __init__(self,**kwargs):
		self.pid = os.getpid()
		self.sourceCur = kwargs[‘sourceCur‘]
		self.dstCur =  kwargs[‘dstCur‘]
		self.noData =  kwargs[‘noData‘]
		self.sourceTable =  kwargs[‘sourceTable‘]
		self.lockAllTables =  kwargs[‘lockAllTables‘]
		self.sourceDb = kwargs[‘sourceDb‘]
		#print kwargs
	@abstractmethod
	def clone(self):
		pass

class MySQLClone(object):

	def __init__(self,**kwargs):
		self.sourceTable = kwargs[‘sourceTable‘]
		self.noData = kwargs[‘noData‘]
		self.lockAllTables = kwargs[‘lockAllTables‘]
		self.triggers = kwargs[‘triggers‘]
		self.routines = kwargs[‘routines‘]
		self.events = kwargs[‘events‘]
		self.sourceDb = kwargs[‘sourceDb‘]
		try:
			self.sourceConn=MySQLdb.connect(host=kwargs[‘sourceHost‘],user=kwargs[‘sourceUser‘],passwd=kwargs[‘sourcePasswd‘],port=kwargs[‘sourcePort‘])
			self.sourceCur=self.sourceConn.cursor()
			self.sourceConn.select_db(kwargs[‘sourceDb‘])
			self.sourceCur.execute("SET NAMES utf8;")
			#self.sourceCur.execute("SET AUTOCOMMIT=0;")
		except MySQLdb.Error,e:
			print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])

		try:
			self.dstConn=MySQLdb.connect(host=kwargs[‘dstHost‘],user=kwargs[‘dstUser‘],passwd=kwargs[‘dstPasswd‘],port=kwargs[‘dstPort‘],local_infile=1)
			self.dstCur=self.dstConn.cursor()
			self.dstConn.select_db(kwargs[‘dstDb‘])
			self.dstCur.execute("SET NAMES utf8;")
			self.dstCur.execute("SET AUTOCOMMIT=0;")
			#stop bin log
			self.dstCur.execute("SET sql_log_bin=0;")
			#
			#self.dstCur.execute("SET GLOBAL innodb_flush_log_at_trx_commit=2;")
		except MySQLdb.Error,e:
			print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])

	def __dstCommit(self):
	 	 self.dstConn.commit()
    
	def __sourceColse(self):
		self.sourceCur.close()
		self.sourceConn.close()

	def __dstColse(self):
		self.dstCur.close()
		self.dstConn.close()

	def clone(self):
		kwargs = {}
		kwargs[‘sourceCur‘] = self.sourceCur
		kwargs[‘dstCur‘] = self.dstCur
		kwargs[‘noData‘] = self.noData
		kwargs[‘sourceTable‘] = self.sourceTable
		kwargs[‘lockAllTables‘] = self.lockAllTables
		kwargs[‘sourceDb‘] = self.sourceDb
		c = DatabaseClone(**kwargs)
		c.clone()

		if self.triggers:
			t = TriggersClone(**kwargs)
			t.clone()

		if self.routines:
			#print kwargs
			r = RoutinesClone(**kwargs)
			r.clone()

		if self.events:
			e = EventsClone(**kwargs)
			e.clone()

	def __del__(self):
		self.__dstCommit()
		self.__sourceColse()
		self.__dstColse()

class DatabaseClone(Clone):
	‘‘‘
	clone database
	‘‘‘
	def __init__(self,**kwargs):
		Clone.__init__(self,**kwargs)
	def __cloneSingleTable(self,table):
		#create table
		count = self.sourceCur.execute(‘SHOW CREATE TABLE %s;‘ % table)
		results=self.sourceCur.fetchone()
		try:
			self.dstCur.execute(results[1])
		except MySQLdb.Error,e:
			print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])
			return False
		else:
			if self.noData:
				PrintInfo.say(‘table‘,table)
		#load data
		if not self.noData:
			tmpFile = ‘/tmp/‘+table+‘_‘+str(self.pid)
			try:
				outFileSQL = "SELECT * INTO OUTFILE ‘%s‘ FIELDS TERMINATED BY ‘,‘ OPTIONALLY ENCLOSED BY ‘\"‘ LINES TERMINATED BY ‘\n‘ FROM %s;" % (tmpFile,table)
				count = self.sourceCur.execute(outFileSQL)
			except MySQLdb.Error,e:
				print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])
			else:
				self.__loadData(tmpFile,table)
		if not self.lockAllTables:
			self.sourceCur.execute("COMMIT;")
			self.sourceCur.execute("UNLOCK TABLES;")

	def __loadData(self,dataFile,table):
		try:
			intoFileSQL = "LOAD DATA LOCAL INFILE ‘%s‘ INTO TABLE %s CHARACTER SET utf8 FIELDS TERMINATED BY ‘,‘ OPTIONALLY ENCLOSED BY ‘\"‘ LINES TERMINATED BY ‘\n‘;" % (dataFile,table)
			self.dstCur.execute(intoFileSQL)
		except MySQLdb.Error,e:
			print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])
		else:
			os.remove(dataFile)
			PrintInfo.say(‘table‘,table)

	def clone(self):
		tableList = []
		if self.sourceTable:
			tableList.append(self.sourceTable)
		else:
			count = self.sourceCur.execute(‘SHOW TABLES;‘)
			results=self.sourceCur.fetchall()
			for table in results:
				tableList.append(table[0])
			#lock all tables
			if self.lockAllTables:
				lockSQL = "LOCK TABLES %s;" % ‘,‘.join([t+‘ READ‘ for t in tableList])
				self.sourceCur.execute(lockSQL)

		for t in tableList:
			#lock single table
			if not self.lockAllTables:
				self.sourceCur.execute("LOCK TABLES %s READ;"% t)
			self.__cloneSingleTable(t)

		if self.lockAllTables:
			self.sourceCur.execute("COMMIT;")
			self.sourceCur.execute("UNLOCK TABLES;")

class EventsClone(Clone):
	‘‘‘
	clone events
	‘‘‘
	def __init__(self,**kwargs):
		Clone.__init__(self,**kwargs)
	def clone(self):
		for event in self.__getEventList():
			self.__cloneEvent(event)

	def __getEventList(self):
		try:
			count = self.sourceCur.execute("SHOW EVENTS;")
			results=self.sourceCur.fetchall()
		except MySQLdb.Error,e:
			print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])
		else:
			for event in results:
				yield event[1]

	def __cloneEvent(self,event):
		try:
			count = self.sourceCur.execute("SHOW CREATE EVENT %s;" % event)
			results=self.sourceCur.fetchone()
		except MySQLdb.Error,e:
			print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])

		else:
			try:
				self.dstCur.execute(results[3])
			except MySQLdb.Error,e:
				print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])
			else:
				PrintInfo.say(‘event‘,event)

class TriggersClone(Clone):
	‘‘‘
	clone triggers
	‘‘‘
	def __init__(self,**kwargs):
		pass
	def clone(self):
		print "clone triggers"

class RoutinesClone(Clone):
	‘‘‘
	clone routines (functions and procedures).
	‘‘‘
	def __init__(self,**kwargs):
		Clone.__init__(self,**kwargs)

	def clone(self):
		for proc in self.__getProcList():
			self.__cloneProc(proc)

		for func in self.__getFunctionList():
			self.__cloneFunc(func)

	def __getProcList(self):
		try:
			count = self.sourceCur.execute("SHOW PROCEDURE STATUS WHERE db=‘%s‘;" % (self.sourceDb))
			results=self.sourceCur.fetchall()
		except MySQLdb.Error,e:
			print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])
		else:
			for proc in results:
				yield proc[1]

	def __cloneProc(self,proc):
		try:
			count = self.sourceCur.execute("SHOW CREATE PROCEDURE %s;" % proc)
			results=self.sourceCur.fetchone()
		except MySQLdb.Error,e:
			print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])

		else:
			try:
				self.dstCur.execute(results[2])
			except MySQLdb.Error,e:
				print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])
			else:
				PrintInfo.say(‘procedure‘,proc)
	def __getFunctionList(self):
		try:
			count = self.sourceCur.execute("SHOW FUNCTION STATUS WHERE db=‘%s‘;" % (self.sourceDb))
			results=self.sourceCur.fetchall()
		except MySQLdb.Error,e:
			print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])
		else:
			for func in results:
				yield func[1]

	def __cloneFunc(self,func):
		try:
			count = self.sourceCur.execute("SHOW CREATE FUNCTION %s;" % func)
			results=self.sourceCur.fetchone()
		except MySQLdb.Error,e:
			print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])

		else:
			try:
				self.dstCur.execute(results[2])
			except MySQLdb.Error,e:
				print >> sys.stderr,"Mysql Error %d: %s" % (e.args[0], e.args[1])
			else:
				PrintInfo.say(‘function‘,func)

class PrintInfo(object):
	‘‘‘
	print info
	‘‘‘
	@staticmethod
	def say(item,name):
		‘‘‘
		print info 
		‘‘‘
		curTime = time.strftime(‘%Y-%m-%d %H:%M:%S‘,time.localtime(time.time()))
		print >> sys.stdout,"[%s] INFO: - %s [%s] was transferred." % (curTime,item,name)

if __name__ == "__main__":

	parser = argparse.ArgumentParser()
	parser.add_argument(‘--sourceHost‘,action=‘store‘,dest=‘sourceHost‘,default=‘localhost‘,help=‘The source database host,default[localhost]‘)
	parser.add_argument(‘--sourcePort‘,action=‘store‘,dest=‘sourcePort‘,default=3306,help=‘The source database port,defalut[3306]‘)
	parser.add_argument(‘--sourcePasswd‘,action=‘store‘,dest=‘sourcePasswd‘,default="",help=‘The source databas passwd,default[NULL]‘)
	parser.add_argument(‘--sourceUser‘,action=‘store‘,dest=‘sourceUser‘,default=‘root‘,help=‘The source databas username,default[root]‘)
	parser.add_argument(‘--sourceDb‘,action=‘store‘,dest=‘sourceDb‘,default=None,help=‘The source databas name‘)
	parser.add_argument(‘--dstDb‘,action=‘store‘,dest=‘dstDb‘,default=None,help=‘The dst databas name‘)
	parser.add_argument(‘--sourceTable‘,action=‘store‘,dest=‘sourceTable‘,default=None,help=‘The source table,default[None]‘)
	parser.add_argument(‘--dstHost‘,action=‘store‘,dest=‘dstHost‘,default=‘localhost‘,help=‘The dst database host,default[localhost]‘)
	parser.add_argument(‘--dstPort‘,action=‘store‘,dest=‘dstPort‘,default=3306,help=‘The dst database port,defalut[3306]‘)
	parser.add_argument(‘--dstPasswd‘,action=‘store‘,dest=‘dstPasswd‘,default="",help=‘The dst databas passwd,default[NULL]‘)
	parser.add_argument(‘--dstUser‘,action=‘store‘,dest=‘dstUser‘,default=‘root‘,help=‘The dst databas username,default[root]‘)

	parser.add_argument(‘--no-data‘,‘-d‘,action=‘store_true‘,dest=‘noData‘,help=‘No row information;False is default‘)

	parser.add_argument(‘--lock-all-tables‘,‘-X‘,action=‘store_true‘,dest=‘lockAllTables‘,help=‘Locks all tables,default[False:Lock the table to be read]‘)

	parser.add_argument(‘--events‘,‘-E‘,action=‘store_true‘,dest=‘events‘,help=‘Clone events,default[False]‘)
	parser.add_argument(‘--routines‘,‘-R‘,action=‘store_true‘,dest=‘routines‘,help=‘Clone stored routines (functions and procedures),default[False]‘)
	parser.add_argument(‘--triggers‘,action=‘store_true‘,dest=‘triggers‘,help=‘Clone triggers for each dumped table,default[False]‘)

	args = parser.parse_args()

	if args.sourceDb is None:
		print >> sys.stderr,"ERROR: parameter --sourceDb cant not be NULL"
		sys.exit(1)

	if args.dstDb is None:
		print >> sys.stderr,"ERROR: parameter --dstDb cant not be NULL"
		sys.exit(1)

	conInfo = {
		"sourceHost":args.sourceHost,
		"sourceUser":args.sourceUser,
		"sourcePasswd":args.sourcePasswd,
		"sourceDb":args.sourceDb,
		"sourceTable":args.sourceTable,
		"noData":args.noData,
		"dstHost":args.dstHost,
		"dstUser":args.dstUser,
		"dstPasswd":args.dstPasswd,
		"dstDb":args.dstDb,
		"sourcePort":args.sourcePort,
		"dstPort":args.dstPort,
		"lockAllTables":args.lockAllTables,
		"events":args.events,
		"routines":args.routines,
		"triggers":args.triggers
		}
	print >> sys.stdout,"[%s] INFO: - Start clone database [%s:%s] To [%s:%s]" %(time.strftime(‘%Y-%m-%d %H:%M:%S‘,time.localtime(time.time())),conInfo[‘sourceHost‘],conInfo[‘sourceDb‘],conInfo[‘dstHost‘],conInfo[‘dstDb‘])

	m = MySQLClone(**conInfo)
	m.clone()

由于时间问题,克隆triggers的功能还没有完成,而且当数据量过大的时候,最好是将数据分批操作,如每10万load一次,这样会得到更好的性能,该程序会持续更新下。

也可以直接从github下载程序源码:https://github.com/diannaowa/mysqlclone

时间: 2024-11-03 22:01:32

Python开发MySQL数据库(表)克隆工具的相关文章

Python 读取MySQL数据库表数据

环境 Python 3.6 ,Window 64bit 目的 从MySQL数据库读取目标表数据,并处理 代码 # -*- coding: utf-8 -*- import pandas as pd import pymysql ## 加上字符集参数,防止中文乱码 dbconn=pymysql.connect( host="**********", database="kimbo", user="kimbo_test", password=&quo

python开发mysql:单表查询&多表查询

一 单表查询,以下是表内容 1 一 having 过滤 2 1.1 having和where 3 select * from emp where id > 15; 4 解析过程;from > where 找到数据 > 分组(没有默认一个组)> select 打印 where是出结果之前 5 select * from emp having id > 15; 6 解析过程;from > where 找到数据(没有约束条件,就是整个表)) > 分组(没有默认一个组)&

Python数据库操作 Mysql数据库表引擎与字符集#学习猿地

# Mysql数据库表引擎与字符集 ![](./imgs/752951346A5F4E7EBDE362FA97107707.png) ### 1.服务器处理客户端请求 其实不论客户端进程和服务器进程是采用哪种方式进行通信,最后实现的效果都是:**客户端进程向服务器进程发送一段文本(MySQL语句),服务器进程处理后再向客户端进程发送一段文本(处理结果).**那服务器进程对客户端进程发送的请求做了什么处理,才能产生最后的处理结果呢?客户端可以向服务器发送增删改查各类请求,我们这里以比较复杂的查询请

使用python操作mysql数据库

这是我之前使用mysql时用到的一些库及开发的工具,这里记录下,也方便我查阅. python版本: 2.7.13 mysql版本: 5.5.36 几个python库 1.mysql-connector-python 是MySQL官方的Python驱动 https://dev.mysql.com/doc/connector-python/en/ 安装: pip install mysql-connector 示例代码: https://github.com/mike-zhang/pyExample

Python使用MySQL数据库(新)

之前写过一篇 Python使用MySQL数据库的博客,主要使用的是Python2和MySQLdb驱动. python使用mysql数据库 然而,2016年开始,我从Python2切换到了Python3,Python2已经基本不再使用,MySQLdb驱动从2014年1月停止了维护.所以,打算重新再来写这篇博客. Python2 ---> Python3 MySQLdb --> PyMySQL 一,安装PyMySQL Python是编程语言,MySQL是数据库,它们是两种不同的技术:要想使Pyth

python专题-Mysql数据库(python3._+ PyMysql)

之前写过一篇 Python使用MySQL数据库的博客,主要使用的是Python2和MySQLdb驱动. python使用mysql数据库 Python2 ---> Python3 MySQLdb --> PyMySQL 一,安装PyMySQL Python是编程语言,MySQL是数据库,它们是两种不同的技术:要想使Python操作MySQL数据库需要使用驱动.这里选用PyMySQL驱动.下载地址: https://pypi.python.org/pypi/PyMySQL https://git

python操作mysql数据库实现增删改查

Python 标准数据库接口为 Python DB-API,Python DB-API为开发人员提供了数据库应用编程接口. Python 数据库接口支持非常多的数据库,你可以选择适合你项目的数据库: GadFly mSQL MySQL PostgreSQL Microsoft SQL Server 2000 Informix Interbase Oracle Sybase 你可以访问Python数据库接口及API查看详细的支持数据库列表. 不同的数据库你需要下载不同的DB API模块,例如你需要

【转】python操作mysql数据库

python操作mysql数据库 Python 标准数据库接口为 Python DB-API,Python DB-API为开发人员提供了数据库应用编程接口. Python 数据库接口支持非常多的数据库,你可以选择适合你项目的数据库: GadFly mSQL MySQL PostgreSQL Microsoft SQL Server 2000 Informix Interbase Oracle Sybase 你可以访问Python数据库接口及API查看详细的支持数据库列表. 不同的数据库你需要下载

python的mysql数据库操作

python操作mysql数据库 Python 标准数据库接口为 Python DB-API,Python DB-API为开发人员提供了数据库应用编程接口. Python 数据库接口支持非常多的数据库,你可以选择适合你项目的数据库: GadFly mSQL MySQL PostgreSQL Microsoft SQL Server 2000 Informix Interbase Oracle Sybase 我目前使用的是mysql数据库,所以这里只是记录了python中mysql的使用 在写my