1 开启进程的两种方式
方式一
- def task(name):
- print("%s start..." % name)
- if __name__ == __main__:
- p = Process(target=task, args=("sb",))
- p.start()
方式二
- class Piao(Process):
- def __init__(self, name):
- super().__init__()
- self.name = name
- def run(self):
- print(%s start.. % self.name)
- if __name__ == __main__:
- p1 = Piao("sb")
- p1.start()
terminate 和 is_alive
- from multiprocessing import Process
- import time
- def task(name):
- time.sleep(1)
- print("%s done.." % name)
- if __name__ == __main__:
- p1 = Process(target=task, args=("sb",))
- p1.start()
- p1.terminate() # 发送关闭进程命令
- print(p1.is_alive()) # 查看进程是否活动
- # True
- print("主")
- time.sleep(1)
name 与 pid
- from multiprocessing import Process
- import os
- def task(name):
- print("%s start..." % name)
- if __name__ == __main__:
- p = Process(target=task, args=("sb",), name="子进程 1") # 可以用关键参数来指定进程名
- p.start()
- print(p.name, p.pid, os.getppid()) # p.ppid 报错
- print(os.getpid()) # p.pid==os.getpid()
守护进程
一守护进程在主进程执行结束终止
二守护进程内无法开启子进程
- from multiprocessing import Process
- import time
- def foo(): # 不执行
- print(123)
- time.sleep(1)
- print("end123")
- def bar():
- print(456)
- time.sleep(3)
- print("end456")
- if __name__ == __main__:
- p1 = Process(target=foo)
- p2 = Process(target=bar)
- p1.daemon = True
- p1.start()
- p2.start()
- print("main-------")
互斥锁
前戏: 进程之间数据不共享, 但是共享同一套文件系统, 所以访问同一个文件, 或者打开同一个打印终端, 共享带来竞争
互斥锁: 互斥锁的意思就是互相排斥保证了数据安全不错乱
- from multiprocessing import Process
- from multiprocessing import Lock
- import time
- def task(name, mutex):
- mutex.acquire()
- print("%s 1" % name)
- time.sleep(1)
- print("%s 2" % name)
- mutex.release()
- if __name__ == __main__:
- mutex = Lock()
- for i in range(2):
- p = Process(target=task, args=("进程 %s" % i, mutex))
- p.start()
互斥锁和 join 的区别
互斥锁是让加锁部分变成串性, join 是让整段代码都变成串行
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # __author__:JasonLIN
- from multiprocessing import Process, Lock
- import time
- import json
- def search(name):
- time.sleep(1)
- data = json.load(open("data.txt", "r"))
- print("<%s > 剩余票数 [%s]" % (name, data["count"]))
- def buy(name):
- data = json.load(open("data.txt", "r"))
- if int(data["count"]) > 0:
- data["count"] -= 1
- time.sleep(2)
- json.dump(data, open("data.txt", "w"))
- print("%s 购票成功" % name)
- def task(name, mutex):
- search(name)
- mutex.acquire()
- buy(name)
- mutex.release()
- if __name__ == __main__:
- mutex = Lock()
- for i in range(10):
- p = Process(target=task, args=("路人 %s" % i, mutex))
- p.start()
join 方法, search 和 buy 都变成串行, 效率更低
- from multiprocessing import Process
- import time
- import json
- def search(name):
- data = json.load(open("data.txt", "r"))
- time.sleep(1)
- print("<%s > 剩余票数 [%s]" % (name, data["count"]))
- def buy(name):
- data = json.load(open("data.txt", "r"))
- if int(data["count"]) > 0:
- data["count"] -= 1
- time.sleep(2)
- json.dump(data, open("data.txt", "w"))
- print("%s 购票成功" % name)
- def task(name):
- search(name)
- buy(name)
- if __name__ == __main__:
- for i in range(10):
- p = Process(target=task, args=("路人 %s" % i,))
- p.start()
- p.join()
队列
进程彼此之间互相隔离, 要实现进程间通信 (IPC),multiprocessing 模块支持两种形式: 队列和管道, 这两种方式都是使用消息传递的
创建队列的类 (底层就是以管道和锁定的方式实现)
Queue([maxsize]): 创建共享的进程队列, Queue 是多进程安全的队列, 可以使用 Queue 实现多进程之间的数据传递
Queue([maxsize]): 是队列中允许最大项数, 省略则无大小限制
q.put 方法用以插入数据到队列中
q.get 方法可以从队列读取并且删除一个元素
代码实例
- from multiprocessing import Process,Queue
- q=Queue(3)
- #put ,get ,put_nowait,get_nowait,full,empty
- q.put(1)
- q.put(2)
- q.put(3)
- print(q.full()) #满了
- # q.put(4) #再放就阻塞住了
- print(q.get())
- print(q.get())
- print(q.get())
- print(q.empty()) #空了
- # print(q.get()) #再取就阻塞住了
生产者消费者模型
为什么要使用生产者消费者模型?
生产者指的是生产数据的任务, 消费者指的是处理数据的任务, 在并发编程中, 如果生产者处理速度很快, 而消费者处理速度很慢, 那么生产者就必须等待消费者处理完, 才能继续生产数据同样的道理, 如果消费者的处理能力大于生产者, 那么消费者就必须等待生产者为了解决这个问题于是引入了生产者和消费者模式
什么是生产者和消费者模式?
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题生产者和消费者彼此之间不直接通讯, 而通过阻塞队列来进行通讯
这个阻塞队列就是用来给生产者和消费者解耦的
转自:
- https://www.luffycity.com/python-book/di-7-zhang-bing-fa-bian-cheng/72-bing-fa-bian-cheng-zhi-duo-jin-cheng/727-sheng-chan-zhe-xiao-fei-zhe-mo-xing.html
- from multiprocessing import Process, Queue
- import time
- import random
- def producer(q):
- for i in range(3):
- res = "包子 %s" % i
- time.sleep(0.5)
- print("生产了 %s" % res)
- q.put(res)
- def consume(q):
- while True:
- res = q.get()
- if not res:
- break
- time.sleep(random.randint(1, 3))
- print("吃了 %s" % res)
- if __name__ == __main__:
- q = Queue()
- p1 = Process(target=producer, args=(q,))
- p2 = Process(target=producer, args=(q,))
- p3 = Process(target=producer, args=(q,))
- c1 = Process(target=consume, args=(q,))
- c2 = Process(target=consume, args=(q,))
- p1.start()
- p2.start()
- p3.start()
- c1.start()
- c2.start()
- p1.join()
- p2.join()
- p3.join()
- q.put(None)
- q.put(None)
- print("zhu")from multiprocessing import Process, Queue
- import time
- import random
- def producer(q):
- for i in range(3):
- res = "包子 %s" % i
- time.sleep(0.5)
- print("生产了 %s" % res)
- q.put(res)
- def consume(q):
- while True:
- res = q.get()
- if not res:
- break
- time.sleep(random.randint(1, 3))
- print("吃了 %s" % res)
- if __name__ == __main__:
- q = Queue()
- p1 = Process(target=producer, args=(q,))
- p2 = Process(target=producer, args=(q,))
- p3 = Process(target=producer, args=(q,))
- c1 = Process(target=consume, args=(q,))
- c2 = Process(target=consume, args=(q,))
- p1.start()
- p2.start()
- p3.start()
- c1.start()
- c2.start()
- p1.join()
- p2.join()
- p3.join()
- q.put(None)
- q.put(None)
- print("zhu")
- JoinableQueue([maxsize])
- from multiprocessing import Process, JoinableQueue
- import time
- import random
- def producer(q):
- for i in range(3):
- res = "包子 %s" % i
- time.sleep(0.5)
- print("生产了 %s" % res)
- q.put(res)
- q.join() # 等消费者把所有数据取走之后, 生产者才结束
- def consume(q):
- while True:
- res = q.get()
- if not res:
- break
- time.sleep(random.randint(1, 3))
- print("吃了 %s" % res)
- q.task_done() # 发送信号给 q.join(), 说明已经从队列中取走一个数据并处理完毕
- if __name__ == __main__:
- q = JoinableQueue()
- p1 = Process(target=producer, args=(q,))
- p2 = Process(target=producer, args=(q,))
- p3 = Process(target=producer, args=(q,))
- c1 = Process(target=consume, args=(q,))
- c2 = Process(target=consume, args=(q,))
- c1.daemon = True
- c2.daemon = True
- p1.start()
- p2.start()
- p3.start()
- c1.start()
- c2.start()
- p1.join()
- p2.join()
- p3.join()
- print("zhu")
来源: http://www.bubuko.com/infodetail-2504907.html