本文首发于知乎
python 中的多进程编程方式和多线程非常相似, 几乎可以说只是换了一些函数, 有了之前讲过的多线程基础, 很多地方我就只展示一些代码, 在涉及到差别的地方再着重说明
本文分为如下几个部分
事先说明
最简单的多进程
类的形式
进程池
进程之间内存独立
队列
pipe
value
进程锁
事先说明
有两点在写代码时需要注意
使用多进程时, 最好在文件中编写代码, 用 cmd 来执行, 在 jupyter 经常无法得到想要的结果
创建进程的代码一定要放在
if __name__ == '__main__'
里面
最简单的多进程
- import multiprocessing
- import time
- def myfun(num):
- time.sleep(1)
- print(num + 1)
- if __name__ == '__main__':
- for i in range(5):
- p = multiprocessing.Process(target = myfun, args = (i, ))
- p.start()
另外,
join is_alive daemon name current_process
等也都是一样的
类的形式
- import multiprocessing
- import requests
- from bs4 import BeautifulSoup
- class MyProcess(multiprocessing.Process):
- def __init__(self, i):
- multiprocessing.Process.__init__(self)
- self.i = i
- def run(self):
- url = 'https://movie.douban.com/top250?start={}&filter='.format(self.i*25)
- r = requests.get(url)
- soup = BeautifulSoup(r.content, 'html.parser')
- lis = soup.find('ol', class_='grid_view').find_all('li')
- for li in lis:
- title = li.find('span', class_="title").text
- print(title)
- if __name__ == '__main__':
- for i in range(10):
- p = MyProcess(i)
- p.start()
进程池
- import requests
- from bs4 import BeautifulSoup
- from multiprocessing import Pool, current_process
- def get_title(i):
- print('start', current_process().name)
- title_list = []
- url = 'https://movie.douban.com/top250?start={}&filter='.format(i*25)
- r = requests.get(url)
- soup = BeautifulSoup(r.content, 'html.parser')
- lis = soup.find('ol', class_='grid_view').find_all('li')
- for li in lis:
- title = li.find('span', class_="title").text
- # return title
- title_list.append(title)
- print(title)
- return(title_list)
- if __name__ == '__main__':
- pool = Pool()
- for i in range(10):
- pool.apply_async(get_title, (i, ))
- pool.close()
- pool.join()
- print('finish')
这里要说明一下
使用 Pool 时, 不指定进程数量, 则默认为 CPU 核心数量
核心数量对应电脑的 (任务管理器 - 性能) 逻辑处理器数量而不是内核数量(我的电脑 2 个内核, 有 4 个逻辑处理器, 所以这里默认使用 4 个进程)
进程数量可以是成百上千, 并不是说最大开启进程数量为 4, 只要用 Pool(10)就可以同时开启 10 个进程进行抓取
不过要注意一点, 无论多线程还是多进程, 数量开启太多都会造成切换费时, 降低效率, 所以慎重创建太多线程与进程
进程之间内存独立
多进程与多线程最大的不同在于, 多进程的每一个进程都有一份变量的拷贝, 进程之间的操作互不影响, 我们先来看看下面的例子
- import multiprocessing
- import time
- zero = 0
- def change_zero():
- global zero
- for i in range(3):
- zero = zero + 1
- print(multiprocessing.current_process().name, zero)
- if __name__ == '__main__':
- p1 = multiprocessing.Process(target = change_zero)
- p2 = multiprocessing.Process(target = change_zero)
- p1.start()
- p2.start()
- p1.join()
- p2.join()
- print(zero)
运行结果如下
- Process-1 1
- Process-1 2
- Process-1 3
- Process-2 1
- Process-2 2
- Process-2 3
- 0
上面结果显示, 新创建的两个进程各自把值增加到了 3, 二者不是一起将其加到了 6 的同时, 主进程的值还是 0 所以说每个进程都是将数据拷贝过去自己做, 并没有将结果与其他进程共享
但是对于写入文件则不同
- import multiprocessing
- import time
- def write_file():
- for i in range(30):
- with open('try.txt', 'a') as f:
- f.write(str(i) + ' ')
- if __name__ == '__main__':
- p1 = multiprocessing.Process(target = write_file)
- p2 = multiprocessing.Process(target = write_file)
- p1.start()
- p2.start()
- p1.join()
- p2.join()
得到的 try.txt 文件内容如下
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 0 15 2 16 17 3 4 18 19 5 20 6 21 22 8 9 23 10 11 25 26 12 13 27 28 14 29 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
可见两个进程都将数据写入了同一份文件中
下面我们要讨论第一种情况, 如果真的要在两个进程之间共享变量需要怎么办
队列
这里介绍进程之间的第一种交流方式队列 multiprocessing 模块中提供了
multiprocessing.Queue
, 它和 Queue.Queue 的区别在于, 它里面封装了进程之间的数据交流, 不同进程可以操作同一个
- multiprocessing.Queue
- from multiprocessing import Process, Queue
- def addone(q):
- q.put(1)
- def addtwo(q):
- q.put(2)
- if __name__ == '__main__':
- q = Queue()
- p1 = Process(target=addone, args = (q, ))
- p2 = Process(target=addtwo, args = (q, ))
- p1.start()
- p2.start()
- p1.join()
- p2.join()
- print(q.get())
- print(q.get())
运行结果如下
1
2
这个队列是线程进程安全的, 即对队列的每一次修改中间不会被中断从而造成结果错误
pipe
pipe 的功能和 Queue 类似, 可以理解成简化版的 Queue 我们先来看下面一个例子
- import random
- import time
- from multiprocessing import Process, Pipe, current_process
- def produce(conn):
- while True:
- new = random.randint(0, 100)
- print('{} produce {}'.format(current_process().name, new))
- conn.send(new)
- time.sleep(random.random())
- def consume(conn):
- while True:
- print('{} consume {}'.format(current_process().name, conn.recv()))
- time.sleep(random.random())
- if __name__ == '__main__':
- pipe = Pipe()
- p1 = Process(target=produce, args=(pipe[0],))
- p2 = Process(target=consume, args=(pipe[1],))
- p1.start()
- p2.start()
结果如下
- Process-1 produce 24
- Process-2 consume 24
- Process-1 produce 95
- Process-2 consume 95
- Process-1 produce 100
- Process-2 consume 100
- Process-1 produce 28
- Process-2 consume 28
- Process-1 produce 62
- Process-2 consume 62
- Process-1 produce 92
- Process-2 consume 92
- ....................
上面使用了 pipe 来实现生产消费模式
总结 Queue 与 pipe 之间的差别如下
Queue 使用 put get 来维护队列, pipe 使用 send recv 来维护队列
pipe 只提供两个端点, 而 Queue 没有限制这就表示使用 pipe 时只能同时开启两个进程, 可以像上面一样, 一个生产者一个消费者, 它们分别对这两个端点 (Pipe() 返回的两个值)操作, 两个端点共同维护一个队列如果多个进程对 pipe 的同一个端点同时操作, 就会发生错误 (因为没有上锁, 类似线程不安全) 所以两个端点就相当于只提供两个进程安全的操作位置, 以此限制了进程数量只能是 2
Queue 的封装更好, Queue 只提供一个结果, 它可以被很多进程同时调用; 而 Pipe()返回两个结果, 要分别被两个进程调用
Queue 的实现基于 pipe, 所以 pipe 的运行速度比 Queue 快很多
当只需要两个进程时使用 pipe 更快, 当需要多个进程同时操作队列时, 使用 Queue
value
当我们不是想维护一个队列, 而只是多个进程同时操作一个数字, 就需要提供一个可以在多个进程之间共享的方法, 即 Value
- from multiprocessing import Process, Value
- def f1(n):
- n.value += 1
- def f2(n):
- n.value -= 2
- if __name__ == '__main__':
- num = Value('d', 0.0)
- p1 = Process(target=f1, args=(num, ))
- p2 = Process(target=f2, args=(num, ))
- p1.start()
- p2.start()
- p1.join()
- p2.join()
- print(num.value)
运行结果为
-1.0
其中 Value('d', 0.0)中的 d 表示双精度浮点数, 更多类型可以看这里
除了 Value, 模块还提供了类似的 Array, 感兴趣的读者可以去官网查看用法
进程锁
既然变量在进程之间可以共享了, 那么同时操作一个变量导致的不安全也随之出现同多线程一样, 进程也是通过锁来解决, 而且使用方法都和多线程里相同
- lock = multiprocessing.Lock()
- lock.acquire()
- lock.release()
- with lock:
这些用法和功能都和多线程是一样的
另外,
multiprocessing.Semaphore Condition Event RLock
也和多线程相同
来源: https://juejin.im/post/5aa732b3518825558001e5af