1. 管道
加锁可以保证多个进程修改同一块数据时, 同一时间只能有一个任务可以进行任务修改, 即串行修改, 速度慢了, 但牺牲了速度却保证了数据安全.
文件共享数据实现进程间的通信, 但问题是:
1. 效率低(共享数据基于文件, 而文件是硬盘上的数据)
2. 需要自己加锁处理
而使用 multiprocess 模块为我们提供的基于消息 IPC 通信机制: 通信和管道 可以帮我们解决这两个问题.
队列和管道都是将数据存放于内存内, 而队列又是基于 (管道 + 锁) 实现的, 可以让我们从复杂的锁问题中解脱出来来, 因而队列才是进程间通信的最佳选择
我们应该尽量避免使用共享数据, 尽可能的使用消息传递和队列, 避免初拉力复杂的同步和锁问题, 而且在进程数目增多时, 往往可以获得更好的可扩展性.
格式:
- conn1,conn2 = Pipe()
- conn1.recv()
- conn1.send()
数据接收一次就没有了
示例代码:
- from multiprocessing import Process
- def f1(conn):
- from_zhujincheng = conn.recv()
- print('我是子进程')
- print('我是来自主进程的消息>>>',from_zhujincheng)
- if __name__ == '__main__':
- conn1,conn2 = Pipe()
- p1 = Process(target=f1, args=(conn2,))
- p1.start()
- conn1.send('你好')
- print('我是主进程')
结果:
我是主进程
我是子进程
我是来自主进程的消息>>> 你好
案例:
- from multiprocessing import Process,Pipe
- def f1(conn):
- from_zhujincheng = conn.recv()
- print('我是子进程')
- print('来自主进程的消息:',from_zhujincheng)
- if __name__ == '__main__':
- conn1,conn2 = Pipe() #创建一个管道对象, 全双工, 返回管道的两端, 但是一端发送的消息, 只能另外一端接收, 自己这一端是不能接收的
- #可以将一端或者两端发送给其他的进程, 那么多个进程之间就可以通过这一个管道进行通信了
- p1 = Process(target=f1,args=(conn2,))
- p1.start()
- conn1.send('你在哪')
- print('我是主进程')
2. 事件 Event
线程的一个关键特性是每个线程都是独立运行且状态不可预测. 如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作, 这是线程同步问题就会变得非常棘手. 为了解决这些问题, 我们需要使用 Event 对象. 对象包含一个可由线程设置的信号标志, 它允许线程或进程等待某些事件的发生. 在初始情况下, Event 对象中的信号标志被设置为假(False).
e = Event() #初识状态是 false
e.wait() 当事件对象 e 的状态为 false 的时候, 在 wait 的地方会阻塞程序, 当对象状态为 true 的时候, 直接在这个 wait 地方继续往下执行
e.set() 将事件对象的状态改为 true,
e.is_set() 查看状态
e.clear() 将事件对象的状态改为 false
基于事件的进程间通信:
- import time
- from multiprocessing import Process,Event
- def f1(e):
- time.sleep(2)
- n = 100
- print('子进程计算结果为',n)
- e.set()
- if __name__ == '__main__':
- e = Event()
- p = Process(target=f1,args=(e,))
- p.start()
- print('主进程等待...')
- e.wait()
- print('结果已经写入文件了, 可以拿到这值')
示例:
- from multiprocessing import Process,Event
- e = Event() #创建事件对象, 这个对象的初识状态为 False
- print('e 的状态是:',e.is_set())
- print('进程运行到这里了')
- e.set() #将 e 的状态改为 True
- print('e 的状态是:',e.is_set())
- e.clear() #将 e 的状态改为 False
- e.wait() #e 这个事件对象如果值为 False, 就在我加 wait 的地方等待
- print('进程过了 wait')
3. 信号量(Semaphore)
信号量也是一把锁, 可以指定信号量为 5, 对比互斥锁同一时间只能有一个任务抢到锁去执行, 信号量同一时间可以有 5 个任务拿到锁去执行, 如果说互斥锁是合租房屋的人去抢一个厕所, 那么信号量就相当于一群路人去争抢公共厕所, 公共厕所有多个坑位, 这意味着同一时间可以有多个人上公共厕所, 但公共厕所容纳的人数是一定的, 这便是信号量的大小.
Semaphore 管理一个内置的计数器
每当调用 acquire 时内置计数器 - 1
调用 release()时内置计数器 + 1
计数器不能小于 0, 当计数器为 0 时, acquire()将阻塞线程直到其他线程调用 release()
格式:
- s = Semphore(4) # 内部维护了一个计数器, acquire-1,release+1, 为 0 的时候, 其他的进程都要在 acquire 之前等待
- s.acquire()
需要锁住的代码
s.release()
示例:
- import time
- import random
- from multiprocessing import Process,Semaphore
- def f1(i,s):
- s.acquire()
- print('%s 男嘉宾到了'%i)
- time.sleep(random.randint(1,3))
- s.release()
- if __name__ == '__main__':
- s = Semaphore(4) #计数器 4,acquire 一次减一, 为 0 , 其他人等待, release 加 1,
- for i in range(10):
- p = Process(target=f1,args=(i,s))
- p.start()
4. 进程池
进程的创建和销毁是很有消耗的, 影响代码执行效率
进程池:
方法:
map: 异步提交任务, 并且传参需要可迭代类型的数据, 自带 close 和 join 功能
- res = Apply(f1,args=(i,)) #同步执行任务, 必须等任务执行结束才能给进程池提交下一个任务, 可以直接拿到返回结果 res
- res_obj = Apply_async(f1,args=(i,)) #异步提交任务, 可以直接拿到结果对象, 从结果对象里面拿结果, 要用 get 方法, get 方法会阻塞程序, 没有拿到结果会一直等待
close : 锁住进程池, 防止有其他的新的任务在提交给进程池
Join : 等待着进程池将自己里面的任务都执行完
map 方法使用:
- def f1(n):
- for i in range(5):
- n = n + i
- if __name__ == '__main__':
- #统计进程池执行 100 个任务的时间
- s_time = time.time()
- pool = Pool(4) #里面这个参数是指定进程池中有多少个进程用的, 4 表示 4 个进程, 如果不传参数, 默认开启的进程数一般是 CPU 的个数
- # pool.map(f1,[1,2]) #参数数据必须是可迭代的
- pool.map(f1,range(100)) #参数数据必须是可迭代的, 异步提交任务, 自带 join 功能
进程池的异步方法和同步方法时间比较(异步效率更高)
进程池同步方法:
- import time
- from multiprocessing import Process,Pool
- def f1(n):
- time.sleep(1)
- # print(n)
- return n*n
- if __name__ == '__main__':
- pool = Pool(4)
- for i in range(10):
- print('xxxxxx')
- res = pool.apply(f1,args=(i,))
- print(res)
进程池异步方法:
- import time
- from multiprocessing import Process,Pool
- def f1(n):
- time.sleep(0.5)
- # print(n)
- return n*n
- if __name__ == '__main__':
- pool = Pool(4)
- res_list = []
- for i in range(10):
- print('xxxx')
- #异步给进程池提交任务
- res = pool.apply_async(f1,args=(i,))
- res_list.append(res)
- # print('等待所有任务执行完')
- # pool.close() #锁住进程池, 意思就是不让其他的程序再往这个进程池里面提交任务了
- # pool.join()
- #打印结果, 如果异步提交之后的结果对象
- for i in res_list:
- print(i.get())
- # time.sleep(10)
进程池同步方法与异步方法的时间比较
- import time
- from multiprocessing import Process,Pool
- def f1(n):
- time.sleep(0.5)
- # print(n)
- return n*n
- if __name__ == '__main__':
- pool = Pool(4)
- res_list = []
- for i in range(10):
- print('xxxx')
- #异步给进程池提交任务
- res = pool.apply_async(f1,args=(i,))
- res_list.append(res)
- # print('等待所有任务执行完')
- # pool.close() #锁住进程池, 意思就是不让其他的程序再往这个进程池里面提交任务了
- # pool.join()
- #打印结果, 如果异步提交之后的结果对象
- for i in res_list:
- print(i.get())
- # time.sleep(10)
结果
进程池用的时间 0.5779643058776855
主进程
>>>多进程的执行时间 1547113644.185883
5. 回调函数
可以为进程池内的每个进程绑定一个函数, 该函数在进程或线程的任务执行完毕后自动触发, 并接收任务的返回值当作参数, 该函数称为回调函数
apply_async(f1,args=(i,),callback=function) #将前面 f1 这个任务的返回结果作为参数传给 callback 指定的那个 function 函数
示例:
- import os
- from multiprocessing import Pool,Process
- def f1(n):
- print('进程池里面的进程 id',os.getpid())
- print('>>>>',n)
- return n*n
- def call_back_func(asdf):
- print('>>>>>>>>>>>>>',os.getpid())
- print('回调函数中的结果:',asdf)
- # print('回调函数中的结果:',s.get())
- if __name__ == '__main__':
- pool = Pool(4)
- res = pool.apply_async(f1,args=(5,),callback=call_back_func)
- pool.close()
- pool.join()
- # print(res.get())
- print('主进程的进程 id',os.getpid())
来源: http://www.bubuko.com/infodetail-2915428.html