此专栏我会不定期分享一些 Django 最前沿的文章, 内容偏重技巧, 经验的归纳总结, 来源暂时有:
Medium
Twitter 知名博主
如果大家感兴趣, 请一定点个关注, 给我一些动力, 毕竟翻译整理是需要时间的, 谢谢大家
--
对开发人员来说, ORM 确实非常实用, 但是将数据库的访问抽象出来本身是有成本的, 那些愿意在数据库中探索的开发人员, 经常会发现修改 ORM 的默认行为可以带来性能的提升.
在本文中, 我将分享在 Django 中使用数据库的 9 个技巧
1. 过滤器聚合(Aggregation with Filter)
在 Django 2.0 之前, 如果我们想要得到诸如用户总数和活跃用户总数之类的东西, 我们不得不求助于 条件表达式 :
from django.contrib.auth.models import User
from django.db.models import (
Count,
Sum,
Case,
When,
Value,
IntegerField,
)
User.objects.aggregate(total_users=Count('id'),
total_active_users=Sum(Case(
When(is_active=True, then=Value(1)),
default=Value(0),
output_field=IntegerField(),
)),
)
在 Django 2.0 中, 添加了 聚合函数的过滤器参数 , 使其更容易:
from django.contrib.auth.models import User
from django.db.models import Count, F
User.objects.aggregate(
total_users=Count('id'),
total_active_users=Count('id', filter=F('is_active')),
)
很棒, 又短又可口
如果你正在使用 PostgreSQL, 这两个查询将如下所示:
SELECT
COUNT(id) AS total_users,
SUM(CASE WHEN is_active THEN 1 ELSE 0 END) AS total_active_users
FROM
auth_users;
SELECT
COUNT(id) AS total_users,
COUNT(id) FILTER (WHERE is_active) AS total_active_users
FROM
auth_users;
第二个查询使用了 WHERE 过滤子句.
2. 查询集的结果变为具名元组(QuerySet results as namedtuples)
我是一个 namedtuples 的粉丝, 同时也是 Django 2.0 的 ORM 的粉丝.
在 Django 2.0 中, values_list 方法的参数中添加了一个叫做 named 的属性. 将 named 设置为 True 会将 QuerySet 作为 namedtuples 列表返回:
> user.objects.values_list(
'first_name',
'last_name',
)[0]
('Haki', 'Benita')
> user_names = User.objects.values_list(
'first_name',
'last_name',
named=True,
)
> user_names[0]
Row(first_name='Haki', last_name='Benita')
> user_names[0].first_name
'Haki'
> user_names[0].last_name
'Benita'
3. 自定义函数(Custom functions)
Django 2.0 的 ORM 功能非常强大, 而且特性丰富, 但还是不能与所有数据库的特性同步. 不过幸运的是, ORM 让我们用自定义函数来扩展它.
假设我们有一个记录报告的持续时间字段, 我们希望找到所有报告的平均持续时间:
from django.db.models import Avg
Report.objects.aggregate(avg_duration=Avg('duration'))
> {'avg_duration': datetime.timedelta(0, 0, 55432)}
那很棒, 但是如果只有均值, 信息量有点少. 我们再算出标准偏差吧:
from django.db.models import Avg, StdDev
Report.objects.aggregate(
avg_duration=Avg('duration'),
std_duration=StdDev('duration'),
)
ProgrammingError: function stddev_pop(interval) does not exist
LINE 1: SELECT STDDEV_POP("report"."duration") AS "std_dura...
^
HINT: No function matches the given name and argument types.
You might need to add explicit type casts.
呃... PostgreSQL 不支持间隔类型字段的求标准偏差操作, 我们需要将时间间隔转换为数字, 然后才能对它应用 STDDEV_POP 操作.
一个选择是从时间间隔中提取:
SELECT
AVG(duration),
STDDEV_POP(EXTRACT(EPOCH FROM duration))
FROM
report;
avg | stddev_pop
----------------+------------------
00:00:00.55432 | 1.06310113695549
(1 row)
那么我们如何在 Django 中实现呢? 你猜到了 -- 一个自定义函数:
# common/db.py
from django.db.models import Func
class Epoch(Func):
function = 'EXTRACT'
template = "%(function)s('epoch'from %(expressions)s)"
我们的新函数这样使用:
from django.db.models import Avg, StdDev, F
from common.db import Epoch
Report.objects.aggregate(
avg_duration=Avg('duration'),
std_duration=StdDev(Epoch(F('duration'))),
)
{'avg_duration': datetime.timedelta(0, 0, 55432),
'std_duration': 1.06310113695549}
* 注意在 Epoch 调用中使用 F 表达式.
4. 声明超时(Statement Timeout)
这可能是我给的最简单的也是最重要的提示. 我们是人类, 我们都会犯错. 我们不可能考虑到每一个边缘情况, 所以我们必须设定边界.
与其他非阻塞应用程序服务器 (如 Tornado,asyncio 甚至 Node) 不同, Django 通常使用同步工作进程. 这意味着, 当用户执行长时间运行的操作时, 工作进程会被阻塞, 完成之前, 其他人无法使用它.
应该没有人真正在生产中只用一个工作进程来运行 Django, 但是我们仍然希望确保一个查询不会浪费太多资源太久.
在大多数 Django 应用程序中, 大部分时间都花在等待数据库查询上了. 所以, 在 SQL 查询上设置超时是一个很好的开始.
我喜欢像这样在我的 wsgi.py 文件中设置一个全局超时:
# wsgi.py
from django.db.backends.signals import connection_created
from django.dispatch import receiver
@receiver(connection_created)
def setup_postgres(connection, **kwargs):
if connection.vendor != 'postgresql':
return
# Timeout statements after 30 seconds.
with connection.cursor() as cursor:
cursor.execute("""SET statement_timeout TO 30000;""")
为什么是 wsgi.py? 因为这样它只会影响工作进程, 不会影响进程外的分析查询, cron 任务等.
希望您使用的是持久的数据库连接, 这样每次请求都不会再有连接开销.
超时也可以配置到用户粒度:
postgresql=#> alter user app_user set statement_timeout TO 30000;
ALTER ROLE
题外话: 我们花了很多时间在其他常见的地方, 比如网络. 因此, 请确保在调用远程服务时始终设置超时时间:
import requests
response = requests.get(
'https://api.slow-as-hell.com',
timeout=3000,
)
5. 限制(Limit)
这与设置边界的最后一点有些相关. 有时我们的客户的一些行为是不可预知的
比如, 同一用户打开另一个选项卡并在第一次尝试卡住时再试一次并不罕见.
这就是为什么要限制
我们限制某一个查询的返回不超过 100 行数据:
# bad example
data = list(Sale.objects.all())[:100]
这很糟糕, 因为虽然只返回 100 行数据, 但是其实你已经把所有的行都取出来放进了内存.
我们再试试:
data = Sale.objects.all()[:100]
这个好多了, Django 会在 SQL 中使用 limit 子句来获取 100 行数据.
我们增加了限制, 但我们仍然有一个问题 -- 用户想要所有的数据, 但我们只给了他们 100 个, 用户现在认为只有 100 个数据了.
并非盲目的返回前 100 行, 我们先确认一下, 如果超过 100 行(通常是过滤以后), 我们会抛出一个异常:
LIMIT = 100
if Sales.objects.count() > LIMIT:
raise ExceededLimit(LIMIT)
return Sale.objects.all()[:LIMIT]
挺有用, 但是我们增加了一个新的查询
能不能做的更好呢? 我们可以这样:
LIMIT = 100
data = Sale.objects.all()[:(LIMIT + 1)]
if len(data) > LIMIT:
raise ExceededLimit(LIMIT)
return data
我们不取 100 行, 我们取 100 + 1 = 101 行, 如果 101 行存在, 那么我们知道超过了 100 行:
记住 LIMIT + 1 窍门, 有时候它会非常方便
6. 事务与锁的控制
这个比较难.
由于数据库中的锁机制, 我们开始在半夜发现事务超时错误.
(看来这个作者之前经常被半夜叫醒
来源: https://juejin.im/entry/5a72a49c6fb9a01cb74eb394