process 类介绍
multiprocessing 模块官方说明文档 https://docs.python.org/3.5/library/multiprocessing.html
Process 类用来描述一个进程对象. 创建子进程的时候, 只需要传入一个执行函数和函数的参数即可完成 Process 示例的创建.
python 中的多线程无法利用多核优势, 如果想要充分地使用多核 CPU 的资源 (os.cpu_count() 查看), 在 python 中大部分情况需要使用多进程. Python 提供了 multiprocessing.
multiprocessing 模块用来开启子进程, 并在子进程中执行我们定制的任务(比如函数), 该模块与多线程模块 threading 的编程接口类似.
star() 方法启动进程.
join() 方法实现进程间的同步,
等待所有进程退出.
close() 用来阻止多余的进程涌入进程池 Pool 造成进程阻塞.
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
target 是函数名字, 需要调用的函数
args 函数需要的参数,
以 tuple 的形式传入
创建子进程方式一:
- rom multiprocessing import Process
- import time
- def f(name):
- time.sleep(2)
- print('hello', name)
- if __name__ == '__main__':
- p = Process(target=f, args=('bob',))
- p.start()
- p.join()
创建子进程方式二:
- from multiprocessing import Process
- import time
- class MyProcess(Process):
- def __init__(self,name):
- super().__init__()
- self.name=name
- def run(self):
- print('task <%s> is runing' % self.name)
- time.sleep(2)
- print('task <%s> is done' % self.name)
- if __name__ == '__main__':
- p=MyProcess('egon')
- p.start()
- print('主')
注意: run 方法是必须去重写的.
查看进程父子进程的进程号, 示例:
- from multiprocessing import Process
- import os
- def info(title):
- print(title)
- print('module name:', __name__)
- print('parent process:', os.getppid())
- print('process id:', os.getpid())
- print("\n\n")
- def f(name):
- info('\033[31;1mfunction f\033[0m')
- print('hello', name)
- if __name__ == '__main__':
- info('\033[32;1mmain process line\033[0m')
- p = Process(target=f, args=('bob',))
- p.start()
- p.join()
进程间通信
先要声明一点, 这里所说的进程间通信指的是具有父子关系的进程间通信机制, 如果两个进程间没有任何关系, 这里的机制是无法实现的.
Queues
使用方法跟 threading 里的 queue 差不多
- from multiprocessing import Process, Queue
- def f(q):
- q.put([42, None, 'hello'])
- if __name__ == '__main__':
- q = Queue()
- p = Process(target=f, args=(q,))
- p.start()
- print(q.get()) # prints "[42, None,'hello']"
- p.join()
- Pipes
常用来在两个进程间通信, 两个进程分别位于管道的两端.
multiprocessing.Pipe([duplex])
示例一:
- rom multiprocessing import Process, Pipe
- def send(pipe):
- pipe.send(['spam'] + [42, 'egg']) # send 传输一个列表
- pipe.close()
- if __name__ == '__main__':
- (con1, con2) = Pipe() # 创建两个 Pipe 实例
- sender = Process(target=send, args=(con1, )) # 函数的参数, args 一定是实例化之后的 Pip 变量, 不能直接写 args=(Pip(),)
- sender.start() # Process 类启动进程
- print("con2 got: %s" % con2.recv()) # 管道的另一端 con2 从 send 收到消息
- con2.close() # 关闭管道
结果:
con2 got: ['spam', 42, 'egg']
示例二:
- from multiprocessing import Process, Pipe
- def talk(pipe):
- pipe.send(dict(name='Bob', spam=42)) # 传输一个字典
- reply = pipe.recv() # 接收传输的数据
- print('talker got:', reply)
- if __name__ == '__main__':
- (parentEnd, childEnd) = Pipe() # 创建两个 Pipe() 实例, 也可以改成 conf1, conf2
- child = Process(target=talk, args=(childEnd,)) # 创建一个 Process 进程, 名称为 child
- child.start() # 启动进程
- print('parent got:', parentEnd.recv()) # parentEnd 是一个 Pip() 管道, 可以接收 child Process 进程传输的数据
- parentEnd.send({x * 2 for x in 'spam'}) # parentEnd 是一个 Pip() 管道, 可以使用 send 方法来传输数据
- child.join() # 传输的数据被 talk 函数内的 pip 管道接收, 并赋值给 reply
- print('parent exit')
结果:
parent got: {'name': 'Bob', 'spam': 42}
talker got: {'ss', 'aa', 'pp', 'mm'}
- parent exit
- Managers
A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types https://docs.python.org/3.5/library/stdtypes.html#list , https://docs.python.org/3.5/library/stdtypes.html#dict , https://docs.python.org/3.5/library/multiprocessing.html#multiprocessing.managers.Namespace , https://docs.python.org/3.5/library/multiprocessing.html#multiprocessing.Lock , https://docs.python.org/3.5/library/multiprocessing.html#multiprocessing.RLock , https://docs.python.org/3.5/library/multiprocessing.html#multiprocessing.Semaphore , https://docs.python.org/3.5/library/multiprocessing.html#multiprocessing.BoundedSemaphore , https://docs.python.org/3.5/library/multiprocessing.html#multiprocessing.Condition , https://docs.python.org/3.5/library/multiprocessing.html#multiprocessing.Event , https://docs.python.org/3.5/library/multiprocessing.html#multiprocessing.Barrier , https://docs.python.org/3.5/library/multiprocessing.html#multiprocessing.Queue , https://docs.python.org/3.5/library/multiprocessing.html#multiprocessing.Value and https://docs.python.org/3.5/library/multiprocessing.html#multiprocessing.Array . For example:
- from multiprocessing import Process, Manager
- def f(d, l):
- d[1] = '1'
- d['2'] = 2
- d[0.25] = None
- l.append(1)
- print(l)
- if __name__ == '__main__':
- with Manager() as manager:
- d = manager.dict()
- l = manager.list(range(5))
- p_list = []
- for i in range(10):
- p = Process(target=f, args=(d, l))
- p.start()
- p_list.append(p)
- for res in p_list:
- res.join()
- print(d)
- print(l)
进程池
在利用 Python 进行系统管理的时候, 特别是同时操作多个文件目录, 或者远程控制多台主机, 并行操作可以节约大量的时间. 当被操作对象数目不大时, 可以直接利用 multiprocessing 中的 Process 动态成生多个进程, 10 几个还好, 但如果是上百个, 上千个目标, 手动的去限制进程数量却又太过繁琐, 这时候进程池 Pool 发挥作用的时候就到了.
Pool 可以提供指定数量的进程, 供用户调用, 当有新的请求提交到 pool 中时, 如果池还没有满, 那么就会创建一个新的进程用来执行该请求; 但如果池中的进程数已经达到规定最大值, 那么该请求就会等待, 直到池中有进程结束, 才会创建新的进程来它. 这里有一个简单的例子:
- #!/usr/bin/env python
- # _*_ coding utf-8 _*_
- #Author: aaron
- from multiprocessing import Process, Pool
- import time, os
- def Foo(i):
- time.sleep(5)
- print('in process[Foo]', os.getpid())
- return i + 100
- def Bar(arg): # 父进程去执行, 而不是子进程调用
- print('-->exec done:', arg)
- print('in process[Bar]', os.getpid())
- if __name__ == '__main__':
- pool = Pool(5) # 允许进程池里同时放入 5 个进程 其他多余的进程处于挂起状态
- for i in range(10):
- pool.apply_async(func=Foo, args=(i,), callback=Bar)
- # pool.apply(func=Foo, args=(i,))
- print('end:', os.getpid())
- pool.close() # close() 必须在 join()前被调用
- pool.join() # 进程池中进程执行完毕后再关闭, 如果注释, 那么程序直接关闭.
pool.apply_async()用来向进程池提交目标请求.
pool.join()是用来等待进程池中的 worker 进程执行完毕, 防止主进程在 worker 进程结束前结束. 但 pool.join()必须使用在 pool.close()或者 pool.terminate()之后.
close()跟 terminate()的区别在于 close()会等待池中的 worker 进程执行结束再关闭 pool, 而 terminate()则是直接关闭.
result.successful()表示整个调用执行的状态, 如果还有 worker 没有执行完, 则会抛出 AssertionError 异常.
利用 multiprocessing 下的 Pool 可以很方便的同时自动处理几百或者上千个并行操作, 脚本的复杂性也大大降低.
来源: https://www.cnblogs.com/cjaaron/p/9175828.html