1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 4 import paramiko,os,sys,datetime,time,MySQLdb 5 6 def mysql_conn(sql): 7 try: 8 conn = MySQLdb.connect(host = ‘‘,user = ‘‘,passwd = ‘‘,connect_timeout=5) 9 cursor = conn.cursor(cursorclass = MySQLdb.cursors.DictCursor) 10 cursor.execute(sql) 11 alldata = cursor.fetchall() 12 cursor.close() 13 conn.close() 14 if len(alldata)==0: 15 return 0 16 return alldata[0] 17 except MySQLdb.Error,e: 18 return 0 19 20 def ssh_connect(ip,password,cmd): 21 port=63008 22 user="root" 23 24 ssh=paramiko.SSHClient() 25 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 26 ssh.connect(ip,port,user,password) 27 stdin,stdout,stderr=ssh.exec_command(cmd) 28 data=stdout.read().strip() 29 ssh.close() 30 return data 31 32 33 class Database: 34 def __init__(self,dbport,slave_ip,stat_ip,master_ip): 35 self.user=‘root‘ 36 self.password=‘‘ 37 self.port=63008 38 self.today=datetime.date.today().strftime(‘%Y%m%d‘) 39 self.dbport=dbport 40 self.slave_ip=slave_ip 41 self.host=master_ip 42 self.main_id=0 43 self.dist_id=0 44 self.data_dir=‘‘ 45 self.version=‘‘ 46 self.m_version=‘‘ 47 self.app_name=‘‘ 48 self.m_app_name=‘‘ 49 self.conf_name=‘‘ 50 self.m_conf_name=‘‘ 51 self.STAT_IP = stat_ip 52 self.m_dbport=0 53 54 def log_w(self,text): 55 logfile = "/home/create_slave_%s.log" %self.dbport 56 now = time.strftime("%Y-%m-%d %H:%M:%S") 57 tt = str(now) + "\t" + str(text) + "\n" 58 f = open(logfile,‘a+‘) 59 f.write(tt) 60 f.close() 61 62 def find_ip(self):#走中心应用确认本服务器是否用于从库 63 text = "正在从中心应用确认本服务器是否用于从库,稍等......".decode("utf-8").encode("GBK") 64 self.log_w(text) 65 print "\033[1;32;40m%s\033[0m" % text 66 sql = "SELECT c.main_id,c.data_dir,c.version,c.dist_id,c.ip,c.app_name FROM center_app.main_category a, center_app.sub_category b, center_app.app_info c WHERE a.id = b.main_id AND b.dist_id = c.dist_id AND b.main_id = c.main_id AND c.flag=1 AND c.del_info!=0 AND c.ip=‘%s‘ AND c.port=%s" %(self.slave_ip,self.dbport) 67 alldata = mysql_conn(sql) 68 if alldata==0: 69 text = " This server is not matched server,server Error,please check it !!" 70 self.log_w(text) 71 print "\033[1;31;40m%s\033[0m" % text 72 sys.exit() 73 if alldata["app_name"].find("_s") != -1: 74 text = " This server is slave server,server OK !!" 75 self.log_w(text) 76 print "\033[1;32;40m%s\033[0m" % text 77 self.dist_id=alldata["dist_id"] 78 self.main_id=alldata[‘main_id‘] 79 self.version=alldata[‘version‘] 80 self.app_name=alldata[‘app_name‘] 81 self.m_app_name=self.app_name.split(‘_‘)[0] 82 self.data_dir=alldata[‘data_dir‘] 83 self.conf_name=‘%s-%s-%s-%s‘ %(self.main_id,self.dist_id,self.m_app_name,self.dbport) 84 print "STAT_IP: %s" %(self.STAT_IP) 85 sql = "SELECT c.version FROM center_app.main_category a, center_app.sub_category b, center_app.app_info c WHERE a.id = b.main_id AND b.dist_id = c.dist_id AND b.main_id = c.main_id AND c.main_id=%s AND c.flag=‘1‘ AND c.del_info!=0 AND c.dist_id=%s AND c.app_name=‘%s‘" %(self.main_id,self.dist_id,self.m_app_name) 86 self.m_version=mysql_conn(sql)["version"] 87 88 else: 89 text = " This server is not used slave server,server Error,please check it !!" 90 self.log_w(text) 91 print "\033[1;31;40m%s\033[0m" % text 92 sys.exit() 93 94 def check_mysql(self):#检查从库mysql数据库服务是否运行,如在运行则pkill掉,然后跳过权限表启动,为导入数据做准备 95 text = "Check mysql now,Please wait...." 96 self.log_w(text) 97 print "\033[1;32;40m%s\033[0m" % text 98 ret=os.popen("cd /root/ && rm -f /root/deploy_mysql_instance_gao.sh && wget http://208.asktao.com/multi/deploy_mysql_instance_gao.sh && echo $?").read().strip() 99 if ret != ‘0‘: 100 text = " mysql deploy script download fail !" 101 self.log_w(text) 102 print "\033[1;31;40m%s\033[0m" % text 103 sys.exit() 104 ret=os.popen("sh /root/deploy_mysql_instance_gao.sh %s %s %s %s %s %s %s" %(self.main_id,self.dist_id,self.app_name,self.dbport,self.version,self.data_dir,self.STAT_IP)).read().strip() 105 os.popen("rm -f /root/deploy_mysql_instance_gao.sh") 106 if ret.find(‘Deployment failure‘) != -1: 107 text = " mysql instance exists,please check !" 108 self.log_w(text) 109 print "\033[1;31;40m%s\033[0m" % text 110 sys.exit() 111 112 if not os.path.isdir("/usr/local/mysql%s" %self.version): 113 text = " Mysql not install,Please install mysql !" 114 self.log_w(text) 115 print "\033[1;31;40m%s\033[0m" % text 116 sys.exit() 117 if os.popen("netstat -ntlp|grep %s|wc -l"%(self.dbport)).read().strip() != ‘0‘: 118 os.popen("/usr/local/mysql%s/bin/mysqladmin --socket=/tmp/mysql-%s.sock shutdown"%(self.version,self.dbport)) 119 time.sleep(10) 120 while 1: 121 if os.popen("netstat -ntlp|grep %s|wc -l"%(self.dbport)).read().strip() == ‘0‘: 122 conm = "/usr/local/mysql%s/bin/mysqld_safe --defaults-file=/home/mysql/etc/%s.cnf --user=mysql --skip-grant-tables &"%(self.version,self.conf_name) 123 os.popen(conm) 124 break 125 else: 126 text = "Mysql not stop,please wait..." 127 self.log_w(text) 128 print text 129 time.sleep(5) 130 a = 0 131 while 1: 132 if os.popen("netstat -ntlp|grep %s|wc -l"%(self.dbport)).read().strip() != ‘0‘: 133 text = "Mysql restart success !!" 134 self.log_w(text) 135 print "\033[1;32;40m%s\033[0m" % text 136 break 137 else: 138 if a == 12: 139 text = "Mysql not Running,please start with ‘--skip-grant-tables‘ !" 140 self.log_w(text) 141 print "\033[1;31;40m%s\033[0m" % text 142 sys.exit() 143 else: 144 a= a + 1 145 time.sleep(5) 146 147 def export_table(self):#导出当前主库的表结构 148 text = "Export master table and back mysql,Please wait ...." 149 self.log_w(text) 150 print "\033[1;32;40m%s\033[0m" % text 151 try: 152 s=paramiko.SSHClient() 153 s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 154 s.connect(self.host,self.port,self.user,self.password) 155 conm = "/usr/local/mysql%s/bin/mysqldump --socket=/tmp/mysql-%s.sock --add-drop-table -udump -pKPt1IVNd4BnswQpc -d -A|bzip2 -2 > /data1/script/db_back/%s_%s_table_%s_%s.bz2 && echo $?" % (self.m_version,self.m_dbport,self.main_id,self.app_name.split(‘_‘)[0],self.dist_id,self.today) 156 stdin,stdout,stderr=s.exec_command(conm) 157 result = stdout.readlines()[-1].strip() 158 s.close() 159 if result == ‘0‘: 160 text = " Export_table success !" 161 self.log_w(text) 162 print text 163 else: 164 text = "Export_table Error !" 165 self.log_w(text) 166 print "\033[1;31;40m%s\033[0m" % text 167 sys.exit() 168 except Exception,e: 169 text = "SSH connect Error haha!" 170 print e 171 self.log_w(text) 172 print "\033[1;31;40m%s\033[0m" % text 173 # sys.exit() 174 175 def down_back(self):#拷贝主库当天的数据库备份和表结构 176 os.popen("mkdir -p /data1/tmp/%s"%self.m_app_name) 177 local_dir= "/data1/tmp/%s/"%self.m_app_name 178 remote_dir=‘/data1/script/db_back/‘ 179 try: 180 t=paramiko.Transport((self.host,self.port)) 181 t.connect(username=self.user,password=self.password) 182 sftp=paramiko.SFTPClient.from_transport(t) 183 files=sftp.listdir(remote_dir) 184 text = "Download back file,Please wait ...." 185 self.log_w(text) 186 print "\033[1;32;40m%s\033[0m" % text 187 text = " Beginning to download file from %s %s " % (self.host,datetime.datetime.now()) 188 self.log_w(text) 189 print text 190 for f in files: 191 if f.find(self.today) != -1 and f.find(str(self.dist_id)) != -1 and f.find("%s_%s"%(self.main_id,self.app_name.split(‘_‘)[0])) != -1: 192 text = " Downloading file:%s:%s" % (self.host,os.path.join(remote_dir,f)) 193 self.log_w(text) 194 print text 195 sftp.get(os.path.join(remote_dir,f),os.path.join(local_dir,f)) 196 #sftp.put(os.path.join(local_dir,f),os.path.join(remote_dir,f)) 197 t.close() 198 text = ‘ Download All back file success %s ‘ % datetime.datetime.now() 199 self.log_w(text) 200 print text 201 except Exception,e: 202 text = "SFTP connect Error !" 203 self.log_w(text) 204 print "\033[1;31;40m%s\033[0m" % text 205 print e 206 sys.exit() 207 208 def unbz2(self):#解压拷贝的数据库备份和表结构bz2包 209 text = "Decompression file,Please wait ...." 210 self.log_w(text) 211 print "\033[1;32;40m%s\033[0m" % text 212 text = ‘ Beginning to Decompression file from %s‘ % datetime.datetime.now() 213 self.log_w(text) 214 print text 215 conm = ‘bzip2 -dfk /data1/tmp/%s/%s_%s*%s_%s*.bz2 && echo $?‘ %(self.m_app_name,self.main_id,self.app_name.split(‘_‘)[0],self.dist_id,self.today) 216 bz = os.popen(conm).read().strip() 217 if bz == ‘0‘: 218 text = ‘ Decompression file success %s‘ % datetime.datetime.now() 219 self.log_w(text) 220 print text 221 else: 222 text = "Decompression Error !" 223 self.log_w(text) 224 print "\033[1;31;40m%s\033[0m" % text 225 sys.exit() 226 227 def restart_mysql(self): 228 text = "Restart mysql Now,Please wait ...." 229 self.log_w(text) 230 print "\033[1;32;40m%s\033[0m" % text 231 os.popen("sed -i ‘s/#log-bin/log-bin/‘ /home/mysql/etc/%s.cnf" % (self.conf_name)) 232 os.popen("sed -i ‘s/#binlog_format/binlog_format/‘ /home/mysql/etc/%s.cnf" % (self.conf_name)) 233 os.popen("/usr/local/mysql%s/bin/mysqladmin --socket=/tmp/mysql-%s.sock shutdown"%(self.version,self.dbport)) 234 while 1: 235 if os.popen("netstat -ntlp|grep %s|wc -l"%(self.dbport)).read().strip() == ‘0‘: 236 time.sleep(10) 237 os.popen("/usr/local/mysql%s/bin/mysqld_safe --defaults-file=/home/mysql/etc/%s.cnf --user=mysql &"%(self.version,self.conf_name)) 238 break 239 else: 240 text = "Mysql not stop,please wait..." 241 self.log_w(text) 242 print text 243 time.sleep(5) 244 a = 0 245 while 1: 246 if os.popen("netstat -ntlp|grep %s|wc -l"%(self.dbport)).read().strip() != ‘0‘: 247 text = "Mysql restart success !!" 248 self.log_w(text) 249 print "\033[1;32;40m%s\033[0m" % text 250 break 251 else: 252 if a == 12: 253 text = "Mysql restart Error,please check !!" 254 self.log_w(text) 255 print "\033[1;31;40m%s\033[0m" % text 256 sys.exit() 257 else: 258 a= a + 1 259 time.sleep(5) 260 261 if self.app_name=="adb_s" and self.version=="5.1" and self.m_version=="4.0": 262 os.popen("/usr/local/mysql%s/bin/mysql_upgrade --socket=/tmp/mysql-%s.sock -udev2 -pPs6iW7-Jp39Rx5"%(self.version,self.dbport)) 263 264 def import_date(self):#导入表结构和备份数据库 265 text = "Slave import master date,Please wait ...." 266 self.log_w(text) 267 print "\033[1;32;40m%s\033[0m" % text 268 #导入表结构 269 dir = "/data1/tmp/%s/"%self.m_app_name 270 table = ‘%s_%s_table_%s_%s‘ %(self.main_id,self.app_name.split(‘_‘)[0],self.dist_id,self.today) 271 conm = "/usr/local/mysql%s/bin/mysql --socket=/tmp/mysql-%s.sock < %s%s && echo $?" % (self.version,self.dbport,dir,table) 272 result = os.popen(conm).read().strip() 273 if result == ‘0‘: 274 text = " Import %s success !" % table 275 self.log_w(text) 276 print text 277 else: 278 text = "Import Table structure Error !" 279 self.log_w(text) 280 print "\033[1;31;40m%s\033[0m" % text 281 sys.exit() 282 283 for f in os.listdir(dir):#导入数据库 284 if os.path.isfile(os.path.join(dir,f)) and (f.find(‘bz2‘) == -1) and (f.find(‘table‘) == -1): 285 if f.find("%s_%s"%(self.main_id,self.app_name.split(‘_‘)[0])) != -1 and f.find(self.today) != -1 and (f.find(str(self.dist_id)) != -1): 286 conm = "/usr/local/mysql%s/bin/mysql --socket=/tmp/mysql-%s.sock < %s && echo $?" % (self.version,self.dbport,os.path.join(dir,f)) 287 result = os.popen(conm).read().strip() 288 if result == ‘0‘: 289 text = " Import %s success !" % f 290 self.log_w(text) 291 print text 292 else: 293 text = "Import Database %s Error !" %self.app_name.split(‘_‘)[0] 294 self.log_w(text) 295 print "\033[1;31;40m%s\033[0m" % text 296 sys.exit() 297 298 def slave_start(self):#启动salve 299 text = "Settings Slave,Please wait ...." 300 self.log_w(text) 301 print "\033[1;32;40m%s\033[0m" % text 302 binlog,log_pos=self.bin_pos() 303 sql = "change master to master_host=‘%s‘,master_user=‘repl‘,master_password=‘H7RYbCkGHmm_P1XO‘,master_port=%s,master_log_file=‘%s‘,master_log_pos=%s;" % (self.host,self.m_dbport,binlog,log_pos) 304 try: 305 conn = MySQLdb.connect(host = ‘127.0.0.1‘,port=self.dbport,user = ‘‘,passwd = ‘‘,connect_timeout=5) 306 cursor = conn.cursor(cursorclass = MySQLdb.cursors.DictCursor) 307 cursor.execute(sql) 308 cursor.execute("slave start;") 309 time.sleep(3) 310 cursor.execute("show slave status;") 311 alldata = cursor.fetchall()[0] 312 if alldata["Slave_IO_Running"] == "Yes" and alldata["Slave_SQL_Running"] == "Yes": 313 text = " Settings Slave success!" 314 self.log_w(text) 315 print "\033[1;32;40m%s\033[0m" % text 316 for key in alldata: 317 print "%21s :" % key + ‘\t‘ + str(alldata[key]) 318 time.sleep(5) 319 print 320 print "******************************************" 321 print 322 cursor.execute("show slave status;") 323 alldata = cursor.fetchall()[0] 324 for key in alldata: 325 print "%21s :" % key + ‘\t‘ + str(alldata[key]) 326 else: 327 text = " Settings Slave Error,Please check it!" 328 self.log_w(text) 329 print "\033[1;31;40m%s\033[0m" % text 330 cursor.close() 331 conn.close() 332 except MySQLdb.Error,e: 333 self.log_w(e) 334 print e 335 sys.exit() 336 337 338 def bin_pos(self):#获取主库备份前的一个bin-log文件以及它的第一个pos位置 339 dir = "/data1/tmp/%s/"%self.m_app_name 340 for f in os.listdir(dir): 341 if os.path.isfile(os.path.join(dir,f)) and (f.find(‘bz2‘) == -1) and (f.find(‘table‘) == -1): 342 if f.find("%s_%s"%(self.main_id,self.app_name.split(‘_‘)[0])) != -1 and f.find(self.today) != -1 and (f.find(str(self.dist_id)) != -1): 343 binlog = f.split(‘_‘)[4] 344 log_pos = f.split(‘_‘)[5] 345 return binlog,log_pos 346 ‘‘‘ 347 def eth1_ip(self): 348 eth0_ip = self.m_ip() 349 try: 350 conm = "ifconfig | grep Mask | grep 192.168 | awk -F ‘:‘ ‘{print $2}‘ | awk -F ‘ ‘ ‘{print $1}‘" 351 result = ssh_connect(eth0_ip,self.password,conm) 352 if result != ‘‘: 353 text = " 成功获取主库内网地址 :%s".decode("utf-8").encode("GBK") % result 354 self.log_w(text) 355 print "\033[1;32;40m%s\033[0m" % text 356 #self.host=result 357 358 else: 359 text = " 无法获取主库内网地址".decode("utf-8").encode("GBK") 360 self.log_w(text) 361 print "\033[1;31;40m%s\033[0m" % text 362 sys.exit() 363 except Exception,e: 364 text = "SSH connect Error !" 365 self.log_w(text) 366 print "\033[1;31;40m%s\033[0m" % text 367 sys.exit() 368 ‘‘‘ 369 370 def m_ip(self):#走中心应用确认本服务器的主库IP地址 371 text = "走中心应用确认本服务器的主库信息,稍等......".decode("utf-8").encode("GBK") 372 self.log_w(text) 373 print "\033[1;32;40m%s\033[0m" % text 374 375 sql = "SELECT c.dist_id,c.ip,c.app_name,c.port FROM center_app.main_category a, center_app.sub_category b, center_app.app_info c WHERE a.id = b.main_id AND b.dist_id = c.dist_id AND b.main_id = c.main_id AND c.main_id=%s AND c.dist_id=%s AND c.app_name = ‘%s‘ AND c.flag=‘1‘ AND c.del_info!=0" % (self.main_id,self.dist_id,self.m_app_name) 376 alldata = mysql_conn(sql) 377 self.m_dbport=int(alldata["port"]) 378 self.m_conf_name=‘%s-%s-%s-%s‘ %(self.main_id,self.dist_id,self.m_app_name,self.m_dbport) 379 if alldata != 0: 380 return alldata["ip"] 381 else: 382 text = " 中心应用没有找到本服务器主库的相关信息,程序退出!".decode("utf-8").encode("GBK") 383 self.log_w(text) 384 print "\033[1;31;40m%s\033[0m" % text 385 sys.exit() 386 387 388 def drop_dump_file(self): 389 text = " 删除游戏 %s %s dump文件".decode("utf-8").encode("GBK") %(self.main_id,self.app_name.split(‘_‘)[0]) 390 self.log_w(text) 391 print "\033[1;32;40m%s\033[0m" % text 392 conm="rm -f /data1/tmp/%s/%s_%s*%s_%s*" %(self.m_app_name,self.main_id,self.app_name.split(‘_‘)[0],self.dist_id,self.today) 393 os.popen(conm) 394 395 if __name__=="__main__": 396 dbport = raw_input(‘Enter :Slave_dbport :‘) 397 print "\033[1;31;40m%s\033[0m" %(‘‘‘ 398 注意事项:1、确认脚本中用于服务器连接的用户名、密码和端口之类的没有错 399 2、确认主库已经按照要求修改配置文件打开了bin-log,设置了相关参数 400 3、确认从库已经安装和主库一样版本的mysql 401 4、运行之后删除/data1/tmp/目录下的压缩包 402 5、运行格式:python create_slave.py 403 6、从库创建完成并且数据与主库追齐之后要用脚本验证数据的一致性,数据一致后整个创建从库过程完成 404 7、用于创建主从 405 8、/home/create_slave_%s.log为脚本运行日志 406 9、在执行check_mysql时,会出现Broken pipe的错误,这个是由于python调用系统命令关闭和打开mysql时显示的信息没有正确的显示在终端造成的,没有影响,暂时没有找到不让显示此类信息的方法,亟待解决 407 ‘‘‘%dbport).decode("utf-8").encode("GBK") 408 dbport=int(dbport) 409 slave_ip = raw_input(‘Enter :slave outer_ip :‘) 410 stat_ip = raw_input(‘Enter :stat_ip :‘) 411 master_ip = raw_input(‘Enter :master_ip :‘) 412 boss = Database(dbport,slave_ip,stat_ip,master_ip) 413 boss.find_ip() 414 boss.m_ip() 415 boss.check_mysql() 416 boss.export_table() 417 boss.down_back() 418 boss.unbz2() 419 boss.import_date() 420 boss.restart_mysql() 421 boss.slave_start() 422 boss.drop_dump_file()
时间: 2024-10-12 04:20:06