- import time
- import random
- from multiprocessing import Process,JoinableQueue,Queue
- # 平衡生产者与消费者之间的速度差
- # 程序解开耦合
- '''
- 在消费者这一端
- 每次获得一个数据
- 处理一个数据
- 发送一个记号 标志一个数据被处理成功 q.task_done() 发送给 q.join()
- 在生产者这一端:
- 每次生产一个数据
- 且每一次生产的数据都放在队列中
- 在队列中刻上一个记号
- 当生产者全部生产完毕后
- join 信号 已经停止生产数据了
- 切要等待之前被刻上记号都消费完了结束阻塞
- '''
- def conmmu(q,name):
- while True:
- food=q.get()
- if food=='None':
- print('%s 取到一个空')
- break
- print('%s 吃到了 %s'%(name,food))
- time.sleep(random.randint(1,3))
- print('xiaofeizhe1')
- q.task_done() #count-1
- print('xiaofeizhe2')
- def prodect(q,name,food):
- for i in range(4):
- time.sleep(random.randint(1,3))
- foods='%s 做的第 %s 个 %s'%(name,i,food)
- q.put(foods)
- print('shengchanzhe1')
- q.join() #阻塞 知道一个队列中的所有数据被执行完毕
- print('shengchanzhe2')
- if __name__ == '__main__':
- q=JoinableQueue()
- p1=Process(target=prodect,args=(q,'yuan','baozi',))
- p2=Process(target=prodect,args=(q,'xu','hulatang',))
- '''
- 其实这里也可以将消费者者设定为守护进程
- ''' c1=Process(target=conmmu,args=(q,'wang',))
- c2=Process(target=conmmu,args=(q,'dan'))
- # c1.daemon=True
- # c2.daemon = True
- p1.start()
- p2.start()
- print('zhujincheng c1')
- c1.start()
- c2.start()
- print('zhujincheng c2')
- p1.join()
- p2.join()
- # time.sleep(19)
- # print(p1.is_alive())
- # print(p2.is_alive())
- print('zhu')
- '''
prodect 函数中的 q.join() 只是感知队列中的数据是否被执行完毕,
此时是处于阻塞状态的, 若被执行完毕则完成代码, 这里陷入死循环
是因为 conmmu 函数中有 while 循环, 所以需要配合守护进程,
p1.join() 确保了 prodect 函数中的 q.join() 确实已经被执行完毕,
如果不在主进程中加 p1.join() 和 p2.join 则两个生产者子进程和主进程就是异步的关系
随着主进程的结束, 那么守护进程也会结束 但是消费者子进程可能还没有将生产者子进程
生产的数据执行完成就被关闭了 造成了生产者子进程的 q.join() 一直在哪里阻塞
conmmu 函数中的 q.task_done() 每执行一次就回向 q.join() 发送一次标记
来源: http://www.bubuko.com/infodetail-3386591.html