1, 线程理论知识
概念: 指的是一条流水线的工作过程的总称, 是一个抽象的概念, 是 CPU 基本执行单位.
进程和线程之间的区别:
1. 进程仅仅是一个资源单位, 其中包含程序运行所需的资源, 而线程就相当于车间的流水线, 负责执行具代码.
2. 每个进程至少包含一个线程, 由操作系统自动创建, 称之为主线程
3. 每个进程可以有任意数量的线程
4. 创建进程的开销要比创建进程小得多
5. 同一进程的线程间数据是共享的
6. 线程之间是平等的, 没有子父级关系, 同一进程下的各线程的 PID 相同
7. 创建线程的代码可以写在任意位置, 不一定非要在 main 函数下.
为什么使用线程:
提高程序执行效率
2, 开启线程的两种方式
和进程类似, 但是开启方式不一定非要建在 main 函数下.
- # 第一种方式, 实例化 Thread
- # from threading import Thread
- #
- # def task():
- # print("subthread is running....")
- #
- # t = Thread(target=task)
- # t.start()
- # print('main is over....')
- # 第二种方式, 继承 Thread 类
- from threading import Thread
- class MyThread(Thread):
- def run(self):
- print("subthread is running....")
两种方式
3, 主线程和子线程之间的关系
1. 主线程任务执行完毕后, 主线程会等待所有子线程全部执行完毕后结束
2. 在同一进程中, 所有线程都是平等的, 没有子父级关系
- # 验证主线程代码执行完后会不会立即结束,
- import random
- import time
- import threading
- from threading import Thread
- def task(name):
- print("%s is running..." % name)
- time.sleep(random.randint(1, 3))
- print(threading.enumerate())
- print("%s is over....." % name)
- t = Thread(target=task, args=('aaa',))
- t.start()
- print('main over....')
- # 验证主线程代码执行完后会不会立即结束,
4, 验证线程和进程之间的区别
- from threading import Thread
- import time
- def task():
- global num
- time.sleep(1)
- num -= 1
- num = 10
- t = Thread(target=task,)
- t.start()
- t.join()
- print(num)
同一进程中线程的数据是可以共享的
- from multiprocessing import Process
- from threading import Thread
- import time
- def task():
- pass
- def expense(cls):
- """用来测试线程或进程创建开销"""
- lis = []
- start = time.time()
- for i in range(50):
- p = cls(target=task,)
- p.start()
- lis.append(p)
- for p in lis:
- p.join()
- return time.time()-start
创建线程的开销要比创建进程小的多
5, 线程的安全问题
1. 互斥锁
数据共享必然会造成竞争, 竞争就会造成数据错乱问题.
解决办法: 和进程一样, 加互斥锁.
- from threading import Thread, Lock
- import time
- num = 10
- def task(lock):
- global num
- lock.acquire()
- a = num
- time.sleep(0.5)
- num = a-1
- lock.release()
- ts = []
- lock = Lock()
- for i in range(10):
- t = Thread(target=task,args=(lock,))
- t.start()
- ts.append(t)
- for t in ts:
- t.join()
- print(num)
加互斥锁, 保证数据安全
2. 死锁
死锁不是一种锁, 而是一种锁的状态,
一般出现死锁的情况有两种:
1. 对同一把锁多次 acquire.(使用 RLOCK 锁, 代替 LOCK)
2. 两个或两个以上的进程或线程在执行过程中, 因争夺资源造成的相互等待现象.(解决办法: 能不加最好不加, 要加就只加一把)
- from threading import Thread, Lock
- import time
- def task1(name, locka, lockb):
- locka.acquire()
- print("%s 拿到 a 锁"%name)
- time.sleep(0.3)
- lockb.acquire()
- print('%s 拿到 b 锁'%name)
- lockb.release()
- locka.release()
- def task2(name, locka, lockb):
- lockb.acquire()
- print("%s 拿到 b 锁"%name)
- time.sleep(0.3)
- locka.acquire()
- print('%s 拿到 a 锁'%name)
- locka.release()
- lockb.release()
- locka = Lock()
- lockb = Lock()
- t1 = Thread(target=task1, args=('t1', locka, lockb))
- t2 = Thread(target=task2, args=('t2', locka, lockb))
- t1.start()
- t2.start()
死锁的第二种情况的示例
3. 可重入锁
只能解决同一线程多次执行 acquire 情况.
只有一个线程所有的 acquire 都被释放, 其他线程才能拿到这个锁.
也会发生死锁现象.
- from threading import Thread, RLock
- lock = RLock()
- lock.acquire()
- lock.acquire()
- lock.acquire()
- lock.acquire()
- print("over")
- lock = RLock()
- def task1():
- lock.acquire()
- print('task1')
- def task2():
- lock.acquire()
- print('task2')
- Thread(target=task1).start()
- Thread(target=task2).start()
示例
4. 信号量
也是一种锁, 用来控制同一时间, 有多少线程可以提供并发访问, 不是用来处理线程安全问题
- from threading import Semaphore, Thread
- import time
- s_lock = Semaphore(3)
- def task():
- s_lock.acquire()
- time.sleep(1)
- print("run.....")
- s_lock.release()
- for i in range(20):
- t = Thread(target=task)
- t.start()
示例
6, 守护线程
守护线程在所有非守护线程结束后结束.
- import threading
- from threading import Thread
- import time
- def task1():
- print('thread-1 is running...')
- time.sleep(3)
- print('thread-1 over....')
- def task2():
- print('thread-2 is running...')
- time.sleep(1)
- print('thread-2 over....')
- if __name__ == '__main__':
- t1 = Thread(target=task1,)
- t2 = Thread(target=task2,)
- t1.setDaemon(True)
- t1.start()
- t2.start()
- print(t1.ident)
- print(threading.enumerate())
- print("main over...")
示例
7,GIL
全局解释器锁, 是一互斥锁, 只有在 Cpython 解释器存在.
为什么需要: 因为一个 python.exe 进行运行只有一份解释器, 如果这个进程开启的多个线程都要执行代码, 多线程之间就要竞争解释器, 一旦竞争就有可能出现问题.
带来的好处: 保证了多线程同时访问解释器时的数据安全问题.
带来的问题: 同一时间只有一个线程访问解释器, 使得多线程无法真正的并发
出现的原因: 默认情况下, 一个进程只有一个线程不会是不会出问题, 但不要忘了还有 GC 线程, 一旦出现多个线程就可能出现问题, 所以当初就简单粗暴的加上了 GIL 锁
GIL 加锁和解锁时机:
加锁: 在调用解释器时立即加锁
解锁: 当前线程遇到 IO 时释放, 或者当前线程执行超过设定值释放 (py2 计算的是执行代码的行数, py3 中计算的是时间)
解决办法: 使用多进程或使用其他的 python 解释器
8, 线程池和进程池
一种容器, 本质十一存储线程或进程的列表
为什么使用? 因为服务器不能无限开启线程或进程, 所以需要对线程数量加以控制, 线程池就是帮我们完成线程 / 进程的创建, 销毁以及任务分配
特点:
线程池在创建时不会开启线程,
等到任务提交时, 如果没有空闲线程, 并且已存在的线程数量小于最大值, 开启新线程,
线程开启后不会关闭, 直到进程全部结束为止
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
pool= ProcessPoolExecutor(maxsize), 创建进程池, maxsize 为最大进程个数
res = pool.submit(task, 'a'), 提交任务
res.result(timeout), 接收调用的返回值, timeout 为超时时间, 超时报错
该函数是阻塞函数, 会一直等待任务执行完毕
pool.shutdown(wait), 所有任务执行完毕, 阻塞函数
wait=True, 等待池内所有任务执行完毕后回收资源才继续
wait=False, 立即返回, 并不会等待池内的任务执行完毕
方法和属性
- from concurrent.futures import ThreadPoolExecutor
- import time
- def task(num):
- time.sleep(0.5)
- print("%s is running....."%num)
- return num**2
- pool = ThreadPoolExecutor()
- ress = []
- for i in range(10):
- res = pool.submit(task, i)
- ress.append(res)
- pool.shutdown(wait=False)
- for i in ress:
- print(i.result())
- print('over')
示例
9, 同步异步阻塞非阻塞
阻塞和非阻塞都是指程序的运行状态
阻塞: 当程序执行遇到 IO 操作, 无法继续执行代码
非阻塞: 程序执行没有遇到 IO 操作, 或通过某种方式, 使程序遇到了也不会停在原地, 还可以继续执行
同步异步指的是提交任务的方式
同步: 发起任务后必须原地等待任务执行完成, 才可以继续执行
异步: 发起任务后不用等待任务执行, 可以立即执行其他操作
异步效率高于同步, 发起异步任务方式: 就是多线程和多进程
同步和阻塞的不同: 阻塞一定使 CPU 已经切换, 同步虽然在等待, 但 CPU 没有切走, 还在当前进程中执行其他任务
来源: http://www.bubuko.com/infodetail-2981827.html