python制作mysql数据迁移脚本
用python写了个数据迁移脚本,主要是利用从库将大的静态表导出表空间,载导入到目标实例中。
#!/usr/bin/envpython3
#-*-coding:utf8-*-
#author:zhanbin.liu
#!!!!!DB必须同版本
#python3环境pip3installpymysqlparamiko
importos
#frompathlibimportPath
importsys
importpymysql
importparamiko
#每次只能迁移一个DB下的表,到指定DB
#GRANTSELECT,CREATE,RELOAD,ALTER,LOCKTABLESON*.*TO'data_migration'@'192.168.%'IDENTIFIEDBY'data_migration@123';
tables='sqlauto_cluster,sqlauto_user'#以,分割的字符串,如a,b,c
tableList=tables.split(',')
sourceIp='192.168.1.101'
sourceDataBase='/data/mysql/3306/data'
sourceDbName='inception_web'
sourceDataDir=os.path.join(sourceDataBase,sourceDbName)
desIp='192.168.1.102'
desDataBase='/data/mysql/3306/data'
desDbName='inception_web'
desDataDir=os.path.join(desDataBase,desDbName)
#fortableintableList:
#desFile=Path("%s/%s.ibd"%(desDataDir,table))
#print(desFile)
#ifdesFile.is_file():
#print("ok")
#else:
#print("no")
comUser='data_migration'
comPwd='data_migration@123'
comPort=3306
client=paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
deftable_judge():
print("table_judge")
sourceTableExist=pymysql.connect(sourceIp,comUser,comPwd,sourceDbName,comPort,charset='utf8')
desTableExist=pymysql.connect(desIp,comUser,comPwd,desDbName,comPort,charset='utf8')
sourceTables=[]
desTables=[]
cursor_source=sourceTableExist.cursor()
cursor_des=desTableExist.cursor()
fortableintableList:
#print(table)
cursor_source.execute("selectTABLE_NAMEfrominformation_schema.TABLESwhereTABLE_SCHEMA='%s'andTABLE_NAME='%s';"%(sourceDbName,table))
sourceTable_tmp=cursor_source.fetchall()
cursor_des.execute("selectTABLE_NAMEfrominformation_schema.TABLESwhereTABLE_SCHEMA='%s'andTABLE_NAME='%s';"%(desDbName,table))
desTable_tmp=cursor_des.fetchall()
#print(desTable_tmp)
ifsourceTable_tmpis():
sourceTables.append(table)
ifdesTable_tmpisnot():
desTables.append(desTable_tmp[0][0])
sourceTableExist.close()
desTableExist.close()
s=d=0
ifsourceTables!=[]:
print('迁移源不存在将要迁移的表:',sourceIp,sourceDbName,sourceTables,'请检查')
s=1
ifdesTables!=[]:
print('目标库存在将要迁移的表:',desIp,desDbName,desTables,'请移除')
d=1
ifs==1ord==1:
sys.exit()
defdata_sync():
print('data_sync')
db_source=pymysql.connect(sourceIp,comUser,comPwd,sourceDbName,comPort,charset='utf8')
db_des=pymysql.connect(desIp,comUser,comPwd,desDbName,comPort,charset='utf8')
cursor_db_source=db_source.cursor()
cursor_db_des=db_des.cursor()
fortableintableList:
print("正在同步表:",table)
cursor_db_source.execute("showcreatetable%s;"%(table))
createTableSQL=cursor_db_source.fetchall()[0][1]
print(createTableSQL)
try:
cursor_db_des.execute(createTableSQL)
exceptExceptionaserror:
print(error)
cursor_db_source.execute("flushtable%swithreadlock;"%(table))
cursor_db_des.execute("altertable%sdiscardtablespace;"%(table))
client.connect(sourceIp,22,'root')
stdin1,stdout1,stderr1=client.exec_command("scp%s%s:%s"%(sourceDataDir+"/"+table+".ibd",desIp,desDataDir))
stdin2,stdout2,stderr2=client.exec_command("scp%s%s:%s"%(sourceDataDir+"/"+table+".cfg",desIp,desDataDir))
a_e_1=stderr1.readlines()
a_e_2=stderr2.readlines()
ifa_e_1!=[]ora_e_2!=[]:
print(a_e_1,a_e_2)
sys.exit()
client.close()
client.connect(desIp,22,'root')
stdin3,stdout3,stderr3=client.exec_command("chown-Rmysql.mysql%s*"%(desDataDir+"/"+table))
a_e_3=stderr3.readlines()
ifa_e_3!=[]:
print(a_e_1,a_e_2)
sys.exit()
client.close()
#cursor_db_source.execute("selectsleep(10);")
cursor_db_source.execute("unlocktables;")
cursor_db_des.execute("altertable%simporttablespace;"%(table))
print("同步完成")
cursor_db_source.close()
cursor_db_des.close()
defdata_checksum():
print('data_checksum')
db_source=pymysql.connect(sourceIp,comUser,comPwd,sourceDbName,comPort,charset='utf8')
db_des=pymysql.connect(desIp,comUser,comPwd,desDbName,comPort,charset='utf8')
cursor_db_source=db_source.cursor()
cursor_db_des=db_des.cursor()
fortableintableList:
print("正在校验表:",table)
cursor_db_source.execute("checksumtable%s;"%(table))
ck_s=cursor_db_source.fetchall()[0][1]
cursor_db_des.execute("checksumtable%s;"%(table))
ck_d=cursor_db_des.fetchall()[0][1]
ifck_s!=ck_d:
print("表不一致:",table)
else:
print("表一致:",table)
cursor_db_source.close()
cursor_db_des.close()
if__name__=="__main__":
table_judge()
data_sync()
data_checksum()
print('haha')