8.2.1 进程的创建
开启多进程 scoketserver:server,client
进程的开启: python 中的多线程, 一定是有一个主进程, 由主进程创建几个子进程,
Linux 与 Windows
相同点: 都是由主进程创建子进程, 主进程和子进程原则上都有相互隔离的独立空间, 互不影响
不同点: Linux 子进程空间的初始数据完全是从主进程中 copy 来的; Windows 子进程空间数据也是从主进程 copy 但与主进程有所不同
进程的三种状态
阻塞状态: bolcked 进程等待的某种条件满足之前无法继续执行
运行状态: running 进程正在占用 CPU 资源
就绪状态: ready 进程以获得除处理器之外的所需资源, 等待分配处理器资源
python 多进程
Windows 下使用 Process 模块开启多进程, 一定要放在__main__下运行
第一种方式
- from multiprocessing import Process
- import time
- def task(name):
- print(f"{name} is runing")
- time.sleep(3)
- print(f"{name} is over")
- if __name__ == '__main__': # Windows 环境下, 开启多进程一定放在这个下面
- p = Process(target=task,args=('盖伦',)) # args 一定是一个元组的形式.
- p.start() # 通知操作系统, 你给我在内存中开辟一个空间, 将 p 这个进程放进去, 然后让 CPU 执行, 中间需要一定的时间开辟空间
- time.sleep(1)
- print('hello')
第二种方法
- from multiprocessing import Process
- import time
- class MyProcess(Process):
- def __init__(self,name):
- super().__init__() # 必须要继承父类的__init__
- self.name = name
- def run(self): # 必须定义 run 名字.
- print(f"{self.name} is runing")
- time.sleep(3)
- print(f"{self.name} is over")
- if __name__ == '__main__': # Windows 环境下, 开启多进程一定放在这个下面
- p = MyProcess('盖伦')
- p.start()
- print('main process')
8.2.2 进的 id
当内存中存在多个进程时, 如何识别进程的身份?
Windows 终端中可以使用 tasklist 查看所有进程 pid, 使用 tasklist | findstr 进程名可以查看单个具体的进程 pid.
python 中获取 pid 的方法: 子进程 pid--os.getpid(); 父进程 pid--os.getppid()
- import os
- print(f'子进程:{os.getpid()}')
- print(f'父进程:{os.getppid()}')
8.2.3 进程之间的数据隔离
原则上, 个进程之间的数据是隔离的, 因为创建子进程之后, 会复制一份父进程的数据给子进程, 下面进行验证:
- from multiprocessing import Process
- import time
- import os
- x = 1000
- def f():
- global x
- x = 2
- print(f"子进程:{os.getpid()}")
- if __name__ == '__main__':
- p = Process(target=f)
- p.start()
- time.sleep(2) # 让子进程先执行
- print(x) # 输出 x = 1000, 是数据隔离的
- print(f"父进程:{os.getpid()}")
- # 注意: 可以使用 id(x) 产看 x 在内存中的位置 - 5 到 255 的 id(x) 值相等, 其他的都不相等
- 8.2.4 join**
主进程等待子进程结束之后再往下执行
用法: 子进程. join()
两个实例感受一下
- from multiprocessing import Process
- import time
- def f(name,x):
- print(f"{name} is runing")
- time.sleep(x)
- print(f"{name} is finishing")
- if __name__ == '__main__':
- p1 = Process(target=f,args=('gailun',2))
- p2 = Process(target=f, args=('zhaoxin', 1))
- p3 = Process(target=f, args=('jiawen', 3))
- start_time = time.time()
- p1.start()
- p2.start()
- p3.start()
- p1.join()
- p2.join()
- p3.join()
- print(f"主进程运行时间 {time.time()-start_time}")
- # 最后输出在 3. 多秒
- from multiprocessing import Process
- import time
- def f(name,x):
- print(f"{name} is runing")
- time.sleep(x)
- print(f"{name} is finishing")
- if __name__ == '__main__':
- p1 = Process(target=f,args=('gailun',2))
- p2 = Process(target=f, args=('zhaoxin', 1))
- p3 = Process(target=f, args=('jiawen', 3))
- start_time = time.time()
- p1.start()
- p1.join()
- p2.start()
- p2.join()
- p3.start()
- p3.join()
- print(f"主进程运行时间 {time.time()-start_time}")
- # 输出在 6. 多秒
8.2.5 进程对象的其他属性 **
terminate() 结束子进程与 start 一样只是告诉系统要终止这个进程; 内存中不管是终止或是开启子进程, 都是耗费时间的, 命名执行后系统不一定会立马运行
is_alive() 判断子进程是否存活
8.2.6 僵尸进程与孤儿进程 **
只有 Linux(Mac) 环境下才强调的两个概念, Windows 下没有
僵尸进程
? 一般来说, 主进程是子进程的发起者, 父子进程的运行是异步的, 主进程往往会调用 wait 或者 waitpid 获取子进程的状态信息; 如果子进程结束了而父进程没有获取子进程的状态信息, 那么子进程的进程描述就会一直保留在系统中, 这时子进程就成为僵尸进程.
? 子进程没有可执行代码后将变成僵尸进程, 如果父进程一直运行, 又没有处理僵尸进程的代码, 僵尸进程也将一直存在, 消耗资源. 僵尸进程无法通过 kill 命令杀掉, 因为僵尸进程是已经停止的, 所以使用杀死进程的方法来杀僵尸进程是无效的. 僵尸进程不使用 CPU 或硬盘等系统资源, 而只使用极少量的内存用于存储退出状态和资源使用信息.
[危害] 僵尸进程在系统中保存的信息一直不释放的话, 会一直占用内存和进程号, 如果大量的进程号被占用之后, 系统就不能产生新的进程, 因为系统分配的进程号时有限的.
孤儿进程
如果主进程由于各种原因, 提前退出, 但是他的子进程还在运行, 这时把它所有的子进程称为孤儿进程, 一段时间之后, init 进程会对孤儿进程进行回收.
孤儿进程是无害的, init 会来回收他
8.2.7 守护进程
子进程对父进程进行守护
语法: 位置放在 start 之前
进程名. daemon = True
进程名. start()
守护进程会在主进程代码执行结束后就终止
守护进程内不能再开启新的子进程, 否则抛出异常 AssertionError
- from multiprocessing import Process
- import time
- def f(name):
- print(f"{name} is piaoing" )
- time.sleep(3)
- print(f"{name} is piao end")
- if __name__ == '__main__':
- p = Process(target=f,args=('gailun',))
- p.daemon = True
- p.start()
- time.sleep(2)
- print('this is main_process')
- # 输出
- gailun is piaoing
- this is main_process
- from multiprocessing import Process
- import time
- def f1(name):
- print(f"{name} is runing")
- time.sleep(2)
- print(f"{name} is end")
- def f(name):
- print(f"{name} is piaoing" )
- p2 = Process(target=f1, args=('zhaoxin',))
- p2.start()
- time.sleep(3)
- print(f"{name} is piao end")
- # 报错: AssertionError: daemonic processes are not allowed to have children
8.3 互斥锁
? 因为进程之间数据是不共享的, 但却公用同一个操作系统, 某些情况下多个进程会公用某一个资源, 例如打印机, 下面来进行说明
? [情况一]
- from multiprocessing import Process
- import time
- import random
- def task1():
- print('task1 start print')
- time.sleep(random.randint(1,3))
- print('task1 done')
- def task2():
- print('task2 start print')
- time.sleep(random.randint(1, 3))
- print('task2 done')
- def task3():
- print('task3 start print')
- time.sleep(random.randint(1, 3))
- print('task3 done')
- if __name__ == '__main__':
- p1 = Process(target=task1)
- p2 = Process(target=task2)
- p3 = Process(target=task3)
- p1.start()
- p2.start()
- p3.start()
这种情况的虽然有效率, 但是打印的顺数是乱的, 我们希望的是, 一个进程打印完了, 下一个进程才能打印
? [情况二]
- p1.start()
- p1.join()
- p2.start()
- p2.join()
- p3.start()
- p3.join()
这种情况虽然实现了依次打印问题, 但是顺序是我们设定好的, 没有遵循公平竞争资源的原则
? [情况三]
- from multiprocessing import Process,Lock
- import time
- import random
- def task1(lock):
- print('---task1---') # 验证 CPU 遇到 io 是否切换
- lock.acquire()
- print('task1 start print')
- time.sleep(random.randint(1,3))
- print('task1 done')
- lock.release()
- def task2(lock):
- print('---task2---') # 验证 CPU 遇到 io 是否切换
- lock.acquire()
- print('task2 start print')
- time.sleep(random.randint(1, 3))
- print('task2 done')
- lock.release()
- def task3():
- print('---task3---') # 验证 CPU 遇到 io 是否切换
- # lock.acquire()
- print('task3 start print')
- time.sleep(random.randint(1, 3))
- print('task3 done')
- # lock.release()
- if __name__ == '__main__':
- lock = Lock()
- p1 = Process(target=task1,args=(lock,)) # 以参数的形式传进进程中
- p2 = Process(target=task2,args=(lock,))
- p3 = Process(target=task3)
- p1.start()
- p2.start()
- p3.start()
[总结]
使用 Lock 类的 acquire() 和 release() 函数, 加锁可以保证多个进程竞争同一个资源时, 同一时间只能由一个进程可以进行修改, 虽然把多个进程变成串行, 但是保证了数据的安全性.
一个 acquire 必须跟一个 release, 两个连续的 acquire 会造成死锁, 死锁的常常发生在带锁的进程未解锁之前又生成了带锁的进程
从第三个实列来看, CPU 遇到 IO 时还是切换了进程, 只不过碰见带锁的代码时会跳过, 去执行不带锁的代码
8.4 进程间通信
进程与进程间是互相隔离的. 如果要实现两个进程间的通信, 可以借助硬盘里的同一文件, 但是该方法着实有些效率低, 所以我们选择使用队列或者管道, 但是管道又因为 bug 无法保证数据安全性和稳定性等等被诟病, 往往选择队列.
[使用文件模拟抢票]
- # 需求: 1. 买票之前, 先经过查票阶段, 查票是并行发生
- # 2. 买票, 从服务端获取剩余票数, 买票, 票数减一, 中间有网络延迟
- # 数据隔离, 只是内存层面的隔离, 不代表硬盘上的数据也隔离
- from multiprocessing import Process,Lock
- import time
- import random
- import os
- import JSON
- def serch():
- with open('db.json',mode='r',encoding='utf-8') as f1 :
- dic = JSON.load(f1)
- print(f'剩余票数:{dic["count"]}')
- def get():
- with open('db.json',mode='r',encoding='utf-8') as f1:
- dic = JSON.load(f1)
- time.sleep(random.random()) # 模拟网络延迟
- if dic["count"]> 0 :
- dic["count"] -= 1
- with open('db.json', mode='w', encoding='utf-8') as f2:
- JSON.dump(dic,f2)
- print(f"{os.getpid()} 购票成功")
- else:
- print('该班列车票已售完')
- def task(l):
- serch()
- l.acquire()
- get()
- l.release()
- if __name__ == '__main__':
- with open('db.json', mode='w', encoding='utf-8') as f:
- JSON.dump({'count':3},f)
- lock = Lock()
- for i in range(5):
- p = Process(target=task,args=(lock,))
- p.start()
8.4.1 队列 Queue
队列是存在于内存中的容器
队列的特点: 先进先出 (FIFO) 原则
[常用方法]
maxsize() q = Queue(3) 数据量不易过大. 精简的重要的数据
.put(对象) -- 往队列中插入数据, 其中有两个可选参数 blocked 和 timeout.blocked 默认为 True, 如果 put 的对象数量超过 maxsize, 则会阻塞进程, blocked 默认为 False 时, 直接报 Queue.Full 异常; timeout 参数设定之后, 是在 timeout 时间之后如果队列还无法插入数据载报 Queue.Full 异常
.get(对象) -- 从队列中拿取一个元素, get 函数也有两个参数 blocked 和 timeout.blocked 默认为 True, 如果队列为空时, 继续 get 就会阻塞进程, 这是如果尤其的进程王该队列中添加了数据, 阻塞的进程还是可以取到数据, blocked 为 false 且队列为空时, 直接报 Queue.Empty 异常, timeout 参数设定之后, 是在 timeout 时间之后如果队列还无法插入数据载报 Queue.Empty 异常
q.get_nowait(): 同 q.get(False)
q.put_nowait(): 同 q.put(False)
q.empty(): 调用此方法的此时此刻 q 为空则返回 True, 如果队列中又加入了项目, 它不会在意, 所以该函数返回的结果是不可靠的
q.full(): 调用此方法的此时此刻 q 已满则返回 True, 该结果也是不可靠的, 比如在返回 True 的过程中, 队列中的项目被取走
q.qsize(): 返回队列中此时此刻项目的正确数量, 结果也不可靠, 理由同 q.empty() 和 q.full() 一样
- from multiprocessing import Queue
- from multiprocessing import Process
- import os
- def task(q):
- try :
- q.put(f"{os.getpid()}",block=False)
- except Exception:
- return
- if __name__ == '__main__':
- q = Queue(10)
- for i in range(12):
- p = Process(target=task,args=(q,))
- p.start()
- for i in range(1,11):
- print(f"排名第 {i} 的用户:{q.get()}")
8.4.2 生产者消费者模型
[形成的基本条件条件] 两个进程 (生产者, 消费者) + 缓冲区 (队列);
生产者负责产生数据, 消费者负责处理数据, 队列缓冲区负责连接两个进程间通信的临界资源
[问题的核心] 生产者在队列满后要暂时停止存入数据, 消费者在队列空时, 暂时不要取数据
[用于何处] 多用于解决并发问题, 处理生产数据与处理数据的进程间的强耦合问题, 平衡二者的工作速率, 从而提高整体的效率
- # 生产者: 生成数据
- # 消费者: 处理数据
- # 缓冲区
- # 用于并发
- from multiprocessing import Process,Queue
- import random
- import time
- import os
- def producer(q):
- for i in range(1,10):
- time.sleep(random.randint(1,2))
- res = f"{i} 号武器"
- q.put(res)
- print(f"\033[0;32m 邦德锻造了 {res} \033[0m")
- def consumer(q):
- while 1:
- try: # 当生产者不再生产, 队列为空时要停止消费者继续取值
- time.sleep(random.randint(1,2))
- ret = q.get(timeout=4)
- print(f"\033[0;33m 士兵拿走了 {ret} \033[0m")
- except Exception:
- return
- if __name__ == '__main__':
- q = Queue()
- p = Process(target=producer,args=(q,))
- c = Process(target=consumer,args=(q,))
- p.start()
- c.start()
对于临界, 除了以上 try 的处理方式外, 还可以在生产者进程内 put 一个停止取值的信号, 当消费者在队列中取到该值时, 就会停止继续取值; 还可以在主程序下, 配合 join 向队列中 put 停止信号, 这种方式相对 low 一些, 因为有多少个生产者就要写多少个停止信号
来源: http://www.bubuko.com/infodetail-3133595.html