Flask 默认并没有提供任何数据库操作的 API.
Flask 中可以自己的选择数据, 用原生语句实现功能, 也可以选择 ORM(SQLAlchemy,MongoEngine)
原生 SQL 缺点
代码利用率低, 条件复杂代码语句越长, 有很多相似语句
一些 SQL 是在业务逻辑中拼出来的, 修改需要了解业务逻辑
直接写 SQL 容易忽视 SQL 问题.
一, ORM
将对对象的操作转换为原生 SQL
1, 优点
易用性, 可以有效减少重复 SQL, 性能损耗少设计灵活, 可以轻松实现复杂查询, 移植性好
Python 中的 ORM 是 SQLAlchemy
针对于 Flask 的支持
pip install flask-sqlalchemy
2, 连接数据库
dialect+driver://username:[email protected]:port/database
dialect 数据库实现
driver 数据库的驱动
- username
- password
- host
- port
- database
连接数据库需要指定配置
- App.config['SQLALCHEMY_DATABASE_URI'] = DB_URI
- App.config['SQLALCHEMY_TRAKE_MODIFICATIONS']=False
3, 创建模型
- class User(db.Model):
- __tablename__ = "UserModel" # 指定表名, 默认是类名
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- u_name = db.Column(db.String(16), unique=True)
- u_des = db.Column(db.String(128), nullable=True)
(1), 字段类型
- Integer
- SmallInteger
- BigInteger
- Float
- Numeric
- String
- Text
- Unicode
- Unicode Text
- Boolean
- Date
- Time
- DateTime
- Interval
- LargeBinary
(2), 常见约束
- primary_key
- autoincrement
- unique
- index
- nullable
- default
- ForeignKey()
(3), 数据操作
db.create_all() 创建数据库
db.drop_all() 删除数据库
1, 数据插入
数据插入是在事务中处理
- db.session.add(object)
- db.session.add_all(list[object])
- db.session.commit()
- @API.route('/adduser/')
- def adduser():
- users = []
- for i in range(5):
- user = User()
- user.u_name = "小花 %d" % random.randrange(10000)
- users.append(user)
- db.session.add_all(users)
- db.session.commit()
- return 'Add success'
2, 数据删除
- db.session.delete(object)
- db.session.commit()
修改和删除基于查询.
(4), 模型继承
默认继承并不会报错, 它会将多个模型的数据映射到一张表中, 导致数据混乱, 不能满足基本使用
抽象的模型是不会在数据库中产生映射的
- class Animal(db.Model):
- __abstract__ = True
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- a_name = db.Column(db.String(16))
- class Dog(Animal):
- d_legs = db.Column(db.Integer, default=4)
- class Cat(Animal):
- c_eat = db.Column(db.String(32), default='fish')
(5), 模型迁移
- python manager.py db init #初次迁移, 生成 migration 包
- python manager.py db migrate # 创建迁移
- python manager.py db upgrade # 更新
(6), 数据查询
1, 查询单个对象
- first
- get
- get_or_404
- @API.route('/getuser/<int:id>/')
- def get_user(id):
- user = User.query.get(id)
- print(user)
- return 'GET success'
2, 查询结果集
all: 比较特殊, 返回列表
- @API.route('/getusers/')
- def get_users():
- users = User.query.all()
- for user in users:
- print(user.u_name)
- return 'get success'
filter:BaseQuery 对象
运算符:
- contains
- startswith
- endswith
- in_
- like
- __gt__
- __ge__
- __lt__
- __le__
条件:
- 类名. 属性名. 魔术方法 (临界值)
- @API.route('/getdog/')
- def getdog():
- dogs = Dog.query.filter(Dog.id.__le__(5))
- for dog in dogs:
- print(dog.id, dog.a_name)
- return 'GET SUCCESS'
- 类名. 属性名 操作符运算符 临界值
- @API.route('/getdog/')
- def getdog():
- dogs = Dog.query.filter(Dog.id> 5)
- for dog in dogs:
- print(dog.id, dog.a_name)
- return 'GET SUCCESS'
- @API.route('/getdog/')
- def getdog():
- dogs = Dog.query.filter(Dog.a_name.contains("2"))
- for dog in dogs:
- print(dog.id, dog.a_name)
- return 'GET SUCCESS'
offset 和 limit 不区分顺序, 都是先执行 offset
- @API.route('/getdog/')
- def getdog():
- dogs = Dog.query.offset(5).limit(4)
- for dog in dogs:
- print(dog.id, dog.a_name)
- return 'GET SUCCESS'
- order_by 调用必须在 offset 和 limit 之前
使用 offset 以及 limit 实现分页
- @API.route('/getdogs/')
- def get_dogs():
- page = request.args.get("page", 1, type=int)
- per_page = request.args.get('per_page', 4, type=int)
- dogs = Dog.query.offset(per_page * (page - 1)).limit(per_page)
- return render_template('Dogs.html', dogs=dogs)
paginate 实现分页
- @API.route('/getdogs/')
- def get_dogs_with_page():
- # dogs = Dog.query.paginate().items
- pagination = Dog.query.paginate()
- per_page = request.args.get('per_page', 4, type=int)
- return render_template('Dogs.html', pagination=pagination, per_page=per_page)
- <div class=pagination>
- {% for page in pagination.iter_pages(left_edge=5,left_current=5,right_current=5,right_edge=5) %}
- {% if page %}
- {% if page != pagination.page %}
- <a href="{{ url_for('api.get_dogs_with_page') }}?page={{ page }}&per_page={{ per_page }}">{{ page }}</a>
- {% else %}
- <strong>{{ page }}</strong>
- {% endif %}
- {% else %}
- <span class=ellipsis>...</span>
- {% endif %}
- {% endfor %}
- </div>
filter_by
用在级联数据上, 条件语法精准, 字段 = 值
- @blue.route('/getcatsfilterby/')
- def get_cats_filter_by():
- cats = Cat.query.filter_by(id = 5)
- return render_template('Cats.html', cats=cats)
(7) 级联数据
- class Customer(db.Model):
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- c_name = db.Column(db.String(16))
- addresses = db.relationship('Address', backref='customer', lazy=True)
- class Address(db.Model):
- id = db.Column(db.Integer, primary_key=True, autoincrement=True)
- a_position = db.Column(db.String(128))
- a_customer_id = db.Column(db.Integer, db.ForeignKey(Customer.id))
1添加数据
- @API.route('/getcustomer/')
- def get():
- customer = Customer.query.order_by(desc('id')).first()
- return str(customer.id)
- @API.route('/addaddress/')
- def add_address():
- address = Address()
- address.a_position = '秀水街 %s' % random.randrange(10000)
- address.a_customer_id = Customer.query.order_by(desc('id')).first().id # 注意此处使用 id 的倒叙, 不能直接用'-id'
- db.session.add(address)
- db.session.commit()
- return 'Address Add Success %s' % address.a_position
2查询数据
根据地址找到对应的人
- @API.route('/getcustomer/')
- def get():
- a_id = request.args.get('a_id', type=int)
- address = Address.query.get(a_id)
- customer = Customer.query.get(address.a_customer_id) #维护关系表中的外键存的是不维护关系表中的主键,
- return customer.c_name
根据人找到对应的地址
- @API.route('/getaddress/')
- def get_address():
- c_id = request.args.get('c_id')
- customer = Customer.query.get(c_id)
- # addresses = Address.query.filter_by(a_customer_id=customer.id)
- addresses = customer.addresses
- return render_template('address.html', addresses=addresses)
- <ul>
- {% for address in addresses %}
- <li>{{ address.a_position }}</li>
- {% endfor %}
- </ul>
3逻辑运算
filter 多个条件
- @API.route('/getaddress/')
- def get_address():
- addresses = Address.query.filter(Address.a_customer_id.__eq__(1)).filter(Address.a_position.endswith('4'))
- return render_template('address.html', addresses=addresses)
与 and
filter(and_(条件), 条件...)
- @API.route('/getaddress/')
- def get_address():
- addresses = Address.query.filter(and_(Address.a_customer_id.__eq__(1),Address.a_position.endswith('4')))
- return render_template('address.html', addresses=addresses)
Django 中可以将字段直接写在 filter 中, 无需使用 and_
或
or_
filter(or_(条件), 条件...)
非
not_
filter(not_(条件), 条件...)
- @API.route('/getaddress/')
- def get_address():
- addresses = Address.query.filter(not_(or_(Address.a_customer_id.__eq__(1), Address.a_position.endswith('4'))))
- return render_template('address.html', addresses=addresses)
二, 缓存
pip install Flask-Caching
在 ext.py 中进行配置
- from flask_caching import Cache
- from flask_migrate import Migrate
- from flask_session import Session
- from flask_sqlalchemy import SQLAlchemy
- db = SQLAlchemy()
- migrate = Migrate()
- cache = Cache(config={
- "CACHE_TYPE": "redis" # 默认是连接本地, 可以设置远程.
- })
- # cache=Cache() # 配置可以写在 settings 中的 Config 类中
- def init_ext(App):
- db.init_app(App)
- migrate.init_app(App, db)
- Session(App)
- cache.init_app(App)
在视图中使用
- @API.route('/getaddress/')
- @cache.cached(timeout=60)
- def get_address():
- addresses = Address.query.filter(not_(or_(Address.a_customer_id.__eq__(1), Address.a_position.endswith('4'))))
- print('数据库中获取')
- return render_template('address.html', addresses=addresses)
来源: http://www.bubuko.com/infodetail-3412789.html