多进程和多线程
一, 进程
1.1 进程的引入
现实生活中, 有很多的场景中的事情是同时进行的, 比如开车的时候 手和脚共同来驾驶汽车, 再比如唱歌跳舞也是同时进行的; 试想, 如果把唱歌和跳舞这 2 件事情分开依次完成的话, 估计就没有那么好的效果了(想一下场景: 先唱歌, 然后在跳舞, O(∩_∩)O 哈哈~)
程序中
如下程序, 来模拟 "唱歌跳舞" 这件事情
- # 模拟唱歌, 跳舞
- from time import sleep
- def sing():
- for i in range(3):
- print("正在唱歌...%d"%i)
- sleep(1)
- def dance():
- for i in range(3):
- print("正在跳舞...%d"%i)
- sleep(1)
- if __name__ == '__main__':
- sing() # 唱歌
- dance() # 跳舞
运行结果
注意
很显然刚刚的程序并没有完成唱歌和跳舞同时进行的要求
如果想要实现 "唱歌跳舞" 同时进行, 那么就需要一个新的方法, 叫做: 多任务
1.2 多任务的概念
什么叫 "多任务" 呢? 简单地说, 就是操作系统可以同时运行多个任务. 打个比方, 你一边在用浏览器上网, 一边在听 MP3, 一边在用 Word 赶作业, 这就是多任务, 至少同时有 3 个任务正在运行. 还有很多任务悄悄地在后台同时运行着, 只是桌面上没有显示而已.
现在, 多核 CPU 已经非常普及了, 但是, 即使过去的单核 CPU, 也可以执行多任务. 由于 CPU 执行代码都是顺序执行的, 那么, 单核 CPU 是怎么执行多任务的呢?
答案就是操作系统轮流让各个任务交替执行, 任务 1 执行 0.01 秒, 切换到任务 2, 任务 2 执行 0.01 秒, 再切换到任务 3, 执行 0.01 秒...... 这样反复执行下去. 表面上看, 每个任务都是交替执行的, 但是, 由于 CPU 的执行速度实在是太快了, 我们感觉就像所有任务都在同时执行一样.
真正的并行执行多任务只能在多核 CPU 上实现, 但是, 由于任务数量远远多于 CPU 的核心数量, 所以, 操作系统也会自动把很多任务轮流调度到每个核心上执行.
其实就是 CPU 执行速度太快啦..
1.2.1 进程
每个独立执行的程序称为进程
进程是程序的一次动态执行过程, 它经历了从代码加载, 执行到执行完毕的一个完整过程, 这个过程也是进程本身从产生, 发展到最终消亡的过程.
多进程 (多任务) 操作系统能同时运行多个进程(程序), 由于 CPU 具备分时机制, 所以每个进程都能循环获得自己的 CPU 时间片. 由于 CPU 执行速度非常快, 使得所有程序好象是在 "同时" 运行一样.
在操作系统中进程是进行系统资源分配, 调度和管理的最小单位, 进程在执行过程中拥有独立的内存单元.
比如: Windows 采用进程作为最小隔离单位, 每个进程都有自己的数据段, 代码段, 并且与别的进程没有任何关系. 因此进程间进行信息交互比较麻烦.
进程也可以通过派生 (fork 或 spawn)新的进程来执行其他任务, 不过因为每个新进程也都拥有自己的内存和数据栈等, 所以只能采用进程间通信 (IPC) 的方式共享信息.
1.2.2 线程
为了解决进程调度资源的浪费, 为了能够共享资源, 出现了线程. 有时候把线程称之为轻量级进程.
线程是 CPU 调度和分派的基本单位, 它可与同属一个进程的其他的线程共享进程所拥有的全部资源, 多个线程共享内存, 从而极大地提高了程序的运行效率.
线程是比进程更小的执行单位, 线程是进程内部单一的一个顺序控制流.
所谓多线程是指一个进程在执行过程中可以产生多个线程, 这些线程可以同时存在, 同时运行, 形成多条执行线索. 一个进程可能包含了多个同时执行的线程.
一个或更多的线程构成了一个进程(操作系统是以进程为单位的, 而进程是以线程为单位的, 进程中必须有一个主线程 main)
如果一个进程没有了, 那么这个进程内的所有线程肯定会消失, 如果线程消失了, 但是进程未必会消失. 只有所有的线程都结束了, 进程才会结束!!! 而且所有线程都是在进程的基础之上同时运行.
1.3 Python 和并发编程
在大多数系统上, Python 支持多进程 (基于消息传递) 编程和多线程编程.
大多数人比较熟悉的是多线程编程, 但是在 python 中的多线程编程却是有诸多的限制.
python 中多线程的限制
为了线程安全考虑, python 的解释器还是使用了内部的 GIL(Global Interperter Lock, 全局解释器锁定), 在任意时刻只运行单个 python 的线程执行. 即使有多个可用的 CPU 核心, 也是如此. 这就限制了 python 只能在一个 CPU 核心上运行.
GIL 的存在直接影响了程序的并发编程问题.
如果一个应用程序是大部分与 I/O 相关, 那么使用线程一般没有问题, 因为大部分时间是在 I/O 等待.
如果一个应用程序是 CPU 密集型的, 则使用多线程的坏处大于好处, 返回会降低程序的运行速度, 一般比你想象的还要慢的多.
因此, 用户在有些情况需要使用多进程(子进程和消息传递)
子进程和消息传递
展望未来, 如果要再 python 中进行各种类型的并发编程, 消息传递应该是最应该掌握的概念.
1.4 multiprocessing 包
multiprocessing 是一个 package, 这个包支持使用类似 threading 模块的类似 API 去创建新的进程.
multiprocessing 支持本地和远程并发编程, 通过使用子进程来代替线程高效的规避了 GIL 问题.
所以, multiprocessing 允许程序员重复利用给定计算机的多核 CPU.
由于 python 的跨平台, 所以 multiprocessing 支持多个平台: unix, Windows, Linux.
1.4.1 Process 类
Process 语法结构如下:
Process([group [, target [, name [, args [, kwargs]]]]])
target: 表示这个进程实例所调用对象;
args: 表示调用对象的位置参数元组;
kwargs: 表示调用对象的关键字参数字典;
name: 为当前进程实例的别名;
group: 大多数情况下用不到;
最简单的使用代码:
- # 从 multiprocessing 中导入 Process
- from multiprocessing import Process
- import os
- # 子进程要执行的代码
- def run_proc(name):
- print('子进程运行中, hello,= %s ,pid=%d...' % (name, os.getpid()))
- if __name__ == '__main__': # 判断是否为主程序
- print('父进程 %d.' % os.getpid())
- """
- 创建 Process 对象, 表示一个子进程.
- 1. target 参数表示子进程要做的任务(一个可执行对象)
- 2. args 是一个元组, 表示传递给 target 的可执行对象的位置参数.
- 本例中就是把 "王二狗" 传递给函数 f 的 name 参数
- """ p = Process(target=run_proc, args=('王二狗',))
- print('子进程将要执行..')
- p.start() # 启动子进程
- p.join() # 等待进程终止
- print("子进程已经终止")
说明
创建子进程时, 只需要传入一个执行函数和函数的参数, 创建一个 Process 实例, 用 start()方法启动.
join()方法可以等待子进程结束后再继续往下运行, 通常用于进程间的同步.
1.4.1.1 Process 类的实例具有以下方法
Process 实例 p 具有以下方法:
p.start()
启动子进程. 这将运行代表进程的子进程, 并调用该子进程中的 p.run()方法.
p.join([timeout])
等待进程 p 终止, timeout 是可选的超时时间. 这个方法通常用户进程间的同步.
p.is_alive()
测试进程 p 是否还在运行, 如果扔在运行, 则返回 True
run():
如果没有给定 target 参数, 对这个对象调用 start()方法时, 就将执行对象中的 run()方法;
p.terminate()
强制终止 p 进程. 如果调用此方法, 进程 p 将立即被终止, 同时不会进行任何清理工作. 如果再进程 p 中也开启了子进程, 则这些子进程将成为僵死进程. s 如果 p 保存了一个锁定或有进程间通信, 那么终止可能会导致死锁或 I/O 崩溃.
1.4.1.2 Process 实例具有以下实例属性:
Process 实例 p 具有以下实例属性:
p.daemon
一个布尔标志, 指示这个进程是否为后台进程. 当创建他的 python 进程终止时, 后台进程将自动终止.
另外禁止后台进程创建自己的新进程. p.daemon 的值必须再进程启动前设置.
p.exitcode
进程的整数退出码. 如果进程仍在运行, 则它的值是 None. 如果是负数, -N 表示由信号 N 所终止
p.name
当前进程实例别名, 默认为 Process-N,N 为从 1 开始递增的整数;
p.pid
进程的整数 ID
1.4.1.3 实例
实例 1:
- from multiprocessing import Process
- import os
- from time import sleep
- # 子进程要执行的代码
- def run_proc(name, age, **kwargs):
- for i in range(10):
- print('子进程运行中, name= %s,age=%d ,pid=%d...' % (name, age,os.getpid()))
- print(kwargs)
- sleep(0.5)
- if __name__=='__main__':
- print('父进程 %d.' % os.getpid())
- p = Process(target=run_proc, args=('test',18), kwargs={"m":20})
- print('子进程将要执行')
- p.start()
- sleep(1)
- p.terminate()
- p.join()
- print('子进程已结束')
运行结果:
实例 2:
- from multiprocessing import Process
- import time
- import os
- # 两个子进程将会调用的两个方法
- def worker_1(interval):
- print("worker_1, 父进程(%s), 当前进程(%s)"%(os.getppid(), os.getpid()))
- t_start = time.time()
- time.sleep(interval) # 程序将会被挂起 interval 秒
- t_end = time.time()
- print("worker_1, 执行时间为'%0.2f'秒" % (t_end - t_start))
- def worker_2(interval):
- print("worker_2, 父进程(%s), 当前进程(%s)" % (os.getppid(), os.getpid()))
- t_start = time.time()
- time.sleep(interval)
- t_end = time.time()
- print("worker_2, 执行时间为'%0.2f'秒" % (t_end - t_start))
- if __name__ == '__main__': # 判断是否为主程序
- # 输出当前程序的 ID
- print("进程 ID:%s" % os.getpid())
- """
- 创建两个进程对象, target 指向这个进程对象要执行的对象名称,
- args 后面的元组中, 是要传递给 worker_1 方法的参数,
- 因为 worker_1 方法就一个 interval 参数, 这里传递一个整数 2 给它,
- 如果不指定 name 参数, 默认的进程对象名称为 Process-N,N 为一个递增的整数
- """
- p1=Process(target=worker_1, args=(2,))
- p2=Process(target=worker_2, name="王二狗", args=(1,))
- # 使用 "进程对象名称. start()" 来创建并执行一个子进程,
- # 这两个进程对象在 start 后, 就会分别去执行 worker_1 和 worker_2 方法中的内容
- p1.start()
- p2.start()
- # 同时父进程仍然往下执行, 如果 p2 进程还在执行, 将会返回 True
- print("p2.is_alive=%s" % p2.is_alive())
- # 输出 p1 和 p2 进程的别名和 pid
- print("p1.name=%s" % p1.name)
- print("p1.pid=%s" % p1.pid)
- print("p2.name=%s" % p2.name)
- print("p2.pid=%s" % p2.pid)
- """
- join 括号中不携带参数, 表示父进程在这个位置要等待 p1 进程执行完成后, 再继续执行下面的语句, 一般用于进程间的数据同步
- 如果不写这一句, 下面的 is_alive 判断将会是 True,
- 改成 p1.join(1),
- 因为 p2 需要 2 秒以上才可能执行完成, 父进程等待 1 秒很可能不能让 p1 完全执行完成, 所以下面的 print 会输出 True, 即 p1 仍然在执行
- """
- p1.join()
- print("p1.is_alive=%s" % p1.is_alive())
运行结果:
1.4.1.4 进程的创建 - Process 子类
创建新的进程还能够使用类的方式, 可以自定义一个类, 继承 Process 类, 每次实例化这个类的时候, 就等同于实例化一个进程对象
示例代码:
- from multiprocessing import Process
- import time
- import os
- # 继承 Process 类
- class ProcessClass(Process):
- """
- 因为 Process 类本身也有__init__方法, 这个子类相当于重写了这个方法,
- 但这样就会带来一个问题, 我们并没有完全的初始化一个 Process 类, 所以就不能使用从这个类继承的一些方法和属性,
- 最好的方法就是将继承类本身传递给 Process.__init__方法, 完成这些初始化操作
- """
- def __init__(self,interval):
- Process.__init__(self)
- self.interval = interval
- # 重写了 Process 类的 run()方法
- def run(self):
- print("子进程(%s) 开始执行, 父进程为(%s)" % (os.getpid(), os.getppid()))
- t_start = time.time()
- time.sleep(self.interval)
- t_stop = time.time()
- print("(%s)执行结束, 耗时 %0.2f 秒"%(os.getpid(), t_stop-t_start))
- if __name__ == "__main__":
- t_start = time.time()
- print("当前程序进程(%s)"%os.getpid())
- p1 = ProcessClass(2)
- # 对一个不包含 target 属性的 Process 类执行 start()方法, 就会运行这个类中的 run()方法, 所以这里会执行 p1.run()
- p1.start()
- p1.join()
- t_stop = time.time()
- print("(%s)执行结束, 耗时 %0.2f"%(os.getpid(),t_stop-t_start))
运行结果:
1.4.2 进程池: Pool
当需要创建的子进程数量不多时, 可以直接利用 multiprocessing 中的 Process 动态成生多个进程, 但如果是上百甚至上千个目标, 手动的去创建进程的工作量巨大, 此时就可以用到 multiprocessing 模块提供的 Pool 方法. 使用类 Pool 可以创建进程池, 然后把各种数据处理任务都提交给进程池.
初始化 Pool 时, 可以指定一个最大进程数, 当有新的请求提交到 Pool 中时, 如果池还没有满, 那么就会创建一个新的进程用来执行该请求; 但如果池中的进程数已经达到指定的最大值, 那么该请求就会等待, 直到池中有进程结束, 才会创建新的进程来执行
Pool([numprocess, initializer, initargs])
说明:
numprocess 是指要创建的线程数. 默认是 CPU 的核心数.(os.cpu_count()的返回值)
initializer 是每个进程启动时要执行的可调用对象, 默认是 None
initargs 是传递给 initializer 的元组参数.
1.4.2.1 multiprocessing.Pool 常用函数解析:
apply(func[, args[, kwds]]): 使用阻塞方式调用 func
在进程池的一个工作进程中执行 func 函数, args 是传给 func 的元组参数. 注意使用这个方法让多个进程去执行, 他们是同步执行的. 即: 多个进程是顺序执行的.
func 的返回值就是 p.apply 的返回值.
apply_async(func[, args, kwargs, callback]) : 使用非阻塞方式调用 func
(并行执行, 堵塞方式必须等待上一个进程退出才能执行下一个进程),args 为传递给 func 的参数列表, kwds 为传递给异步的执行 func
callback 是可调用对象, 当 func 执行结束, 则立即调用 callback 并把 func 的返回值传递给 callback.
func 的关键字参数列表;
close(): 关闭 Pool, 使其不再接受新的任务;
terminate(): 不管任务是否完成, 立即终止;
join(): 主进程阻塞, 等待子进程的退出, 必须在 close 或 terminate 之后使用;
AsyncResult 对象 (apply_async() 的返回值)
apply_async()的返回值是 AsyncResult 实例. 具有如下方法:
a.get([timeout])
等待返回结果, 结果就是任务函数的返回值.
a.ready()
如果任务函数执行结束返回 True
a.successful()
如果任务函数执行结束, 且在执行的过程中没有发生异常则
a.wait([timeout])
等待任务结束, 这个方法与 get()的区别就是它没有返回值.
来源: https://www.cnblogs.com/yanadoude/p/12636665.html