大家好, 并发编程 进入第十一章.
前面两节, 我们讲了协程中的单任务和多任务
这节我们将通过一个小实战, 来对这些内容进行巩固.
在实战中, 将会用到以下知识点:
多线程的基本使用
Queue 消息队列的使用
Redis 的基本使用
asyncio 的使用
. 动态添加协程
在实战之前, 我们要先了解下在 asyncio 中如何将协程态添加到事件循环中的. 这是前提.
如何实现呢, 有两种方法:
主线程是同步的
- import time
- import asyncio
- from queue import Queue
- from threading import Thread
- def start_loop(loop):
- # 一个在后台永远运行的事件循环
- asyncio.set_event_loop(loop)
- loop.run_forever()
- def do_sleep(x, queue, msg=""):
- time.sleep(x)
- queue.put(msg)
- queue = Queue()
- new_loop = asyncio.new_event_loop()
- # 定义一个线程, 并传入一个事件循环对象
- t = Thread(target=start_loop, args=(new_loop,))
- t.start()
- print(time.ctime())
- # 动态添加两个协程
- # 这种方法, 在主线程是同步的
- new_loop.call_soon_threadsafe(do_sleep, 6, queue, "第一个")
- new_loop.call_soon_threadsafe(do_sleep, 3, queue, "第二个")
- while True:
- msg = queue.get()
- print("{} 协程运行完..".format(msg))
- print(time.ctime())
由于是同步的, 所以总共耗时 6+3=
9
秒.
输出结果
Thu May 31 22:11:16 2018
第一个 协程运行完..
Thu May 31 22:11:22 2018
第二个 协程运行完..
Thu May 31 22:11:25 2018
主线程是异步的, 这是重点, 一定要掌握..
- import time
- import asyncio
- from queue import Queue
- from threading import Thread
- def start_loop(loop):
- # 一个在后台永远运行的事件循环
- asyncio.set_event_loop(loop)
- loop.run_forever()
- async def do_sleep(x, queue, msg=""):
- await asyncio.sleep(x)
- queue.put(msg)
- queue = Queue()
- new_loop = asyncio.new_event_loop()
- # 定义一个线程, 并传入一个事件循环对象
- t = Thread(target=start_loop, args=(new_loop,))
- t.start()
- print(time.ctime())
- # 动态添加两个协程
- # 这种方法, 在主线程是异步的
- asyncio.run_coroutine_threadsafe(do_sleep(6, queue, "第一个"), new_loop)
- asyncio.run_coroutine_threadsafe(do_sleep(3, queue, "第二个"), new_loop)
- while True:
- msg = queue.get()
- print("{} 协程运行完..".format(msg))
- print(time.ctime())
输出结果
由于是同步的, 所以总共耗时 max(6, 3)=
6
秒
Thu May 31 22:23:35 2018
第二个 协程运行完..
Thu May 31 22:23:38 2018
第一个 协程运行完..
Thu May 31 22:23:41 2018
. 实战: 利用 redis 实现动态添加任务
对于并发任务, 通常是用生成消费模型, 对队列的处理可以使用类似 master-worker 的方式, master 主要用户获取队列的 msg,worker 用户处理消息.
为了简单起见, 并且协程更适合单线程的方式, 我们的主线程用来监听队列, 子线程用于处理队列. 这里使用 redis 的队列. 主线程中有一个是无限循环, 用户消费队列.
先安装 Redis
到 https://github.com/MicrosoftArchive/redis/releases 下载
解压到你的路径.
然后, 在当前路径运行 cmd, 运行 redis 的服务端.
服务开启后, 我们就可以运行我们的客户端了.
并依次输入 key=queue,value=5,3,1 的消息.
一切准备就绪之后, 我们就可以运行我们的代码了.
- import time
- import redis
- import asyncio
- from queue import Queue
- from threading import Thread
- def start_loop(loop):
- # 一个在后台永远运行的事件循环
- asyncio.set_event_loop(loop)
- loop.run_forever()
- async def do_sleep(x, queue):
- await asyncio.sleep(x)
- queue.put("ok")
- def get_redis():
- connection_pool = redis.ConnectionPool(host='127.0.0.1', db=0)
- return redis.Redis(connection_pool=connection_pool)
- def consumer():
- while True:
- task = rcon.rpop("queue")
- if not task:
- time.sleep(1)
- continue
- asyncio.run_coroutine_threadsafe(do_sleep(int(task), queue), new_loop)
- if __name__ == '__main__':
- print(time.ctime())
- new_loop = asyncio.new_event_loop()
- # 定义一个线程, 运行一个事件循环对象, 用于实时接收新任务
- loop_thread = Thread(target=start_loop, args=(new_loop,))
- loop_thread.setDaemon(True)
- loop_thread.start()
- # 创建 redis 连接
- rcon = get_redis()
- queue = Queue()
- # 子线程: 用于消费队列消息, 并实时往事件对象容器中添加新任务
- consumer_thread = Thread(target=consumer)
- consumer_thread.setDaemon(True)
- consumer_thread.start()
- while True:
- msg = queue.get()
- print("协程运行完..")
- print("当前时间:", time.ctime())
稍微讲下代码
loop_thread: 单独的线程, 运行着一个事件对象容器, 用于实时接收新任务.
consumer_thread: 单独的线程, 实时接收来自 Redis 的消息队列, 并实时往事件对象容器中添加新任务.
输出结果
Thu May 31 23:42:48 2018
协程运行完..
当前时间: Thu May 31 23:42:49 2018
协程运行完..
当前时间: Thu May 31 23:42:51 2018
协程运行完..
当前时间: Thu May 31 23:42:53 2018
我们在 Redis, 分别发起了 5s,3s,1s 的任务.
从结果来看, 这三个任务, 确实是并发执行的, 1s 的任务最先结束, 三个任务完成总耗时 5s
运行后, 程序是一直运行在后台的, 我们每一次在 Redis 中输入新值, 都会触发新任务的执行..
好了, 经过这个实战内容, 你应该对 asyncio 的实际应用有了一个更深刻的认识了, 至此, 你已经可以使用 asyncio 来实现任务的并发. 快去体验一下. 如果有什么疑问, 请在后台加我微信与我联系..
快关注一下, 成为 Python 高手
来源: https://www.cnblogs.com/wongbingming/p/9124142.html