一 前言
本文算是一次队列的学习笔记, Queue 模块实现了三种类型的队列, 它们的区别仅仅是队列中元素被取回的顺序. 在 FIFO 队列中, 先添加的任务先取回. 在 LIFO 队列中, 最近被添加的元素先取回(操作类似一个堆栈). 优先级队列中, 元素将保持排序( 使用 heapq 模块 ) 并且最小值的条目第一个返回.
值得注意的是 Python 2.X 版本中调用队列需要引用 import Queue 而在 Python 3.X 版本中则需要 import queue
二 队列特性
2.1 Queue 的常用函数
Queue 常用的方法:
qsize() 获取队列的元素个数.
put(item [,block[, timeout]]): 往 queue 中放一个 item
get(item [,block[, timeout]]): 从 queue 中取出一个 item, 并在队列中删除的这个 item
需要特别说明的是:
如果 block 为 True , timeout 为 None(也是默认的选项), 那么 get()/put()可能会阻塞, 直到队列中出现可用的数据 / 位置. 如果 timeout 是正整数, 那么函数会阻塞直到超时 N 秒, 然后抛出一个异常.
如果 block 为 False , 如果队列无数据, 调用 get()或者有无空余位置时调用 put(), 就立即抛出异常(timeout 将会被忽略).
task_done(): 表示前面排队的任务已经被完成. 被队列的消费者线程使用. 每个 get() 被用于获取一个任务, 后续调用 task_done() 告诉队列, 该任务的处理已经完成.
join(): 队列中所有的元素都被接收和处理完毕之前程序一直阻塞.
在应用程序中, 如果主程序调用了 join()则当前程序发生阻塞, 当队列中所有的元素都被处理后, 将解除阻塞 (意味着每个 put() 进队列的条目的 task_done() 都被收到). 如果 task_done()被调用的次数多于放入队列中的项目数量, 将引发 ValueError 异常 .
我们通过程序向队列添加元素的时候, 未完成任务的计数就会增加. 每当消费者线程调用 task_done() 时表示这个元素已经被回收, 涉及到该元素的业务逻辑已经完成, 未完成计数就会减少. 当未完成计数降到零的时候, 程序便会解除 join()阻塞.
2.2 实践
我们用一个比较经典的案例 生产者和消费者模型, 生产者生产馒头放到队列, 消费者去队列里面获取馒头.
- # encoding: utf-8
- """
- author: yangyi@youzan.com
- time: 2019/8/14 11:20 PM
- func:
- """
- from multiprocessing import Process, JoinableQueue, Lock
- import time
- import random
- thread_lock = Lock()
- def lock_print(msg):
- with thread_lock:
- print (msg)
- def consumer(q):
- while True:
- res = q.get(block=True, timeout=3) # 如果为空 则等待 3 秒超时则报错退出
- print('消费者拿到了 %s' % res)
- q.task_done()
- def producer(q):
- for item in range(4):
- time.sleep(random.randrange(1, 2))
- q.put('馒头{0}'.format(item))
- print('生产者做好了 %s' %'馒头{0}'.format(item))
- q.join()
- lock_print("生产结束")
- if __name__ == '__main__':
- print('主进程开始')
- q = JoinableQueue()
- pd = Process(target=producer, args=(q,))
- cp = Process(target=consumer, args=(q,))
- cp.daemon = True ##
- pd.start()
- cp.start()
- pd.join()
- print('主进程结束')
说明
这里生产者生产馒头并将馒头通过 put()放到全局的队列中, 消费者从使用 get()队列中获取馒头然后调用 task_done() 通知队列中的馒头已经被消费者获取.
设置 cp.daemon = True 表示消费者进程会随主进程一起结束而结束. 还有一种写法是
- if __name__ == '__main__':
- print('主进程开始')
- q = JoinableQueue()
- pd = Process(target=producer, args=(q,))
- cp = Process(target=consumer, args=(q,))
- pd.start()
- cp.start()
- pd.join()
- cp.join()
- print('主进程结束')
cp.join() 会让消费者进程一直等待生产者往队列放数据直到设置的超时时间. 具体的逻辑需要结合自己程序的实际需求来定, 是需要一直等待生产者生产数据还是随着主进程结束而结束.
三 总结
本文结合前面文章中介绍的多进程中的 守护进程和 join()方法, 学习如何使用队列中的两个函数 task_done 和 join. 其实还有其他比较多的函数用法, 需要深入的学习探索, 感兴趣的朋友可以动手实践一下.
- https://docs.python.org/zh-cn/3/library/queue.html
- https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter2/12_Thread_communication_using_a_queue.html
- -The End-
来源: https://www.cnblogs.com/yangyi402/p/11413674.html