每天迁移MySQL历史数据到历史库Python脚本
本文实例为大家分享了Python每天迁移MySQL历史数据到历史库的具体代码,供大家参考,具体内容如下
#!/usr/bin/envpython
#coding:utf-8
__author__='John'
importMySQLdb
importsys
importdatetime
importtime
classClassMigrate(object):
def_get_argv(self):
self.usage="""
usage():
pythondaily_migration.py--source=192.168.1.4:3306/db_name:tab_name/proxy/password\\
--dest=192.168.1.150:13301/db_name_archive:tab_name_201601/proxy/password\\
--delete_strategy=delete--primary_key=auto_id--date_col=ut--time_interval=180
"""
iflen(sys.argv)==1:
printself.usage
sys.exit(1)
elifsys.argv[1]=='--help'orsys.argv[1]=='-h':
printself.usage
sys.exit()
eliflen(sys.argv)>2:
foriinsys.argv[1:]:
_argv=i.split('=')
if_argv[0]=='--source':
_list=_argv[1].split('/')
self.source_host=_list[0].split(':')[0]
self.source_port=int(_list[0].split(':')[1])
self.source_db=_list[1].split(':')[0]
self.source_tab=_list[1].split(':')[1]
self.source_user=_list[2]
self.source_password=_list[3]
elif_argv[0]=='--dest':
_list=_argv[1].split('/')
self.dest_host=_list[0].split(':')[0]
self.dest_port=int(_list[0].split(':')[1])
self.dest_db=_list[1].split(':')[0]
self.dest_tab=_list[1].split(':')[1]
self.dest_user=_list[2]
self.dest_password=_list[3]
elif_argv[0]=='--delete_strategy':
self.deleteStrategy=_argv[1]
ifself.deleteStrategynotin('delete','drop'):
print(self.usage)
sys.exit(1)
elif_argv[0]=='--primary_key':
self.pk=_argv[1]
elif_argv[0]=='--date_col':
self.date_col=_argv[1]
elif_argv[0]=='--time_interval':
self.interval=_argv[1]
else:
print(self.usage)
sys.exit(1)
def__init__(self):
self._get_argv()
##--------------------------------------------------------------------
self.sourcedb_conn_str=MySQLdb.connect(host=self.source_host,port=self.source_port,user=self.source_user,passwd=self.source_password,db=self.source_db,charset='utf8')
self.sourcedb_conn_str.autocommit(True)
self.destdb_conn_str=MySQLdb.connect(host=self.dest_host,port=self.dest_port,user=self.dest_user,passwd=self.dest_password,db=self.dest_db,charset='utf8')
self.destdb_conn_str.autocommit(True)
##--------------------------------------------------------------------
self.template_tab=self.source_tab+'_template'
self.step_size=20000
##--------------------------------------------------------------------
self._migCompleteState=False
self._deleteCompleteState=False
##--------------------------------------------------------------------
self.source_cnt=''
self.source_min_id=''
self.source_max_id=''
self.source_checksum=''
self.dest_cn=''
##--------------------------------------------------------------------
self.today=time.strftime("%Y-%m-%d")
#self.today='2016-05-3009:59:40'
defsourcedb_query(self,sql,sql_type):
try:
cr=self.sourcedb_conn_str.cursor()
cr.execute(sql)
ifsql_type=='select':
returncr.fetchall()
elifsql_type=='dml':
rows=self.sourcedb_conn_str.affected_rows()
returnrows
else:
returnTrue
exceptException,e:
print(str(e)+"
")
returnFalse
finally:
cr.close()
defdestdb_query(self,sql,sql_type,values=''):
try:
cr=self.destdb_conn_str.cursor()
ifsql_type=='select':
cr.execute(sql)
returncr.fetchall()
elifsql_type=='insertmany':
cr.executemany(sql,values)
rows=self.destdb_conn_str.affected_rows()
returnrows
else:
cr.execute(sql)
returnTrue
exceptException,e:
print(str(e)+"
")
returnFalse
finally:
cr.close()
defcreate_table_from_source(self):
'''''因为tab_name表的数据需要迁移到archive引擎表,所以不适合使用这种方式。预留作其他用途。'''
try:
sql="showcreatetable%s;"%self.source_tab
create_str=self.sourcedb_query(sql,'select')[0][1]
create_str=create_str.replace('CREATETABLE','CREATETABLEIFNOTEXISTS')
self.destdb_query(create_str,'ddl')
returnTrue
exceptException,e:
print(str(e)+"
")
returnFalse
defcreate_table_from_template(self):
try:
sql='CREATETABLEIFNOTEXISTS%slike%s;'%(self.dest_tab,self.template_tab)
state=self.destdb_query(sql,'ddl')
ifstate:
returnTrue
else:
returnFalse
exceptException,e:
print(str(e+"
")+"
")
returnFalse
defget_min_max(self):
"""创建目标表、并获取源表需要迁移的总条数、最小id、最大id"""
try:
print("\nStartingMigrateat--%s
")%(datetime.datetime.now().__str__())
sql="""selectcount(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1)from%swhere%s>=CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL-%sday),'%%Y-%%m-%%d'),'00:00:00')\
and%s<=CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL-%sday),'%%Y-%%m-%%d'),'23:59:59')"""\
%(self.pk,self.pk,self.source_tab,self.date_col,self.today,self.interval,self.date_col,self.today,self.interval)
q=self.sourcedb_query(sql,'select')
self.source_cnt=q[0][0]
self.source_min_id=q[0][1]
self.source_max_id=q[0][2]
self.source_checksum=str(self.source_cnt)+'_'+str(self.source_min_id)+'_'+str(self.source_max_id)
ifself.source_cnt==0orself.source_min_id==-1orself.source_max_id==-1:
print("Thereis0recordinsourcetablebeenmatched!
")
returnFalse
else:
returnTrue
exceptException,e:
print(str(e)+"
")
returnFalse
defmigrate_2_destdb(self):
try:
get_min_max_id=self.get_min_max()
ifget_min_max_id:
k=self.source_min_id
desc_sql="desc%s;"%self.source_tab
#self.filed=[]
cols=self.sourcedb_query(desc_sql,'select')
#forjincols:
#self.filed.append(j[0])
fileds="%s,"*len(cols)#源表有多少个字段,就拼凑多少个%s,拼接到insert语句
fileds=fileds.rstrip(',')
whilek<=self.source_max_id:
sql="""select*from%swhere%s>=%dand%s<%d\
and%s>=CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL-%sday),'%%Y-%%m-%%d'),'00:00:00')\
and%s<=CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL-%sday),'%%Y-%%m-%%d'),'23:59:59')"""\
%(self.source_tab,self.pk,k,self.pk,k+self.step_size,self.date_col,self.today,self.interval,self.date_col,self.today,self.interval)
print("\n%s
")%sql
starttime=datetime.datetime.now()
results=self.sourcedb_query(sql,'select')
insert_sql="insertinto"+self.dest_tab+"values(%s)"%fileds
rows=self.destdb_query(insert_sql,'insertmany',results)
ifrows==False:
print("Insertfailed!!
")
else:
print("Inserted%srows.
")%rows
endtime=datetime.datetime.now()
timeinterval=endtime-starttime
print("Elapsed:"+str(timeinterval.seconds)+'.'+str(timeinterval.microseconds)+"seconds
")
k+=self.step_size
print("\nInsertcompleteat--%s
")%(datetime.datetime.now().__str__())
returnTrue
else:
returnFalse
exceptException,e:
print(str(e)+"
")
returnFalse
defverify_total_cnt(self):
try:
sql="""selectcount(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1)from%swhere%s>=CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL-%sday),'%%Y-%%m-%%d'),'00:00:00')\
and%s<=CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL-%sday),'%%Y-%%m-%%d'),'23:59:59')"""\
%(self.pk,self.pk,self.dest_tab,self.date_col,self.today,self.interval,self.date_col,self.today,self.interval)
dest_result=self.destdb_query(sql,'select')
self.dest_cnt=dest_result[0][0]
dest_checksum=str(self.dest_cnt)+'_'+str(dest_result[0][1])+'_'+str(dest_result[0][2])
print("source_checksum:%s,dest_checksum:%s
")%(self.source_checksum,dest_checksum)
ifself.source_cnt==dest_result[0][0]anddest_result[0][0]!=0andself.source_checksum==dest_checksum:
self._migCompleteState=True
print("Verifysuccessfully!!
")
else:
print("Verifyfailed!!
")
sys.exit(77)
exceptException,e:
print(str(e)+"
")
defdrop_daily_partition(self):
try:
ifself._migCompleteState:
sql="""explainpartitionsselect*from%swhere%s>=CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL-%sday),'%%Y-%%m-%%d'),'00:00:00')
and%s<=CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL-%sday),'%%Y-%%m-%%d'),'23:59:59')"""\
%(self.source_tab,self.date_col,self.today,self.interval,self.date_col,self.today,self.interval)
partition_name=self.sourcedb_query(sql,'select')
partition_name=partition_name[0][3]
sql="""selectcount(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1)from%spartition(%s)"""\
%(self.pk,self.pk,self.source_tab,partition_name)
q=self.sourcedb_query(sql,'select')
source_cnt=q[0][0]
source_min_id=q[0][1]
source_max_id=q[0][2]
checksum=str(source_cnt)+'_'+str(source_min_id)+'_'+str(source_max_id)
ifsource_cnt==0orsource_min_id==-1orsource_max_id==-1:
print("Thereis0recordinsourcePARTITIONbeenmatched!
")
else:
ifchecksum==self.source_checksum:
drop_par_sql="altertable%sdroppartition%s;"%(self.source_tab,partition_name)
droped=self.sourcedb_query(drop_par_sql,'ddl')
ifdroped:
print(drop_par_sql+"
")
print("\nDroppartitioncompleteat--%s
")%(datetime.datetime.now().__str__())
self._deleteCompleteState=True
else:
print(drop_par_sql+"
")
print("Droppartitionfailed..
")
else:
print("Thepartition%schecksumfailed!!Dropfailed!!")%partition_name
sys.exit(77)
exceptException,e:
print(str(e)+"
")
defdelete_data(self):
try:
ifself._migCompleteState:
k=self.source_min_id
whilek<=self.source_max_id:
sql="""deletefrom%swhere%s>=%dand%s<%d\
and%s>=CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL-%sday),'%%Y-%%m-%%d'),'00:00:00')\
and%s<=CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL-%sday),'%%Y-%%m-%%d'),'23:59:59')"""\
%(self.source_tab,self.pk,k,self.pk,k+self.step_size,self.date_col,self.today,self.interval,self.date_col,self.today,self.interval)
print("\n%s
")%sql
starttime=datetime.datetime.now()
rows=self.sourcedb_query(sql,'dml')
ifrows==False:
print("Deletefailed!!
")
else:
print("Deleted%srows.
")%rows
endtime=datetime.datetime.now()
timeinterval=endtime-starttime
print("Elapsed:"+str(timeinterval.seconds)+'.'+str(timeinterval.microseconds)+"seconds
")
time.sleep(1)
k+=self.step_size
print("\nDeletecompleteat--%s
")%(datetime.datetime.now().__str__())
self._deleteCompleteState=True
exceptException,e:
print(str(e)+"
")
defdo(self):
tab_create=self.create_table_from_template()
iftab_create:
migration=self.migrate_2_destdb()
ifmigration:
self.verify_total_cnt()
ifself._migCompleteState:
ifself.deleteStrategy=='drop':
self.drop_daily_partition()
else:
self.delete_data()
print("\n
")
print("====="*5+'
')
print("source_total_cnt:%s
")%self.source_cnt
print("dest_total_cnt:%s
")%self.dest_cnt
print("====="*5+'
')
ifself._deleteCompleteState:
print("\nFinalresult:Successfully!!
")
sys.exit(88)
else:
print("\nFinalresult:Failed!!
")
sys.exit(254)
else:
print("Createtablefailed!Exiting...")
sys.exit(255)
f=ClassMigrate()
f.do()
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。