大家好, 并发编程 进入第六篇.
在第四章, 讲消息通信时, 我们学到了 Queue 消息队列的一些基本使用. 昨天我在准备如何创建线程池这一章节的时候, 发现对 Queue 消息队列的讲解有一些遗漏的知识点, 而这些知识点, 也并不是无关紧要的, 所以在今天的章节里, 我要先对 Queue 先做一些补充以防大家对消息队列有一些知识盲区.
再次提醒:
本系列所有的代码均在 Python3 下编写, 也建议大家尽快投入到 Python3 的怀抱中来.
本文目录
消息队列的先进先出
创建多线程的两种方式
. 消息队列的先进先出
首先, 要告诉大家的事, 消息队列可不是只有 queue.Queue 这一个类, 除它之外, 还有 queue.LifoQueue 和 queue.PriorityQueue 这两个类.
从名字上, 对于他们之间的区别, 你大概也能猜到一二吧.
queue.Queue: 先进先出队列
queue.LifoQueue: 后进先出队列
queue.PriorityQueue: 优先级队列
先来看看, 我们的老朋友, queue.Queue.
所谓的先进先出 (FIFO,First in First Out), 就是先进入队列的消息, 将优先被消费.
这和我们日常排队买菜是一样的, 先排队的人肯定是先买到菜.
用代码来说明一下
import queueq = queue.Queue()for i in range(5): q.put(i)while not q.empty(): print q.get()
看看输出, 符合我们先进先出的预期. 存入队列的顺序是 01234, 被消费的顺序也是 01234.
01234
再来看看 Queue.LifoQueue, 后进先出, 就是后进入消息队列的, 将优先被消费.
这和我们羽毛球筒是一样的, 最后放进羽毛球筒的球, 会被第一个取出使用.
用代码来看下
import queueq = queue.LifoQueue()for i in range(5): q.put(i)while not q.empty(): print q.get()
来看看输出, 符合我们后进后出的预期. 存入队列的顺序是 01234, 被消费的顺序也是 43210.
43210
最后来看看 Queue.PriorityQueue, 优先级队列.
这和我们日常生活中的会员机制有些类似, 办了金卡的人比银卡的服务优先, 办了银卡的人比不办卡的人服务优先.
来用代码看一下
from queue import PriorityQueue# 重新定义一个类, 继承自 PriorityQueueclass MyPriorityQueue(PriorityQueue): def __init__(self): PriorityQueue.__init__(self) self.counter = 0 def put(self, item, priority): PriorityQueue.put(self, (priority, self.counter, item)) self.counter += 1 def get(self, *args, **kwargs): _, _, item = PriorityQueue.get(self, *args, **kwargs) return itemqueue = MyPriorityQueue()queue.put('item2', 2)queue.put('item5', 5)queue.put('item3', 3)queue.put('item4', 4)queue.put('item1', 1)while True: print(queue.get())
来看看输出, 符合我们的预期. 我们存入入队列的顺序是 25341, 对应的优先级也是 25341, 可是被消费的顺序丝毫不受传入顺序的影响, 而是根据指定的优先级来消费.
item1item2item3item4item5
. 创建多线程的两种方式
在使用多线程处理任务时也不是线程越多越好, 由于在切换线程的时候, 需要切换上下文环境, 依然会造成 CPU 的大量开销. 为解决这个问题, 线程池的概念被提出来了. 预先创建好一个较为优化的数量的线程, 让过来的任务立刻能够使用, 就形成了线程池.
在 Python3 中, 创建线程池是通过 concurrent.futures 函数库中的 ThreadPoolExecutor 类来实现的.
import timeimport threadingfrom concurrent.futures import ThreadPoolExecutordef target(): for i in range(5): print('running thread-{}:{}'.format(threading.get_ident(), i)) time.sleep(1)#: 生成线程池最大线程为 5 个 pool = ThreadPoolExecutor(5) for i in range(100): pool.submit(target) # 往线程中提交, 并运行
从结果来看, 前面设置线程池最大线程数 5 个, 有生效.
running thread-11308:0running thread-12504:0running thread-5656:0running thread-12640:0running thread-7948:0running thread-11308:1running thread-5656:1running thread-7948:1running thread-12640:1running thread-12504:1......
除了使用上述第三方模块的方法之外, 我们还可以自己结合前面所学的消息队列来自定义线程池.
这里我们就使用 queue 来实现一个上面同样效果的例子, 大家感受一下.
import timeimport threadingfrom queue import Queuedef target(q): while True: msg = q.get() for i in range(5): print('running thread-{}:{}'.format(threading.get_ident(), i)) time.sleep(1)def pool(workers,queue): for n in range(workers): t = threading.Thread(target=target, args=(queue,)) t.daemon = True t.start()queue = Queue()# 创建一个线程池: 并设置线程数为 5pool(5, queue)for i in range(100): queue.put("start")# 消息都被消费才能结束 queue.join()
输出是和上面是完全一样的效果
running thread-11308:0running thread-12504:0running thread-5656:0running thread-12640:0running thread-7948:0running thread-11308:1running thread-5656:1running thread-7948:1running thread-12640:1running thread-12504:1......
构建线程池的方法, 是可以很灵活的, 大家有举可以自己多研究. 但是建议只要掌握一种自己熟悉的, 能快速上手的就好了.
来源: https://juejin.im/entry/5bcedcb7f265da0a9e534afa