这篇文章主要介绍了 Python 进程间通信 Queue 实例解析, 分享了相关代码示例, 小编觉得还是挺不错的, 具有一定借鉴价值, 需要的朋友可以参考下
本文研究的主要是 Python 进程间通信 Queue 的相关实例, 具体如下
1.Queue 使用方法:
Queue.qsize(): 返回当前队列包含的消息数量;
Queue.empty(): 如果队列为空, 返回 True, 反之 False ;
Queue.full(): 如果队列满了, 返回 True, 反之 False;
Queue.get(): 获取队列中的一条消息, 然后将其从列队中移除, 可传参超时时长
Queue.get_nowait(): 相当 Queue.get(False), 取不到值时触发异常: Empty;
Queue.put(): 将一个值添加进数列, 可传参超时时长
Queue.put_nowait(): 相当于 Queue.get(False), 当队列满了时报错: Full
2.Queue 使用实例:
来, 上代码:
- #!/usr/bin/env python3
- import time
- from multiprocessing import Process,Queue
- q = Queue() #创建列队, 不传数字表示列队不限数量
- for i in range(11):
- q.put(i)
- def A():
- while 1:
- try:
- num = q.get_nowait()
- print('我是进程 A, 取出数字:%d'%num)
- time.sleep(1)
- except :
- break
- def B():
- while 1:
- try:
- num = q.get_nowait()
- print('我是进程 B, 取出数字:%d'%num)
- time.sleep(1)
- except :
- break
- p1 = Process(target = A)
- p2 = Process(target = B)
- p1.start()
- p2.start()
此程序是在队列中加入 10 个数字, 然后用 2 个进程来取出
运行结果:
我是进程 A, 取出数字: 0
我是进程 B, 取出数字: 1
我是进程 A, 取出数字: 2
我是进程 B, 取出数字: 3
我是进程 A, 取出数字: 4
我是进程 B, 取出数字: 5
我是进程 B, 取出数字: 6
我是进程 A, 取出数字: 7
我是进程 B, 取出数字: 8
我是进程 A, 取出数字: 9
我是进程 B, 取出数字: 10
3. 使用进程池 Pool 时, Queue 会出错, 需要使用 Manager.Queue:
上代码
- #!/usr/bin/env python3
- import time
- from multiprocessing import Pool,Manager,Queue
- q = Manager().Queue()
- for i in range(11):
- q.put(i)
- def A(i):
- num = q.get_nowait()
- print('我是进程 %d, 取出数字:%d'%(i,num))
- time.sleep(1)
- pool = Pool(3)
- for i in range(10):
- pool.apply_async(A,(i,))
- pool.close()
- pool.join()
运行结果:
我是进程 1, 取出数字: 0
我是进程 0, 取出数字: 1
我是进程 2, 取出数字: 2
我是进程 4, 取出数字: 3
我是进程 3, 取出数字: 4
我是进程 5, 取出数字: 5
我是进程 6, 取出数字: 6
我是进程 7, 取出数字: 7
我是进程 8, 取出数字: 8
我是进程 9, 取出数字: 9
当把 Manager().Queue() 直接换成 Queue(), 可能会出现资源混乱, 缺少进程
4. 主进程定义了一个 Queue 类型的变量, 并作为 Process 的 args 参数传给子进程 processA 和 processB, 两个进程一个向队列中写数据, 一个读数据
- import time
- from multiprocessing import Process,Queue
- MSG_QUEUE = Queue(5)
- def startA(msgQueue):
- while True:
- if msgQueue.empty() > 0:
- print 'queue is empty %d' % (msgQueue.qsize())
- else:
- msg = msgQueue.get()
- print 'get msg %s' % (msg,)
- time.sleep(1)
- def startB(msgQueue):
- while True:
- msgQueue.put('hello world')
- print 'put hello world queue size is %d' % (msgQueue.qsize(),)
- time.sleep(3)
- if __name__ == '__main__':
- processA = Process(target=startA,args=(MSG_QUEUE,))
- processB = Process(target=startB,args=(MSG_QUEUE,))
- processA.start()
- print 'processA start..'
- processB.start()
- print 'processB start..'
其打印的结果如下:
- C:\Python27\python.exe E:/outofmemory/test/queuetest/queuetest.py
- processA start..
- processB start..
- queue is empty 0
- put hello world queue size is 1
- get msg hello world
- queue is empty 0
- queue is empty 0
- put hello world queue size is 1
- get msg hello world
- queue is empty 0
- queue is empty 0
- put hello world queue size is 1
来源: http://www.phperz.com/article/18/0207/363241.html