流畅的 Python笔记.
本篇主要讨论 concurrent.futures 模块, 并用它实现一个简单的并发操作.
1. 前言
我们都知道, 如果有大量数据要处理, 或者要处理大量链接, 异步操作会比顺序操作快很多. Python 中, concurrent 和 asyncio 则是标准库中进行了高度封装的两个异步操作包. 它们在底层使用了 Python 提供的更基础的两个模块, 分别是 multiprocessing 和 threading.
future(全小写)并不具体指某个类的实例, 而且笔者查了老多资料也没看到哪个类叫做 future, 它泛指用于异步操作的对象. concurrent.futures 和 asyncio 这两个模块中有两个名为 Future 的类:
concurrent.futures.Future
和 asyncio.Future. 这两个类的作用相同, 都表示可能已经完成或尚未完成的延迟计算. 这两个 Future 的实例并不应该由我们手动创建, 而应交由并发框架 (也就是前面那两个模块) 来实例化.
本篇主要介绍 concurrent.futures 模块的简单使用, 并会将其和顺序计算进行对比, 其中还会涉及 GIL 和阻塞型 I/O 的概念. asyncio 将在下一篇进行介绍.
2. 顺序执行
首先实现一个下载各国国旗的程序, 随后再将它与并发版本进行对比. 以下是顺序执行的版本, 它下载人口前 20 的国家的国旗, 并保存到本地:
- # 代码 2.1,flags.py
- import os, time, sys # 这么引用只是为了节省篇幅, 并不提倡
- import requests # 第三方库
- POP20_CC = ("CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR").split()
- # 如果想测试自己的并发程序, 为了避免被误认为是 DOS 攻击, 请自建 http 服务
- BASE_URL = "http://flupy.org/data/flags"
- DEST_DIR = "downloads/"
- def save_flag(img, filename): # 保存图片到本地
- path = os.path.join(DEST_DIR, filename)
- with open(path, "wb") as fp:
- fp.write(img)
- def get_flag(cc): # 请求图片
- url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
- resp = requests.get(url)
- return resp.content
- def show(text): # 每获取一张图片就给出一个提示
- print(text, end=" ")
- sys.stdout.flush()
- def download_one(cc): # 下载一张图片
- image = get_flag(cc)
- show(cc)
- save_flag(image, cc.lower() + ".gif")
- return cc # 这个 return 主要是给后面的并发程序用的, 此处不要这行代码也可以
- def download_many(cc_list): # 下载多张图片
- for cc in sorted(cc_list):
- download_one(cc)
- return len(cc_list)
- def main(download_many): # 主程序, 接收一个函数为参数
- t0 = time.time() # 开始时间
- count = download_many(POP20_CC)
- elapsed = time.time() - t0 # 结束时间
- msg = "\n{} flags downloaded in {:.2f}s"
- print(msg.format(count, elapsed))
- if __name__ == "__main__":
- main(download_many)
- # 结果
- BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
- 20 flags downloaded in 14.83s # 耗时, 只做了一次
复制代码
3. concurrent.futures
现在我们用 concurrent.futures 模块将上述代码改写为线程版本, 使其异步执行, 其中有大部分函数延用上述代码.
3.1 futures.as_completed
首先实现一个更具有细节的版本, 我们手动提交线程, 然后再运行. 这个版本只是为了讲述细节, 所以并没有全部下载, 最大线程数也没有设置得很高:
- # 代码 3.1,flags_threadpool.py
- from concurrent import futures
- from flags import save_flag, get_flag, download_one, show, main
- def download_many_ac(cc_list):
- cc_list = cc_list[:5] # 只下载前五个用于测试
- with futures.ThreadPoolExecutor(len(cc_list) / 2) as executor:
- to_do = {} # 有意写出字典, 其实也可以是列表或集合, 但这是个惯用方法
- for cc in sorted(cc_list):
- future = executor.submit(download_one, cc)
- to_do[future] = cc
- msg = "Scheduled for {}: {}"
- print(msg.format(cc, future))
- results = []
- for future in futures.as_completed(to_do):
- res = future.result()
- msg = "{} result: {!r}"
- print(msg.format(future, res))
- results.append(res)
- return len(results)
- if __name__ == "__main__":
- main(download_many_ac)
- # 结果:
- Scheduled for BR: <Future at 0x1cbca5ab0f0 state=running>
- Scheduled for CN: <Future at 0x1cbcb339b00 state=running>
- Scheduled for ID: <Future at 0x1cbcb3490f0 state=running>
- Scheduled for IN: <Future at 0x1cbcb349748 state=pending>
- Scheduled for US: <Future at 0x1cbcb3497f0 state=pending>
- CN <Future at 0x1cbcb339b00 state=finished returned str> result: 'CN'
- BR <Future at 0x1cbca5ab0f0 state=finished returned str> result: 'BR'
- IN <Future at 0x1cbcb349748 state=finished returned str> result: 'IN'
- US <Future at 0x1cbcb3497f0 state=finished returned str> result: 'US'
- ID <Future at 0x1cbcb3490f0 state=finished returned str> result: 'ID'
- 5 flags downloaded in 2.39s # 20 个一起下载只需要 1.6s 左右
复制代码
解释:
在 concurrent.futures 中有一个名为 Executor 的抽象基类, 由它定义执行异步操作的接口. 在这个模块中有它的两个具体类: 的 ThreadPoolExecutor 和
ProcessPoolExecutor
, 前者是线程, 后者是进程. Executor 的第一个参数指定最大运行线程数.
Executor.submit(func, *args, **kwargs)
方法会在线程中执行
func(*args, **kwargs)
, 它将这个方法封装成 Future 对象并返回(假设这个实例叫做 future).submit 方法会对 future 进行排期, 如果运行的线程数没达到最大线程数, 则 future 会被立即运行, 并将其状态置为 running; 否则就等待, 并将其状态置为 pending. 这同时也表明, 线程在 submit 方法中启动.
futures.as_completed
函数的第一个参数是一个 future 序列, 在内部会被转换成 set. 它返回一个迭代器, 在 future 运行结束后产出 future. 在使用这个函数时还有一个惯用方法: 将 future 放到一个字典中. 因为 as_completed 返回的 future 的顺序不一定是传入时的顺序, 使用字典可以很轻松的做一些后续处理.
上述代码中, 从第 31-35 行的最开始两个字母是由 show 函数输出的. 光看上述结果, 会让人觉得线程是在 as_completed 中启动的, 而之所以结果输出得这么整齐, 是因为 for 循环里只是 "提交", 实际运行是在线程中. 如果在每次循环最后都执行 sleep(2), 你将会看到这样的结果:
- # 代码 3.2
- Scheduled for BR: <Future at 0x13e6b30b2b0 state=running>
- BR Scheduled for CN: <Future at 0x13e6b5820b8 state=running>
- CN Scheduled for ID: <Future at 0x13e6c099278 state=running>
- -- snip --
复制代码
concurrent.futures.Future
有一个 **result 方法, 它返回 future 中可调用对象运行完成后的结果, 或者重新抛出可调用对象运行时的异常 **. 如果 future 还未运行完成, 调用 future.result()将阻塞调用方所在的线程, 直到有结果可返回; 它还可以接受一个 timeout 参数用于指定运行时间, 如果在 timeout 时间内 future 没有运行完毕, 将抛出 TimeoutError 异常.
3.2 Executor.map
在代码 3.1 中, 我们自行提交线程, 其实, 上述可改为更简洁的版本: 使用 Executor.map 批量提交, 只需要新建一个 download_many 函数, 其余不变:
- # 代码 3.3
- def download_many(cc_list):
- with futures.ThreadPoolExecutor(len(cc_list)) as executor:
- res = executor.map(download_one, sorted(cc_list))
- return len(list(res))
- # 结果:
- JP RUBR EG CN VN BD TR FR ID NG DE IN PK ET PH IR US CD MX
- 20 flags downloaded in 1.69s
复制代码
Executor.map()方法和内置的 map 函数类似, 它将第一个参数 (可调用对象) 映射到第二个参数 (可迭代对象) 的每一个元素上以创建 Future 列表. Executor.map()方法内部也是通过调用 Future.submit 来创建 Future 对象.
3.3 比较
从上面代码可以看出, 虽然使用 Executor.map()的代码量比较少, 但 Executor.submit()和
futures.as_completed()
的组合更灵活.
Executor.map()更适合于需要批量处理的情况, 比如同一函数 (或者可调用对象) 不同参数. 而 Executor.submit()则更适合于零散的情况, 比如不同函数同一参数, 不同函数不同参数, 甚至两个线程毫无关联.
4. 补充
本文主体部分已经结束, 下面是一些补充.
4.1 I/O 密集型和 GIL
CPython 本身并不是线程安全的, 因此有全局解释器锁(Global Interpreter Lock, GIL), 一次只允许使用一个线程执行 Python 字节码.
以这个为基础, 按理说上述所有代码将都不能并行下载, 因为一次只能运行一个线程, 并且线程版本的运行时间应该比顺序版本的还要多才对(线程切换耗时). 但结果也表明, 两个线程版本的耗时都大大降低了.
这是因为, Python 标准库中所有执行阻塞型 I/O 操作的函数, 在等待操作系统返回结果时都会释放 GIL. 这就意味着, GIL 几乎对 I/O 密集型处理并没有什么影响, 依然可以使用多线程.
4.2 CPU 密集型
concurrent.futures 中还有一个
ProcessPoolExecutor
类, 它实现的是真正的并行计算. 它和 ThreadPoolExecutro 一样, 继承自 Executor, 两者实现了共同的接口, 因此使用 concurrent.futures 编写的代码可以轻松地在线程版本与进程版本之间转换, 比如要讲上述代码改为进程版本, 只需更改 download_many()中的一行代码:
- # 代码 3.4
- with futures.ThreadPoolExecutor(len(cc_list)) as executor:
- # 改为:
- with futures.ProcessPoolExecutor() as executor:
复制代码
也可以指定进程数, 但默认是 os.cpu_count()的返回值, 即电脑的 CPU 核心数.
这个类非常适合于 CPU 密集型作业上. 使用这个类实现的上述代码虽然比线程版本慢一些, 但依然比顺序版本快很多.
4.3 进度条
如果你用最新版 pip 下载过第三方库, 你会发现在下载时会有一个文字进度条. 在 Python 中想要实现这种效果可以使用第三方库 tqdm, 以下是它的一个简单用法:
- # 代码 3.5
- import tqdm
- from time import sleep
- for i in tqdm.tqdm(range(1000)):
- sleep(0.01)
- # 结果:
- 40%| | 400/1000 [00:10<00:00, 98.11it/s]
复制代码
来源: https://juejin.im/post/5b24e19d6fb9a00e325e6d07