进程, 线程的含义?
1. 什么是进程?
进程是指运行中的应用程序, 每个进程都有自己独立的地址空间(内存空间). 比如用户点击桌面的 IE 浏览器, 就启动了一个进程, 操作系统就会为该进程分配独立的地址空间. 当用户再次点击 IE 浏览器, 又启动了一个进程, 操作系统将为新的进程分配新的独立的地址空间. 多进程就是 "多任务", 就像使用电脑时同时打开浏览器上网, 打开播放器听歌, 后台还默默运行着杀毒软件一样. 现代操作系统如 Mac OS X,UNIX,Linux,Windows 等都支持多进程, 每启动一个进程, 操作系统便为该进程分配一个独立的内存空间.
2. 什么是线程?
线程是进程中的一个实体, 是被系统独立调度和分派的基本单位. 一个进程可以有一个线程, 也可以有多个线程.
线程自己不拥有独立的系统资源, 只拥有一点在运行中必不可少的资源, 它可与同属一个进程的其它线程共享当前进程所拥有的全部资源.
一个线程可以创建和撤消另一个线程, 同一进程中的多个线程之间可以并发执行.
线程有就绪 (runnable), 阻塞(blocked) 和运行 (running) 三种基本状态以及新建 (new) 和死亡 (dead) 状态.
为什么要有多进程和多线程?
每个进程至少要干一件事, 比如一个编辑器既要打字输入同时又要检测打错的拼写有时候还要区分一些关键字高亮显示, 它们同属于编辑器这个进程, 我们把编辑器作为一个进程, 而以上这些工作就是它的子任务, 如何实现他们同时工作呢? 就是让每个子任务即线程短暂运行交替执行, 由于它们彼此之间交替太快了, 看起来就像同时运行一样.(真正的多线程需要多核 CPU 才能实现)
当我们要让一个 python 程序执行多个任务时, 我们可以用多个进程或多个线程来完成我们的任务, 他们之间彼此同时交替进行甚至一个任务依赖于另一个任务执行的结果, 他们需要相互通信和协调, 所以我们就需要用到多进程和多线程编程了.
实现多进程和多线程
1. 多进程
Linux 下可使用 os 模块的 fork().
Unix/Linux 操作系统提供了一个 fork()系统调用, 它非常特殊. 普通的函数调用, 调用一次, 返回一次, 但是 fork()调用一次, 返回两次, 因为操作系统自动把当前进程 (称为父进程) 复制了一份(称为子进程), 然后, 分别在父进程和子进程内返回.
- import os
- print('Process (%s) start...' % os.getpid())
- # Only works on Unix/Linux/Mac:
- pid = os.fork()
- if pid == 0:
- print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
- else:
- print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
Windows 下可以使用 multiprocessing 模块
multiprocessing 模块提供了一个 Process 类来代表一个进程对象, 下面的例子演示了启动一个子进程并等待其结束:
- from multiprocessing import Process
- import os
- # 子进程要执行的代码
- def run_proc(name):
- print('Run child process %s (%s)...' % (name, os.getpid()))
- if __name__=='__main__':
- print('Parent process %s.' % os.getpid())
- p = Process(target=run_proc, args=('test',))
- print('Child process will start.')
- p.start()
- p.join()
- print('Child process end.')
创建子进程时, 只需要传入一个执行函数和函数的参数, 创建一个 Process 实例, 用 start()方法启动.
join()方法可以等待子进程结束后再继续往下运行, 通常用于进程间的同步.
Pool
如果要启动大量的子进程, 可以用进程池的方式批量创建子进程:
- from multiprocessing import Pool
- import os, time, random
- def long_time_task(name):
- print('Run task %s (%s)...' % (name, os.getpid()))
- start = time.time()
- time.sleep(random.random() * 3)
- end = time.time()
- print('Task %s runs %0.2f seconds.' % (name, (end - start)))
- if __name__=='__main__':
- print('Parent process %s.' % os.getpid())
- p = Pool(4)
- for i in range(5):
- p.apply_async(long_time_task, args=(i,))
- print('Waiting for all subprocesses done...')
- p.close()
- p.join()
- print('All subprocesses done.')
对 Pool 对象调用 join()方法会等待所有子进程执行完毕, 调用 join()之前必须先调用 close(), 调用 close()之后就不能继续添加新的 Process 了.
子进程
很多时候, 子进程并不是自身, 而是一个外部进程. 我们创建了子进程后, 还需要控制子进程的输入和输出.
subprocess 模块可以让我们非常方便地启动一个子进程, 然后控制其输入和输出.
下面的例子演示了如何在 Python 代码中运行命令 nslookup www.python.org, 这和命令行直接运行的效果是一样的:
- import subprocess
- print('$ nslookup www.python.org')
- r = subprocess.call(['nslookup', 'www.python.org'])
- print('Exit code:', r)
2. 多线程
使用 threading 模块实现多线程, Python 的线程是真正的 Posix Thread, 而不是模拟出来的线程.
- import time, threading
- def loop():
- print('线程 %s 在运行' % threading.current_thread().name)
- n = 0
- while n <5:
- n = n + 1
- print('线程 %s>>> %s' % (threading.current_thread().name, n))
- time.sleep(1)
- print('线程 %s 结束.' % threading.current_thread().name)
- print('线程 %s 在运行' % threading.current_thread().name)
- t = threading.Thread(target=loop, name='子线程 1')
- t2 = threading.Thread(target=loop, name='子线程 2')
- t.start()
- t2.start()
- t.join()
- t2.join()
- print('线程 %s 结束.' % threading.current_thread().name)
或者
- import time, threading
- num=0
- lock = threading.Lock()
- def action_one():
- global num
- for i in range(3):
- lock.acquire()
- try:
- print("线程 1 %d"%num)
- num+=1
- time.sleep(1)
- finally:
- lock.release()
- def action_two():
- global num
- for i in range(3):
- lock.acquire()
- try:
- print("线程 2 %d"%num)
- num+=1
- time.sleep(1)
- finally:
- lock.release()
- t1 = threading.Thread(target=action_one, name='子线程 1')
- t2 = threading.Thread(target=action_two, name='子线程 2')
- t1.start()
- t2.start()
- t1.join()
- t2.join()
进程之间和线程之间的相互协调
1. 进程间的通信:
Process 之间肯定是需要通信的, 操作系统提供了很多机制来实现进程间的通信. Python 的 multiprocessing 模块包装了底层的机制, 提供了 Queue,Pipes 等多种方式来交换数据.
以 Queue 为例, 在父进程中创建两个子进程, 一个往 Queue 里写数据, 一个从 Queue 里读数据:
- from multiprocessing import Process, Queue
- import os, time, random
- # 写数据进程执行的代码:
- def write(q):
- print('Process to write: %s' % os.getpid())
- for value in ['A', 'B', 'C']:
- print('Put %s to queue...' % value)
- q.put(value)
- time.sleep(random.random())
- # 读数据进程执行的代码:
- def read(q):
- print('Process to read: %s' % os.getpid())
- while True:
- value = q.get(True)
- print('Get %s from queue.' % value)
- if __name__=='__main__':
- # 父进程创建 Queue, 并传给各个子进程:
- q = Queue()
- pw = Process(target=write, args=(q,))
- pr = Process(target=read, args=(q,))
- # 启动子进程 pw, 写入:
- pw.start()
- # 启动子进程 pr, 读取:
- pr.start()
- # 等待 pw 结束:
- pw.join()
- # pr 进程里是死循环, 无法等待其结束, 只能强行终止:
- pr.terminate()
在 Unix/Linux 下, multiprocessing 模块封装了 fork()调用, 使我们不需要关注 fork()的细节. 由于 Windows 没有 fork 调用, 因此, multiprocessing 需要 "模拟" 出 fork 的效果, 父进程所有 Python 对象都必须通过 pickle 序列化再传到子进程去, 所有, 如果 multiprocessing 在 Windows 下调用失败了, 要先考虑是不是 pickle 失败了.
2. 线程间通信
1.Queue
使用线程队列有一个要注意的问题是, 向队列中添加数据项时并不会复制此数据项, 线程间通信实际上是在线程间传递对象引用. 如果你担心对象的共享状态, 那你最好只传递不可修改的数据结构 (如: 整型, 字符串或者元组) 或者一个对象的深拷贝.
Queue 对象提供一些在当前上下文很有用的附加特性. 比如在创建 Queue 对象时提供可选的 size 参数来限制可以添加到队列中的元素数量. 对于 "生产者" 与 "消费者" 速度有差异的情况, 为队列中的元素数量添加上限是有意义的. 比如, 一个 "生产者" 产生项目的速度比 "消费者"" 消费 "的速度快, 那么使用固定大小的队列就可以在队列已满的时候阻塞队列, 以免未预期的连锁效应扩散整个程序造成死锁或者程序运行失常. 在通信的线程之间进行" 流量控制 " 是一个看起来容易实现起来困难的问题. 如果你发现自己曾经试图通过摆弄队列大小来解决一个问题, 这也许就标志着你的程序可能存在脆弱设计或者固有的可伸缩问题. get() 和 put() 方法都支持非阻塞方式和设定超时.
- import queue
- q = queue.Queue()
- try:
- data = q.get(block=False)
- except queue.Empty:
- ...
- try:
- q.put(item, block=False)
- except queue.Full:
- ...
- try:
- data = q.get(timeout=5.0)
- except queue.Empty:
- ...
- def producer(q):
- ...
- try:
- q.put(item, block=False)
- except queue.Full:
- log.warning('queued item %r discarded!', item)
- _running = True
- def consumer(q):
- while _running:
- try:
- item = q.get(timeout=5.0)
- # Process item
- ...
- except queue.Empty:
- pass
最后, 有 q.qsize() , q.full() , q.empty() 等实用方法可以获取一个队列的当前大小和状态. 但要注意, 这些方法都不是线程安全的. 可能你对一个队列使用 empty() 判断出这个队列为空, 但同时另外一个线程可能已经向这个队列中插入一个数据项. 所以, 你最好不要在你的代码中使用这些方法.
为了避免出现死锁的情况, 使用锁机制的程序应该设定为每个线程一次只允许获取一个锁. 如果不能这样做的话, 你就需要更高级的死锁避免机制. 在 threading 库中还提供了其他的同步原语, 比如 RLock 和 Semaphore 对象.
Queue 提供的方法:
task_done()
意味着之前入队的一个任务已经完成. 由队列的消费者线程调用. 每一个 get()调用得到一个任务, 接下来的 task_done()调用告诉队列该任务已经处理完毕.
如果当前一个 join()正在阻塞, 它将在队列中的所有任务都处理完时恢复执行 (即每一个由 put() 调用入队的任务都有一个对应的 task_done()调用).
join()
阻塞调用线程, 直到队列中的所有任务被处理掉.
只要有数据被加入队列, 未完成的任务数就会增加. 当消费者线程调用 task_done()(意味着有消费者取得任务并完成任务), 未完成的任务数就会减少. 当未完成的任务数降到 0,join()解除阻塞.
put(item[, block[, timeout]])
将 item 放入队列中.
1. 如果可选的参数 block 为 True 且 timeout 为空对象(默认的情况, 阻塞调用, 无超时).
2. 如果 timeout 是个正整数, 阻塞调用进程最多 timeout 秒, 如果一直无空空间可用, 抛出 Full 异常(带超时的阻塞调用).
3. 如果 block 为 False, 如果有空闲空间可用将数据放入队列, 否则立即抛出 Full 异常
其非阻塞版本为 put_nowait 等同于 put(item, False)
get([block[, timeout]])
从队列中移除并返回一个数据. block 跟 timeout 参数同 put 方法
其非阻塞方法为 get_nowait()相当与 get(False)
empty()
如果队列为空, 返回 True, 反之返回 False
2. 同步机制 Event
线程的一个关键特性是每个线程都是独立运行且状态不可预测. 如果程序中的其他线程需要通过断某个线程的状态来确定自己下一步的操作, 这时线程同步问题就会变得非常棘手. 为了解决这些问题, 我们需要使用 threading 库中的 Event 对象.
Event 对象包含一个可由线程设置的信号标志, 它允许线程等待某些事件的发生. 在初始情况下, event 对象中的信号标志被设置假. 如果有线程等待一个 event 对象, 而这个 event 对象的标志为假, 那么这个线程将会被一直阻塞直至该标志为真. 一个线程如果将一个 event 对象的信号标志设置为真, 它将唤醒所有等待个 event 对象的线程. 如果一个线程等待一个已经被设置为真的 event 对象, 那么它将忽略这个事件, 继续执行.
- from threading import Thread, Event
- import time
- def countdown(n, start_evt):
- print('countdown is starting...')
- start_evt.set()
- while n> 0:
- print('T-minus', n)
- n -= 1
- time.sleep(5)
- start_evt = Event() # 可通过 Event 判断线程的是否已运行
- t = Thread(target=countdown, args=(10, start_evt))
- t.start()
- print('launching countdown...')
- start_evt.wait() # 等待 countdown 执行
- # event 对象的一个重要特点是当它被设置为真时会唤醒所有等待它的线程
- print('countdown is running...')
- Semaphore(信号量)
在多线程编程中, 为了防止不同的线程同时对一个公用的资源 (比如全部变量) 进行修改, 需要进行同时访问的数量 (通常是 1) 的限制. 信号量同步基于内部计数器, 每调用一次 acquire(), 计数器减 1; 每调用一次 release(), 计数器加 1. 当计数器为 0 时, acquire()调用被阻塞.
- from threading import Semaphore, Lock, RLock, Condition, Event, Thread
- import time
- # 信号量
- sema = Semaphore(3) #限制同时能访问资源的数量为 3
- def foo(tid):
- with sema:
- print('{} acquire sema'.format(tid))
- time.sleep(1)
- print('{} release sema'.format(tid))
- threads = []
- for i in range(5):
- t = Thread(target=foo, args=(i,))
- threads.append(t)
- t.start()
- for t in threads:
- t.join()
- Lock(锁)
互斥锁为资源引入一个状态: 锁定 / 非锁定. 某个线程要更改共享数据时, 先将其锁定, 此时资源的状态为 "锁定", 其他线程不能更改; 直到该线程释放资源, 将资源的状态变成 "非锁定", 其他的线程才能再次锁定该资源.
互斥锁保证了每次只有一个线程进行写入操作, 从而保证了多线程情况下数据的正确性.
- # 创建锁
- mutex = threading.Lock()
- # 锁定
- mutex.acquire([timeout])
- # 释放
- mutex.release()
- RLock(可重入锁)
为了支持在同一线程中多次请求同一资源, python 提供了 "可重入锁":threading.RLock.RLock 内部维护着一个 Lock 和一个 counter 变量, counter 记录了 acquire 的次数, 从而使得资源可以被多次 acquire. 直到一个线程所有的 acquire 都被 release, 其他的线程才能获得资源.
- import threading
- import time
- class MyThread(threading.Thread):
- def run(self):
- global num
- time.sleep(1)
- if mutex.acquire(1):
- num = num+1
- msg = self.name+'set num to'+str(num)
- print msg
- mutex.acquire()
- mutex.release()
- mutex.release()
- num = 0
- mutex = threading.RLock()
- def test():
- for i in range(5):
- t = MyThread()
- t.start()
- if __name__ == '__main__':
- test()
- Condition(条件变量)
Condition 被称为条件变量, 除了提供与 Lock 类似的 acquire 和 release 方法外, 还提供了 wait 和 notify 方法. 线程首先 acquire 一个条件变量, 然后判断一些条件. 如果条件不满足则 wait; 如果条件满足, 进行一些处理改变条件后, 通过 notify 方法通知其他线程, 其他处于 wait 状态的线程接到通知后会重新判断条件. 不断的重复这一过程, 从而解决复杂的同步问题.
可以认为 Condition 对象维护了一个锁 (Lock/RLock) 和一个 waiting 池. 线程通过 acquire 获得 Condition 对象, 当调用 wait 方法时, 线程会释放 Condition 内部的锁并进入 blocked 状态, 同时在 waiting 池中记录这个线程. 当调用 notify 方法时, Condition 对象会从 waiting 池中挑选一个线程, 通知其调用 acquire 方法尝试取到锁.
Condition 对象的构造函数可以接受一个 Lock/RLock 对象作为参数, 如果没有指定, 则 Condition 对象会在内部自行创建一个 RLock.
除了 notify 方法外, Condition 对象还提供了 notifyAll 方法, 可以通知 waiting 池中的所有线程尝试 acquire 内部锁. 由于上述机制, 处于 waiting 状态的线程只能通过 notify 方法唤醒, 所以 notifyAll 的作用在于防止有线程永远处于沉默状态.
- import threading
- import time
- class Producer:
- def run(self):
- global count
- while True:
- if con.acquire():
- if count> 1000:
- con.wait()
- else:
- count += 100
- msg = threading.current_thread().name + 'produce 100, count=' + str(count)
- print(msg)
- con.notify() # 通知 waiting 线程池中的线程
- con.release()
- time.sleep(1)
- count = 0
- con = threading.Condition()
- class Consumer:
- def run(self):
- global count
- while True:
- if con.acquire():
- if count < 100:
- con.wait()
- else:
- count -= 3
- msg = threading.current_thread().name + 'consumer 3, count=' + str(count)
- print(msg)
- con.notify()
- con.release()
- time.sleep(3)
- producer = Producer()
进程和线程的比较
1. 稳定性
多进程模式最大的优点就是稳定性高, 因为一个子进程崩溃了它拥有自己独立的内存空间, 不会影响主进程和其他子进程(主进程崩掉, 子进程也难逃厄运). 多进程模式的缺点是创建进程的代价大, 在 Unix/Linux 系统下, 用 fork 调用还行, 在 Windows 下创建进程开销巨大. 另外, 操作系统能同时运行的进程数也是有限的, 在内存和 CPU 的限制下, 如果有几千个进程同时运行, 操作系统连调度都会成问题.
多线程模式通常比多进程快, 多线程模式致命的缺点就是任何一个线程挂掉都可能直接造成整个进程崩溃, 因为所有线程共享进程的内存.
2. 切换开销
首先上下文切换就是从当前执行任务切换到另一个任务执行的过程. 但是, 为了确保下次能从正确的位置继续执行, 在切换之前, 会保存上一个任务的状态.
操作系统在切换进程或者线程时需要先保存当前执行的现场环境(CPU 寄存器状态, 内存页等), 然后, 把新任务的执行环境准备好(恢复上次的寄存器状态, 切换内存页等), 才能开始执行. 这个切换过程虽然很快, 但是也需要耗费时间.
但是线程的切换虚拟空间内存是相同的, 但是进程切换的虚拟空间内存则是不同的. 所以线程上下文切换比进程上下文切换快的多. 同时, 这两种上下文切换的处理都是通过操作系统内核来完成的. 内核的这种切换过程伴随的最显著的性能损耗是将寄存器中的内容切换出.
3. 计算密集型和 IO 密集型下的选择
我们可以把任务分为计算密集型和 IO 密集型.
计算密集型任务的特点是要进行大量的计算, 消耗 CPU 资源. IO 密集型任务的特点是涉及到网络, 磁盘 IO, 这类任务的特点是 CPU 消耗很少, 任务的大部分时间都在等待 IO 操作完成
对比维度 | 多进程 | 多线程 | 总结 |
数据共享、同步 | 数据共享复杂,需要用 IPC;数据是分开的,同步简单 | 因为共享进程数据,数据共享简单,但也是因为这个原因导致同步复杂 | 各有优势 |
内存、CPU | 占用内存多,切换复杂,CPU 利用率低 | 占用内存少,切换简单,CPU 利用率高 | 线程占优 |
创建销毁、切换 | 创建销毁、切换复杂,速度慢 | 创建销毁、切换简单,速度很快 | 线程占优 |
编程、调试 | 编程简单,调试简单 | 编程复杂,调试复杂 | 进程占优 |
可靠性 | 进程间不会互相影响 | 一个线程挂掉将导致整个进程挂掉 | 进程占优 |
分布式 | 适应于多核、多机分布式;如果一台机器不够,扩展到多台机器比较简单 | 适应于多核分布式 | 进程占优 |
(1)需要频繁创建销毁的优先用线程
原因请看上面的对比.
这种原则最常见的应用就是 Web 服务器了, 来一个连接建立一个线程, 断了就销毁线程, 要是用进程, 创建和销毁的代价是很难承受的
(2)需要进行大量计算的优先使用线程
所谓大量计算, 当然就是要耗费很多 CPU, 切换频繁了, 这种情况下线程是最合适的.
这种原则最常见的是图像处理, 算法处理.
(3)强相关的处理用线程, 弱相关的处理用进程
什么叫强相关, 弱相关? 理论上很难定义, 给个简单的例子就明白了.
一般的 Server 需要完成如下任务: 消息收发, 消息处理."消息收发" 和 "消息处理" 就是弱相关的任务, 而 "消息处理" 里面可能又分为 "消息解码","业务处理", 这两个任务相对来说相关性就要强多了. 因此 "消息收发" 和 "消息处理" 可以分进程设计,"消息解码","业务处理" 可以分线程设计.
当然这种划分方式不是一成不变的, 也可以根据实际情况进行调整.
(4)可能要扩展到多机分布的用进程, 多核分布的用线程
原因请看上面对比.
来源: https://www.cnblogs.com/songyifan427/p/11397219.html