python 多线程适合 IO 密集型场景, 而在 CPU 密集型场景, 并不能充分利用多核 CPU, 而协程本质基于线程, 同样不能充分发挥多核的优势.
针对计算密集型场景需要使用多进程, python 的 multiprocessing 与 threading 模块非常相似, 支持用进程池的方式批量创建子进程.
创建单个 Process 进程 (使用 func)
只需要实例化 Process 类, 传递函数给 target 参数, 这点和 threading 模块非常的类似, args 为函数的参数
- import os
- from multiprocessing import Process
- # 子进程要执行的代码
- def task(name):
- print('run child process %s (%s)...' % (name, os.getpid()))
- if __name__ == '__main__':
- print('parent process %s.' % os.getpid())
- p = Process(target=task, args=('test',))
- p.start()
- p.join()
- print('process end.')
创建单个 Process 进程 (使用 class)
继承 Process 类, 重写 run 方法创建进程, 这点和 threading 模块基本一样
- import multiprocessing
- import os
- from multiprocessing import current_process
- class Worker(multiprocessing.Process):
- def run(self):
- name = current_process().name # 获取当前进程的名称
- print('run child process <%s> (%s)' % (name, os.getpid()))
- print('In %s' % self.name)
- return
- if __name__ == '__main__':
- print('parent process %s.' % os.getpid())
- p = Worker()
- p.start()
- p.join()
- print('process end.')
* 停止进程
terminate() 结束子进程, 但是会导致子进程的资源无法释放掉, 是不推荐的做法, 因为结束的时候不清楚子线程的运行状况, 有很大可能性导致子线程在不恰当的时刻被结束.
- import multiprocessing
- import time
- def worker():
- print('starting worker')
- time.sleep(0.1)
- print('finished worker')
- if __name__ == '__main__':
- p = multiprocessing.Process(target=worker)
- print('执行前:', p.is_alive())
- p.start()
- print('执行中:', p.is_alive())
- p.terminate() # 发送停止号
- print('停止:', p.is_alive())
- p.join()
- print('等待完成:', p.is_alive())
直接创建多个 Process 进程
- import multiprocessing
- def worker(num):
- print(f'Worker:%s %s', num)
- return
- if __name__ == '__main__':
- jobs = []
- for i in range(5):
- p = multiprocessing.Process(target=worker, args=(i,))
- jobs.append(p)
- p.start()
使用进程池创建多个进程
在利用 Python 进行系统管理的时候, 特别是同时操作多个文件目录, 或者远程控制多台主机, 并行操作可以节约大量的时间. 当被操作对象数目不大时, 可以直接利用 multiprocessing 中的 Process 动态成生多个进程, 十几个还好, 但如果是上百个, 上千个目标, 手动的去限制进程数量却又太过繁琐, 此时可以发挥进程池的功效.
Pool 可以提供指定数量的进程供用户调用, 当有新的请求提交到 pool 中时, 如果池还没有满, 那么就会创建一个新的进程用来执行该请求; 但如果池中的进程数已经达到规定最大值, 那么该请求就会等待, 直到池中有进程结束, 才会创建新的进程来它.
- import os
- import random
- import time
- from multiprocessing import Pool
- from time import ctime
- def task(name):
- print('start task %s (%s)...' % (name, os.getpid()))
- start = time.time()
- time.sleep(random.random() * 3)
- print('end task %s runs %0.2f seconds.' % (name, (time.time() - start)))
- if __name__ == '__main__':
- print('parent process %s.' % os.getpid())
- p = Pool() # 初始化进程池
- for i in range(5):
- p.apply_async(task, args=(i,)) # 追加任务 apply_async 是异步非阻塞的, 就是不用等待当前进程执行完毕, 随时根据系统调度来进行进程切换.
- p.close()
- p.join() # 等待所有结果执行完毕, 会等待所有子进程执行完毕, 调用 join() 之前必须先调用 close()
- print(f'all done at: {ctime()}')
如果关心每个进程的执行结果, 可以使用返回结果的 get 方法获取, 代码如下
- import os
- import random
- import time
- from multiprocessing import Pool, current_process
- from time import ctime
- def task(name):
- print('start task %s (%s)...' % (name, os.getpid()))
- start = time.time()
- time.sleep(random.random() * 3)
- print('end task %s runs %0.2f seconds.' % (name, (time.time() - start)))
- return current_process().name + 'done'
- if __name__ == '__main__':
- print('parent process %s.' % os.getpid())
- result = []
- p = Pool() # 初始化进程池
- for i in range(5):
- result.append(p.apply_async(task, args=(i,))) # 追加任务 apply_async 是异步非阻塞的, 就是不用等待当前进程执行完毕, 随时根据系统调度来进行进程切换.
- p.close()
- p.join() # 等待所有结果执行完毕
- for res in result:
- print(res.get()) # get() 函数得出每个返回结果的值
- print(f'all done at: {ctime()}')
来源: https://www.cnblogs.com/chenqionghe/p/9674596.html