Flask数据库
2019-04-30 23:43:07来源:博客园 阅读 ()
一 数据库的设置
Web应用中普遍使用的是关系模型的数据库,关系型数据库把所有的数据都存储在表中,表用来给应用的实体建模,表的列数是固定的,行数是可变的。它使用结构化的查询语言。关系型数据库的列定义了表中表示的实体的数据属性。比如:商品表里有name、price、number等。 Flask本身不限定数据库的选择,你可以选择SQL或NOSQL的任何一种。也可以选择更方便的SQLALchemy,类似于Django的ORM。SQLALchemy实际上是对数据库的抽象,让开发者不用直接和SQL语句打交道,而是通过Python对象来操作数据库,在舍弃一些性能开销的同时,换来的是开发效率的较大提升。
SQLAlchemy是一个关系型数据库框架,它提供了高层的ORM和底层的原生数据库的操作。flask-sqlalchemy是一个简化了SQLAlchemy操作的flask扩展。
在Flask中使用mysql数据库,需要安装一个flask-sqlalchemy的扩展。
pip3 install flask-sqlalchemy
要连接mysql数据库,仍需要安装flask-mysqldb
pip3 install flask-mysqldb
使用Flask-SQLAlchemy管理数据库
使用Flask-SQLAlchemy扩展操作数据库,首先需要建立数据库连接。数据库连接通过URL指定,而且程序使用的数据库必须保存到Flask配置对象的SQLALCHEMY_DATABASE_URI键中。
Flask的数据库设置:
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test'
常用的SQLAlchemy字段类型
类型名 | python中类型 | 说明 |
---|---|---|
Integer | int | 普通整数,一般是32位 |
SmallInteger | int | 取值范围小的整数,一般是16位 |
BigInteger | int或long | 不限制精度的整数 |
Float | float | 浮点数 |
Numeric | decimal.Decimal | 普通整数,一般是32位 |
String | str | 变长字符串 |
Text | str | 变长字符串,对较长或不限长度的字符串做了优化 |
Unicode | unicode | 变长Unicode字符串 |
UnicodeText | unicode | 变长Unicode字符串,对较长或不限长度的字符串做了优化 |
Boolean | bool | 布尔值 |
Date | datetime.date | 时间 |
Time | datetime.datetime | 日期和时间 |
LargeBinary | str | 二进制文件 |
常用的SQLAlchemy列选项
选项名 | 说明 |
---|---|
primary_key | 如果为True,代表表的主键 |
unique | 如果为True,代表这列不允许出现重复的值 |
index | 如果为True,为这列创建索引,提高查询效率 |
nullable | 如果为True,允许有空值,如果为False,不允许有空值 |
default | 为这列定义默认值 |
常用的SQLAlchemy关系选项
选项名 | 说明 |
---|---|
backref | 在关系的另一模型中添加反向引用 |
primary join | 明确指定两个模型之间使用的联结条件 |
uselist | 如果为False,不使用列表,而使用标量值 |
order_by | 指定关系中记录的排序方式 |
secondary | 指定多对多中记录的排序方式 |
secondary join | 在SQLAlchemy中无法自行决定时,指定多对多关系中的二级联结条件 |
二 自定义模型类
模型表示程序使用的数据实体,在Flask-SQLAlchemy中,模型一般是Python类,继承自db.Model,db是SQLAlchemy类的实例,代表程序使用的数据库。
类中的属性对应数据库表中的列。id为主键,是由Flask-SQLAlchemy管理。db.Column类构造函数的第一个参数是数据库列和模型属性类型。
如下示例:定义了两个模型类,用户和角色。
from flask import Flask from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) class Config(object): """配置参数""" # sqlalchemy的配置参数 SQLALCHEMY_DATABASE_URI = "mysql://root:123456@127.0.0.1:3306/db_flask" # 设置成 True,SQLAlchemy 将会追踪对象的修改并且发送信号。这需要额外的内存, 如果不必要的可以禁用它。 SQLALCHEMY_TRACK_MODIFICATIONS = True # 实例化SQLAlchemy对象 app.config.from_object(Config) # 创建数据库sqlalchemy工具对象 db = SQLAlchemy(app) # 表名常见规范 # ihome --> ih_user 数据库缩写_表名 # tbl_user --> tbl_表名 # 创建数据库模型类 class Role(db.Model): """用户表""" __tablename__ = "tbl_roles" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True) users = db.relationship("User", backref="role") class User(db.Model): """用户表""" __tablename__ = "tbl_users" # 指明数据库的表名 id = db.Column(db.Integer, primary_key=True) # 整型的主键, 会默认设置为自增主键 name = db.Column(db.String(64), unique=True) email = db.Column(db.String(128)) password = db.Column(db.String(128)) role_id = db.Column(db.Integer, db.ForeignKey("tbl_roles.id"))
三 数据库基本操作
在Flask-SQLAlchemy中,插入、修改、删除操作,均由数据库会话管理。会话用db.session表示。在准备把数据写入数据库前,要先将数据添加到会话中然后调用commit()方法提交会话。
数据库会话是为了保证数据的一致性,避免因部分更新导致数据不一致。提交操作把会话对象全部写入数据库,如果写入过程发生错误,整个会话都会失效。
数据库会话也可以回滚,通过db.session.rollback()方法,实现会话提交数据前的状态。
在Flask-SQLAlchemy中,查询操作是通过query对象操作数据。最基本的查询是返回表中所有数据,可以通过过滤器进行更精确的数据库查询。
创建表:
db.create_all()
删除表:
db.drop_all()
插入一条数据:
# 创建对象 ro1 = Role(name='admin') # session记录对象任务 db.session.add(ro1) # 提交任务到数据库 db.session.commit() ro2 = Role(name='user') db.session.add(ro2) db.session.commit()
一次插入多条数据
us1 = User(name='wang', email='wang@163.com', password='123456', role_id=ro1.id) us2 = User(name='zhang', email='zhang@163.com', password='201512', role_id=ro2.id) us3 = User(name='chen', email='chen@126.com', password='987654', role_id=ro2.id) us4 = User(name='zhou', email='zhou@163.com', password='456789', role_id=ro1.id) # 一次保存多条数据 db.session.add_all([us1, us2, us3, us4]) db.session.commit()
3.1 查询:
常用的SQLAlchemy查询过滤器
过滤器 | 说明 |
---|---|
filter() | 把过滤器添加到原查询上,返回一个新查询 |
filter_by() | 把等值过滤器添加到原查询上,返回一个新查询 |
limit | 使用指定的值限定原查询返回的结果 |
offset() | 偏移原查询返回的结果,返回一个新查询 |
order_by() | 根据指定条件对原查询结果进行排序,返回一个新查询 |
group_by() | 根据指定条件对原查询结果进行分组,返回一个新查询 |
常用的SQLAlchemy查询执行器
方法 | 说明 |
---|---|
all() | 以列表形式返回查询的所有结果 |
first() | 返回查询的第一个结果,如果未查到,返回None |
first_or_404() | 返回查询的第一个结果,如果未查到,返回404 |
get() | 返回指定主键对应的行,如不存在,返回None |
get_or_404() | 返回指定主键对应的行,如不存在,返回404 |
count() | 返回查询结果的数量 |
paginate() | 返回一个Paginate对象,它包含指定范围内的结果 |
filter_by精确查询:
>>> User.query.filter_by(name="wang").all() [<User 1>] >>> user = User.query.filter_by(name="wang").all() >>> user[0].name 'wang'
first()返回查询到的第一个对象
>>> user = User.query.first() >>> user <User 1> >>> user.name 'wang'
all()返回查询到的所有对象
>>> User.query.all()
[<User 1>, <User 2>, <User 3>, <User 4>]
filter模糊查询,返回名字结尾字符为g的所有数据。
>>> users = User.query.filter(User.name.endswith("g")).all() >>> users [<User 1>, <User 2>] >>> users[0].name 'wang'
get(),参数为主键,如果主键不存在没有返回内容
>>> User.query.get(1) <User 1> >>> user = User.query.get(1) >>> user.name 'wang'
逻辑非,返回名字不等于wang的所有数据。
>>> user.query.filter(User.name!="wang").all() [<User 2>, <User 3>, <User 4>]
逻辑与,需要导入and,返回and()条件满足的所有数据。
>>> from sqlalchemy import and_ >>> user = User.query.filter(and_(User.name!="wang",User.email.endswith("163.com"))).all() >>> user [<User 2>, <User 4>]
逻辑或,需要导入or_
>>> from sqlalchemy import or_ >>> User.query.filter(or_(User.name!="wang", User.email.endswith("163.com"))).all() [<User 1>, <User 2>, <User 3>, <User 4>]
not_ 相当于取反
>>> from sqlalchemy import not_ >>> User.query.filter(not_(User.name=="chen")).all() [<User 1>, <User 2>, <User 4>]
使用db.session查询
>>> db.session.query(Role).all() [<Role 1>, <Role 2>] >>> db.session.query(Role).get(2) <Role 2> >>> db.session.query(Role).first() <Role 1>
取不到数据返回None
>>> user= User.query.get(5) >>> user >>> type(user) <class 'NoneType'>
offset偏移
>>> users = User.query.offset(2).all() >>> users[0].name 'chen'
limit
>>> users = User.query.offset(1).limit(2).all() >>> users[0].name 'zhang' >>> users[1].name 'chen'
order_by
>>> users = User.query.order_by(User.id.desc()).all() >>> users [<User 4>, <User 3>, <User 2>, <User 1>] >>> users[0].name 'zhou'
group_by
>>> from sqlalchemy import func >>> db.session.query(User.role_id, func.count(User.role_id)).group_by(User.role_id) <flask_sqlalchemy.BaseQuery object at 0x0000025DBA5B0CC0> >>> db.session.query(User.role_id, func.count(User.role_id)).group_by(User.role_id).all() [(1, 2), (2, 2)]
在每个模型类中添加__repr__:
class Role(db.Model): """用户表""" __tablename__ = "tbl_roles" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True) users = db.relationship("User", backref="role") def __repr__(self): return "Role object:%s" % self.name class User(db.Model): """用户表""" __tablename__ = "tbl_users" # 指明数据库的表名 id = db.Column(db.Integer, primary_key=True) # 整型的主键, 会默认设置为自增主键 name = db.Column(db.String(64), unique=True) email = db.Column(db.String(128)) password = db.Column(db.String(128)) role_id = db.Column(db.Integer, db.ForeignKey("tbl_roles.id")) def __repr__(self): return "User object:%s" % self.name
>>> User.query.get(1)
User object:wang
关联查询
>>> ro = Role.query.get(1) >>> type(ro) <class 'db_demo.Role'> >>> ro.users [User object:wang, User object:zhou] >>> user = User.query.get(1) >>> user User object:wang >>> user.role_id 1 >>> Role.query.get(user.role_id) Role object:admin >>> user.role Role object:admin
3.2 更新数据
第一种方法:
>>> user = User.query.get(4) >>> user User object:zhou >>> user.name = "test" >>> db.session.add(user) >>> db.session.commit() >>> User.query.get(4) User object:test
第二种方法:
>>> user = User.query.filter_by(id=1).update({"name":"test1"}) >>> db.session.commit() >>> User.query.get(1) User object:test1
3.3 删除
>>> user =User.query.get(1) >>> db.session.delete(user) >>> db.session.commit()
四 图书案例
视图
from flask import Flask, render_template, request, redirect, url_for from flask_sqlalchemy import SQLAlchemy from flask_wtf import FlaskForm from wtforms import StringField, SubmitField from wtforms.validators import DataRequired app = Flask(__name__) class Config(object): SQLALCHEMY_DATABASE_URI = "mysql://root:123456@127.0.0.1:3306/db_flask" SQLALCHEMY_TRACK_MODIFICATIONS = True SECRET_KEY = "sahq28y1qhihsd0-121ewq" # 实例化SQLAlchemy对象 app.config.from_object(Config) # 创建数据库sqlalchemy工具对象 db = SQLAlchemy(app) # 定义数据库的模型 class Author(db.Model): """作者""" __tablename__ = "tbl_authors" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(32), unique=True) books = db.relationship("Book", backref="author") class Book(db.Model): """书籍""" __tablename__ = "tbl_books" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True) author_id = db.Column(db.Integer, db.ForeignKey("tbl_authors.id")) # 创建表单模型类 class AuthorBookForm(FlaskForm): """作者数据表单模型类""" author_name = StringField(label="作者", validators=[DataRequired("作者必填")]) book_name = StringField(label="书籍", validators=[DataRequired("书籍必填")]) submit = SubmitField(label="保存") @app.route("/index", methods=["POST", "GET"]) def index(): form = AuthorBookForm() if form.validate_on_submit(): author_name = form.author_name.data book_name = form.book_name.data author = Author(name=author_name) db.session.add(author) db.session.commit() book = Book(name=book_name, author_id=author.id) # book = Book(name=book_name, author=author) db.session.add(book) db.session.commit() authors = Author.query.all() return render_template("author_book.html", form=form, authors=authors) @app.route("/delete_book") def delete_book(): book_id = request.args.get("book_id") book = Book.query.get(book_id) author_id = book.id author = Author.query.get(author_id) db.session.delete(book) db.session.commit() db.session.delete(author) db.session.commit() return redirect(url_for("index")) if __name__ == '__main__': # db.drop_all() # db.create_all() # au_xi = Author(name='我吃西红柿') # au_qian = Author(name='萧潜') # au_san = Author(name='唐家三少') # db.session.add_all([au_xi, au_qian, au_san]) # db.session.commit() # # bk_xi = Book(name='吞噬星空', author_id=au_xi.id) # bk_xi2 = Book(name='寸芒', author_id=au_qian.id) # bk_qian = Book(name='飘渺之旅', author_id=au_qian.id) # bk_san = Book(name='冰火魔厨', author_id=au_san.id) # db.session.add_all([bk_xi, bk_xi2, bk_qian, bk_san]) db.session.commit() app.run(debug=True)
模板
<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>Title</title> <meta http-equiv='Content-type' content='text/htm'> </head> <body> <form method="post"> <p>添加作者和书籍</p> {{ form.csrf_token }} {{ form.author_name.label }} {{ form.author_name }} {{ form.author_name.errors.0 }} <br/> {{ form.book_name.label }} {{ form.book_name }} {{ form.book_name.errors.0 }} <br/> {{ form.submit }} <br/> </form> <hr/> {% for author in authors %} {{ author.name }} <ul> {% for book in author.books %} <a href="/delete_book?book_id={{ book.id }}">删除</a> <li>{{ book.name }}</li> {% endfor %} </ul> {% endfor %} </body> </html>
五 数据库迁移
在Flask中可以使用Flask-Migrate扩展,来实现数据迁移。并且集成到Flask-Script中,所有操作通过命令就能完成。
为了导出数据库迁移命令,Flask-Migrate提供了一个MigrateCommand类,可以附加到flask-script的manager对象上。
pip3 install flask-migrate
文件: _migration.py
from flask import Flask from flask_sqlalchemy import SQLAlchemy from flask_script import Manager from flask_migrate import Migrate, MigrateCommand app = Flask(__name__) class Config(object): SQLALCHEMY_DATABASE_URI = "mysql://root:123456@127.0.0.1:3306/db_flask" SQLALCHEMY_TRACK_MODIFICATIONS = True app.config.from_object(Config) # 创建sqlalchemy的数据库连接对象 db = SQLAlchemy(app) # 创建flask脚本管理工具对象 manager = Manager(app) # 创建数据库迁移工具对象 Migrate(app, db) # 向manager对象中添加数据库的操作命令 manager.add_command("db", MigrateCommand) # 定义模型Role class Role(db.Model): # 定义表名 __tablename__ = 'roles' # 定义列对象 id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True) def __repr__(self): return 'Role:'.format(self.name) # 定义用户 class User(db.Model): __tablename__ = 'users' id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(64), unique=True, index=True) def __repr__(self): return 'User:'.format(self.username) if __name__ == '__main__': manager.run()
创建迁移仓库
#这个命令会创建migrations文件夹,所有迁移文件都放在里面。
python _migrate.py db init
创建迁移脚本
自动创建迁移脚本有两个函数,upgrade()函数把迁移中的改动应用到数据库中。downgrade()函数则将改动删除。自动创建的迁移脚本会根据模型定义和数据库当前状态的差异,生成upgrade()和downgrade()函数的内容。对比不一定完全正确,有可能会遗漏一些细节,需要进行检查
#创建自动迁移脚本 python _migrate.py db migrate -m 'initial migration' # -m 表示备注
更新数据库
python _migrate.py db upgrade
此时数据库里已经存在数据表了,如果需要回到之前的迁移版本,使用回退命令
回退数据库
回退数据库时,需要指定回退版本号,由于版本号是随机字符串,为避免出错,建议先使用python _migrate.py db history命令查看历史版本的具体版本号,然后复制具体版本号执行回退。
python _migrate.py db downgrade 版本号
六 发送邮件
在开发过程中,很多应用程序都需要通过邮件提醒用户,Flask的扩展包Flask-Mail通过包装了Python内置的smtplib包,可以用在Flask程序中发送邮件。
Flask-Mail连接到简单邮件协议(Simple Mail Transfer Protocol,SMTP)服务器,并把邮件交给服务器发送。
如下示例,通过开启QQ邮箱SMTP服务设置,发送邮件。
from flask import Flask from flask_mail import Mail, Message app = Flask(__name__) #配置邮件:服务器/端口/传输层安全协议/邮箱名/密码 app.config.update( DEBUG = True, MAIL_SERVER='smtp.qq.com', MAIL_PROT=465, MAIL_USE_TLS = True, MAIL_USERNAME = 'xxxxxxxx@qq.com', MAIL_PASSWORD = 'xxxxxx', ) mail = Mail(app) @app.route('/') def index(): # sender 发送方,recipients 接收方列表 msg = Message("This is a test ",sender='11111@qq.com', recipients=['aaaaaa@163.com','11111@qq.com']) #邮件内容 msg.body = "Flask test mail" #发送邮件 mail.send(msg) print "Mail sent" return "Sent Succeed" if __name__ == "__main__": app.run()
原文链接:https://www.cnblogs.com/ForT/p/10791439.html
如有疑问请与原作者联系
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
下一篇:20190501-整数翻转
- Flask request接口获取参数 2019-08-13
- 链接 Mysql 创建 数据库和创表,增加数据 2019-08-13
- django修改表数据结构后报错的解决办法 2019-07-24
- 原创:Python编写通讯录,支持模糊查询,利用数据库存储 2019-07-24
- Python--代码1(接口测试:测试用例从数据库读取写到yaml文 2019-07-24
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash