基于SQLAlchemy实现操作MySQL并执行原生sql语句
场景应用
老大我让爬取内部网站获取数据,插入到新建的表中,并每天进行爬取更新数据(后面做了定时任务)。然后根据该表统计每日的新增数量/更新数量进行制图制表,向上级汇报。
思路构建
选用sqlalchemy+mysqlconnector,连接数据库,创建表,对指定表进行CRUD
fromsqlalchemyimportexists,Column,Integer,String,ForeignKey,DateTime,Text,func fromsqlalchemy.ext.declarativeimportdeclarative_base fromsqlalchemyimportcreate_engine fromsqlalchemy.ormimportsessionmaker fromconf.parseConfigimportparseConf #从配置文件中获取数据库信息 host=parseConf.get_conf('MySQLInfo','host') port=parseConf.get_conf('MySQLInfo','port') dbname=parseConf.get_conf('MySQLInfo','dbname') usernm=parseConf.get_conf('MySQLInfo','usernm') passwd=parseConf.get_conf('MySQLInfo','passwd') #连接数据库 engine_str="mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}".format(usernm,passwd,host,port,dbname) #创建的数据库引擎 engine=create_engine(engine_str,encoding='utf-8') #创建session类型 DBSession=sessionmaker(bind=engine) #创建session对象,进行增删改查: session=DBSession() #实例化官宣模型-Base就是ORM模型 Base=declarative_base() #创建服务单表继承Base基类 classServiceOrder(Base): __tablename__='serviceOrderTable' serviceOrderId=Column(String(32),primary_key=True,comment='服务单ID') serviceDesc=Column(String(512),comment='服务说明') transferTimes=Column(String(32),comment='转派次数') #创建更新时间,对数据的更新进行记录 updateTime=Column(DateTime,server_default=func.now(),onupdate=func.now()) definit_db(): Base.metadata.create_all(engine) defdrop_db(): Base.metadata.drop_all(engine) if__name__=="__main__": #每次执行时会判断表的存在性对于数据库中不存在的表进行创建已存在的表则可以直接进行增删改查 init_db() ###首先讲一下使用sqlalchemy执行原生的sql语句### #方式一: res=session.execute('select*fromServiceOrder')#res是获取的对象 all_res_list=res.fetchall()#all_res_list具体的结果是列表 print(all_res_list)#结果:[('数据提取',),('非数据提取',)] #方式二: conn=engine.connect() res=conn.execute('select*fromServiceOrder') all_res_list=res.fetchall() ###使用创建好的session对象进行增删改查### #插入单条数据 #创建新service0rder对象 new_serviceorder=ServiceOrder(serviceOrderId='001',serviceDesc='ack',transferTimes='9') #添加到session session.add(new_serviceorder) #提交即保存到数据库 session.commit() #插入多条数据 serviceorder_list=[ServiceOrder(serviceOrderId='002',serviceDesc='好的',transferTimes='9'),ServiceOrder(serviceOrderId='003',serviceDesc='起床',transferTimes='9')] session.add_all(serviceorder_list) session.commit() #session.close() #查询 #查询是否存在结果是布尔值 it_exists=session.query( exists().where(ServiceOrder.serviceOrderId=='002') ).scalar() #创建Query查询,filter是where条件 #调用one()first()返回唯一行,如果调用all()则已列表的形式返回所有行: server_order=session.query(ServiceOrder).filter(ServiceOrder.serviceOrderId=='003').first() print(server_order.serviceDesc) serciceorders=session.query(ServiceOrder).filter(ServiceOrder.serviceDesc=='好的').all() #改更新数据 #数据更新,将值为Mack的serviceDesc修改为Danny update_obj=session.query(ServiceOrder).filter(ServiceOrder.serviceDesc=='Mack').update({"serviceDesc":"Danny"}) #或则 update_objp=session.query(ServiceOrder).filter(ServiceOrder.serviceDesc=='Mack').first() update_objp.serviceDesc='Danny' session.commit() #删除 update_objk=session.query(ServiceOrder).filter(ServiceOrder.serviceDesc=='Mack').delete() #或则 update_objkp=session.query(ServiceOrder).filter(ServiceOrder.serviceDesc=='Mack').one() update_objkp.delete() session.commit() session.close()
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。