Python实现mysql数据库更新表数据接口的功能
前言
昨天,因为项目需求要添加表的更新接口,来存储预测模型训练的数据,所以自己写了一段代码实现了该功能,在开始之前,给大家分享python操作mysql数据库基础:
#coding=utf-8
importMySQLdb
conn=MySQLdb.connect(
host='localhost',
port=3306,
user='root',
passwd='123456',
db='test',
)
cur=conn.cursor()
#创建数据表
#cur.execute("createtablestudent(idint,namevarchar(20),classvarchar(30),agevarchar(10))")
#插入一条数据
#cur.execute("insertintostudentvalues('2','Tom','3year2class','9')")
#修改查询条件的数据
#cur.execute("updatestudentsetclass='3year1class'wherename='Tom'")
#删除查询条件的数据
#cur.execute("deletefromstudentwhereage='9'")
cur.close()
conn.commit()
conn.close()
>>>conn=MySQLdb.connect(host='localhost',port=3306,user='root',passwd='123456',db='test',)
Connect()方法用于创建数据库的连接,里面可以指定参数:用户名,密码,主机等信息。
这只是连接到了数据库,要想操作数据库需要创建游标。
>>>cur=conn.cursor()
通过获取到的数据库连接conn下的cursor()方法来创建游标。
>>>cur.execute("createtablestudent(idint,namevarchar(20),classvarchar(30),agevarchar(10))")
通过游标cur操作execute()方法可以写入纯sql语句。通过execute()方法中写如sql语句来对数据进行操作。
>>>cur.close()
cur.close()关闭游标
>>>conn.commit()
conn.commit()方法在提交事物,在向数据库插入一条数据时必须要有这个方法,否则数据不会被真正的插入。
>>>conn.close()
Conn.close()关闭数据库连接
下面开始本文的正文:
Python实现mysql更新表数据接口
示例代码
#-*-coding:utf-8-*-
importpymysql
importsettings
classmysql(object):
def__init__(self):
self.db=None
defconnect(self):
self.db=pymysql.connect(host=settings.ip,port=settings.port,user=settings.mysql_user,passwd=settings.mysql_passwd,db=settings.database,)
#print("connectisok")
#return1
defdisconnect(self):
self.db.close()
#return-1
defcreate_table(self,tablename,columns,spec='time'):
"""
:paramtablename:
:paramspec:
:paramcolumns:列表[]
:return:
"""
type_data=['int','double(10,3)']
cursor=self.db.cursor()
sql="createtable%s("%(tablename,)
sqls=[]
forcolincolumns:
#判断是否time_num
ifcol==spec:
sqls.append('%s%sprimarykey'%(col,type_data[0]))
else:
sqls.append('%s%s'%(col,type_data[1]))
sqlStr=','.join(sqls)
sql+=sqlStr+')'
try:
cursor.execute(sql)
print("Table%siscreated"%tablename)
except:
self.db.rollback()
defis_table_exist(self,tablename,dbname):
cursor=self.db.cursor()
sql="selecttable_namefrominformation_schema.TABLESwheretable_schema='%s'andtable_name='%s'"%(dbname,tablename)
#results="error:Thietableisnotexit"
try:
cursor.execute(sql)
results=cursor.fetchall()#接受全部返回行
except:
#不存在这张表返回错误提示
raiseException('Thistabledoesnotexist')
ifnotresults:
returnNone
else:
returnresults
#printdatas
definsert_mysql_with_json(self,tablename,datas):
"""
:paramtablename:
:paramdatas:字典{(key:value),.....}
:return:
"""
#keys=datas[0]
keys=datas[0].keys()
keys=str(tuple(keys))
keys=''.join(keys.split("'"))#用'隔开
print(keys)
ret=[]
fordtindatas:
values=dt.values()##‘str'objecthasnoattribute#
sql="insertinto%s"%tablename+keys
sql=sql+"values"+str(tuple(values))
ret.append(sql)
#print("1")
#printkeysinsertinto%tablenamedat[i]valuesstr[i]
self.insert_into_sql(ret)
print("1")
definsert_into_sql(self,sqls):
cursor=self.db.cursor()
forsqlinsqls:
#执行sql语句
try:
cursor.execute(sql)
self.db.commit()
#print("insert%s"%sql,"success.")
except:
#Rollbackincasethereisanyerror
self.db.rollback()
#找列名
deffind_columns(self,tablename):
sql="selectCOLUMN_NAMEfrominformation_schema.columnswheretable_name='%s'"%tablename
cursor=self.db.cursor()
try:
cursor.execute(sql)
results=cursor.fetchall()
except:
raiseException('hello')
returntuple(map(lambdax:x[0],results))
deffind(self,tablename,start_time,end_time,fieldName=None):
"""
:paramtablename:test_scale1015
:paramfieldName:Noneor(columns1010,columns1011,columns1012,columns1013,time)
:return:
"""
cursor=self.db.cursor()
sql=''
iffieldName==None:
fieldName=self.find_columns(tablename)
sql="select*from%swheretimebetween%sand%s"%(tablename,str(start_time),str(end_time))
#print('None')
else:
fieldNameStr=','.join(fieldName)
sql="select%sfrom%swheretimebetween%sand%s"%(
fieldNameStr,tablename,str(start_time),str(end_time))
#print('sm')
try:
cursor.execute(sql)
results=cursor.fetchall()
except:
raiseException('hello')
returnfieldName,results,
#样例data=[{'time':123321,'predict':1.222},{'time':123322,'predict':1.223},{'time':123324,'predict':1.213}]
defupdata(self,datas,tablename):
cursor=self.db.cursor()
columns=[]
fordataindatas:
foriindata.keys():
columns.append(i)
#print(columns)
break
#columns_2=columns[:]
db.connect()
ifdb.is_table_exist(settings.tablename_2,settings.database):
#exists
#pass
forcolincolumns:
ifcol!='time':
sql="altertable%saddcolumn%sdouble(10,3);"%(settings.tablename_2,col)
try:
cursor.execute(sql)
print("%sisalteredok"%(col))
except:
print("alterisfailed")
ret=[]
foriindatas:
col=[]
foriiini.keys():
col.append(ii)
#time=col[0]andpredict=col[1]
time_data=i[col[0]]
predic_data=i[col[1]]
sql="update%sset%s='%s'where%s=%s"%(settings.tablename_2,col[1],predic_data,col[0],time_data)
ret.append(sql)
self.insert_into_sql(ret)
#db.insert_mysql_with_json(tablename,datas)
else:
#noexists
db.create_table(settings.tablename_2,columns)
db.insert_mysql_with_json(settings.tablename_2,datas)
db=mysql()
其中update()函数,是新添加的接口:
传入的data的样例data=[{'time':123321,'predict':1.222},{'time':123322,'predict':1.223},{'time':123324,'predict':1.213}]这样子的。
一个列表里有多个字典,每个字典有time和predict。如果需要存predict_2,predict_3的时候,则实现更新操作,否则,只进行创表和插入数据的操作~~~~~~
看起来是不是很简单~~~~~~
这个接口还没有进行优化等操作,很冗余~~~~
毕竟项目还在测试阶段,等先跑通了,在考虑优化吧~~~~~~
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对毛票票的支持。