一, 必备的理论基础
二, 操作系统发展史
三, 进程理论
四, 线程理论
五, 协程
一, 必备的理论基础
操作系统理论:
操作系统是一个协调 \ 管理 \ 控制计算机硬件资源与应用软件资源的控制程序
操作系统的两大功能:
将复杂的硬件操作封装成简单的接口给应用程序或者用户去使用
将多个进程对硬件的竞争变得有序
二, 操作系统发展史
2.1 穿孔卡片(第一代)
特点:
没有操作系统的概念
所有的程序设计都是直接操控硬件
工作过程:
程序员在墙上的机时表预约一段时间, 然后程序员拿着他的插件版到机房里, 将自己的插件板接到计算机里, 这几个小时内他独享整个计算机资源, 后面的一批人都得等着
后来出现了穿孔卡片, 可以将程序写在卡片上, 然后读入机而不用插件板
优点: 程序员在申请的时间段内独享整个资源, 可以即时地调试自己的程序(有 bug 可以立刻处理)
缺点:
1)读取数据速度特别慢
2)CPU 利用率极低
3)单用户使用
2.2 批处理系统(第二代)
批处理系统: 加载在计算机上的一个系统软件, 在它的控制下, 计算机能够自动地, 成批地处理一个或多个用户的作业(这作业包括程序, 数据和命令).
第二代如何解决第一代的问题 / 缺点:
1. 把一堆人的输入攒成一大波输入,
2. 然后顺序计算(这是有问题的, 但是第二代计算也没有解决)
3. 把一堆人的输出攒成一大波输出
优点: 批处理, 节省了机时
缺点:
1)读取数据速度特别慢
2)CPU 利用率极低
3)联机 (多份代码) 使用
4)效率依然低下
5)计算的过程仍然是顺序计算 -》串行
2.3 脱机批处理 (现代操作系统的设计原理) 第三代(重要)
第三代计算机的操作系统广泛应用了第二代计算机的操作系统没有的关键技术: 多道技术(基于单核背景下产生)
三, 进程理论
3.1 什么是进程?
进程指的是一个正在进行 / 运行的程序, 进程是用来描述程序执行过程的虚拟概念.
进程的概念起源于操作系统, 进程是操作系统最核心的概念, 操作系统其它所有的概念都是围绕进程的.
进程 vs 程序:
进程: 程序执行的过程
程序: 一堆代码
3.2 多道技术
单道: 一条道走道黑 ---->串行
多道技术: 多道技术中的多道指的是多个程序, 多道技术是为了解决多个程序竞争或者说共享同一个资源 (比如 CPU) 的有序调度问题, 解决方式即多路复用, 多路复用分为时间上的复用和空间上的复用.
1. 空间上的复用: 内存中同时有多道 (个) 程序
2. 时间上的复用: 多道程序复用(共用)CPU 的时间, 切换 + 保存状态
当执行程序遇到 IO 时, 操作系统会将 CPU 的执行权限剥夺.
优点: CPU 的执行效率提高
当执行程序执行时间过长时, 操作系统会将 CPU 的执行权限剥夺.
缺点: 程序的执行效率低
3.3 并发与并行:
串行: 一个任务完完整整地运行完成, 才能运行下一个任务
并发: 在单核 (一个 CPU) 情况下, 当执行两个 A,B 程序时, A 先执行, 当 A 遇到 IO 时, B 开始争抢 CPU 的执行权限, 再让 B 执行, 他们看起像同时运行.
并行: 在多核 (多个 CPU) 的情况下, 当执行两个 A,B 程序时, A 与 B 同时执行. 他们是真正意义上的同时运行.
3.4 进程调度:
要想多个进程交替运行, 操作系统必须对这些进程进行调度, 这个调度也不是随即进行的, 而是需要遵循一定的法则, 由此就有了进程的调度算法.
先来先服务调度算法(了解)
- 比如程序 a,b, 若 a 先来, 则让 a 先服务, 待 a 服务完毕后, b 再服务.
- 缺点: 执行效率低.
短作业优先调度算法(了解)
- 执行时间越短, 则先调度.
- 缺点: 导致执行时间长的程序, 需要等待所有时间短的程序执行完毕后, 才能执行.
现代操作系统的进程调度算法: 时间片轮转法 + 多级反馈队列
时间片轮转法
- 将 CPU 的执行时间, 等分成 N 个时间片. 比如同时有 10 个程序需要执行, 操作系统会给你 10 秒, 然后时间片轮转法会将 10 秒分成 10 等分.
多级反馈队列
1 级队列: 优先级最高
2 级队列: 优先级次高
3 级队列: 优先级当前最低
3.5 同步与异步
同步与异步 指的是 "提交任务的方式"
同步(串行): 若有两个任务需要提交, 在提交第一个任务时, 必须等待该任务运行完毕拿到结果后, 才能继续提交并执行第二个任务, 会导致任务是串行执行的.
如两个 a,b 程序都要提交并执行, 假如 a 先提交执行, b 必须等 a 执行完毕后, 才能提交任务.
异步(并发): 若有两个任务需要提交, 在提交第一个任务时, 不需要原地等待, 立即可以提交并执行第二个任务, 会导致任务是并发执行的.
如两个 a,b 程序都要提交并执行, 假如 a 先提交并执行, b 无需等 a 执行完毕, 就可以直接提交任务.
3.6 阻塞与非阻塞
阻塞(等待): 凡是遇到 IO 都会阻塞.
- IO:
- input()
- print()
- time.sleep(3)
文件的读写
数据的传输
非阻塞 (不等待) : 除了 IO 都是非阻塞 (比如: 从 1+1 开始计算到 100 万)
3.7 进程的三种状态
就绪态: 所有任务提交完毕后, 就会进入就绪态
运行态: 通过进程调度一个任务开始执行, 该任务进入运行态
- 程序的执行时间过长 ----> 将程序返回给就绪态, 等待下次调度.
- 非阻塞
阻塞态: 凡是遇到 IO 操作的任务都会进入阻塞态, 待 IO 操作结束, 则阻塞态结束, 进入就绪态, 等待下次调度.
问题: 阻塞与同步是一样的吗? 非阻塞与异步是一样的吗?
同步与异步: 提交任务的方式
阻塞与非阻塞: 进程的状态.
异步非阻塞: ----> CPU 的利用率最大化(通过并发对程序进程操作)
3.8 创建进程的两种方式
- from multiprocessing import Process #Process 是一个类
- import time
- # 开启子进程方式一: 直接调用 Process
- def task(name):
- print(f'{name} is running')
- time.sleep(3)
- print(f'{name} is done')
- # 在 Windows 系统上, 开启子进程的操作必须要到 if __name__ == '__main__': 的子代码下面去
- if __name__ == '__main__':
- #target = 任务(函数地址) ---> 创建一个子进程
- obj = Process(target=task,args=('egon',)) #args 必须传一个元组的形式
- obj.start() # 只是向操作系统发送了一个开启子进程的信号
- print('主') #代表主进程
- # 开启子进程方式二:
- class Myprocess(Process):
- def __init__(self,name):
- super().__init__()
- self.name = name
- def run(self): #函数名必须叫 run
- print(f'{self.name} is running')
- time.sleep(3)
- print(f'{self.name} is done')
- if __name__ == '__main__':
- obj = Myprocess('egon')
- obj.start() #obj.start 自动调用 run 方法
- print('主') #代表主进程
join 方法: 让主进程在原地等待, 等待子进程运行完毕后再运行, 不会影响子进程的运行
- #join:
- from multiprocessing import Process #Process 是一个类
- import time
- # 开启子进程方式一: 直接调用 Process
- def task(name,n):
- print(f'{name} is running')
- time.sleep(n)
- print(f'{name} is done')
- # 在 Windows 系统上, 开启子进程的操作必须要到 if __name__ == '__main__': 的子代码下面去
- if __name__ == '__main__':
- #target = 任务(函数地址) ---> 创建一个子进程
- #异步提交 3 个任务
- obj1 = Process(target=task,args=('子 1',1)) #args 必须传一个元组的形式
- obj2 = Process(target=task,args=('子 2',2)) #args 必须传一个元组的形式
- obj3 = Process(target=task,args=('子 3',3)) #args 必须传一个元组的形式
- obj1.start() # 只是向操作系统发送了一个开启子进程的信号
- obj2.start() # 只是向操作系统发送了一个开启子进程的信号
- obj3.start() # 只是向操作系统发送了一个开启子进程的信号
- obj1.join() #主进程等 obj1 结束
- obj2.join() #主进程等 obj2 结束
- obj3.join() #主进程等 obj3 结束 #obj1 与 obj3 谁在前谁在后无所谓
- print('主')
上述代码冗余, 改进如下:
- #join: 让主进程在原地等待, 等待子进程运行完毕, 不会影响子进程的运行
- from multiprocessing import Process #Process 是一个类
- import time
- # 开启子进程方式一: 直接调用 Process
- def task(name,n):
- print(f'{name} is running')
- time.sleep(n)
- print(f'{name} is done')
- # 在 Windows 系统上, 开启子进程的操作必须要到 if __name__ == '__main__': 的子代码下面去
- if __name__ == '__main__':
- #target = 任务(函数地址) ---> 创建一个子进程
- #异步提交
- obj_1 = []
- for i in range(1,4):
- obj = Process(target=task,args=('子 %s' %i ,1)) #args 必须传一个元组的形式
- obj_1.append(obj)
- obj.start()
- for obj in obj_1:
- obj.join()
- print('主')
- # 开启子进程方式二:
- class Myprocess(Process):
- def __init__(self,name):
- super().__init__()
- self.name = name
- def run(self): #函数名必须叫 run
- print(f'{self.name} is running')
- time.sleep(3)
- print(f'{self.name} is done')
- if __name__ == '__main__':
- list1 = []
- for i in range(1,4):
- obj = Myprocess('子 %s' %i)
- obj.start() #obj.start 自动调用 run 方法
- list1.append(obj)
- for obj in list1:
- obj.join()
- print('主') #代表主进程
3.9 进程对象相关的其他属性或方法
3.9.1 进程 PID
PID: 每一个进程在操作系统内都有一个唯一的 id 号, 称之为 PID
- from multiprocessing import Process,current_process
- import time,os
- # 第一种: current_process().pid
- def task():
- print('子 %s is running' % current_process().pid)
- time.sleep(2)
- print('子 %s is done' %current_process().pid)
- if __name__ == '__main__':
- p = Process(target=task,name='子进程 1') #name 给子进程起名
- p.start()
- print(p.name) #'子进程 1'
- print('主',current_process().pid)
- # 第二种: os.getpid(),os.getppid()
- def task():
- print('子 %s is running 爹是:%s' % (os.getpid(),os.getppid()))
- time.sleep(2)
- print('子 %s is done 爹是:%s' % (os.getpid(),os.getppid()))
- if __name__ == '__main__':
- p = Process(target=task)
- p.start()
- print('主:%s 主他爹:%s' % (os.getpid(),os.getppid()))
- ### 2. 进程名字
3.9.2 子进程回收资源的两种方式:
1) join 让主进程等待子进程结束, 并回收子进程资源, 主进程再结束并回收资源.
2) 主进程 "正常结束" , 子进程与主进程一并被回收资源.
3.10 进程间数据隔离
进程隔离是为保护操作系统中进程互不干扰而设计的一组不同硬件和软件的技术
这个技术是为了避免进程 A 写入进程 B 的情况发生. 进程的隔离实现, 使用了虚拟地址空间. 进程 A 的虚拟地址和进程 B 的虚拟地址不同, 这样就防止进程 A 将数据信息写入进程 B
进程隔离的安全性通过禁止进程间内存的访问可以方便实现
- from multiprocessing import Process
- import time
- number = 10
- def func():
- global number
- number = 20
- print(number)
- if __name__ == '__main__':
- p_obj = Process(target=func)
- p_obj.start()
- p_obj.join() #让子进程先运行
- print(number)
执行结果:
20
10
3.11 僵尸进程与孤儿进程(了解)
僵尸进程 (有坏处): 在子进程结束后, 主进程没有结束, 子进程 PID 不会被回收.
缺点:
操作系统中的 PID 号是有限的, 如有子进程 PID 号无法正常回收, 则会占用 PID 号.
资源浪费.
若 PID 号满了, 则无法创建新的进程.
孤儿进程(没有坏处): 在子进程没有结束时, 主进程结束, 子进程 PID 不会被回收.
操作系统优化机制(孤儿院):
当主进程意外终止, 操作系统会检测是否有正在运行的子进程, 会他们放入孤儿院中, 让操作系统帮你自动回收.
3.12 守护进程
守护进程: 本质就是一个子进程. 当被守护的进程 (皇上) 结束时, 子进程 (太监) 也必须结束, 并回收.
- from multiprocessing import Process
- import time
- def task():
- print('is running')
- time.sleep(3)
- print('is done')
- if __name__ == '__main__':
- obj = Process(target=task)
- #守护进程必须在 obj.start()调用之前执行
- obj.daemon = True #将子进程设置为守护进程
- obj.start()
- time.sleep(1)
- print('主进程结束')
3.13 进程互斥锁
互斥锁是一把锁, 让并发变成串行, 牺牲了执行效率, 保证了数据安全
进程之间数据是不共享的, 但是共享同一套文件系统, 而共享带来的是竞争, 竞争带来的结果就是错乱, 如何控制, 就是加锁处理, 如抢票例子
- from multiprocessing import Process
- from multiprocessing import Lock #--- 进程互斥锁
- import JSON,time,random
- # 抢票例子:
- # 查票
- def search(name):
- with open('db.json','rt',encoding='utf-8')as f:
- dic = JSON.load(f)
- time.sleep(1)
- print(f'%s 查看到余票为 %s' %(name,dic['count']))
- # 购票
- def get(name):
- with open('db.json','rt',encoding='utf-8')as f:
- dic = JSON.load(f)
- if dic['count']> 0 :
- time.sleep(random.randint(1,3))
- dic['count'] -= 1
- with open('db.json','wt',encoding='utf-8')as f:
- JSON.dump(dic,f)
- print('%s 购票成功' % name )
- else:
- print('没有票了')
- def run(name,lock):
- search(name) #并发
- lock.acquire() #加锁
- get(name) #串行
- lock.release() #释放锁
- if __name__ == '__main__':
- lock = Lock() #产生一个互斥锁对象
- for i in range(10):
- p = Process(target=run,args=('路人 %s' %i,lock))
- p.start()
- # p.join() #join 只能将进程的任务整体变成串行
3.14 队列(先进先出)
队列相当于一个第三方的管道, 可以存放数据.
先进先出指的是先存放的数据就先取出来
应用: 让进程之间的数据进行交互
- from multiprocessing import Queue #multiprocessing 提供的队列
- from multiprocessing import JoinableQueue #基于 Queue 封装的队列
- import queue #python 内置的队列, 也是先进先出
- # 第一种:
- # Queue(5)指的是队列中只能存放 5 个数据
- q_obj1 = Queue(5) #q_obj1 队列对象 #第一种
- # q_obj1 = queue.Queue(5) #第二种
- #q_obj1 = JoinableQueue(5) #第三种
- # 添加数据到队列中
- q_obj1.put('jason')
- print('添加 1 个')
- q_obj1.put('hcy')
- print('添加 1 个')
- q_obj1.put('bh')
- print('添加 1 个')
- q_obj1.put('tank')
- print('添加 1 个')
- q_obj1.put('zzc')
- print('添加 1 个')
- # q_obj1.put('第 6 个') #put: 只要队列满了, 就会进入阻塞
- # print('第 6 个')
- # q_obj1.put_nowait('sean') #只要数据满了就会报错
- #get: 只要队列中有数据, 就能获取数据, 若没有则会进入阻塞
- print(q_obj1.get())
- print(q_obj1.get())
- print(q_obj1.get())
- print(q_obj1.get())
- print(q_obj1.get())
- print(q_obj1.get()) #到这步已经没有数据了
- # print(q_obj1.get_nowait()) #get_nowait: 若队列中没有数据获取则会报错
3.15 IPC 机制(进程间通信)
进程间数据是相互隔离的, 若想实现进程间通信, 可以利用队列.
- from multiprocessing import Process
- from multiprocessing import JoinableQueue
- import time
- def task1(q):
- q.put(100)
- print('添加数据 100')
- time.sleep(3)
- print(q.get())
- def task2(q):
- #想要在 task2 中获取 task1 的 100
- res = q.get()
- print(f'获取的数据是{res}')
- q.put(9527)
- if __name__ == '__main__':
- #产生队列
- q = JoinableQueue()
- #产生两个不同的子进程
- p1 = Process(target=task1,args=(q,))
- p2 = Process(target=task2,args=(q,))
- p1.start()
- p2.start()
执行结果:
添加数据 100
获取的数据是 100
9527
3.16 生产者消费者模型
生产者: 代指生产数据的
消费者: 代指使用数据的
该模型的工作方式: 生成数据传递消费者处理
实现方式: 生产者 ---->队列<----- 消费者
此模型是为了解决供需不平衡问题.
- from multiprocessing import JoinableQueue
- from multiprocessing import Process
- import time
- # 生产者: 生成数据放到队列中
- def producer(name,food,q):
- msg = f'{name}生产了{food}'
- q.put(food) #生产一个食物, 添加到队列中
- print(msg)
- # 消费者: 使用的数据 从队列中获取
- def customer(name,q):
- while True:
- try:
- time.sleep(1)
- #若报错, 则跳出循环
- food = q.get_nowait()
- msg = f'{name}吃了 {food} 食物!'
- print(msg)
- except Exception:
- break
- if __name__ == '__main__':
- q = JoinableQueue()
- #创建 10 个生产者
- for line in range(10):
- p1 = Process(target=producer,args=('tank1',f'油条{line}',q,))
- p1.start()
- #创建两个消费者
- c1 = Process(target=customer,args=('jason',q))
- c2 = Process(target=customer,args=('egon',q))
- c1.start()
- c2.start()
四, 线程理论
4.1 什么是线程?
进程其实是一个资源单位, 而进程内的线程才是 CPU 的执行单位. 线程其实指的就是代码的执行过程.
进程就好比划了块地, 在这块地上开了一个车间, 而线程好比车间里的一条或多条流水线, 真正运行的也就是流水线. 那每个车间必须至少有一条流水线吧.
注意: 开启一个进程, 一定会自带一个线程, 线程才是真正地执行者.
4.2 为什么要使用线程?
可以节省内存资源.
进程 vs 线程
开启进程:
会产生一个内存空间, 申请一块资源.
会自带一个主线程.
开启子进程的速度要比开启子线程的速度慢
开启线程:
一个进程内可以开启多个线程, 从进程的内存空间中申请资源.
节省资源.
强调: 进程之间数据是隔离的, 同一进程下多个线程共享该进程的数据, 如下所示:
- n = 100
- def task():
- global n
- n = 0
- if __name__ == '__main__':
- t = Thread(target=task)
- t.start()
- t.join()
- print('主',n) # n 为 0
4.3 如何使用?
开启线程的两种方式:
- from threading import Thread
- import time
- # 开启方式一:
- def task():
- print('start...') #最先打印, 因为开启线程的速度要比进程的快
- time.sleep(1)
- print('end...') #因为睡了 1 秒, 最后打印
- if __name__ == '__main__':
- #开启一个子线程
- t = Thread(target=task)
- t.start()
- print('主进程(主线程)...') #其次打印
- # 开启方式二:
- class Mythread(Thread):
- def run(self):
- print('start...') #最先打印, 因为开启线程的速度要比进程的快
- time.sleep(1)
- print('end...') #因为睡了 1 秒, 最后打印
- if __name__ == '__main__':
- #开启一个子线程
- t = Mythread()
- t.start()
- print('主进程(主线程)...') #其次打印
4.4, 线程相关的其他属性或方法
- from threading import Thread,active_count,current_thread
- import time,os
- def task():
- print('%s is running' %current_thread().name) #查看子线程名
- time.sleep(2)
- print('%s is done' %os.getpid()) #查看子线程 PID 号
- if __name__ == '__main__':
- t=Thread(target=task,)
- t.start()
- # t.join()
- # print('主',active_count()) #查看运行的线程数: 2
- print('主',current_thread().name) #查看主线程的名字
4.5 线程互斥锁
- # 未加互斥锁状态:
- from threading import Thread,Lock
- import time
- n = 100
- def task():
- global n
- temp = n #每个线程拿到的 n 此时都是 100
- time.sleep(0.1) #等 100 个线程全造完
- n = temp-1
- if __name__ == '__main__':
- t_1 = []
- for i in range(100):
- t =Thread(target=task)
- t_1.append(t)
- t.start()
- for t in t_1:
- t.join()
- print(n) # n-->99
- # 加互斥锁状态:
- from threading import Thread,Lock
- import time
- mutex = Lock()
- n = 100
- def task():
- global n
- mutex.acquire()
- temp = n #每个线程拿到的 n 此时都是 100
- time.sleep(0.1) #等 100 个线程全造完
- n = temp-1
- mutex.release()
- if __name__ == '__main__':
- t_1 = []
- for i in range(100):
- t =Thread(target=task)
- t_1.append(t)
- t.start()
- for t in t_1:
- t.join()
- print(n) # n-->0
4.6 线程池与进程池
进程池是用来限制创建的进程数.
线程池是用来限制创建的线程数.
- from concurrent.futures import ThreadPoolExecutor
- import time
- # 让 pool 只能创建 100 个线程
- pool = ThreadPoolExecutor(100) #pool 为线程池对象
- def task(line):
- print(line)
- time.sleep(3)
- if __name__ == '__main__':
- for line in range(1000):
- #异步提交任务
- pool.submit(task, line)
- #同步提交任务
- # res = pool.submit(task,line).result() #原地等拿到结果
- #print(res)
- from concurrent.futures import ThreadPoolExecutor
- import time
- import random
- # 让 pool 只能创建 100 个线程
- pool = ThreadPoolExecutor(10) #pool 为线程池对象
- def task(line):
- print(line)
- time.sleep(random.randint(1,2))
- return '小仓'
- if __name__ == '__main__':
- l =[]
- for line in range(50):
- #异步提交任务
- # pool.submit(task, line)
- # res = pool.submit(task,line).result() #res 拿到 task 任务提交后返回的结果
- # print(res)
- future = pool.submit(task,line)
- l.append(future)
- pool.shutdown(wait=True) #关闭进程池的入口, 并且在原地等待进程池内所有任务运行完毕
- for future in l:
- print(future.result())
- print('主')
回调函数: add_done_callback
- from concurrent.futures import ThreadPoolExecutor
- import time
- pool = ThreadPoolExecutor(50)
- def task1(n):
- print(f'from task1...{n}')
- time.sleep(3)
- return 'baohan'
- def get_result(obj):
- result = obj.result()
- print(result)
- if __name__ == '__main__':
- n = 1
- while True:
- # add_done_callback(参数 1), 会将 submit 提交的 task1 执行返回的结果, 传给 get_result 中的第一个参数,
- # 第一个参数是一个对象.
- pool.submit(task1,n).add_done_callback(get_result)
- n += 1
- from concurrent.futures import ThreadPoolExecutor
- from concurrent.futures import ProcessPoolExecutor
- import time,os,requests
- def get(url):
- print('%s GET %s' %(os.getpid(),url))
- time.sleep(3)
- response = requests.get(url)
- if response.status_code == 200:
- res = response.text
- else:
- res = '下载失败'
- return res
- def parse(future):
- time.sleep(1)
- res = future.result()
- print('%s 解析结果为 %s' %(os.getpid(),len(res)))
- if __name__ == '__main__':
- urls=['https://www.baidu.com',
- 'https://www.sina.com',
- 'https://www.tmall.com',
- 'https://www.jd.com',
- 'https://www.baidu.com',
- 'https://www.baidu.com',
- 'https://www.baidu.com',]
- p = ProcessPoolExecutor(7)
- l=[]
- for url in urls:
- future = p.submit(get,url)
- # parse 会在任务运行完毕后自动触发, 然后接收一个参数 future 对象
- # 结果 future 对象会在任务运行完毕后自动传给回调函数
- future.add_done_callback(parse)
- l.append(future)
- p.shutdown(wait=True)
- print('主')
4.7 GIL 全局解释器锁
什么是 GIL?
GIL 全局解释器锁, 本质上就是一把互斥锁, 保证数据安全.
在 Cpython 解释器中, 每个进程内都会存在一把 GIL, 同一进程内的每个线程必须要抢到 GIL 后才能执行自己的代码.
同一个进程下开启的多线程, 同一时刻只能有一个线程执行, 无法利用多核优势. 即无法实现并行, 但可以实现并发.
GIL 与多线程
PS:
CPU 是用来计算的, 不是用来做 I/O 的
多 CPU, 意味着可以有多个核并行完成计算, 所以多核提升的是计算性能
每个 CPU 一旦遇到 I/O 阻塞, 仍然需要等待, 所以多核对 I/O 操作没什么用处
# 分析:
我们有四个任务需要处理, 若实现并发的效果, 解决方案可以是:
方案一: 开启四个进程
方案二: 一个进程下, 开启四个线程
# 单核情况下, 分析结果:
如果四个任务是计算密集型, 没有多核来并行计算, 方案一徒增了创建进程的开销, 方案二胜
如果四个任务是 I/O 密集型, 方案一创建进程的开销大, 且进程的切换速度远不如线程, 方案二胜
# 多核情况下, 分析结果:
如果四个任务是计算密集型, 多核意味着并行计算, 在 python 中一个进程中同一时刻只有一个线程执行用不上多核, 方案一胜
如果四个任务是 I/O 密集型, 再多的核也解决不了 I/O 问题, 方案二胜
结论: 现在的计算机基本上都是多核, python 对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升, 甚至不如串行(没有大量切换),
但是, 对于 IO 密集型的任务效率还是有显著提升的
总结:
计算密集型: 多进程效率高
I/O 密集型: 多线程效率高
4.8 死锁与递归锁(了解)
- from threading import Thread,Lock,RLock
- import time
- #
- # #死锁
- mutexA = Lock()
- mutexB = Lock()
- class Mythread(Thread):
- def run(self):
- self.f1()
- self.f2()
- def f1(self):
- mutexA.acquire()
- print('%s 抢到 A 锁' %self.name)
- mutexB.acquire()
- print('%s 抢到 B 锁' %self.name)
- mutexB.release()
- mutexA.release()
- pass
- def f2(self):
- mutexB.acquire()
- print('%s 抢到 B 锁' %self.name)
- time.sleep(2) #足够其他 99 个线程起来了
- mutexA.acquire()
- print('%s 抢到 A 锁' %self.name)
- mutexA.release()
- mutexB.release()
- if __name__ == '__main__':
- for i in range(100):
- t = Mythread()
- t.start()
- # 递归锁
- mutexA = RLock()
- mutexB = RLock()
- class Mythread(Thread):
- def run(self):
- self.f1()
- self.f2()
- def f1(self):
- mutexA.acquire()
- print('%s 抢到 A 锁' %self.name)
- mutexB.acquire()
- print('%s 抢到 B 锁' %self.name)
- mutexB.release()
- mutexA.release()
- pass
- def f2(self):
- mutexB.acquire()
- print('%s 抢到了 B 锁' %self.name)
- time.sleep(2) #足够其他 99 个线程起来了
- mutexA.acquire()
- print('%s 抢到了 A 锁' %self.name)
- mutexA.release()
- mutexB.release()
- if __name__ == '__main__':
- for i in range(100):
- t = Mythread()
- t.start()
4.9 信号量(了解)
- from threading import Semaphore,Thread
- import time,random
- sm = Semaphore(5) #限制 5 个人同一时刻能拿到锁
- def task(name):
- sm.acquire()
- print(f'{name}正在上厕所')
- time.sleep(random.randint(1,3))
- sm.release()
- if __name__ == '__main__':
- for i in range(20):
- t = Thread(target=task,args=('路人 %s' %i,))
- t.start()
4.10 Event 事件(了解)
一个进程等其他进程结束后才能运行
- from threading import Thread,Event
- import time
- # 模拟车等红绿灯
- event = Event()
- def light():
- print('红灯正亮着')
- time.sleep(3)
- event.set() #绿灯亮
- def car(name):
- print('车 %s 正等绿灯' %name)
- event.wait() #等绿灯
- print('车 %s 通行' %name)
- if __name__ == '__main__':
- #红绿灯
- t1 = Thread(target=light)
- t1.start()
- #车
- for i in range(10):
- t = Thread(target=car,args=(i,))
- t.start()
五, 协程
什么是协程?
协程: 单线程下实现并发
- 在 IO 密集型的情况下, 使用协程能提高最高效率.
注意: 协程不是任何单位, 只是一个程序员 YY 出来的东西, 操作系统里面只有进程和线程的概念,(操作系统调度的是线程)
总结: 在单线程下实现多个任务间遇到 IO 就切换就可以降低单线程的 IO 时间, 从而最大限度地提高单线程地效率.
协程的目的:
手动实现 "IO 切换 + 保存状态" 去欺骗操作系统, 让操作系统误以为没有 IO 操作, 将 CPU 的执行权限给你.
- from gevent import monkey #猴子补丁
- monkey.patch_all() # 识别下列所有的任务是否有 IO 操作
- from gevent import spawn #spawn 任务
- from gevent import joinall
- import time
- def task1():
- print('start from task1...')
- time.sleep(3)
- print('end from task1...')
- def task2():
- print('start from task2...')
- time.sleep(5)
- print('end from task2...')
- if __name__ == '__main__':
- start_time = time.time()
- sp1 = spawn(task1) #spawn 提交任务
- sp2 = spawn(task2)
- # sp1.join()
- # sp2.join()
- joinall([sp1,sp2])
- end_time = time.time()
- print(f'消耗时间: {end_time - start_time}')
来源: http://www.bubuko.com/infodetail-3363279.html