Python 实现数据库(SQL)更新脚本的生成方法
我在工作的时候,在测试环境下使用的数据库跟生产环境的数据库不一致,当我们的测试环境下的数据库完成测试准备更新到生产环境上的数据库时候,需要准备更新脚本,真是一不小心没记下来就会忘了改了哪里,哪里添加了什么,这个真是非常让人头疼。因此我就试着用Python来实现自动的生成更新脚本,以免我这烂记性,记不住事。
主要操作如下:
1.在原先basedao.py中添加如下方法,这样旧能很方便的获取数据库的数据,为测试数据库和生产数据库做对比打下了基础。
defselect_database_struts(self):
'''
查找当前连接配置中的数据库结构以字典集合
'''
sql='''SELECTCOLUMN_NAME,IS_NULLABLE,COLUMN_TYPE,COLUMN_KEY,COLUMN_COMMENT
FROMinformation_schema.`COLUMNS`
WHERETABLE_SCHEMA="%s"ANDTABLE_NAME="{0}"'''%(self.__database)
struts={}
forkinself.__primaryKey_dict.keys():
self.__cursor.execute(sql.format(k))
results=self.__cursor.fetchall()
struts[k]={}
forresultinresults:
struts[k][result[0]]={}
struts[k][result[0]]["COLUMN_NAME"]=result[0]
struts[k][result[0]]["IS_NULLABLE"]=result[1]
struts[k][result[0]]["COLUMN_TYPE"]=result[2]
struts[k][result[0]]["COLUMN_KEY"]=result[3]
struts[k][result[0]]["COLUMN_COMMENT"]=result[4]
returnself.__config,struts
2.编写对比的Python脚本
'''
数据库迁移脚本,目前支持一下几种功能:
1.生成旧数据库中没有的数据库表执行SQL脚本(支持是否带表数据),生成的SQL脚本在temp目录下(表名.sql)。
2.生成添加列SQL脚本,生成的SQL脚本统一放在temp目录下的depoyed.sql中。
3.生成修改列属性SQL脚本,生成的SQL脚本统一放在temp目录下的depoyed.sql中。
4.生成删除列SQL脚本,生成的SQL脚本统一放在temp目录下的depoyed.sql中。
'''
importjson,os,sys
frombasedaoimportBaseDao
temp_path=sys.path[0]+"/temp"
ifnotos.path.exists(temp_path):
os.mkdir(temp_path)
defmain(old,new,has_data=False):
'''
@old旧数据库(目标数据库)
@new最新的数据库(源数据库)
@has_data是否生成结构+数据的sql脚本
'''
clear_temp()#先清理temp目录
old_config,old_struts=old
new_config,new_struts=new
fornew_table,new_fieldsinnew_struts.items():
ifold_struts.get(new_table)isNone:
gc_sql(new_config["user"],new_config["password"],new_config["database"],new_table,has_data)
else:
cmp_table(old_struts[new_table],new_struts[new_table],new_table)
defcmp_table(old,new,table):
'''
对比表结构生成sql
'''
old_fields=old
new_fields=new
sql_add_column="ALTERTABLE`{TABLE}`ADDCOLUMN`{COLUMN_NAME}`{COLUMN_TYPE}COMMENT'{COLUMN_COMMENT}';\n"
sql_change_column="ALTERTABLE`{TABLE}`CHANGE`{COLUMN_NAME}``{COLUMN_NAME}`{COLUMN_TYPE}COMMENT'{COLUMN_COMMENT}';\n"
sql_del_column="ALTERTABLE`{TABLE}`DROP{COLUMN_NAME};"
ifold_fields!=new_fields:
f=open(sys.path[0]+"/temp/deploy.sql","a",encoding="utf8")
content=""
fornew_field,new_field_dictinnew_fields.items():
old_filed_dict=old_fields.get(new_field)
ifold_filed_dictisNone:
#生成添加列sql
content+=sql_add_column.format(TABLE=table,**new_field_dict)
else:
#生成修改列sql
ifold_filed_dict!=new_field_dict:
content+=sql_change_column.format(TABLE=table,**new_field_dict)
pass
#生成删除列sql
forold_field,old_field_dictinold_fields.items():
ifnew_fields.get(old_field)isNone:
content+=sql_del_column.format(TABLE=table,COLUMN_NAME=old_field)
f.write(content)
f.close()
defgc_sql(user,pwd,db,table,has_data):
'''
生成sql文件
'''
ifhas_data:
sys_order="mysqldump-u%s-p%s%s%s>%s/%s.sql"%(user,pwd,db,table,temp_path,table)
else:
sys_order="mysqldump-u%s-p%s-d%s%s>%s/%s.sql"%(user,pwd,db,table,temp_path,table)
os.system(sys_order)
defclear_temp():
'''
每次执行的时候调用这个,先清理下temp目录下面的旧文件
'''
ifos.path.exists(temp_path):
files=os.listdir(temp_path)
forfileinfiles:
f=os.path.join(temp_path,file)
ifos.path.isfile(f):
os.remove(f)
print("临时文件目录清理完成")
if__name__=="__main__":
test1_config={
"user":"root",
"password":"root",
"database":"test1",
}
test2_config={
"user":"root",
"password":"root",
"database":"test2",
}
test1_dao=BaseDao(**test1_config)
test1_struts=test1_dao.select_database_struts()
test2_dao=BaseDao(**test2_config)
test2_struts=test2_dao.select_database_struts()
main(test2_struts,test1_struts)
目前只支持了4种SQL脚本的生成。
总结
以上所述是小编给大家介绍的Python实现数据库(SQL)更新脚本的生成方法,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!