python 的 multiprocessing 模块是用来创建多进程的, 下面对 multiprocessing 总结一下使用记录.
multiprocessing 创建多进程在 windows 和 linux 系统下的对比
- fork()
- import os
- pid = os.fork() # 创建一个子进程
- if pid == 0:
- print('这是子进程')
- print(os.getpid(),os.getppid())
- else:
- print('这是父进程')
- print(os.getpid())
- os.wait() # 等待子进程结束释放资源
fork 函数被调用后会返回两次, pid 为 0 的代表子进程, 其他返回子进程的 id 号表示父进程.
getpid 和 getppid 函数可以获取本进程和父进程的 id 号;
fork 方式的缺点:
兼容性差, 只能在类 linux 系统下使用, windows 系统不可使用;
扩展性差, 当需要多条进程的时候, 进程管理变得很复杂;
会产生 "孤儿" 进程和 "僵尸" 进程, 需要手动回收资源.
优点:
是系统自带的接近低层的创建方式, 运行效率高.
Process 创建进程
创建方式一:
- from multiprocessing import Queue, Process
- import os
- def test():
- time.sleep(2)
- print('this is process {}'.format(os.getpid()))
- if __name__ == '__main__':
- p = Process(target=test)
- p.start() # 子进程 开始执行
- p.join() # 等待子进程结束
- print('ths peocess is ended')
创建方式二:
- from multiprocessing import Queue, Process
- import os
- class MyProcess(Process):
- def run(self):
- time.sleep(2)
- print('this is process {}'.format(os.getpid()))
- def __del__(self):
- print('del the process {}'.format(os.getpid()))
- if __name__ == '__main__':
- p = MyProcess()
- p.start()
- print('ths process is ended')
- # 结果:
- ths process is ended
- this is process 7600
- del the process 7600
- del the process 12304
说明:
Process 对象可以创建进程, 但 Process 对象不是进程, 其删除与否与系统资源是否被回收没有直接的关系.
上例看到 del 方法被调用了两次, Process 进程创建时, 子进程会将主进程的 Process 对象完全复制一份, 这样在主进程和子进程各有一个 Process 对象, 但是 p1.start() 启动的是子进程, 主进程中的 Process 对象作为一个静态对象存在.
主进程执行完毕后会默认等待子进程结束后回收资源, 不需要手动回收资源;
join() 函数用来控制子进程结束的顺序, 主进程会阻塞等待子进程结束, 其内部也有一个清除僵尸进程的函数, 可以回收资源;
当子进程执行完毕后, 会产生一个僵尸进程, 其会被 join 函数回收, 或者再有一条进程开启, start 函数也会回收僵尸进程, 所以不一定需要写 join 函数.
windows 系统在子进程结束后会立即自动清除子进程的 Process 对象, 而 linux 系统子进程的 Process 对象如果没有 join 函数和 start 函数的话会在主进程结束后统一清除.
Process 对象分析
- class Process(object):
- def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
- pass
- # Process 对象是 python 用来创建进程的类
group: 扩展保留字段;
target: 目标代码, 一般是我们需要创建进程执行的目标函数.
name: 进程的名字, 如果不指定会自动分配一个;
args: 目标函数的普通参数;
kwargs: 目标函数的键值对参数;
# 方法
start(): 创建一个子进程并执行, 该方法一个 Process 实例只能执行一次, 其会创建一个进程执行该类的 run 方法.
run(): 子进程需要执行的代码;
join(): 主进程阻塞等待子进程直到子进程结束才继续执行, 可以设置等待超时时间 timeout.
terminate(): 使活着的进程终止;
is_alive(): 判断子进程是否还活着.
进程池 Pool
如果需要创建大量的进程, 就需要使用 Pool 了.
- from multiprocessing import Queue, Process, Pool
- import os
- def test():
- time.sleep(2)
- print('this is process {}'.format(os.getpid()))
- def get_pool(n=5):
- p = Pool(n) # 设置进程池的大小
- for i in range(10):
- p.apply_async(test)
- p.close() # 关闭进程池
- p.join()
- if __name__ == '__main__':
- get_pool()
- print('ths process is ended')
分析:
如上, 进程池 Pool 被创建出来后, 即使实际需要创建的进程数远远大于进程池的最大上限, p1.apply_async(test) 代码依旧会不停的执行, 并不会停下等待; 相当于向进程池提交了 10 个请求, 会被放到一个队列中;
当执行完 p1 = Pool(5) 这条代码后, 5 条进程已经被创建出来了, 只是还没有为他们各自分配任务, 也就是说, 无论有多少任务, 实际的进程数只有 5 条, 计算机每次最多 5 条进程并行.
当 Pool 中有进程任务执行完毕后, 这条进程资源会被释放, pool 会按先进先出的原则取出一个新的请求给空闲的进程继续执行;
当 Pool 所有的进程任务完成后, 会产生 5 个僵尸进程, 如果主线程不结束, 系统不会自动回收资源, 需要调用 join 函数去回收.
join 函数是主进程等待子进程结束回收系统资源的, 如果没有 join, 主程序退出后不管子进程有没有结束都会被强制杀死;
创建 Pool 池时, 如果不指定进程最大数量, 默认创建的进程数为系统的内核数量.
Pool 对象分析
- class Pool(object):
- def __init__(self, processes=None, initializer=None, initargs=(),
- maxtasksperchild=None, context=None):
- pass
- # 初始化参数
processes: 进程池的大小, 默认 cpu 内核的数量
initializer: 创建进程执行的目标函数, 其会按照进程池的大小创建相应个数的进程;
initargs: 目标函数的参数
context: 代码的上下文
# 方法
apply(): 使用阻塞方式调用 func;
apply_async(): 使用非阻塞方式条用 func;
close(): 关闭 Pool, 使其不再接受新的任务;
terminate(): 不管任务是否完成, 立即终止;
join(): 主进程阻塞, 等待子进程的退出, 必须在 close() 后面使用;
map(self, func, iterable, chunksize=None): 多进程执行一个函数, 传入不同的参数;
starmap(self, func, iterable, chunksize=None): 和 map 类似, 但 iterable 参数可解压缩;
starmap_async(self, func, iterable, chunksize=None, callback=None,error_callback=None): 使用异步的方式的 starmap,callback 为返回后的处理函数
map_async(self, func, iterable, chunksize=None, callback=None,error_callback=None): 异步方式的 map
实例
- from multiprocessing import Pool
- import os
- def test(n):
- time.sleep(1)
- print('this is process {}'.format(os.getpid()))
- return n
- def test1(n, m):
- print(n, m)
- print('this is process {}'.format(os.getpid()))
- def back_func(values): # 多进程执行完毕会返回所有的结果的列表
- print(values)
- def back_func_err(values): # 多进程执行完毕会返回所有错误的列表
- print(values)
- def get_pool(n=5):
- p = Pool(n)
- # p.map(test, (i for i in range(10))) # 阻塞式多进程执行
- # p.starmap(test1, zip([1,2,3],[3,4,5])) # 阻塞式多进程执行多参数函数
- # 异步多进程执行函数
- p.map_async(test, (i for i in range(5)), callback=back_func, error_callback=back_func_err)
- # 异步多进程执行多参数函数
- p.starmap_async(test1, zip([1,2,3],[3,4,5]), callback=back_func, error_callback=back_func_err)
- print('-----')
- p.close()
- p.join()
- if __name__ == '__main__':
- get_pool()
- print('ths process is ended')
进程锁
进程虽然不像线程那样共享内存的数据, 而是每个进程有单独的内存, 但多进程也是共享文件系统的, 即硬盘系统; 当多进程同时写入文件操作时, 可能造成数据的破坏, 因此进程也存在同步锁.
- from multiprocessing import Pool, Lock
- muex = Lock()
- def test():
- if muex.acquire():
- f = open('./test_pro.txt', 'r+', encoding='utf-8')
- x = f.read()
- if not x:
- f.write('0')
- else:
- f.seek(0)
- f.write(str(int(x)+1))
- f.close()
- muex.release()
- if __name__ == '__main__':
- p = Pool(5)
- for i in range(10):
- p.apply_async(test)
- p.close()
- p.join()
- with open('./test_pro.txt', 'r+', encoding='utf-8') as f:
- print(f.read())
进程锁可以保证文件系统的安全, 但是它使得并行变成了串行, 效率下降了, 也可能造成死锁问题, 一般避免用锁机制.
来源: https://www.cnblogs.com/cwp-bg/p/9564065.html