在Python程序和Flask框架中使用SQLAlchemy的教程
ORM江湖
曾几何时,程序员因为惧怕SQL而在开发的时候小心翼翼的写着sql,心中总是少不了恐慌,万一不小心sql语句出错,搞坏了数据库怎么办?又或者为了获取一些数据,什么内外左右连接,函数存储过程等等。毫无疑问,不搞懂这些,怎么都觉得变扭,说不定某天就跳进了坑里,叫天天不应,喊地地不答。
ORM的出现,让畏惧SQL的开发者,在坑里看见了爬出去的绳索,仿佛天空并不是那么黑暗,至少再暗,我们也有了眼睛。顾名思义,ORM对象关系映射,简而言之,就是把数据库的一个个table(表),映射为编程语言的class(类)。
python中比较著名的ORM框架有很多,大名顶顶的SQLAlchemy是python世界里当仁不让的ORM框架。江湖中peewee,strom,pyorm,SQLObject各领风骚,可是最终还是SQLAlchemy傲视群雄。
SQLAlchemy简介
SQLAlchemy分为两个部分,一个用于ORM的对象映射,另外一个是核心的SQLexpression。第一个很好理解,纯粹的ORM,后面这个不是ORM,而是DBAPI的封装,当然也提供了很多方法,避免了直接写sql,而是通过一些sql表达式。使用SQLAlchemy则可以分为三种方式。
- 使用sqlexpression,通过SQLAlchemy的方法写sql表达式,简介的写sql
- 使用rawsql,直接书写sql
- 使用ORM避开直接书写sql
本文先探讨SQLAlchemy的sqlexpresstion部分的用法。主要还是跟着官方的SQLExpressionLanguageTutorial.介绍
为什么要学习sqlexpresstion,而不直接上ORM?因为后面这个两个是orm的基础。并且,即是不使用orm,后面这两个也能很好的完成工作,并且代码的可读性更好。纯粹把SQLAlchemy当成dbapi使用。首先SQLAlchemy内建数据库连接池,解决了连接操作相关繁琐的处理。其次,提供方便的强大的log功能,最后,复杂的查询语句,依靠单纯的ORM比较难实现。
实战
连接数据库
首先需要导入sqlalchemy库,然后建立数据库连接,这里使用mysql。通过create_engine方法进行
fromsqlalchemyimportcreate_engine engine=create_engine("mysql://root:@localhost:3306/webpy?charset=utf8",encoding="utf-8",echo=True)
create_engine方法进行数据库连接,返回一个db对象。里面的参数表示
数据库类型://用户名:密码(没有密码则为空,不填)@数据库主机地址/数据库名?编码
echo=True是为了方便控制台logging输出一些sql信息,默认是False
通过这个engine对象可以直接execute进行查询,例如engine.execute("SELECT*FROMuser")也可以通过engine获取连接在查询,例如conn=engine.connect()通过conn.execute()方法进行查询。两者有什么差别呢?
直接使用engine的execute执行sql的方式,叫做connnectionless执行,
借助engine.connect()获取conn,然后通过conn执行sql,叫做connection执行
主要差别在于是否使用transaction模式,如果不涉及transaction,两种方法效果是一样的.官网推荐使用后者。
定义表
定义数据表,才能进行sql表达式的操作,毕竟sql表达式的表的确定,是sqlalchemy制定的,如果数据库已经存在了数据表还需要定义么?当然,这里其实是一个映射关系,如果不指定,查询表达式就不知道是附加在那个表的操作,当然定义的时候,注意表名和字段名,代码和数据的必须保持一致。定义好之后,就能创建数据表,一旦创建了,再次运行创建的代码,数据库是不会创建的。
#-*-coding:utf-8-*- __author__='ghost' fromsqlalchemyimportcreate_engine,Table,Column,Integer,String,MetaData,ForeignKey #连接数据库 engine=create_engine("mysql://root:@localhost:3306/webpy?charset=utf8",encoding="utf-8",echo=True) #获取元数据 metadata=MetaData() #定义表 user=Table('user',metadata, Column('id',Integer,primary_key=True), Column('name',String(20)), Column('fullname',String(40)), ) address=Table('address',metadata, Column('id',Integer,primary_key=True), Column('user_id',None,ForeignKey('user.id')), Column('email',String(60),nullable=False) ) #创建数据表,如果数据表存在,则忽视 metadata.create_all(engine) #获取数据库连接 conn=engine.connect()
插入insert
有了数据表和连接对象,对应数据库操作就简单了。
>>>i=user.insert()#使用查询 >>>i <sqlalchemy.sql.dml.Insertobjectat0x0000000002637748> >>>printi#内部构件的sql语句 INSERTINTO"user"(id,name,fullname)VALUES(:id,:name,:fullname) >>>u=dict(name='jack',fullname='jackJone') >>>r=conn.execute(i,**u)#执行查询,第一个为查询对象,第二个参数为一个插入数据字典,如果插入的是多个对象,就把对象字典放在列表里面 >>>r <sqlalchemy.engine.result.ResultProxyobjectat0x0000000002EF9390> >>>r.inserted_primary_key#返回插入行主键id [4L] >>>addresses [{'user_id':1,'email':'jack@yahoo.com'},{'user_id':1,'email':'jack@msn.com'},{'user_id':2,'email':'www@www.org'},{'user_id':2,'email':'wendy@aol.com'}] >>>i=address.insert() >>>r=conn.execute(i,addresses)#插入多条记录 >>>r <sqlalchemy.engine.result.ResultProxyobjectat0x0000000002EB5080> >>>r.rowcount#返回影响的行数 4L >>>i=user.insert().values(name='tom',fullname='tomJim') >>>i.compile() <sqlalchemy.sql.compiler.SQLCompilerobjectat0x0000000002F6F390> >>>printi.compile() INSERTINTO"user"(name,fullname)VALUES(:name,:fullname) >>>printi.compile().params {'fullname':'tomJim','name':'tom'} >>>r=conn.execute(i) >>>r.rowcount 1L
查询select
查询方式很灵活,多数时候使用sqlalchemy.sql下面的select方法
>>>s=select([user])#查询user表 >>>s <sqlalchemy.sql.selectable.Selectat0x25a7748;Selectobject> >>>prints SELECT"user".id,"user".name,"user".fullname FROM"user"
如果需要查询自定义的字段,可是使用user的cloumn对象,例如
>>>user.c#表user的字段column对象 <sqlalchemy.sql.base.ImmutableColumnCollectionobjectat0x0000000002E804A8> >>>printuser.c ['user.id','user.name','user.fullname'] >>>s=select([user.c.name,user.c.fullname]) >>>r=conn.execute(s) >>>r <sqlalchemy.engine.result.ResultProxyobjectat0x00000000025A7748> >>>r.rowcount#影响的行数 5L >>>ru=r.fetchall() >>>ru [(u'hello',u'helloworld'),(u'Jack',u'JackJone'),(u'Jack',u'JackJone'),(u'jack',u'jackJone'),(u'tom',u'tomJim')] >>>r <sqlalchemy.engine.result.ResultProxyobjectat0x00000000025A7748> >>>r.closed#只要r.fetchall()之后,就会自动关闭ResultProxy对象 True
同时查询两个表
>>>s=select([user.c.name,address.c.user_id]).where(user.c.id==address.c.user_id)#使用了字段和字段比较的条件 >>>s <sqlalchemy.sql.selectable.Selectat0x2f03390;Selectobject> >>>prints SELECT"user".name,address.user_id FROM"user",address WHERE"user".id=address.user_id
操作符
>>>printuser.c.id==address.c.user_id#返回一个编译的字符串 "user".id=address.user_id >>>printuser.c.id==7 "user".id=:id_1#编译成为带参数的sql语句片段字符串 >>>printuser.c.id!=7 "user".id!=:id_1 >>>printuser.c.id>7 "user".id>:id_1 >>>printuser.c.id==None "user".idISNULL >>>printuser.c.id+address.c.id#使用两个整形的变成+ "user".id+address.id >>>printuser.c.name+address.c.email#使用两个字符串变成|| "user".name||address.email
操作连接
这里的连接指条件查询的时候,逻辑运算符的连接,即andor和not
>>>printand_( user.c.name.like('j%'), user.c.id==address.c.user_id, or_( address.c.email=='wendy@aol.com', address.c.email=='jack@yahoo.com' ), not_(user.c.id>5)) "user".nameLIKE:name_1AND"user".id=address.user_idAND(address.email=:email_1ORaddress.email=:email_2)AND"user".id<=:id_1 >>>
得到的结果为编译的sql语句片段,下面看一个完整的例子
>>>se_sql=[(user.c.fullname+","+address.c.email).label('title')] >>>wh_sql=and_( user.c.id==address.c.user_id, user.c.name.between('m','z'), or_( address.c.email.like('%@aol.com'), address.c.email.like('%@msn.com') ) ) >>>printwh_sql "user".id=address.user_idAND"user".nameBETWEEN:name_1AND:name_2AND(address.emailLIKE:email_1ORaddress.emailLIKE:email_2) >>>s=select(se_sql).where(wh_sql) >>>prints SELECT"user".fullname||:fullname_1||address.emailAStitle FROM"user",address WHERE"user".id=address.user_idAND"user".nameBETWEEN:name_1AND:name_2AND(address.emailLIKE:email_1ORaddress.emailLIKE:email_2) >>>r=conn.execute(s) >>>r.fetchall()
使用rawsql方式
遇到负责的sql语句的时候,可以使用sqlalchemy.sql下面的text函数。将字符串的sql语句包装编译成为execute执行需要的sql对象。例如:、
>>>text_sql="SELECTid,name,fullnameFROMuserWHEREid=:id"#原始sql语句,参数用(:value)表示 >>>s=text(text_sql) >>>prints SELECTid,name,fullnameFROMuserWHEREid=:id >>>s <sqlalchemy.sql.elements.TextClauseobjectat0x0000000002587668> >>>conn.execute(s,id=3).fetchall()#id=3传递:id参数 [(3L,u'Jack',u'JackJone')]
连接join
连接有join和outejoin两个方法,join有两个参数,第一个是join的表,第二个是on的条件,joing之后必须要配合select_from方法:
>>>printuser.join(address) "user"JOINaddressON"user".id=address.user_id#因为开启了外键,所以join能只能识别on条件 >>>printuser.join(address,address.c.user_id==user.c.id)#手动指定on条件 "user"JOINaddressONaddress.user_id="user".id >>>s=select([user.c.name,address.c.email]).select_from(user.join(address,user.c.id==address.c.user_id))#被jion的sql语句需要用select_from方法配合 >>>s <sqlalchemy.sql.selectable.Selectat0x2eb63c8;Selectobject> >>>prints SELECT"user".name,address.email FROM"user"JOINaddressON"user".id=address.user_id >>>conn.execute(s).fetchall() [(u'hello',u'jack@yahoo.com'),(u'hello',u'jack@msn.com'),(u'hello',u'jack@yahoo.com'),(u'hello',u'jack@msn.com'),(u'Jack',u'www@www.org'),(u'Jack',u'wendy@aol.com'),(u'Jack',u'www@www.org'),(u'Jack',u'wendy@aol.com')]
排序分组分页
排序使用order_by方法,分组是group_by,分页自然就是limit和offset两个方法配合
>>>s=select([user.c.name]).order_by(user.c.name)#order_by >>>prints SELECT"user".name FROM"user"ORDERBY"user".name >>>s=select([user]).order_by(user.c.name.desc()) >>>prints SELECT"user".id,"user".name,"user".fullname FROM"user"ORDERBY"user".nameDESC >>>s=select([user]).group_by(user.c.name)#group_by >>>prints SELECT"user".id,"user".name,"user".fullname FROM"user"GROUPBY"user".name >>>s=select([user]).order_by(user.c.name.desc()).limit(1).offset(3)#limit(1).offset(3) >>>prints SELECT"user".id,"user".name,"user".fullname FROM"user"ORDERBY"user".nameDESC LIMIT:param_1OFFSET:param_2 [(4L,u'jack',u'jackJone')]
更新update
前面都是一些查询,更新和插入的方法很像,都是表下面的方法,不同的是,update多了一个where方法用来选择过滤
>>>s=user.update() >>>prints UPDATE"user"SETid=:id,name=:name,fullname=:fullname >>>s=user.update().values(fullname=user.c.name)#values指定了更新的字段 >>>prints UPDATE"user"SETfullname="user".name >>>s=user.update().where(user.c.name=='jack').values(name='ed')#where进行选择过滤 >>>prints UPDATE"user"SETname=:nameWHERE"user".name=:name_1 >>>r=conn.execute(s) >>>printr.rowcount#影响行数 3
还有一个高级用法,就是一次命令执行多个记录的更新,需要用到bindparam方法
>>>s=user.update().where(user.c.name==bindparam('oldname')).values(name=bindparam('newname'))#oldname与下面的传入的从拿书进行绑定,newname也一样 >>>prints UPDATE"user"SETname=:newnameWHERE"user".name=:oldname >>>u=[{'oldname':'hello','newname':'edd'}, {'oldname':'ed','newname':'mary'}, {'oldname':'tom','newname':'jake'}] >>>r=conn.execute(s,u) >>>r.rowcount 5L
删除delete
删除比较容易,调用delete方法即可,不加where过滤,则删除所有数据,但是不会drop掉表,等于清空了数据表
>>>r=conn.execute(address.delete())#清空表 >>>printr <sqlalchemy.engine.result.ResultProxyobjectat0x0000000002EAF550> >>>r.rowcount 8L >>>r=conn.execute(users.delete().where(users.c.name>'m'))#删除记录 >>>r.rowcount 3L
flask-sqlalchemy
SQLAlchemy已经成为了python世界里面orm的标准,flask是一个轻巧的web框架,可以自由的使用orm,其中flask-sqlalchemy是专门为flask指定的插件。
安装flask-sqlalchemy
pipinstallflask-sqlalchemy
初始化sqlalchemy
fromflaskimportFlask fromflask.ext.sqlalchemyimportSQLAlchemy app=Flask(__name__) #dialect+driver://username:password@host:port/database?charset=utf8 #配置sqlalchemy数据库驱动://数据库用户名:密码@主机地址:端口/数据库?编码 app.config['SQLALCHEMY_DATABASE_URI']='mysql://root:@localhost:3306/sqlalchemy?charset=utf8' #初始化 db=SQLAlchemy(app)
定义model
classUser(db.Model): """定义了三个字段,数据库表名为model名小写 """ id=db.Column(db.Integer,primary_key=True) username=db.Column(db.String(80),unique=True) email=db.Column(db.String(120),unique=True) def__init__(self,username,email): self.username=username self.email=email def__repr__(self): return'<User%r>'%self.username defsave(self): db.session.add(self) db.session.commit()
创建数据表
数据包的创建使用sqlalchemyapp,如果表已经存在,则忽略,如果不存在,则新建
>>>fromyourappimportdb,User >>>u=User(username='admin',email='admin@example.com')#创建实例 >>>db.session.add(u)#添加session >>>db.session.commit()#提交查询 >>>users=User.query.all()#查询
需要注意的是,如果要插入中文,必须插入unicode字符串
>>>u=User(username=u'人世间',email='rsj@example.com') >>>u.save()
定义关系
关系型数据库,最重要的就是关系。通常关系分为一对一(例如无限级栏目),一对多(文章和栏目),多对多(文章和标签)
onetomany:
我们定义一个Category(栏目)和Post(文章),两者是一对多的关系,一个栏目有许多文章,一个文章属于一个栏目。
classCategory(db.Model): id=db.Column(db.Integer,primary_key=True) name=db.Column(db.String(50)) def__init__(self,name): self.name=name def__repr__(self): return'<Category%r>'%self.name classPost(db.Model): """定义了五个字段,分别是id,title,body,pub_date,category_id """ id=db.Column(db.Integer,primary_key=True) title=db.Column(db.String(80)) body=db.Column(db.Text) pub_date=db.Column(db.String(20)) #用于外键的字段 category_id=db.Column(db.Integer,db.ForeignKey('category.id')) #外键对象,不会生成数据库实际字段 #backref指反向引用,也就是外键Category通过backref(post_set)查询Post category=db.relationship('Category',backref=db.backref('post_set',lazy='dynamic')) def__init__(self,title,body,category,pub_date=None): self.title=title self.body=body ifpub_dateisNone: pub_date=time.time() self.pub_date=pub_date self.category=category def__repr__(self): return'<Post%r>'%self.title defsave(self): db.session.add(self) db.session.commit()
如何使用查询呢?
>>>c=Category(name='Python') >>>c <Category'Python'> >>>c.post_set <sqlalchemy.orm.dynamic.AppenderBaseQueryobjectat0x0000000003B58F60> >>>c.post_set.all() [] >>>p=Post(title='hellopython',body='pythoniscool',category=c) >>>p.save() >>>c.post_set <sqlalchemy.orm.dynamic.AppenderBaseQueryobjectat0x0000000003B73710> >>>c.post_set.all()#反向查询 [<Postu'hellopython'>] >>>p <Postu'hellopython'> >>>p.category <Categoryu'Python'> #也可以使用category_id字段来添加 >>>p=Post(title='helloflask',body='flaskiscool',category_id=1) >>>p.save()
manytomany(评论已经指出,这样的做法无法关联删除,简书没有删除线格式,多多对例子作废,在此提示,以免被误导)
对于多对多的关系,往往是定义一个两个model的id的另外一张表,例如Post和Tag之间是多对多,需要定义一个Post_Tag的表
post_tag=db.Table('post_tag', db.Column('post_id',db.Integer,db.ForeignKey('post.id')), db.Column('tag_id',db.Integer,db.ForeignKey('tag.id')) ) classPost(db.Model): id=db.Column(db.Integer,primary_key=True) #...省略 #定义一个反向引用,tag可以通过post_set查询到post的集合 tags=db.relationship('Tag',secondary=post_tag, backref=db.backref('post_set',lazy='dynamic')) classTag(db.Model): id=db.Column(db.Integer,primary_key=True) content=db.Column(db.String(10),unique=True) #定义反向查询 posts=db.relationship('Post',secondary=post_tag, backref=db.backref('tag_set',lazy='dynamic')) def__init__(self,content): self.content=content defsave(self): db.session.add(self) db.session.commit()
查询:
>>>tag_list=[] >>>tags=['python','flask','ruby','rails'] >>>fortagintags: t=Tag(tag) tag_list.append(t) >>>tag_list [<f_sqlalchemy.Tagobjectat0x0000000003B7CF28>,<f_sqlalchemy.Tagobjectat0x0000000003B7CF98>,<f_sqlalchemy.Tagobjectat0x0000000003B7CEB8>,<f_sqlalchemy.Tagobjectat0x0000000003B7CE80>] >>>p <Postu'hellopython'> >>>p.tags [] >>>p.tags=tag_list#添加多对多的数据 >>>p.save() >>>p.tags [<f_sqlalchemy.Tagobjectat0x0000000003B7CF28>,<f_sqlalchemy.Tagobjectat0x0000000003B7CF98>,<f_sqlalchemy.Tagobjectat0x0000000003B7CEB8>,<f_sqlalchemy.Tagobjectat0x0000000003B7CE80>] >>>p.tag_set#反向查询 <sqlalchemy.orm.dynamic.AppenderBaseQueryobjectat0x0000000003B7C080> >>>p.tag_set.all() [<f_sqlalchemy.Tagobjectat0x0000000003B7CF28>,<f_sqlalchemy.Tagobjectat0x0000000003B7CF98>,<f_sqlalchemy.Tagobjectat0x0000000003B7CEB8>,<f_sqlalchemy.Tagobjectat0x0000000003B7CE80>] >>>t=Tag.query.all()[1] >>>t <f_sqlalchemy.Tagobjectat0x0000000003B7CF28> >>>t.content u'python' >>>t.posts [<Postu'hellopython'>] >>>t.post_set <sqlalchemy.orm.dynamic.AppenderBaseQueryobjectat0x0000000003B7C358> >>>t.post_set.all() [<Postu'hellopython'>] selfonetoone
自身一对一也是常用的需求,比如无限分级栏目
classCategory(db.Model): id=db.Column(db.Integer,primary_key=True) name=db.Column(db.String(50)) #父级id pid=db.Column(db.Integer,db.ForeignKey('category.id')) #父栏目对象 pcategory=db.relationship('Category',uselist=False,remote_side=[id],backref=db.backref('scategory',uselist=False)) def__init__(self,name,pcategory=None): self.name=name self.pcategory=pcategory def__repr__(self): return'<Category%r>'%self.name defsave(self): db.session.add(self) db.session.commit()
查询:
>>>p=Category('Python') >>>p <Category'Python'> >>>p.pid >>>p.pcategory#查询父栏目 >>>p.scategory#查询子栏目 >>>f=Category('Flask',p) >>>f.save() >>>f <Categoryu'Flask'> >>>f.pid 1L >>>f.pcategory#查询父栏目 <Categoryu'Python'> >>>f.scategory#查询父栏目 >>>p.scategory#查询子栏目 <Categoryu'Flask'>
关于flask-sqlalchemy定义models的简单应用就这么多,更多的技巧在于如何查询。