1.Python 多进程模块
Python 中的多进程是通过 multiprocessing 包来实现的, 和多线程的 threading.Thread 差不多, 它可以利用 multiprocessing.Process 对象来创建一个进程对象. 这个进程对象的方法和线程对象的方法差不多也有 start(), run(), join()等方法, 其中有一个方法不同 Thread 线程对象中的守护线程方法是 setDeamon, 而 Process 进程对象的守护进程是通过设置 daemon 属性来完成的.
下面说说 Python 多进程的实现方法, 和多线程类似
2.Python 多进程实现方法一
- from multiprocessing import Process
- def fun1(name):
- print('测试 %s 多进程' %name)
- if __name__ == '__main__':
- process_list = []
- for i in range(5): #开启 5 个子进程执行 fun1 函数
- p = Process(target=fun1,args=('Python',)) #实例化进程对象
- p.start()
- process_list.append(p)
- for i in process_list:
- p.join()
- print('结束测试')
结果
测试 Python 多进程
测试 Python 多进程
测试 Python 多进程
测试 Python 多进程
测试 Python 多进程
结束测试
Process finished with exit code 0
上面的代码开启了 5 个子进程去执行函数, 我们可以观察结果, 是同时打印的, 这里实现了真正的并行操作, 就是多个 CPU 同时执行任务. 我们知道进程是 python 中最小的资源分配单元, 也就是进程中间的数据, 内存是不共享的, 每启动一个进程, 都要独立分配资源和拷贝访问的数据, 所以进程的启动和销毁的代价是比较大了, 所以在实际中使用多进程, 要根据服务器的配置来设定.
3.Python 多进程实现方法二
还记得 python 多线程的第二种实现方法吗? 是通过类继承的方法来实现的, python 多进程的第二种实现方式也是一样的
- from multiprocessing import Process
- class MyProcess(Process): #继承 Process 类
- def __init__(self,name):
- super(MyProcess,self).__init__()
- self.name = name
- def run(self):
- print('测试 %s 多进程' % self.name)
- if __name__ == '__main__':
- process_list = []
- for i in range(5): #开启 5 个子进程执行 fun1 函数
- p = MyProcess('Python') #实例化进程对象
- p.start()
- process_list.append(p)
- for i in process_list:
- p.join()
- print('结束测试')
结果
测试 Python 多进程
测试 Python 多进程
测试 Python 多进程
测试 Python 多进程
测试 Python 多进程
结束测试
Process finished with exit code 0
效果和第一种方式一样.
我们可以看到 Python 多进程的实现方式和多线程的实现方式几乎一样.
Process 类的其他方法
构造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 线程组
target: 要执行的方法
name: 进程名
args/kwargs: 要传入方法的参数
实例方法:
is_alive(): 返回进程是否在运行, bool 类型.
join([timeout]): 阻塞当前上下文环境的进程程, 直到调用此方法的进程终止或到达指定的 timeout(可选参数).
start(): 进程准备就绪, 等待 CPU 调度
run():strat()调用 run 方法, 如果实例进程时未制定传入 target, 这 star 执行 t 默认 run()方法.
terminate(): 不管任务是否完成, 立即停止工作进程
属性:
daemon: 和线程的 setDeamon 功能一样
name: 进程名字
pid: 进程号
关于 join,daemon 的使用和 python 多线程一样, 这里就不在复述了, 大家可以看看以前的 python 多线程系列文章.
4.Python 多线程的通信
进程是系统独立调度核分配系统资源 (CPU, 内存) 的基本单位, 进程之间是相互独立的, 每启动一个新的进程相当于把数据进行了一次克隆, 子进程里的数据修改无法影响到主进程中的数据, 不同子进程之间的数据也不能共享, 这是多进程在使用中与多线程最明显的区别. 但是难道 Python 多进程中间难道就是孤立的吗? 当然不是, python 也提供了多种方法实现了多进程中间的通信和数据共享(可以修改一份数据)
进程对列 Queue
Queue 在多线程中也说到过, 在生成者消费者模式中使用, 是线程安全的, 是生产者和消费者中间的数据管道, 那在 python 多进程中, 它其实就是进程之间的数据管道, 实现进程通信.
- from multiprocessing import Process,Queue
- def fun1(q,i):
- print('子进程 %s 开始 put 数据' %i)
- q.put('我是 %s 通过 Queue 通信' %i)
- if __name__ == '__main__':
- q = Queue()
- process_list = []
- for i in range(3):
- p = Process(target=fun1,args=(q,i,)) #注意 args 里面要把 q 对象传给我们要执行的方法, 这样子进程才能和主进程用 Queue 来通信
- p.start()
- process_list.append(p)
- for i in process_list:
- p.join()
- print('主进程获取 Queue 数据')
- print(q.get())
- print(q.get())
- print(q.get())
- print('结束测试')
结果
子进程 0 开始 put 数据
子进程 1 开始 put 数据
子进程 2 开始 put 数据
主进程获取 Queue 数据
我是 0 通过 Queue 通信
我是 1 通过 Queue 通信
我是 2 通过 Queue 通信
结束测试
Process finished with exit code 0
上面的代码结果可以看到我们主进程中可以通过 Queue 获取子进程中 put 的数据, 实现进程间的通信.
管道 Pipe
管道 Pipe 和 Queue 的作用大致差不多, 也是实现进程间的通信, 下面之间看怎么使用吧
- from multiprocessing import Process, Pipe
- def fun1(conn):
- print('子进程发送消息:')
- conn.send('你好主进程')
- print('子进程接受消息:')
- print(conn.recv())
- conn.close()
- if __name__ == '__main__':
- conn1, conn2 = Pipe() #关键点, pipe 实例化生成一个双向管
- p = Process(target=fun1, args=(conn2,)) #conn2 传给子进程
- p.start()
- print('主进程接受消息:')
- print(conn1.recv())
- print('主进程发送消息:')
- conn1.send("你好子进程")
- p.join()
- print('结束测试')
结果
主进程接受消息:
子进程发送消息:
子进程接受消息:
你好主进程
主进程发送消息:
你好子进程
结束测试
Process finished with exit code 0
上面可以看到主进程和子进程可以相互发送消息
Managers
Queue 和 Pipe 只是实现了数据交互, 并没实现数据共享, 即一个进程去更改另一个进程的数据. 那么久要用到 Managers
- from multiprocessing import Process, Manager
- def fun1(dic,lis,index):
- dic[index] = 'a'
- dic['2'] = 'b'
- lis.append(index) #[0,1,2,3,4,0,1,2,3,4,5,6,7,8,9]
- #print(l)
- if __name__ == '__main__':
- with Manager() as manager:
- dic = manager.dict()# 注意字典的声明方式, 不能直接通过 {} 来定义
- l = manager.list(range(5))#[0,1,2,3,4]
- process_list = []
- for i in range(10):
- p = Process(target=fun1, args=(dic,l,i))
- p.start()
- process_list.append(p)
- for res in process_list:
- res.join()
- print(dic)
- print(l)
结果:
- {
- 0: 'a', '2': 'b', 3: 'a', 1: 'a', 2: 'a', 4: 'a', 5: 'a', 7: 'a', 6: 'a', 8: 'a', 9: 'a'
- }
- [0, 1, 2, 3, 4, 0, 3, 1, 2, 4, 5, 7, 6, 8, 9]
可以看到主进程定义了一个字典和一个列表, 在子进程中, 可以添加和修改字典的内容, 在列表中插入新的数据, 实现进程间的数据共享, 即可以共同修改同一份数据
5. 进程池
进程池内部维护一个进程序列, 当使用时, 则去进程池中获取一个进程, 如果进程池序列中没有可供使用的进进程, 那么程序就会等待, 直到进程池中有可用进程为止. 就是固定有几个进程可以使用.
进程池中有两个方法:
apply: 同步, 一般不使用
apply_async: 异步
- from multiprocessing import Process,Pool
- import os, time, random
- def fun1(name):
- print('Run task %s (%s)...' % (name, os.getpid()))
- start = time.time()
- time.sleep(random.random() * 3)
- end = time.time()
- print('Task %s runs %0.2f seconds.' % (name, (end - start)))
- if __name__=='__main__':
- pool = Pool(5) #创建一个 5 个进程的进程池
- for i in range(10):
- pool.apply_async(func=fun1, args=(i,))
- pool.close()
- pool.join()
- print('结束测试')
结果
- Run task 0 (37476)...
- Run task 1 (4044)...
- Task 0 runs 0.03 seconds.
- Run task 2 (37476)...
- Run task 3 (17252)...
- Run task 4 (16448)...
- Run task 5 (24804)...
- Task 2 runs 0.27 seconds.
- Run task 6 (37476)...
- Task 1 runs 0.58 seconds.
- Run task 7 (4044)...
- Task 3 runs 0.98 seconds.
- Run task 8 (17252)...
- Task 5 runs 1.13 seconds.
- Run task 9 (24804)...
- Task 6 runs 1.46 seconds.
- Task 4 runs 2.73 seconds.
- Task 8 runs 2.18 seconds.
- Task 7 runs 2.93 seconds.
- Task 9 runs 2.93 seconds.
结束测试
对 Pool 对象调用 join()方法会等待所有子进程执行完毕, 调用 join()之前必须先调用 close(), 调用 close()之后就不能继续添加新的 Process 了.
进程池 map 方法
案例来源于网络, 侵权请告知, 谢谢
因为网上看到这个例子觉得不错, 所以这里就不自己写案例, 这个案例比较有说服力
- import os
- import PIL
- from multiprocessing import Pool
- from PIL import Image
- SIZE = (75,75)
- SAVE_DIRECTORY = \'thumbs\'
- def get_image_paths(folder):
- return (os.path.join(folder, f)
- for f in os.listdir(folder)
- if \'jpeg\' in f)
- def create_thumbnail(filename):
- im = Image.open(filename)
- im.thumbnail(SIZE, Image.ANTIALIAS)
- base, fname = os.path.split(filename)
- save_path = os.path.join(base, SAVE_DIRECTORY, fname)
- im.save(save_path)
- if __name__ == \'__main__\':
- folder = os.path.abspath(
- \'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840\')
- os.mkdir(os.path.join(folder, SAVE_DIRECTORY))
- images = get_image_paths(folder)
- pool = Pool()
- pool.map(creat_thumbnail, images) #关键点, images 是一个可迭代对象
- pool.close()
- pool.join()
上边这段代码的主要工作就是将遍历传入的文件夹中的图片文件, 一一生成缩略图, 并将这些缩略图保存到特定文件夹中. 这我的机器上, 用这一程序处理 6000 张图片需要花费 27.9 秒. map 函数并不支持手动线程管理, 反而使得相关的 debug 工作也变得异常简单.
map 在爬虫的领域里也可以使用, 比如多个 URL 的内容爬取, 可以把 URL 放入元祖里, 然后传给执行函数.
来源: https://www.cnblogs.com/pypypy/p/11954269.html