一, 简介
进程是计算机系统中资源分配的最小单位, 也是操作系统可以控制的最小单位, 在数据科学中很多涉及大量计算, CPU 密集型的任务都可以通过多进程并行运算的方式大幅度提升运算效率从而节省时间开销, 而在 Python 中实现多进程有多种方式, 本文就将针对其中较为易用的几种方式进行介绍.
二, 利用 multiprocessing 实现多进程
multiprocessing 是 Python 自带的用于管理进程的模块, 通过合理地利用 multiprocessing, 我们可以充分榨干所使用机器的 CPU 运算性能, 在 multiprocessing 中实现多进程也有几种方式.
2.1 Process
Process 是 multiprocessing 中最基础的类, 用于创建进程, 先来看看下面的示例:
single_process.py
- import multiprocessing
- import datetime
- import numpy as np
- import os
- def job():
- print(f'进程 {os.getpid()} 开始计算:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
- for j in range(100):
- _ = np.sum(np.random.rand(10000000))
- print(f'进程 {os.getpid()} 结束运算:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
- if __name__ == '__main__':
- process = multiprocessing.Process(target=job)
- process.start()
图 1 single_process.py 运行结果
在上面的例子中, 我们首先定义了函数 job(), 其连续执行一项运算任务 100 次, 并在开始和结束的时刻打印该进程对应的 pid, 用来唯一识别一个独立的进程, 接着利用 Process()将一个进程实例化, 其主要参数如下:
target: 需要执行的运算函数 args: target 函数对应的传入参数, 元组形式传入
在 process 创建完成之后, 我们对其调用. start()方法执行运算, 这样我们就实现了单个进程的创建与使用, 在此基础上, 我们将上述例子多线程化:
multi_processes.py
- import multiprocessing
- import datetime
- import numpy as np
- import os
- def job():
- print(f'进程 {os.getpid()} 开始计算:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
- for j in range(100):
- _ = np.sum(np.random.rand(10000000))
- print(f'进程 {os.getpid()} 结束运算:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
- if __name__ == '__main__':
- process_list = []
- for i in range(multiprocessing.cpu_count() - 1):
- process = multiprocessing.Process(target=job)
- process_list.append(process)
- for process in process_list:
- process.start()
- for process in process_list:
- process.join()
图 2 multi_processes.py 运行结果
在上面的例子中, 我们首先初始化用于存放多个线程的列表 process_list, 接着用循环的方式创建了 CPU 核心数 - 1 个进程并添加到 process_list 中, 再接着用循环的方式将所有进程逐个激活, 最后使用到. join()方法, 这个方法用于控制进程之间的并行, 如下例:
join_demo.py
- import multiprocessing
- import os
- import datetime
- import time
- def job():
- print(f'进程 {os.getpid()} 开始:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
- time.sleep(5)
- print(f'进程 {os.getpid()} 结束:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
- if __name__ == '__main__':
- process1 = multiprocessing.Process(target=job)
- process2 = multiprocessing.Process(target=job)
- process1.start()
- process1.join()
- process2.start()
- process2.join()
- print('='*200)
- process3 = multiprocessing.Process(target=job)
- process4 = multiprocessing.Process(target=job)
- process3.start()
- process4.start()
- process3.join()
- process4.join()
图 2 multi_processes.py 运行结果
观察对应进程执行的开始结束时间信息可以发现, 一个进程对象在. start()之后, 若在其他的进程对象. start()之前调用. join()方法, 则必须等到先前的进程对象运行结束才会接着执行. join()之后的非. join()的内容, 即前面的进程阻塞了后续的进程, 这种情况下并不能实现并行的多进程, 要想实现真正的并行, 需要现行对多个进程执行. start(), 接着再对这些进程对象执行. join(), 才能使得各个进程之间相互独立, 了解了这些我们就可以利用 Process 来实现多进程运算;
2.2 Pool
除了上述的 Process, 在 multiprocessing 中还可以使用 Pool 来快捷地实现多进程, 先来看下面的例子:
Pool_demo.py
- from multiprocessing import Pool
- import numpy as np
- from pprint import pprint
- def job(n):
- return np.mean(np.random.rand(n)), np.std(np.random.rand(n))
- if __name__ == '__main__':
- with Pool(5) as p:
- pprint(p.map(job, [i**10 for i in range(1, 6)]))
图 3 Pool_demo.py 运行结果
在上面的例子中, 我们使用 Pool 这个类, 将自编函数 job 利用. map()方法作用到后面传入序列每一个位置上, 与 Python 自带的 map()函数相似, 不同的是 map()函数将传入的函数以串行的方式作用到传入的序列每一个元素之上, 而 Pool()中的. map()方法则根据前面传入的并行数量 5, 以多进程并行的方式执行, 大大提升了运算效率.
三, 利用 joblib 实现多进程
与 multiprocessing 需要将执行运算的语句放置于含有 if __name__ == '__main__': 的脚本文件中下不同, joblib 将多进程的实现方式大大简化, 使得我们可以在 IPython 交互式环境下中灵活地使用它, 先看下面这个例子:
- from joblib import Parallel, delayed
- import numpy as np
- import time
- import datetime
- def job(i):
- start = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- time.sleep(5)
- end = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- return start, end
- result = Parallel(n_jobs=5, verbose=1)(delayed(job)(j) for j in range(5))
- result
图 4 joblib 并行示例
在上面的例子中, 我们从 joblib 中导入 Parallel 和 delayed, 仅用 Parallel(n_jobs=5, verbose=1)(delayed(job)(j) for j in range(5))一句就实现了并行运算的功能, 其中 n_jobs 控制并行进程的数量, verbose 参数控制是否打印进程运算过程, 如果你熟悉 scikit-learn, 相信这两个参数你一定不会陌生, 因为 scikit-learn 中 RandomForestClassifier 等可以并行运算的算法都是通过 joblib 来实现的.
来源: https://zhuanlan.zhihu.com/p/264795288