1, 基础知识:
计算机的硬件组成:
主板 固化(寄存器, 是直接和 CPU 进行交互的硬件)
CPU 中央处理器: 计算(数字计算和逻辑计算) 和控制(控制所有的硬件协调工作)
存储 硬盘 内存
输入设备 键盘 鼠标 话筒
输出设备 显示器 音响 打印机
早期的计算机是以计算为核心的, 现在的计算机是以存储为核心的
操作系统是一个软件 是一个能直接操作硬件的一个软件
操作系统的目标: 让用户使用更加轻松, 高可用, 低耦合, 封装了所有硬件的接口, 使用户更方便的使用, 对于计算机内的所有资源, 进行一个合理的调度和分配
进程相关基础知识:
进程: 正在执行的程序, 是程序执行过程中的相关指令, 数据集等的集合, 也可以叫做程序的一次执行过程, 是一个动态的概念
进程的组成: 代码段 数据段 PCB: 进程控制块
进程的三大基本状态:
就绪状态: 已经获得了运行所需要的所有资源, 除了 CPU
执行状态: 已经获得了所有资源, 包括 CPU, 处于正在执行的状态
阻塞状态: 因为各种原因, 进程放弃了 CPU, 导致进程无法继续执行, 此时进程处于内存中, 继续等待获取 CPU
一个特殊状态: 挂起状态, 因为各种原因, 进程放弃了 CPU, 导致进程无法继续执行, 此时进程被踢出内存
2, 进程
multiprocessing python 的内置模块, 用于多进程编程 Process from multiprocessing import Process
并行: 指两件或者多事情, 在同一个时间点开始执行
并发: 指两件或者多件事情, 在同一个时间间隔内同时执行
同步: 某一个任务的执行必须依赖另一个任务的返回结果
异步; 某一个任务的执行不需要依赖另一个任务的返回, 只需要告诉另一个任务一声
阻塞: 程序因为类似 IO 等待, 等待时间等无法继续执行
非阻塞: 程序遇到类似 IO 操作时候, 不在阻塞等待, 如果没有及时处理 IO, 就报错或者跳过
进程的方法或者属性:
os.getpid() 获取当前进程的 pid os.getppid()获取当前进程的父进程 id
start() 开启一个子进程
join()异步变同步, 让父进程等待子进程执行结束, 再继续执行(当主进程执行到这条语句的时候, 主进程阻塞, 等待子进程执行完毕, 主进程继续执行),join 必须放在 start 之后
is_alive() 判断进程是否活着
terminate() 杀死进程
属性;
name: 子进程的名字
pid: 子进程的 pid
deamon: 设置进程为守护进程, 给 True 代表为守护进程, 默认为 false, 不是守护进程
守护进程: p.daemon=True
随着父进程的代码执行完毕就结束(敲黑板, 划重点!! 是代码执行完毕 -- 主进程不会阻塞等待)
守护进程不能创建子进程
守护进程必须在进程 start 之前设置
IPC: 进程间通信
锁机制: 为了多进程通信的时候, 保护数据的安全性
- from multiprocessing import Lock
- l = lock()
l.acquire() 获得锁(此时其他进程不可以访问锁住的资源)
l.release() 释放锁(其他进程可以访问)
信号机制:
sem = Semaphore(n)
n: 初始化的时候一把锁配几把钥匙, int
- l.acquire()
- l.release()
信号量机制比锁机制多了个计数器, 这个计数器用来记录当前剩余几把锁, 计数器为 0, 表达当前没有钥匙, acquire()处于阻塞状态, acquire()一次, 计数器内部减 1,release 一次, 计数器加一;
事件机制:
e = Event()
初始为 false, 阻塞状态
e.set() 设置 is_set()为 True, 代表非阻塞状态
e.clear() 设置 is_set()为 false, 代表阻塞状态
e.wait()判断 is_set 的值, True 为非阻塞. fase 为阻塞
e.is_set() 标志
3, 生产者消费者模型
简要介绍: 主要是用来解耦, 借助队列来实现生产者消费者模型
栈: 先进后出
队列: 先进先出
import queue 不能进行多进程之间的数据传输
from multiprocessing import Queue 借助 Queue 解决生产者消费者模型
队列是安全的
q = Queue(num--- 队列的最大长度)
q.get() 阻塞等待获取数据, 如果有数据直接获取, 没有则阻塞等待
q.put() 阻塞 如果可以继续往队列中放数据, 就直接放, 不能放则阻塞等待
q.get_nowait() 不阻塞, 如果有数据直接获取, 没有数据就报错
q.put_nowait() 不阻塞, 如果可以继续往队列中放数据, 就直接放, 不能就报错
from multiprocessing import JoinableQueue 可连接的队列
继承 Queue 可以使用 queue 的方法
增加的方法:
q.join() 用户生产者接收消费者的返回结果, 接收全部生产的数量, 以便知道什么时候队列里的数据被消费完了
q.task_done() 每消费一个数据, 就返回一个表示返回结果, 生产者就您呢个获得当前消费者消费了多少个数据, 每消费队列里的一个数据, 就给 join 返回一个表示
管道:
- from multiprocessing import Pipe
- con1, con2 = Pipe()
管道是不安全的
管道是用于多线程之间通信的一种方式
单进程中:
con1 发则 con2 收, con2 收则 con1 发
多进程中:
父进程 con1 发, 子进程的 con2 收
管道中错误 EOFError 父进程中如果关闭发送端, 子进程还在继续接收, 就回导致 EOFError
4, 进程池
一个池子, 里面有固定数量的进程, 且处在待命状态, 一旦有任务来, 马上就有进程去处理
开启进程需要操作系统消耗大量的事件去管理它, 大量的事件让 CPU 去调度它
进程池会帮助程序员去管理池中的进程
- from multiprocessing import Pool
- p = Pool(os.cpu_count() + 1)
进程池的三个方法:
map(func, iterable)
func: 进程池中进程执行的任务函数
iterable: 可迭代对象, 是把可迭代对象中的每一个元素传给任务函数当参数
- from multiprocessing import Pool
- import os
- def func(num):
- num += 1
- print(num)
- return num
- if __name__ == '__main__':
- p = Pool(os.cpu_count() + 1)
- print(os.cpu_count())
- res = p.map(func, [i for i in range(20)]) . # i 作为参数传入 func 中
- p.close() # 表示不能再向进程池中添加任务
- p.join() # 表示等待进程池中所有任务执行完毕
- print(type(res))
apply(func,arg=()) 同步的执行, 即池中的进程一个个的去执行任务
func: 进程池中进程执行的任务函数
args: 可迭代对象的参数, 是传给任务函数的参数
同步执行任务, 不需要 close 和 join, 进程池中所有的进程都是普通进程(主进程需要等待其结束)
- from multiprocessing import Pool
- import requests
- import time
- def func(url):
- res = requests.get(url)
- if res.status_code == 200:
- return 'ok'
- if __name__ == '__main__':
- p = Pool(5)
- l = ['https://www.baidu.com',
- 'http://www.jd.com',
- 'http://www.taobao.com',
- 'http://www.mi.com',
- 'http://www.cnblogs.com',
- 'https://www.bilibili.com',
- ]
- start = time.time()
- for i in l:
- p.apply(func, args=(i,)) #即使有 n 个线程也是一个一个的去执行
- print(time.time() - start)
- start = time.time()
- for i in l:
- p.apply_async(func, args=(i, ))
- p.close()
- p.join()
- print(time.time() - start)
apply_async(func,args=(), callback=None) 异步: 池中的进程一次性去执行任务
func: 进程池中进程执行的任务函数
args: 可迭代对象的参数, 是传给任务函数的参数
callback: 回调函数 当进程池中有进程处理完任务来, 返回的结果可以交给回调函数, 由回调函数进程进一步的处理, 这是只有异步才有的
异步处理任务, 需要 close 和 join
异步处理任务时, 进程池中所有的进程都是守护进程
回调函数:
进程的任务函数的返回值, 被当成回调函数的形参接收到, 一次进一步的处理操作
回调函数是由主进程调用的, 而不是子进程, 子进程只负责把结果给回调函数
- from multiprocessing import Pool
- import requests
- import time
- import os
- def func(url):
- res = requests.get(url)
- print("子进程的 pid:%s, 父进程的 pid:%s" % (os.getpid(), os.getppid()))
- if res.status_code == 200:
- return url
- def cal_back(sta):
- url = sta
- print("回调函数的 pid:%s" % os.getpid())
- with open('content.txt', 'a+') as f:
- f.write(url + "\n")
- if __name__ == '__main__':
- p = Pool(5)
- l = ['https://www.baidu.com',
- 'http://www.jd.com',
- 'http://www.taobao.com',
- 'http://www.mi.com',
- 'http://www.cnblogs.com',
- 'https://www.bilibili.com',
- ]
- print("主进程 pid:%s" % os.getpid())
- for i in l:
- p.apply_async(func, args=(i,), callback=cal_back)
- p.close()
- p.join()
进程和进程池对比
- from multiprocessing import Process, Pool
- import time
- def func(num):
- num += 1
- # print(num)
- if __name__ == '__main__':
- p = Pool(5)
- start = time.time()
- p.map(func, [i for i in range(1000)])
- p.close() # 指不能再向进程池中添加任务
- p.join() # 等待进程池中的所有任务执行完毕
- print(time.time() - start)
- p_l = []
- start = time.time()
- for i in range(1000):
- p = Process(target=func, args=(i,))
- p.start()
- p_l.append(p)
- [i.join() for i in p_l]
- print(time.time() - start)
5, 生产者消费者模型实例
队列实现生产者消费者模型
- from multiprocessing import Queue, Process
- import time
- def producer(q, name):
- for i in range(20):
- q.put(name)
- print("生产第 %s 个 %s" % (i, name))
- # q.put(None)
- def consumer(q, name, color):
- while 1:
- info = q.get()
- if info:
- print("%s %s 拿走来 %s\033[0m" % (color, name, info))
- else:
- break
- if __name__ == '__main__':
- q = Queue(10)
- p = Process(target=producer, args=(q, 'number one'))
- p1 = Process(target=producer, args=(q, 'number two'))
- p2 = Process(target=producer, args=(q, 'number three'))
- c1 = Process(target=consumer, args=(q, 'alex', '\033[31m'))
- c2 = Process(target=consumer, args=(q, 'wusir', '\033[32m'))
- p_l = [p, p1, p2, c2, c1]
- [i.start() for i in p_l]
- p.join()
- p1.join()
- p2.join()
- q.put(None)
- q.put(None) # 设置标志 表示没有数据了, 生产者不再生产数据
joinableQueue 实现生产者消费者进程
- from multiprocessing import JoinableQueue, Process
- def producer(q, name):
- for i in range(20):
- q.put(name)
- print("生产第 %s 个 %s" % (i, name))
q.join() 生产者进程等待消费者进程消费完成
- def consumer(q, name):
- while 1:
- q.get()
- print("%s 拿走了一个" % name)
- q.task_done()
- if __name__ == '__main__':
- q = JoinableQueue(10)
- p1 = Process(target=producer, args=(q, 'one'))
- c1 = Process(target=consumer, args=(q, 'alex'))
- c1.daemon = True # 设置守护进程
- p1.start()
- c1.start()
- p1.join() # 主进程等待生产者进程
- # 主进程等待生产者进程结束
- # 程序有 3 个进程, 主进程和生产者进程和消费者进程. 当主进程执行到 35 行代码时, 主进程会等待生产进程结束
- # 而生产进程中 (第 26 行) 会等待消费者进程把所有数据消费完, 生产者进程才结束.
- # 现在的状态就是 主进程等待生产者进程结束, 生产者进程等待消费者消费完所有数据
- # 所以, 把消费者设置为守护进程. 当主进程执行完, 就代表生产进程已经结束, 也就代表消费者进程已经把队列中数据消费完
- # 此时, 主进程一旦结束, 守护进程也就是消费者进程也就跟着结束. 整个程序也就能正常结束了.
6, 其他相关知识
进程间的内存共享
- from multiprocessing import Manager, Value
- m = manager()
- num = m.dict({
- }) num = m.list([])
IPC: 管道 队列(锁 信号量 事件)
多个进程之间不能共享内存, 不能修改全局变量
- from multiprocessing import value
- num = Value("i"-- 数据类型, num-- 数据)
num.value() 值
Manager 模块: 多进程间共享数据
- m = Manager()
- num = m.list([1,2,3])
来源: http://www.bubuko.com/infodetail-3170657.html