一, 使用 asyncio 总结
最近在公司的一些项目中开始慢慢使用 python 的 asyncio, 使用的过程中也是各种踩坑, 遇到的问题也不少, 其中有一次是内存的问题, 自己也整理了遇到的问题以及解决方法详细内容看: https://www.syncd.cn/article/memory_trouble
在前面整理的三篇 asyncio 文章中, 也都是使用 asyncio 的一些方法, 但是在实际项目中使用还是避免不了碰到问题, 在这周的工作中遇到之前碰见过的问题, 一个初学 asyncio 写代码中经常会碰到的问题, 我的业务代码在运行一段时间后提示如下错误提示:
Task was destroyed but it is pending!task: <Task pending coro=<HandleMsg.get_msg() done, defined at ex10.py:17> wait_for=<Future cancelled>>
这个错误我在前面几篇关于 asyncio 的系列文章中也反复说过这个问题, 我也认为自己不会在出现这种问题, 但是意外的是, 我的程序还是出现了这个错误.
我将我的业务代码通过一个 demo 代码进行模拟复现以及解决这个问题, 下面整理的就是这个过程
二,"Task was destroyed but it is pending!"
我通过下面这张图先描述一下 demo 程序的逻辑:
- import asyncio
- from asyncio import Queue
- import uuid
- from asyncio import Lock
- from asyncio import CancelledError
- queue = Queue()
- class HandleMsg(object):
- def __init__(self, unid, coroutine_queue, handle_manager):
- self.unid = unid
- self.coroutine_queue = coroutine_queue
- self.handle_manager = handle_manager
- async def get_msg(self):
- while True:
- coroutine_msg = await self.coroutine_queue.get()
- msg_type = coroutine_msg.get("msg")
- if msg_type == "start":
- print("recv unid [%s] is start" % self.unid)
- else:
- print("recv unid [%s] is end" % self.unid)
- # 每个当一个 unid 收到 end 消息为结束
- await self.handle_manager.del_unid(self.unid)
- class HandleManager(object):
- """
- 用于 unid 和 queue 的关系的处理
- """
- def __init__(self):
- self.loop = asyncio.get_event_loop()
- self.lock = Lock(loop=self.loop)
- self.handle_dict = dict()
- async def unid_bind(self, unid, coroutine_queue):
- async with self.lock:
- self.handle_dict[unid] = coroutine_queue
- async def get_queue(self, unid):
- async with self.lock:
- if unid in self.handle_dict:
- return self.handle_dict[unid]
- async def del_unid(self, unid):
- async with self.lock:
- if unid in self.handle_dict:
- self.handle_dict.pop(unid)
- def make_uniqueid():
- """
- 生成 unid
- """
- uniqueid = str(uuid.uuid1())
- uniqueid = uniqueid.split("-")
- uniqueid.reverse()
- uniqueid = "".join(uniqueid)
- return uniqueid
- async def product_msg():
- """
- 生产者
- """
- while True:
- unid = make_uniqueid()
- msg_start = {"unid": unid, "msg": "start"}
- await queue.put(msg_start)
- msg_end = {"unid": unid, "msg": "end"}
- await queue.put(msg_end)
- loop = asyncio.get_event_loop()
- await asyncio.sleep(0.2, loop=loop)
- async def consumer_from_queue(handle_manager):
- """
- 消费者
- """
- while True:
- msg = await queue.get()
- print("consumer recv %s" % msg)
- msg_type = msg.get("msg")
- unid = msg.get("unid")
- if msg_type == "start":
- coroutine_queue = Queue() # 用于和 handle_msg 协程进行数据传递
- handle_msg = HandleMsg(unid, coroutine_queue, handle_manager)
- await handle_manager.unid_bind(unid, coroutine_queue)
- await coroutine_queue.put(msg)
- loop = asyncio.get_event_loop()
- # 每次的 start 消息创建一个 task 去处理消息
- loop.create_task(handle_msg.get_msg())
- else:
- coroutine_queue = await handle_manager.get_queue(unid)
- await coroutine_queue.put(msg)
- if __name__ == "__main__":
- loop = asyncio.get_event_loop()
- handle_manager = HandleManager()
- # 在最开始创建了两个 task 分别是生产者和消费者
- loop.create_task(product_msg())
- loop.create_task(consumer_from_queue(handle_manager))
- loop.run_forever()
上面的代码表面上看没啥问题, 我们先看看运行效果:
- consumer recv {
- 'unid': '784f436cfaf388f611e94ca974e1ffbe', 'msg': 'start'
- }
- consumer recv {
- 'unid': '784f436cfaf388f611e94ca974e1ffbe', 'msg': 'end'
- }
- Task was destroyed but it is pending!
- task: <Task pending coro=<HandleMsg.get_msg() done, defined at demo.py:17> wait_for=<Future cancelled>>
- Task was destroyed but it is pending!
- task: <Task pending coro=<HandleMsg.get_msg() done, defined at demo.py:17> wait_for=<Future cancelled>>
- Task was destroyed but it is pending!
- task: <Task pending coro=<HandleMsg.get_msg() done, defined at demo.py:17> wait_for=<Future cancelled>>
- Task was destroyed but it is pending!
- task: <Task pending coro=<HandleMsg.get_msg() done, defined at demo.py:17> wait_for=<Future cancelled>>
- Task was destroyed but it is pending!
- task: <Task pending coro=<HandleMsg.get_msg() done, defined at demo.py:17> wait_for=<Future cancelled>>
- Task was destroyed but it is pending!
- task: <Task pending coro=<HandleMsg.get_msg() done, defined at demo.py:17> wait_for=<Future cancelled>>
- ..........
程序没运行一段时间都会出现上面显示的错误提示, 我先看看错误提示的信息:
- Task was destroyed but it is pending!
- task: <Task pending coro=<HandleMsg.get_msg() done, defined at demo.py:17> wait_for=<Future cancelled>>
上面提示的其实就是我的 task 是在 pendding 状态的时候被 destroyed 了, 代码行数以及调用方法都告诉我们了是在: HandleMsg.get_msg() done, defined at demo.py:17
其实问题也比较好找, 我们为每个 unid 创建了一个 task 来处理消息, 但是当我们收到每个 unid 消息的 end 消息之后其实这个 task 任务对于我们来说就已经完成了, 同时我们删除了我的 unid 和 queue 的绑定, 但是我们并没有手动去取消这个 task.
注意: 这里我其实也有一个不理解的地方: 关于这个 task 为什么会会 destroyed, 这个协程里是一个死循环一直在收消息, 当 queue 里面没有消息协程也应该一直在 await 地方在等待才对, 但是如果我们把收到 end 消息的那个地方的删除 unid 和 queue 的绑定关系不删除, 那么这个任务是不会被 descroyed. 所以没有完全明白这里的机制, 如果明白的同学欢迎留言讨论
但是即使上面的机制我们有点不是特别明白, 我们其实也应该把这个 task 手动进行 cancel 的, 我们们将上面的代码稍微进行改动如下:
- async def get_msg(self):
- try:
- while True:
- coroutine_msg = await self.coroutine_queue.get()
- msg_type = coroutine_msg.get("msg")
- if msg_type == "start":
- print("recv unid [%s] is start" % self.unid)
- else:
- print("recv unid [%s] is end" % self.unid)
- # 每个当一个 unid 收到 end 消息为结束
- await self.handle_manager.del_unid(self.unid)
- current_task = asyncio.Task.current_task()
- current_task.cancel() # 手动 cancel 当前的当前的 task
- except CancelledError as e:
- print("unid [%s] cancelled success" %self.unid)
这里有个问题需要注意就是当我们对 task 进行 cancel 的时候会抛出 cancelledError 异常, 我们需要对异常进行处理. 官网也对此进行专门说明:
内容如下:
- cancel()
- Request that this task cancel itself.
- This arranges for a CancelledError to be thrown into the wrapped coroutine on the next cycle through the event loop. The coroutine then has a chance to clean up or even deny the request using try/except/finally.
- Unlike Future.cancel(), this does not guarantee that the task will be cancelled: the exception might be caught and acted upon, delaying cancellation of the task or preventing cancellation completely. The task may also return a value or raise a different exception.
- Immediately after this method is called, cancelled() will not return True (unless the task was already cancelled). A task will be marked as cancelled when the wrapped coroutine terminates with a CancelledError exception (even if cancel() was not called).
三, 小结
虽然还有一些地方不太明白, 但是随着用的越多, 碰到的问题越多, 一个一个解决, 可能现在对某些知识还有点模糊, 但是至少比刚开始使用 asyncio 的时候清晰了好多, 之前整理的三篇文章的连接如下:
- https://www.syncd.cn/article/asyncio_article_01
- https://www.syncd.cn/article/asyncio_article_02
- https://www.syncd.cn/article/asyncio_article_03
来源: https://www.cnblogs.com/zhaof/p/10581972.html