使用 python 做 web 开发面临的一个最大的问题就是性能,在解决 C10K 问题上显的有点吃力。有些异步框架 Tornado、Twisted、Gevent 等就是为了解决性能问题。这些框架在性能上有些提升,但是也出现了各种古怪的问题难以解决。
在 python3.6 中,官方的异步协程库 asyncio 正式成为标准。在保留便捷性的同时对性能有了很大的提升, 已经出现许多的异步框架使用 asyncio。
使用较早的异步框架是 aiohttp,它提供了 server 端和 client 端,对 asyncio 做了很好的封装。但是开发方式和最流行的微框架 flask 不同,flask 开发简单,轻量,高效。
微服务是最近最火开发模式,它解决了复杂性问题,提高开发效率,便于部署等优点。
正是结合这些优点, 以 Sanic 为基础,集成多个流行的库来搭建微服务。 Sanic 框架是和 Flask 相似的异步协程框架,简单轻量,并且性能很高。
本项目就是以 Sanic 为基础搭建的微服务框架。
项目地址: sanic-ms
使用 sanic 异步框架,有较高的性能,但是使用不当会造成 blocking, 对于有 IO 请求的都要选用异步库。添加库要慎重。 sanic 使用 uvloop 异步驱动,uvloop 基于 libuv 使用 Cython 编写,性能比 nodejs 还要高。
功能说明:
- @app.listener('before_server_start')
- async def before_srver_start(app, loop):
- queue = asyncio.Queue()
- app.queue = queue
- loop.create_task(consume(queue, app.config.ZIPKIN_SERVER))
- reporter = AioReporter(queue=queue)
- tracer = BasicTracer(recorder=reporter)
- tracer.register_required_propagators()
- opentracing.tracer = tracer
- app.db = await ConnectionPool(loop=loop).init(DB_CONFIG)
- @app.middleware('request')
- async def cros(request):
- if request.method == 'POST' or request.method == 'PUT':
- request['data'] = request.json
- span = before_request(request)
- request['span'] = span
- @app.middleware('response')
- async def cors_res(request, response):
- span = request['span'] if 'span' in request else None
- if response is None:
- return response
- result = {'code': 0}
- if not isinstance(response, HTTPResponse):
- if isinstance(response, tuple) and len(response) == 2:
- result.update({
- 'data': response[0],
- 'pagination': response[1]
- })
- else:
- result.update({'data': response})
- response = json(result)
- if span:
- span.set_tag('http.status_code', "200")
- if span:
- span.set_tag('component', request.app.name)
- span.finish()
- return response
对抛出的异常进行处理,返回统一格式
创建 task 消费 queue 中对 span,用于日志追踪
由于使用的是异步框架,可以将一些 IO 请求并行处理
Example:
- async def async_request(datas):
- # async handler request
- results = await asyncio.gather(*[data[2] for data in datas])
- for index, obj in enumerate(results):
- data = datas[index]
- data[0][data[1]] = results[index]
- @user_bp.get('/<id:int>')
- @doc.summary("get user info")
- @doc.description("get user info by id")
- @doc.produces(Users)
- async def get_users_list(request, id):
- async with request.app.db.acquire(request) as cur:
- record = await cur.fetch(
- """ SELECT * FROM users WHERE id = $1 """, id)
- datas = [
- [record, 'city_id', get_city_by_id(request, record['city_id'])]
- [record, 'role_id', get_role_by_id(request, record['role_id'])]
- ]
- await async_request(datas)
- return record
get_city_by_id, get_role_by_id 是并行处理。
Peewee is a simple and small ORM. It has few (but expressive) concepts, making it easy to learn and intuitive to use。
ORM 使用 peewee, 只是用来做模型设计和 migration, 数据库操作使用 asyncpg。
Example:
- # models.py
- class Users(Model):
- id = PrimaryKeyField()
- create_time = DateTimeField(verbose_name='create time',
- default=datetime.datetime.utcnow)
- name = CharField(max_length=128, verbose_name="user's name")
- age = IntegerField(null=False, verbose_name="user's age")
- sex = CharField(max_length=32, verbose_name="user's sex")
- city_id = IntegerField(verbose_name='city for user', help_text=CityApi)
- role_id = IntegerField(verbose_name='role for user', help_text=RoleApi)
- class Meta:
- db_table = 'users'
- # migrations.py
- from sanic_ms.migrations import MigrationModel, info, db
- class UserMigration(MigrationModel):
- _model = Users
- # @info(version="v1")
- # def migrate_v1(self):
- # migrate(self.add_column('sex'))
- def migrations():
- try:
- um = UserMigration()
- with db.transaction():
- um.auto_migrate()
- print("Success Migration")
- except Exception as e:
- raise e
- if __name__ == '__main__':
- migrations()
asyncpg is the fastest driver among common Python, NodeJS and Go implementations
使用 asyncpg 为数据库驱动, 对数据库连接进行封装, 执行数据库操作。
不使用 ORM 做数据库操作,一个原因是性能,ORM 会有性能的损耗,并且无法使用 asyncpg 高性能库。另一个是单个微服务是很简单的,表结构不会很复杂,简单的 SQL 语句就可以处理来,没必要引入 ORM。使用 peewee 只是做模型设计
Example:
- sql = "SELECT * FROM users WHERE name=$1"
- name = "test"
- async with request.app.db.acquire(request) as cur:
- data = await cur.fetchrow(sql, name)
- async with request.app.db.transaction(request) as cur:
- data = await cur.fetchrow(sql, name)
使用 aiohttp 中的 client,对客户端进行了简单的封装,用于微服务之间访问。
Don't create a session per request. Most likely you need a session per application which performs all requests altogether. A session contains a connection pool inside, connection reusage and keep-alives (both are on by default) may speed up total performance.
Example:
- @app.listener('before_server_start')
- async def before_srver_start(app, loop):
- app.client = Client(loop, url='http://host:port')
- async def get_role_by_id(request, id):
- cli = request.app.client.cli(request)
- async with cli.get('/cities/{}'.format(id)) as res:
- return await res.json()
- @app.listener('before_server_stop')
- async def before_server_stop(app, loop):
- app.client.close()
对于访问不同的微服务可以创建多个不同的 client,这样每个 client 都会 keep-alives
使用官方 logging, 配置文件为 logging.yml, sanic 版本要 0.6.0 及以上。JsonFormatter 将日志转成 json 格式,用于输入到 ES
Enter OpenTracing: by offering consistent, expressive, vendor-neutral APIs for popular platforms, OpenTracing makes it easy for developers to add (or switch) tracing implementations with an O(1) configuration change. OpenTracing also offers a lingua franca for OSS instrumentation and platform-specific tracing helper libraries. Please refer to the Semantic Specification.
- @logger(type='method', category='test', detail='detail', description="des", tracing=True, level=logging.INFO)
- async def get_city_by_id(request, id):
- cli = request.app.client.cli(request)
api 文档使用 swagger 标准。
Example:
- from sanic_ms import doc
- @user_bp.post('/')
- @doc.summary('create user')
- @doc.description('create user info')
- @doc.consumes(Users)
- @doc.produces({'id': int})
- async def create_user(request):
- data = request['data']
- async with request.app.db.transaction(request) as cur:
- record = await cur.fetchrow(
- """ INSERT INTO users(name, age, city_id, role_id)
- VALUES($1, $2, $3, $4, $5)
- RETURNING id
- """, data['name'], data['age'], data['city_id'], data['role_id']
- )
- return {'id': record['id']}
在返回时,不要返回 sanic 的 response,直接返回原始数据,会在 Middleware 中对返回的数据进行处理,返回统一的格式,具体的格式可以 [查看]
单元测试使用 unittest。 mock 是自己创建了 MockClient,因为 unittest 还没有 asyncio 的 mock,并且 sanic 的测试接口也是发送 request 请求,所以比较麻烦. 后期可以使用 pytest。
Example:
- from sanic_ms.tests import APITestCase
- from server import app
- class TestCase(APITestCase):
- _app = app
- _blueprint = 'visit'
- def setUp(self):
- super(TestCase, self).setUp()
- self._mock.get('/cities/1',
- payload={'id': 1, 'name': 'shanghai'})
- self._mock.get('/roles/1',
- payload={'id': 1, 'name': 'shanghai'})
- def test_create_user(self):
- data = {
- 'name': 'test',
- 'age': 2,
- 'city_id': 1,
- 'role_id': 1,
- }
- res = self.client.create_user(data=data)
- body = ujson.loads(res.text)
- self.assertEqual(res.status, 200)
- coverage erase
- coverage run --source . -m sanic_ms tests
- coverage xml -o reports/coverage.xml
- coverage2clover -i reports/coverage.xml -o reports/clover.xml
- coverage html -d reports
使用 app.error_handler = CustomHander() 对抛出的异常进行处理
Example:
- from sanic_ms.exception import ServerError
- @visit_bp.delete('/users/<id:int>')
- async def del_user(request, id):
- raise ServerError(error='内部错误',code=10500, message="msg")
来源: https://juejin.im/post/5a40a83af265da431e16fe61