目录
请求上下文源码分析, g 对象, 第三方插件 (flask_session,flask_script,wtforms), 信号
一, 请求上下文源码分析
二, g 对象
1, 什么是 g 对象
2,g 对象和 session 的区别
三, flask-session
1, 作用
2, 安装
3, 使用
四, flask-script
1, 什么是 flask_script
2, 安装
3, 使用
4, 自定义命令
五, 数据库连接池
1,pymsql 链接数据库
2, 数据库连接池版
六, wtforms
1, 安装
2, 使用
七, 信号
1, 什么是信号
2, 安装
3, 内置信号
4, 使用信号
5, 一个流程中的信号触发点
6, 自定义信号
八, 多 App 应用 (使用蓝图即可实现)
请求上下文源码分析, g 对象, 第三方插件 (flask_session,flask_script,wtforms), 信号
一, 请求上下文源码分析
第一阶段: 将 ctx(request,session) 放到 Local 对象上
第二阶段: 视图函数导入: request/session
request.method
-LocalProxy 对象. method, 执行 getattr 方法, getattr(self._get_current_object(), name)
-self._get_current_object() 返回 return self.__local(),self.__local(), 在 LocakProxy 实例化的时候, object.__setattr__(self, '_LocalProxy__local', local), 此处 local 就是: partial(_lookup_req_object, 'request')
- -def _lookup_req_object(name):
- top = _request_ctx_stack.top #_request_ctx_stack 就是 LocalStack() 对象, top 方法把 ctx 取出来
- if top is None:
- raise RuntimeError(_request_ctx_err_msg)
- return getattr(top, name)# 获取 ctx 中的 request 或 session 对象
第三阶段: 请求处理完毕
- 获取 session 并保存到 cookie
- 将 ctx 删除
程序运行, 两个 LocalStack() 对象, 一个里面放 request 和 session, 另一个放 g 和 current_app
二, g 对象
1, 什么是 g 对象
专门用来存储用户信息的 g 对象, g 的全称的为 global
g 对象在一次请求中的所有的代码的地方, 都是可以使用的
2,g 对象和 session 的区别
session 对象是可以跨 request 的, 只要 session 还未失效, 不同的 request 的请求会获取到同一个 session
但是 g 对象不是, g 对象不需要管过期时间, 请求一次就 g 对象就改变了一次, 或者重新赋值了一次
三, flask-session
1, 作用
将默认保存的签名 cookie 中的值, 保存到 Redis/Memcached/file/MongoDB/SQLAlchemy
2, 安装
pip3 install flask-session
3, 使用
(1) 使用 Redis 保存 session
- from flask import Flask,session
- from flask_session import RedisSessionInterface
- import Redis
- App = Flask(__name__)
- conn=Redis.Redis(host='127.0.0.1',port=6379)
- #use_signer 是否对 key 签名
- App.session_interface=RedisSessionInterface(conn,key_prefix='lqz')
- @App.route('/')
- def hello_world():
- session['name']='lqz'
- return 'Hello World!'
- if __name__ == '__main__':
- App.run()
(2) 通过修改配置实现可插拔式保存 session
- from Redis import Redis
- from flask.ext.session import Session
- App.config['SESSION_TYPE'] = 'redis' # 指定 session 保存的方式为保存在 Redis 中
- App.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='6379')
- Session(App)
问题: 设置 cookie 时, 如何设定关闭浏览器则 cookie 失效.
- response.set_cookie('k','v',exipre=None) # 这样设置即可
- # 在 session 中设置
- App.session_interface=RedisSessionInterface(conn,key_prefix='lqz',permanent=False)
- # 一般不用, 我们一般都设置超时时间, 多长时间后失效
问题: cookie 默认超时时间是多少? 如何设置超时时间
- # 源码 expires = self.get_expiration_time(App, session)
- 'PERMANENT_SESSION_LIFETIME': timedelta(days=31), # 这个配置文件控制
四, flask-script
1, 什么是 flask_script
用于实现类似于 django 中 python3 manage.py runserver ... 类似的命令
2, 安装
pip3 install flask-script
3, 使用
- from flask_script import Manager
- App = Flask(__name__)
- manager=Manager(App)
- ...
- if __name__ == '__main__':
- manager.run()
- # 以后在执行, 直接: python3 manage.py runserver
- #python3 manage.py runserver --help
4, 自定义命令
- @manager.command
- def custom(arg):
- """
- 自定义命令
- python manage.py custom 123
- :param arg:
- :return:
- """
- print(arg)
- @manager.option('-n', '--name', dest='name')
- #@manager.option('-u', '--url', dest='url')
- def cmd(name, url):
- """
- 自定义命令 (-n 也可以写成 --name)
- 执行: python manage.py cmd -n lqz -u http://www.oldboyedu.com
- 执行: python manage.py cmd --name lqz --url http://www.oldboyedu.com
- :param name:
- :param url:
- :return:
- """
- print(name, url)
- # 作用;
- # 把 Excel 的数据导入数据库, 定制个命令, 去执行
五, 数据库连接池
1,pymsql 链接数据库
- import pymysql
- conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123456', db='s8day127db')
- cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
- # cursor.execute("select id,name from users where name=%s and pwd=%s",['lqz','123',])
- cursor.execute("select id,name from users where name=%(user)s and pwd=%(pwd)s",{
- 'user':'lqz','pwd':'123'
- })
- obj = cursor.fetchone()
- conn.commit()
- cursor.close()
- conn.close()
- print(obj)
2, 数据库连接池版
setting.py
- from datetime import timedelta
- from Redis import Redis
- import pymysql
- from DBUtils.PooledDB import PooledDB, SharedDBConnection
- class Config(object):
- DEBUG = True
- SECRET_KEY = "umsuldfsdflskjdf"
- PERMANENT_SESSION_LIFETIME = timedelta(minutes=20)
- SESSION_REFRESH_EACH_REQUEST= True
- SESSION_TYPE = "redis"
- PYMYSQL_POOL = PooledDB(
- creator=pymysql, # 使用链接数据库的模块
- maxconnections=6, # 连接池允许的最大连接数, 0 和 None 表示不限制连接数
- mincached=2, # 初始化时, 链接池中至少创建的空闲的链接, 0 表示不创建
- maxcached=5, # 链接池中最多闲置的链接, 0 和 None 不限制
- maxshared=3,
- # 链接池中最多共享的链接数量, 0 和 None 表示全部共享. PS: 无用, 因为 pymysql 和 MySQLdb 等模块的 threadsafety 都为 1, 所有值无论设置为多少,_maxcached 永远为 0, 所以永远是所有链接都共享.
- blocking=True, # 连接池中如果没有可用连接后, 是否阻塞等待. True, 等待; False, 不等待然后报错
- maxusage=None, # 一个链接最多被重复使用的次数, None 表示无限制
- setsession=[], # 开始会话前执行的命令列表. 如:["set datestyle to ...", "set time zone ..."]
- ping=0,
- # ping MySQL 服务端, 检查是否服务可用.# 如: 0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
- host='127.0.0.1',
- port=3306,
- user='root',
- password='123456',
- database='s8day127db',
- charset='utf8'
- )
- class ProductionConfig(Config):
- SESSION_REDIS = Redis(host='192.168.0.94', port='6379')
- class DevelopmentConfig(Config):
- SESSION_REDIS = Redis(host='127.0.0.1', port='6379')
- class TestingConfig(Config):
- pass
utils/sql.py
- import pymysql
- from settings import Config
- class SQLHelper(object):
- @staticmethod
- def open(cursor):
- POOL = Config.PYMYSQL_POOL
- conn = POOL.connection()
- cursor = conn.cursor(cursor=cursor)
- return conn,cursor
- @staticmethod
- def close(conn,cursor):
- conn.commit()
- cursor.close()
- conn.close()
- @classmethod
- def fetch_one(cls,sql,args,cursor =pymysql.cursors.DictCursor):
- conn,cursor = cls.open(cursor)
- cursor.execute(sql, args)
- obj = cursor.fetchone()
- cls.close(conn,cursor)
- return obj
- @classmethod
- def fetch_all(cls,sql, args,cursor =pymysql.cursors.DictCursor):
- conn, cursor = cls.open(cursor)
- cursor.execute(sql, args)
- obj = cursor.fetchall()
- cls.close(conn, cursor)
- return obj
使用:
obj = SQLHelper.fetch_one("select id,name from users where name=%(user)s and pwd=%(pwd)s", form.data)
六, wtforms
1, 安装
pip3 install wtforms
2, 使用
(1) 使用一
- from flask import Flask, render_template, request, redirect
- from wtforms import Form
- from wtforms.fields import simple
- from wtforms import validators
- from wtforms import widgets
- App = Flask(__name__, template_folder='templates')
- App.debug = True
- class LoginForm(Form):
- # 字段 (内部包含正则表达式)
- name = simple.StringField(
- label='用户名',
- validators=[
- validators.DataRequired(message='用户名不能为空.'),
- validators.Length(min=6, max=18, message='用户名长度必须大于 %(min)d 且小于 %(max)d')
- ],
- widget=widgets.TextInput(), # 页面上显示的插件
- render_kw={'class': 'form-control'}
- )
- # 字段 (内部包含正则表达式)
- pwd = simple.PasswordField(
- label='密码',
- validators=[
- validators.DataRequired(message='密码不能为空.'),
- validators.Length(min=8, message='用户名长度必须大于 %(min)d'),
- validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
- message='密码至少 8 个字符, 至少 1 个大写字母, 1 个小写字母, 1 个数字和 1 个特殊字符')
- ],
- widget=widgets.PasswordInput(),
- render_kw={'class': 'form-control'}
- )
- @App.route('/login', methods=['GET', 'POST'])
- def login():
- if request.method == 'GET':
- form = LoginForm()
- return render_template('login.html', form=form)
- else:
- form = LoginForm(formdata=request.form)
- if form.validate():
- print('用户提交数据通过格式验证, 提交的值为:', form.data)
- else:
- print(form.errors)
- return render_template('login.html', form=form)
- if __name__ == '__main__':
- App.run()
login.HTML
- <!DOCTYPE HTML>
- <HTML lang="en">
- <head>
- <meta charset="UTF-8">
- <title>
- Title
- </title>
- </head>
- <body>
- <h1>
- 登录
- </h1>
- <form method="post">
- <p>
- {{form.name.label}} {{form.name}} {{form.name.errors[0] }}
- </p>
- <p>
- {{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}
- </p>
- <input type="submit" value="提交">
- </form>
- </body>
- </HTML>
(2) 使用 2
- from flask import Flask, render_template, request, redirect
- from wtforms import Form
- from wtforms.fields import core
- from wtforms.fields import html5
- from wtforms.fields import simple
- from wtforms import validators
- from wtforms import widgets
- App = Flask(__name__, template_folder='templates')
- App.debug = True
- class RegisterForm(Form):
- name = simple.StringField(
- label='用户名',
- validators=[
- validators.DataRequired()
- ],
- widget=widgets.TextInput(),
- render_kw={'class': 'form-control'},
- default='alex'
- )
- pwd = simple.PasswordField(
- label='密码',
- validators=[
- validators.DataRequired(message='密码不能为空.')
- ],
- widget=widgets.PasswordInput(),
- render_kw={'class': 'form-control'}
- )
- pwd_confirm = simple.PasswordField(
- label='重复密码',
- validators=[
- validators.DataRequired(message='重复密码不能为空.'),
- validators.EqualTo('pwd', message="两次密码输入不一致")
- ],
- widget=widgets.PasswordInput(),
- render_kw={'class': 'form-control'}
- )
- email = html5.EmailField(
- label='邮箱',
- validators=[
- validators.DataRequired(message='邮箱不能为空.'),
- validators.Email(message='邮箱格式错误')
- ],
- widget=widgets.TextInput(input_type='email'),
- render_kw={'class': 'form-control'}
- )
- gender = core.RadioField(
- label='性别',
- choices=(
- (1, '男'),
- (2, '女'),
- ),
- coerce=int # "1" "2"
- )
- city = core.SelectField(
- label='城市',
- choices=(
- ('bj', '北京'),
- ('sh', '上海'),
- )
- )
- hobby = core.SelectMultipleField(
- label='爱好',
- choices=(
- (1, '篮球'),
- (2, '足球'),
- ),
- coerce=int
- )
- favor = core.SelectMultipleField(
- label='喜好',
- choices=(
- (1, '篮球'),
- (2, '足球'),
- ),
- widget=widgets.ListWidget(prefix_label=False),
- option_widget=widgets.CheckboxInput(),
- coerce=int,
- default=[1, 2]
- )
- def __init__(self, *args, **kwargs):
- super(RegisterForm, self).__init__(*args, **kwargs)
- self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))
- def validate_pwd_confirm(self, field):
- """
- 自定义 pwd_confirm 字段规则, 例: 与 pwd 字段是否一致
- :param field:
- :return:
- """
- # 最开始初始化时, self.data 中已经有所有的值
- if field.data != self.data['pwd']:
- # raise validators.ValidationError("密码不一致") # 继续后续验证
- raise validators.StopValidation("密码不一致") # 不再继续后续验证
- @App.route('/register', methods=['GET', 'POST'])
- def register():
- if request.method == 'GET':
- form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial
- return render_template('register.html', form=form)
- else:
- form = RegisterForm(formdata=request.form)
- if form.validate():
- print('用户提交数据通过格式验证, 提交的值为:', form.data)
- else:
- print(form.errors)
- return render_template('register.html', form=form)
- if __name__ == '__main__':
- App.run()
- <!DOCTYPE HTML>
- <HTML lang="en">
- <head>
- <meta charset="UTF-8">
- <title>
- Title
- </title>
- </head>
- <body>
- <h1>
- 用户注册
- </h1>
- <form method="post" novalidate style="padding:0 50px">
- {% for field in form %}
- <p>
- {{field.label}}: {{field}} {{field.errors[0] }}
- </p>
- {% endfor %}
- <input type="submit" value="提交">
- </form>
- </body>
- </HTML>
七, 信号
1, 什么是信号
Flask 框架中的信号基于 blinker, 其主要就是让开发者可是在 flask 请求过程中定制一些用户行为
2, 安装
pip3 install blinker
3, 内置信号
- request_started = _signals.signal('request-started') # 请求到来前执行
- request_finished = _signals.signal('request-finished') # 请求结束后执行
- before_render_template = _signals.signal('before-render-template') # 模板渲染前执行
- template_rendered = _signals.signal('template-rendered') # 模板渲染后执行
- got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行
- request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行 (无论成功与否)
- appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行 (无论成功与否)
- appcontext_pushed = _signals.signal('appcontext-pushed') # 应用上下文 push 时执行
- appcontext_popped = _signals.signal('appcontext-popped') # 应用上下文 pop 时执行
- message_flashed = _signals.signal('message-flashed') # 调用 flask 在其中添加数据时, 自动触发
4, 使用信号
- from flask import Flask,signals,render_template
- App = Flask(__name__)
- # 往信号中注册函数
- def func(*args,**kwargs):
- print('触发型号',args,kwargs)
- signals.request_started.connect(func)
- # 触发信号: signals.request_started.send()
- @App.before_first_request
- def before_first1(*args,**kwargs):
- pass
- @App.before_first_request
- def before_first2(*args,**kwargs):
- pass
- @App.before_request
- def before_first3(*args,**kwargs):
- pass
- @App.route('/',methods=['GET',"POST"])
- def index():
- print('视图')
- return render_template('index.html')
- if __name__ == '__main__':
- App.wsgi_app
- App.run()
5, 一个流程中的信号触发点
a. before_first_request
b. 触发 request_started 信号
c. before_request
d. 模板渲染
渲染前的信号 before_render_template.send(App, template=template, context=context)
rv = template.render(context) # 模板渲染
渲染后的信号 template_rendered.send(App, template=template, context=context)
- e. after_request
- f. session.save_session()
g. 触发 request_finished 信号
如果上述过程出错:
触发错误处理信号 got_request_exception.send(self, exception=e)
h. 触发信号 request_tearing_down
6, 自定义信号
- from flask import Flask, current_app, flash, render_template
- from flask.signals import _signals
- App = Flask(import_name=__name__)
- # 自定义信号
- xxxxx = _signals.signal('xxxxx')
- def func(sender, *args, **kwargs):
- print(sender)
- # 自定义信号中注册函数
- xxxxx.connect(func)
- @App.route("/x")
- def index():
- # 触发信号
- xxxxx.send('123123', k1='v1')
- return 'Index'
- if __name__ == '__main__':
- App.run()
八, 多 App 应用 (使用蓝图即可实现)
from werkzeug.wsgi import DispatcherMiddleware from werkzeug.serving import run_simple from flask import Flask, current_app app1 = Flask('app01') app2 = Flask('app02') @app1.route('/index') def index(): return "app01" @app2.route('/index2') def index2(): return "app2" # http://www.oldboyedu.com/index # http://www.oldboyedu.com/sec/index2 dm = DispatcherMiddleware(app1, { '/sec': app2, }) if __name__ == "__main__": run_simple('localhost', 5000, dm)
来源: http://www.bubuko.com/infodetail-2960653.html