SQLAlchemy查询过滤器
SQLAlchemy查询执行函数
SQLAlchemy查询过滤器
SQLAlchemy查询执行函数
# coding=utf-8
from urllib import quote_plus as urlquote
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey
import MySQLdb
用此方法链接可以指定UTF8编码,同时解决密码中含有特定字符,比如含有@,则把密码部分进行URL编码
echo=True 会显示每条执行的 SQL 语句
max_overflow 最大链接数
connstr = "mysql+mysqldb://root:%[email protected]:3306/dbname?charset=utf8" % urlquote('password')
engine = create_engine(connstr,echo=True,max_overflow=5)
metadata = MetaData()
user = Table('user', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)
color = Table('color', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)
metadata.create_all(engine)
conn = engine.connect()
#插入数据
engine.execute(
"INSERT INTO color( name) VALUES ('test');"
)
result = engine.execute('select * from color')
print(result.fetchall())
conn.execute(user.insert(),{'name':'test'})
sql = user.insert().values(name='test2')
conn.execute(sql)
#删除一条user表里的 条件是id大于1的
#sql = user.delete().where(user.c.id > 1)
#执行
#conn.execute(sql)
#更新
#把名字为test的修改为aaa
#sql = user.update().where(user.c.name == 'test').values(name='aaa')
#conn.execute(sql)
print "查询user表里的内容"
sql = select([user, ])
res =conn.execute(sql)
print res.fetchall()
print '查询user表下的id'
sql = select([user.c.id, ])
res =conn.execute(sql)
print res.fetchall()
print '查询user表和color表的name,条件是user表的id1=color的id1'
sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id)
res =conn.execute(sql)
print res.fetchall()
print '查询user表的name,并按照条件排序'
#按照名字排序
sql = select([user.c.name]).order_by(user.c.name)
res =conn.execute(sql)
print res.fetchall()
print '按照id排序'
sql = select([user.c.name]).order_by(user.c.id)
res =conn.execute(sql)
print res.fetchall()
print '查询user表的name,并按照条件分组'
sql = select([user]).group_by(user.c.name)
res =conn.execute(sql)
print res.fetchall()
#关闭链接
conn.close()
在flask的SQLAlchemy中创建表,也是存在 ForeignKey(一对多) 与 ManyToMany(多对多) 的存在,以下是在models中单表、一对多、多对多的创建方式。
models.py代码如下:
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column,Integer,String,Text,ForeignKey,DateTime,UniqueConstraint,Index
from sqlalchemy.orm import relationship
Base = declarative_base()
# 【单表创建的示例】
class Users(Base):
__tablename__ = 'users'
id = Column(Integer,primary_key=True)
name = Column(String(32),index=True)
age = Column(Integer,default=18)
email = Column(String(32),unique=True) # unique = True 表示该字段中的值唯一
ctime = Column(DateTime,default=datetime.datetime.now())
extra = Column(Text,nullable=True)
__table_args__ = (
UniqueConstraint('id','name',name='unique_id_name'), # 创建表中id和name字段的唯一索引,最后一个参数是索引名,注意前面要写name=
Index('ix_name_email','name','email') # 创建表中多个字段的普通索引
)
# 【一对多示例】
# 表1
class Hobby(Base):
__tablename__ = 'hobby'
id = Column(Integer,primary_key=True)
caption = Column(String(10),nullable=False,default='台球')
# 表2
class Person(Base):
__tablename__ = 'person'
nid = Column(Integer,primary_key=True)
name = Column(String(32),index=True,nullable=True)
# 创建一对多的关系
hobby_id = Column(Integer,ForeignKey('hobby.id'))
# 此字段不在表中生成,只是为了方便调用Hooby表;
# 正向查找时可以直接用“hobby.”调用“Hobby”表中的字段。
# 在hobby表中反向查找时可以用“pers”调用“Person”表中的字段。
hobby = relationship('Hobby',backref='pers')
# 【多对多示例】
# 两表的关系表
class Server2Group(Base):
__tablename__ = 'server2group'
id = Column(Integer,primary_key=True,autoincrement=True)
server_id = Column(Integer,ForeignKey('server.id'))
group_id = Column(Integer,ForeignKey('group.id'))
# 表1
class Server(Base):
__tablename__ = 'server'
id = Column(Integer,primary_key=True,autoincrement=True)
hostname = Column(String(15),nullable=True,unique=True)
# 用于方便查找的标识
groups = relationship('Group',secondary='server2group',backref='servers')
# 表2
class Group(Base):
__tablename__ = 'group'
id = Column(Integer,primary_key=True,autoincrement=True)
name = Column(String(32),nullable=True,unique=True)
def create_all():
"""
根据类创建数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:[email protected]:3306/test0621?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.create_all(engine)
def drop_db():
"""
根据类删除数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:[email protected]:3306/test0621?charset=utf8",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.drop_all(engine)
if __name__ == '__main__':
drop_db()
create_all()
下面就开始让你见证orm的nb之处,盘古开天劈地之前,我们创建一个表是这样的
CREATE TABLE user (
id INTEGER NOT NULL AUTO_INCREMENT,
name VARCHAR(32),
password VARCHAR(64),
PRIMARY KEY (id)
)
这只是最简单的sql表,如果再加上外键关联什么的,一般程序员的脑容量是记不住那些sql语句的,于是有了orm,实现上面同样的功能,代码如下
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
engine = create_engine("mysql+pymysql://root:alex3714@localhost/testdb",
encoding='utf-8', echo=True)
Base = declarative_base() #生成orm基类
class User(Base):
__tablename__ = 'user' #表名
id = Column(Integer, primary_key=True)
name = Column(String(32))
password = Column(String(64))
Base.metadata.create_all(engine) #创建表结构
你说,娘那个腚的,并没有感觉代码量变少啊,呵呵, 孩子莫猴急,好戏在后面
Lazy Connecting
The Engine, when first returned by create_engine(), has not actually tried to connect to the database yet; that happens only the first time it is asked to perform a task against the database.
除上面的创建之外,还有一种创建表的方式,虽不常用,但还是看看吧
http://www.cnblogs.com/alex3714/articles/5978329.html#
事实上,我们用第一种方式创建的表就是基于第2种方式的再封装。
最基本的表我们创建好了,那我们开始用orm创建一条数据试试
Session_class = sessionmaker(bind=engine) #创建与数据库的会话session class ,注意,这里返回给session的是个class,不是实例
Session = Session_class() #生成session实例
user_obj = User(name="alex",password="alex3714") #生成你要创建的数据对象
print(user_obj.name,user_obj.id) #此时还没创建对象呢,不信你打印一下id发现还是None
Session.add(user_obj) #把要创建的数据对象添加到这个session里, 一会统一创建
print(user_obj.name,user_obj.id) #此时也依然还没创建
Session.commit() #现此才统一提交,创建数据
我擦,写这么多代码才创建一条数据,你表示太tm的费劲了,正要转身离开,我拉住你的手不放开,高潮还没到。。
my_user = Session.query(User).filter_by(name="alex").first()
print(my_user)
此时你看到的输出是这样的应该
<__main__.User object at 0x105b4ba90>
我擦,这是什么?这就是你要的数据呀, 只不过sqlalchemy帮你把返回的数据映射成一个对象啦,这样你调用每个字段就可以跟调用对象属性一样啦,like this..
print(my_user.id,my_user.name,my_user.password)
输出
1 alex alex3714
不过刚才上面的显示的内存对象对址你是没办法分清返回的是什么数据的,除非打印具体字段看一下,如果想让它变的可读,只需在定义表的类下面加上这样的代码
def __repr__(self):
return "<User(name='%s', password='%s')>" % (
self.name, self.password)
my_user = Session.query(User).filter_by(name="alex").first()
my_user.name = "Alex Li"
Session.commit()
my_user = Session.query(User).filter_by(id=1).first()
my_user.name = "Jack"
fake_user = User(name='Rain', password='12345')
Session.add(fake_user)
print(Session.query(User).filter(User.name.in_(['Jack','rain'])).all() ) #这时看session里有你刚添加和修改的数据
Session.rollback() #此时你rollback一下
print(Session.query(User).filter(User.name.in_(['Jack','rain'])).all() ) #再查就发现刚才添加的数据没有了。
# Session
# Session.commit()
print(Session.query(User.name,User.id).all() )
objs = Session.query(User).filter(User.id>0).filter(User.id<7).all()
上面2个filter的关系相当于 user.id >1 AND user.id <7 的效果
Session.query(User).filter(User.name.like("Ra%")).count()
分组
from sqlalchemy import func
print(Session.query(func.count(User.name),User.name).group_by(User.name).all() )
相当于原生sql为
http://www.cnblogs.com/alex3714/articles/5978329.html#
输出为
[(1, ‘Jack’), (2, ‘Rain’)]
外键关联
我们创建一个addresses表,跟user表关联
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class Address(Base):
__tablename__ = 'addresses'
id = Column(Integer, primary_key=True)
email_address = Column(String(32), nullable=False)
user_id = Column(Integer, ForeignKey('user.id'))
user = relationship("User", backref="addresses") #这个nb,允许你在user表里通过backref字段反向查出所有它在addresses表里的关联项
def __repr__(self):
return "<Address(email_address='%s')>" % self.email_address
The relationship.back_populates parameter is a newer version of a very common SQLAlchemy feature calledrelationship.backref. The relationship.backref parameter hasn’t gone anywhere and will always remain available! The relationship.back_populates is the same thing, except a little more verbose and easier to manipulate. For an overview of the entire topic, see the section Linking Relationships with Backref.
表创建好后,我们可以这样反查试试
obj = Session.query(User).first()
for i in obj.addresses: #通过user对象反查关联的addresses记录
print(i)
addr_obj = Session.query(Address).first()
print(addr_obj.user.name) #在addr_obj里直接查关联的user表
创建关联对象
obj = Session.query(User).filter(User.name=='rain').all()[0]
print(obj.addresses)
obj.addresses = [Address(email_address="[email protected]"), #添加关联对象
Address(email_address="[email protected]")]
Session.commit()
Common Filter Operators
Here’s a rundown of some of the most common operators used in filter():
query.filter(User.name == 'ed')
query.filter(User.name != 'ed')
query.filter(User.name.like(‘%ed%’))
query.filter(User.name.in_(['ed', 'wendy', 'jack']))
# works with query objects too:
query.filter(User.name.in_( session.query(User.name).filter(User.name.like(‘%ed%’))
))
query.filter(User.name == None)
# alternatively, if pep8/linters are a concern
query.filter(User.name.is_(None))
query.filter(User.name != None)
# alternatively, if pep8/linters are a concern
query.filter(User.name.isnot(None))
SQLAlchemy Documentation, Release 1.1.0b1
# use and_()
from sqlalchemy import and_
query.filter(and_(User.name == ‘ed’, User.fullname == ‘Ed Jones’))
# or send multiple expressions to .filter()
query.filter(User.name == 'ed', User.fullname == 'Ed Jones')
# or chain multiple filter()/filter_by() calls
query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')
Note: Makesureyouuseand_()andnotthePythonandoperator! • OR:
Note: Makesureyouuseor_()andnotthePythonoroperator! • MATCH:
query.filter(User.name.match(‘wendy’))
Note: match() uses a database-specific MATCH or CONTAINS f
原文:http://www.cnblogs.com/alex3714/articles/5978329.html
背景:
用底层的sql写的话,相当于通过pymysql 游标的方式连接“http://blog.51cto.com/jacksoner/2113454 ”,为了避免把sql语句写死在代码里,有没有一种方法直接把原生sql封装好了并且以你熟悉的方式操作,像面向对象那样?
ORM就是对象映射关系程序。相当于ORM帮我们SQL写成类的形式,然后通过类来调用,获取,而不是写底层的sql(insert,update,select)来获取。
ORM 相当于把数据库也给你实例化了,在代码操作mysql中级又加了orm这一层。
orm的优点:
1、隐藏了数据访问细节,“封闭”的通用数据库交互,ORM的核心。他使得我们的通用数据库交互变得简单易行,并且完全不用考虑该死的SQL语句。快速开发,由此而来。
2、ORM使我们构造固化数据结构变得简单易行。
在Python中,最有名的ORM框架是SQLAlchemy。用户包括openstack\Dropbox等知名公司或应用
安装:
pip install SQLAlchemy
SQLAlchemy==1.2.7
pip install pymysql
数据库:
CentOS Linux release 7.4.1708
mysql版本:5.6.35
IP:192.168.1.48
用户:root
密码:123456
创建一个超级用户:grant all privileges on . to jacker@’%’ identified by ‘123456’ WITH GRANT OPTION;
创建一个数据库: mysql> create database sqlalchemy;
方式一:
import pymysql
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column,Integer,String #创建列,整型,字符串型
engine = create_engine('mysql+pymysql://jacker:[email protected]/sqlalchemy',encoding='utf-8',echo=True)
#echo=True:显示创建sql信息
Base = declarative_base() # 生成orm基类
#这里的继承Base的类,创建user表,字段id(整型,主键) name(列 字符串32位) 密码(列 字符串64位)
class User(Base):
__tablename__ = 'user' # 表名
id = Column(Integer, primary_key=True)
name = Column(String(32))
password = Column(String(64))
Base.metadata.create_all(engine) #创建表结构 (这里是父类调子类)
运行结果:
方式二 创建表,并写入数据:
创建表:
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
engine = create_engine('mysql+pymysql://jacker:[email protected]/sqlalchemy',encoding='utf-8')
metadata = MetaData(engine)
teacher = Table('user', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(50), ),
Column('fullname', Integer),
Column('password', String(10)),
)
metadata.create_all(engine)
写入数据:
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
engine = create_engine('mysql+pymysql://jacker:[email protected]/sqlalchemy',encoding='utf-8')
#echo=True:显示信息
DBsession = sessionmaker(bind=engine)# 实例和engine绑定
session = DBsession() # 生成session实例,相当于游标
Base = declarative_base()# 生成orm基类
class User(Base):
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
name = Column(String(100))
fullname = Column(String(100))
password = Column(String(100))
#实例化操作
user1 = User(id=1001, name='ling', password=123)
user2 = User(id=1002, name='molin', password="123abc")
user3 = User(id=1003, name='karl', password=16)
#需要添加到session中,然后再提交,表里才可以有数据
session.add(user1)
session.add(user2)
session.add(user3)
session.commit()
验证:
my_user=session.query(User).filter_by(name='molin').first()#显示匹配name=molin的第一条数据,因为name='molin' 可能有多个值
my = session.query(User).filter_by().all() #显示所有
print(my) #这个是个列表,通过循环遍历
for i in my:
print(i.name)
print(i.password)
print(my_user.id,my_user.password,my_user.name) #调用
#session.commit() #查询就不用提交了
session.close()
'''
filter和filter_by 条件查询
filter_by(name="ling") 不能使用> < =
filter(Student.id>1001) 这个就必须使用Student.id 可以使用> < =等
'''
a = session.query(User).filter(User.id>1002).all()
for i in a:
print(i.name)
print(i.password)
session.close()
如果出现name相同的,想都打印出来的话:
#实例化操作
user1 = User(id=1001, name='test1', password=123)
user2 = User(id=1002, name='test2', password="123abc")
user4 = User(id=1004, name='test2', password="123abc")
user3 = User(id=1003, name='test3', password=16)
my_user = session.query(User).filter_by(name="test2").all() #my_user:这个是列表
for i in my_user:
print(i.id)
print(i.name)
print(i.password)
多条件查询
objs = Session.query(User).filter(User.id>0).filter(User.id<7).all()
上面2个filter的关系相当于 user.id >1 AND user.id <7 的效果
my_user = session.query(User).filter_by(name="molin").first()
my_user.name = "fenggf" # 查询出来之后直接赋值修改
my_user.passwork = "123qwe"
session.commit()
在上一遍文章中,我们增加了两个模型Questions和Comments,并为Users增加了avatar_path这个字段,然后通过这段代码更新到数据库:
with app.test_request_context():
db.drop_all()
db.create_all()
因为当使用过db.create_all()之后,再次直接使用db.create_all(),对模型的修改并不会更新到数据库,我们要使用db.drop_all()先把数据库中所有的表先删除掉,然后再db.create_all()一次。听上去是不是很麻烦?更糟糕的是,原先数据库的的数据也就没有了。所以我们不用这种简单粗暴的方式去更新数据库结构,而是借助flask-migrate这个专门用于迁移数据库的工具,它可以在保留数据库原始数据的情况下,完成模型的更新。此外,我们还将结合flask-script一起使用,简单来说flask-script让我们可以使用命令行去完成数据库迁移的操作。
在项目主文件夹下新建一个manage.py,代码如下:
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand
from HarpQA import app, db
from models import Users, Questions, Comments
manager = Manager(app)
migrate = Migrate(app, db)
manager.add_command('db', MigrateCommand)
if __name__ == '__main__':
manager.run()
首先导入相关的类,注意模型要全部导入过来,即使代码中并没有显式地使用它们。然后传入app或db来构建Manager和Migrate两个类的实例,最后将MigrateCommand的命令加入到manager中。
此时我们假设要更新模型的结构,在models.py的User模型结尾添加一行代码test = db.Column(db.Integer),然后点击PyCharm下方的Terminal,自动进入到了虚拟环境的命令行中,输入python manage.py db init来初始化,这一步主要是建立数据库迁移相关的文件和文件夹,只是在第一次需要使用。接着依次使用python manage.py db migrate和python manage.py db upgrade,待运行完成,查看users_infor表的结构,结果如下:
可以看到test字段已经添加到表中了。
承接上文,我们的Q&A demo,除了用户表,还需要存储所有问题内容的表questions_info和存储所有评论的表comments_info,并且都和users_info通过外键来关联。我们不排除后续需要更多表的可能性,把所有模型和视图函数写在一起看着也太混乱了!为此,我们新建一个models.py,把三个模型都放在这里。
由于是新建的models.py文件,我们同样要先在开头生成一个名为db的SQLAlchemy对象:
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
前文中我们给SQLAlchemy传入了Flask对象app作为参数,这里是不是也要从视图函数文件HarpQA.py导入那个app并传进去呢?并不可以,因为HarpQA.py也要使用到db(如db.session),这样就产生了循环引用,所以在这里不能传入app,而是回到HarpQA.py,使用db.init_app(app)将app和db绑定,避免了循环引用。
users_info表(Users模型)代码如下:
class Users(db.Model):
__tablename__ = 'users_info'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(32), nullable=False)
password = db.Column(db.String(100), nullable=False)
register_time = db.Column(db.DateTime, nullable=False, default=datetime.now())
# 我们新增了一个avatar_path字段来存用户头像图片文件的路径
avatar_path = db.Column(db.String(256), nullable=False, default='images/doraemon.jpg')
questions_info表(Questions模型)代码如下:
class Questions(db.Model):
__tablename__ = 'questions_info'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
title = db.Column(db.String(100), nullable=False)
content = db.Column(db.TEXT, nullable=False)
author_id = db.Column(db.Integer, db.ForeignKey('users_info.id'))
create_time = db.Column(db.DateTime, nullable=False, default=datetime.now())
author = db.relationship('Users', backref=db.backref('questions', order_by=create_time.desc()))
这个表存储所有问题的标题、内容、创建时间、作者ID,作者ID通过外键与用户表的ID关联,方式也很简单,在db.Column中用db.ForeignKey(‘users_info.id’)作为参数即可。
再看最后一条语句:
author = db.relationship('Users', backref=db.backref('questions', order_by=create_time.desc()))
db.relationship会自动找到两个表的外键,建立Questions和Users的关系,此时对于任意一个Questions对象question,通过question.author就可获得这个question的作者对应的Users对象,例如获取id为1的问题的作者姓名:
question = Questions.query.filter(Questions.id == 1).first()
author_name = question.author.username
db.relationship的第二个参数backref=db.backref(‘questions’, order_by=create_time.desc())则建立了一个反向引用,这样我们不仅可以使用question.author,还可以使用author.questions获得一个作者所有的问题,并通过order_by=create_time.desc()按创建时间倒序排列(网页的内容按时间倒序排列),返回的是一个Questions对象的列表,可以遍历它获取每个对象,如获取作者Harp的所有问题的title:
author = Users.query.filter(Users.username == 'Harp').first()
for question in author.questions:
print(question.title)
同理,comments_info表(Comments模型)代码如下:
class Comments(db.Model):
__tablename__ = 'comments_info'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
content = db.Column(db.TEXT, nullable=False)
question_id = db.Column(db.Integer, db.ForeignKey('questions_info.id'))
author_id = db.Column(db.Integer, db.ForeignKey('users_info.id'))
create_time = db.Column(db.DateTime, nullable=False, default=datetime.now())
author = db.relationship('Users', backref=db.backref('comments'))
question = db.relationship('Questions', backref=db.backref('comments', order_by=create_time.desc()))
在HarpQA.py中,我们要从models.py导入db及所有的模型,注意因为上下文的关系,我们这里用with语句把app推入栈中:
from flask import Flask, render_template
from models import db, Users, Questions, Comments
import config
app = Flask(__name__)
app.config.from_object(config)
db.init_app(app)
with app.test_request_context():
db.drop_all()
db.create_all()
@app.route('/')
def index():
return render_template('home.html')
if __name__ == '__main__':
app.run()
运行脚本,此时数据库已经把三张表都建立好了:
后端一个重要的点就是与数据库联系,例如网页的注册、登录,内容的更新等都需要与数据库建立关系。以MySQL数据库为例,平时我们会用mysqldb(python 2)或者pymysql(python 3)去操作MySQL数据库,但这种方法也是需要自己编写SQL语句的。现在我们有了ORM模型,简单来说,ORM是把数据库中的表抽象成模型,表的列名对应模型的属性,这样我们可以调用类的属性或方法去获得数据库中的数据。例如假设MySQL数据库中有一张表名为table1,使用SELECT * FROM table1 WHERE id=1获取id为1的数据,如果将表table1映射成ORM模型Table,那么可以直接使用Table.query.filter(id=1),这样操作简单了很多,也很利于理解。
SQLAlchemy就是一个这样的ORM,我们可以直接安装flask_sqlalchemy来使用。在这之前我们先在MySQL中手动建立一个数据库harp,在建立的时候把charset设置为utf8,避免存入中文时变成乱码,然后在配置文件config.py中填写好数据库的连接信息:
HOST = "127.0.0.1"
PORT = "3306"
DB = "harp"
USER = "root"
PASS = "Your Password"
CHARSET = "utf8"
DB_URI = "mysql+pymysql://{}:{}@{}:{}/{}?charset={}".format(USER, PASS, HOST, PORT, DB, CHARSET)
SQLALCHEMY_DATABASE_URI = DB_URI
SQLAlchemy依赖mysqldb或者pymysql去连接数据库和执行SQL语句,因为我们用的是python 3,所以需要在配置信息中指明使用pymysql,如果是python 2可以省略,默认是使用mysqldb。
建立好了数据库,我们开始建表,首先建立一张用户表,我们设想它应该有id(作为主键)、用户名、密码、注册时间这些基本的字段,有了ORM,我们就不用再写SQL去建表了,在项目的主py文件中添加以下代码:
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
import config
app = Flask(__name__)
app.config.from_object(config)
db = SQLAlchemy(app)
class Users(db.Model):
__tablename__ = 'users_info'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(32), nullable=False)
password = db.Column(db.String(100), nullable=False)
register_time = db.Column(db.DateTime, nullable=False, default=datetime.now())
db.create_all()
解读一下这段代码,导入SQLAlchemy和含有数据库连接信息的config,实例化一个SQLAlchemy对象名为db,其传入的参数为Flask实例app。接下来定义了一个User类,这个类就是ORM中的模型,也就是数据库中的表映射的模型,它需要继承自db.Model,tablename这个属性就是建表后,数据库生成的表名;然后使用db.Column来实例化id/username/password/register_time这几个列,db.Column的参数描述列的类型、主键等信息,如db.Integer/db.String(32)/db.DateTime分别代表整形、字符串(最大长度)、时间,primary_key=True说明该字段为主键,autoincrement=True代表自增长,nullable决定是否可为空,default代表默认值。最后用db.create_all()来实现创建。我们暂时不用理解为何SQLAlchemy需要传入Flask实例作为参数,为何模型要继承自db.Model,重要的是可以先把想要的表建立起来。
进入数据库,输入desc user_info;,我们发现表已经建立好了,其结构图如下:
但它现在还是空的,我们来试着插入一条语句,将视图函数修改为:
@app.route('/')
def index():
user = Users(username='Harp', password='123456')
db.session.add(user)
db.session.commit()
return render_template('home.html')
代码实例化一个Users的对象user,传入username和password,使用db.session.add(user)将其加入到数据库的session(可以理解为事务)中,然后使用db.session.commit()提交。我们运行程序,然后用浏览器访问,浏览器正常显示了结果,这时再看一眼数据库,发现这条数据已经写入到了数据库:
查询、修改数据也同样很简单:
@app.route('/')
def index():
user = Users.query.filter(Users.id == 1).first() #查找
print(user.username)
user.username = 'Harp1207' #修改
db.session.commit() #修改后需提交
print(user.username)
return render_template('home.html')
思考问题:
说是触发器,其实并不是触发器,这是sqlalchemy中的钩子,也称为事件,在触发某个操作的时候执行某个函数,和sql中的触发器时一样的,更加灵活简单。
我现在也正在学习,我就直接拿出来一个例子吧,大家可以测试一下。
#coding:utf8
from sqlalchemy.orm import scoped_session
from sqlalchemy import Column, Integer, String, DateTime, TIMESTAMP, DECIMAL, func, Text, or_
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import ForeignKey, Boolean, create_engine, MetaData, Constraint
from sqlalchemy.orm import relationship, backref, sessionmaker
from sqlalchemy import event
Base = declarative_base()
class Role(Base):# 一
__tablename__= 'roles'
id = Column(Integer,primary_key=True)
name = Column(String(36),nullable=True)
users = relationship('User',backref='role')
class User(Base):# 多
__tablename__ = 'users'
id = Column(Integer,primary_key=True)
name = Column(String(36),nullable=True)
role_id = Column(Integer, ForeignKey('roles.id'))
class Database():
def __init__(self, bind, pool_size=100, pool_recycle=3600, echo=False):
self.__engine = create_engine(bind,pool_size=pool_size,
pool_recycle=pool_recycle,
echo=echo)
self.__session_factory = sessionmaker(bind=self.__engine)
self.__db_session = scoped_session(self.__session_factory)
Base.metadata.create_all(self.__engine)
@property
def session(self):
return self.__db_session()
def on_created(target, value, initiator):
print "received append event for target: %s" % target
@event.listens_for(User, 'after_insert')
def receive_after_insert(mapper, connection, target):
print mapper
print connection
print target.id
print "insert......."
db = Database(bind="mysql+pymysql://root:xxxx@localhost/mydata?charset=utf8")
if __name__ == "__main__":
user = User()
user.name = "123"
user.role_id=2
db.session.add(user)
db.session.commit()
在插入数据后会执行receive_after_insert函数,很简单。
如果想深入的学习建议看官方文档,说的很详细
http://docs.sqlalchemy.org/en/latest/orm/events.html#attribute-events
user_modules.py
from datetime import datetime
from sqlalchemy import Column,Integer,String,Boolean,DateTime,ForeignKey
from sqlalchemy.orm import relationship
from .connect import Base,session
class User(Base):
__tablename__='user'
id=Column(Integer,primary_key=True,autoincrement=True)
username=Column(String(20),nullable=False)
passwd=Column(String(50),nullable=False)
createtime=Column(DateTime,default=datetime.now)
_locked=Column(Boolean,default=False,nullable=False)
#在modules中写好查询条件,使用时直接调用
@classmethod
def all(cls):
return session.query(cls).all()
@classmethod
def by_name(cls,username):
return session.query(cls).filter_by(username=username).all()
@property
def locked(self):
return self._locked
def __repr__(self):
return '<User(id=%s,username=%s,passwd=%s,createtime=%s,_locked=%s)>'%(
self.id,
self.username,
self.passwd,
self.createtime,
self._locked
)
class UserDetails(Base):
__tablename__='user_details'
id=Column(Integer,primary_key=True,autoincrement=True)
id_card=Column(Integer,nullable=True,unique=True)
last_login=Column(DateTime)
login_num=Column(Integer,default=0)
user_id=Column(Integer,ForeignKey('user.id'))
#bakcref建立反向索引,
userdetails_for_foreignkey=relationship('User',backref='details',uselist=False,cascade='all')
def __self__(self):
return '<UserDetails(id=%s,id_card=%s,last_login=%s,login_num=%s,user_id=%s)>'%(
self.id,
self.id_card,
self.last_login,
self.login_num,
self.user_id
)
if __name__=='__main__':
Base.metadata.create_all()
query.py
from data.user_modules import User,session,UserDetails
#带条件查询
raw=session.query(User).filter_by(username='nanian').all()
raw=session.query(User).filter_by(username='nanian') #去掉.all()原生sql
raw=session.query(User).filter(User.username =='nanian').all()
raw=session.query(User.username).filter(User.username !='nanian').all()
raw=session.query(User.username).filter(User.username !='nanian').first()
raw=session.query(User.username).filter(User.username !='nanian').one() #如果前面查出的是多条数据则报错
print(session.query(User).get(2)) #根据主键查,会自己找主键
print(raw)
#限制查询结果数
print(session.query(User).filter(User.username!='nanian').limit(3).all())#前三行
print(session.query(User).filter(User.username!='nanian').offset(3).all())#第三行以后
print(session.query(User).filter(User.username!='nanian').slice(1,3).all())#2,3行
#排序
from sqlalchemy import desc
raw=session.query(User).filter(User.username !='nanian').order_by(User.username).all()
raw=session.query(User).filter(User.username !='nanian').order_by(desc(User.username).all()#逆序
#模糊查询 尽量少用模糊查询,效率低
from sqlalchemy import or_
raw=session.query(User).filter(User.username!='nanian').all()
raw=session.query(User).filter(User.username.like('n%').all()
raw=session.query(User).filter(User.username.notlike('n%').all()
raw=session.query(User).filter(User.username.in_(['nanian','a']).all()) #加下划线表示和python关键字作区分
raw=session.query(User).filter(User.username.isnot(None),User.passwd=='123').all()) #多条件
raw=session.query(User).filter(or_(User.username.isnot(None),User.passwd=='123')).all()) #或
raw=session.query(User).filter(User.username==None).all())
#聚合函数
from sqlalchemy import func,extract
print(session.query(User.passwd,func.count(User.id)).group_by(User.passwd).all())
print(session.query(User.passwd,func.count(User.id)).group_by(User.passwd).
having(func.count(User.id)>1) all())
print( session.query(extract('minute',User.createtime).label('minute'),
func.count(User.id)).group_by('minute').all() ) #提取分钟,按分钟分组
#多表查询
raw=session.query(User,UserDetails).all()
raw=session.query(User,UserDetails).filter(UserDetails.id==User.id) all()# cross join
raw=session.query(User.username,UserDetails.last_login).
join(UserDetails, UserDetails.id==User.id) all()# inner join
raw=session.query(User.username,UserDetails.last_login).
outerjoin(UserDetails, UserDetails.id==User.id) all()
# outer join代表left join 左连接,右连接将表反过来(sqlalchemy没有rightjoin),小表左连接右表效率高
q1=session.query(User.id)
q2=session.query(UserDetails.id)
raw=q1.union(q2).all()
from sqlalchemy import all_,any_
sql_0=session.query(UserDetails.last_login).subquery() #声明子表
raw=session.query(User).filter(User.createtime >all_(sql_0)).all()
raw=session.query(User).filter(User.createtime >any_(sql_0)).all()
#原生sql
sql_1='''
select * from `user`
'''
raw=session.execute(sql_1)
#print(raw,dir(raw))
#print(raw.fetchone())
#print(raw.fetchmany())
#print(raw.fetchall())
for i in raw:
print(i)