数据库实现命令初始化
1. 实现命令主脚本
- # coding=utf-8
- from functools import wraps
- from getpass import getpass
- import sys
- import os
- # 执行到这里的时候 commands 是为空对象
- commands = {}
- # 工具函数
- def check_input_password(password):
- if len(password.strip()) <6:
- return check_input_password(getpass('密码长度必须大于 6 位, 请重新输入管理员账户密码:'))
- password2 = getpass('再次输入:')
- if password == password2.strip():
- return password
- else:
- return check_input_password(getpass('两次输入密码不一致, 请重新输入管理员账户密码:'))
- def check_input_username(username):
- if username.strip():
- return username
- else:
- return check_input_username(input('请输入管理员账户登录名:'))
- # 注册命令
- def registry_command(cmd_str):
- def decorate(func):
- commands[cmd_str] = func
- @wraps(func)
- def wrapper(*args, **kwargs):
- return func(*args, **kwargs)
- return wrapper
- return decorate
- @registry_command('init_db')
- # 到这里的时候 commands 里面添加一个元素
- #commands["init_db"]=init_db
- def init_db():
- from public import db
- from config import BASE_DIR
- import apps.account.models
- import apps.configuration.models
- import apps.deploy.models
- import apps.assets.models
- import apps.schedule.models
- import apps.setting.models
- user_input = input('是否要初始化数据库, 该操作会清空所有数据 [y|n]?')
- if user_input.strip() == 'y':
- db.drop_all()
- db.create_all()
- with open(os.path.join(BASE_DIR, 'libs', 'sql', 'permissions.sql'), encoding='utf-8') as f:
- line = f.readline()
- while line:
- if line.startswith('INSERT INTO'):
- db.engine.execute(line.strip())
- line = f.readline()
- print('数据库已初始化成功!')
- user_input = input('是否需要创建管理员账户 [y|n]?')
- if user_input.strip() == 'y':
- create_admin()
- @registry_command('create_admin')
- def create_admin():
- from apps.account.models import User
- admin = User.query.filter_by(is_supper=True).first()
- if admin:
- user_input = input('已存在管理员账户 <%s>, 需要重置密码 [y|n]?' % admin.username)
- if user_input.strip() == 'y':
- password = check_input_password(getpass('请输入新的管理员账户密码:'))
- admin.password = password
- admin.token_expired = 0
- admin.save()
- print('重置管理员密码成功!')
- else:
- username = check_input_username(input('请输入管理员账户登录名:'))
- password = check_input_password(getpass('请输入管理员账户密码:'))
- User(username=username, password=password, nickname='管理员', is_supper=True).save()
- print('创建管理员账户成功!')
- @registry_command('enable_admin')
- def enable_admin():
- from apps.account.models import User
- admin = User.query.filter_by(is_supper=True).first()
- admin.update(is_active=True)
- print('管理员账户状态已修改为启用!')
- def print_usage():
- print('''
- usage: %s <command>
- command:
- init_db 初始化数据库
- create_admin 创建管理员账户
- enable_admin 启用管理员账户, 用于登录失败次数过多账户被禁用时使用
- ''' % sys.argv[0])
- if __name__ == '__main__':
- #装饰器的作用是在把此模块加载到内存中的时候先
- #把 registry_command 函数执行了也就是初始化了 commands
- if len(sys.argv) == 1:
- print_usage()
- sys.exit(1)
- cmd = sys.argv.pop(0) #manage.py
- arg1 = sys.argv.pop(0) #init_db
- r_func = commands.get(arg1)
- if callable(r_func):
- r_func(*sys.argv)
- else:
- print('遇到了不可能会出现的错误!')
manage.py
INSERT INTO account_permissions (id, name, `desc`) VALUES (100, 'home_view', 'Dashboard');
-- 用户管理 -> 用户列表
- INSERT INTO account_permissions (id, name, `desc`) VALUES (101, 'account_user_view', '获取用户列表');
- INSERT INTO account_permissions (id, name, `desc`) VALUES (102, 'account_user_add', '添加用户');
- INSERT INTO account_permissions (id, name, `desc`) VALUES (103, 'account_user_edit', '编辑用户');
- INSERT INTO account_permissions (id, name, `desc`) VALUES (104, 'account_user_del', '删除用户');
- INSERT INTO account_permissions (id, name, `desc`) VALUES (105, 'account_user_disable', '禁用用户');
初始表数据
2. 统一配置信息模块
- from pytz import timezone
- import os
- DEBUG = True
- TIME_ZONE = timezone('Asia/Shanghai')
- BASE_DIR = os.path.dirname(os.path.abspath(__file__))
- #SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://root:[email protected]/testdb'
- SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(BASE_DIR, 'test.db')
- SQLALCHEMY_TRACK_MODIFICATIONS = False
- SQLALCHEMY_ECHO = False
- DOCKER_REGISTRY_SERVER = 'localhost:5000'
- DOCKER_REGISTRY_AUTH = {
- 'username': 'user', 'password': 'password'
- }
config.py
3. 创建数据库连接实例模块
- from flask import Flask
- from flask_sqlalchemy import SQLAlchemy
- import config
- App = Flask(__name__)
- App.config.from_object(config)
- db = SQLAlchemy(App)
- View Code
4. 实现效果
Flask 实现 API 接口调用鉴权
1. 定义一个总体装饰器
- def require_permission(str_code):
- def decorate(func):
- @wraps(func)
- def wrapper(*args, **kwargs):
- if not g.user.is_supper:
- or_list = [x.strip() for x in str_code.split('|')]
- # print(or_list)
- #["account_user_view","account_role_view"]
- #接口要求的权限集合
- for or_item in or_list:
- and_set = {x.strip() for x in or_item.split('&')}
- #把列表中单项元素放到一个集合中去
- #or_item.split('&') 这里的作用是防止把单词给打散了
- #split('&') 这里面只需要指定一个单词中不存在的符号就可以了
- #{"account_user_view"}
- and_set = {x.strip() for x in or_item}
- #{'a','c','c','o','u','n','t'...,'w'}
- # print(and_set) #{"account_user_view"}
- if and_set.issubset(g.user.permissions):
- break
- else:
- return json_response(message='Permission denied'), 403
- #当迭代的对象迭代完并为空时, 位于 else 的子句将执行
- #而如果在 for 循环中含有 break 时则直接终止循环, 并不会执行 else 子句
- return func(*args, **kwargs)
- return wrapper
- return decorate
接口装饰器
2. 定义处理对应 url 的函数
- @blueprint.route('/<int:u_id>', methods=['PUT'])
- @require_permission('account_user_edit | account_user_disable')
- def put(u_id):
- form, error = JsonParser('nickname', 'is_active',
- Argument('role_id', type=int, required=False, help='请选择角色'),
- Argument('email', nullable=True),
- Argument('password', nullable=False, required=False),
- Argument('mobile', nullable=True)).parse()
- if error is None:
- u_info = User.query.get_or_404(u_id)
- if form.password:
- u_info.password = form.password
- if not u_info.update(**form) and form.password:
- u_info.save()
- return json_response(u_info)
- return json_response(message=error)
- View Code
3. 定义无需鉴权的 url
- def init_app(App):
- Excel.init_excel(App)
- App.before_request(cross_domain_access_before)
- App.before_request(auth_middleware)
- def auth_middleware():
- if request.path == '/api/account/users/login/' or request.path.startswith('/api/apis/configs/') or request.path.startswith('/api/apis/files/') or "static" in request.path or "favicon.ico" in request.path or "/" == request.path or "/login" == request.path:
- return None
- token = request.headers.get('X-TOKEN')
- if token and len(token) == 32:
- g.user = User.query.filter_by(access_token=token).first()
- if g.user and g.user.is_active and g.user.token_expired>= time.time():
- g.user.token_expired = time.time() + 8 * 60 * 60
- g.user.save()
- return None
- return json_response(message='Auth fail, please login'), 401
- View Code
Flask 蓝图 url 配置
1.App 的 url 配置 无需经过蓝图 直接在 App 对象上进行设置
- from flask import Flask
- from flask_sqlalchemy import SQLAlchemy
- import config
- App = Flask(__name__)
- @App.route("/")
- def index():
- return App.send_static_file('index.html')
- @App.route("/login", methods=['GET'])
- def index_login():
- return App.send_static_file('index.html')
- View Code
2. 通过蓝图配置接口的 url 地址
1. 在每个蓝图 App 目录下的__init__.py 中配置一个 url 访问前缀
- from apps.account import user
- from apps.account import role
- def register_blueprint(App):
- App.register_blueprint(user.blueprint, url_prefix='/api/account/users')
- App.register_blueprint(role.blueprint, url_prefix='/api/account/roles')
- __init__py
2. 在 view 处理函数中指定最后的 url 地址 所以蓝图下的每个处理函数对应的 url 地址由这两个部分拼接组成
- @blueprint.route('/setting/password', methods=['POST'])
- def setting_password():
- form, error = JsonParser(
- Argument('password', help='请输入原密码'),
- Argument('newpassword', help='请输入新密码')
- ).parse()
- if error is None:
- if g.user.verify_password(form.password):
- g.user.password = form.newpassword
- g.user.save()
- else:
- return json_response(message='原密码错误')
- return json_response(message=error)
- views
3. 拼接后的 url 地址为 http://127.0.0.1:3000/api/account/users/setting/info
1./API/account/users 为 url_prefix='/api/account/users'
2./setting/info 为 @blueprint.route('/setting/info')
来源: http://www.bubuko.com/infodetail-3061037.html