模型就是解决某个问题固定方法和套路
e.g.
生产者和消费者
存在的问题 : 效率不同, 往往双方之间的处理速度不一样, 导致双方需要等待对方
tips:
1 将双方解开耦合, 让不同进程负责不同的任务
2 提供一个共享的容器, 来平衡双方的能力, 之所以用进程队列是因为可以在进程之间共享
- case:
- from multiprocessing import Process,Queue
- import requests
- import re,os,time,random
- # 生产者的任务
- def task(url,q):
- i=0
- for url in urls:
- response =requests.get(url)
- text=response.text
- #将生产完成的数放入队列中
- time.sleep(random.random())
- q.put(text)
- i+=1
- print(os.getpid(),"生产了第 %s 个数据"%i)
- # 消费者的任务
- def customer(q):
- i=0
- while True:
- text=q.get()
- time.sleep(random.random())
- res =re.findall('src=//(.*?)width',text)
- i+=1
- print('第 %s 任务获取到 %s 个 img'%(i,len(res))
- if __name__=='__main__'
- url=[
- "http://www.baidu.com",
- "http://www.baidu.com",
- "http://www.baidu.com",
- "http://www.baidu.com",
- ]
- #创建一个双方能共享的容器
- q=Queue()
- #生产者进程
- p1=Process(target=product,args=(url,q))
- p1.start()
- #消费这进程
- c=Process(target=customer,args=(q,))
- c.start()
遇到的问题:
1 消费不知道何时结束
加了一个模块 joinableQueue --------> 和 继承 Queue 用法一致
增加了 join 和 taskDone(任务完成)
join 这个是阻塞函数, 会阻塞直到 taskdone 的调用次数等于存入的元素个数, 可以用于表示队列任务处理完成
完善上述代码案列 case
- from multiprocessing import Process,joinableQueue
- import requests
- import re,os,time,random
- """热狗为例子"""
- # 生产者任务
- def product(q,name):
- for i in range(5):
- dog ="%s 的热狗 %s"%(name,(i+1))
- time.sleep(random.random())
- print('生产了',dog)
- q.put(dog)
- # 吃热狗
- def customer(q):
- while True:
- dog=q.get()
- time.sleep(random.random())
- print('消费了 %s'%dog)
- q.task_done() #标记这个任务处理完成
- if __name__=="__main__":
- #创建一个双方共享的容器
- q=JoinableQueue()
- #生产者的进程
- p1=Process(target=product,args=(q,"上海分店"))
- p2=Process(target=product,args=(q,"北京分店"))
- p1.start()
- p2.start()
- #消费者进程
- c=Process(target=customer,args=(q,))
- #c.deamon=True 可以将消费者设置为守护进程, 将主进程确认 任务全部完成时 可以随着主进程一起结束
- c.start()
- p1.join()
- p2.join() #代码走到这儿代表生产方已经完成
- q.join() #意味着队列中的任务都处理完成了
- #结束所有任务
- c.terminate() #直接终止消费者进程
- #解决问题的关键
1 确定生成者的任务完成
2 确定生出来的数据已经全部处理完成
来源: http://www.bubuko.com/infodetail-3113315.html