学习 Python 多线程的资料很多, 吐槽 Python 多线程的博客也不少本文主要介绍 Python 多线程实际应用, 且假设读者已经了解多线程的基本概念如果读者对进程线程概念不甚了解, 可参见知名博主 阮一峰 转译的一篇博客: 进程与线程的一个简单解释
1 线程的基本操作
Python 中多线程主要有两个模块,_thread 和 threading 模块前者更底层, 后者更常用, 能满足绝大部分编程需求, 今天主要围绕 threading 模块展开介绍启动一个线程需要用 threading 模块中的 Thread
线程的启动需要先创建 Thread 对象, 然后调用该对象的 start()方法, 参见下例:
- import time
- import threading
- def func(n):
- while n> 0:
- print("线程 name:", threading.current_thread().name, "参数 n:", n)
- n -= 1
- time.sleep(1)
- t = threading.Thread(target=func, args=(5,))
- t.start()
- print("主线程:", threading.current_thread().name)
- # 运行结果:
- # 线程 name: Thread-1 参数 n: 5
- # 主线程: MainThread
- # 线程 name: Thread-1 参数 n: 4
- # 线程 name: Thread-1 参数 n: 3
- # 线程 name: Thread-1 参数 n: 2
- # 线程 name: Thread-1 参数 n: 1
上例中, threading.current_thread().name 是获取当前线程的 name 属性
Thread 中, 形参 target 传入函数名, args 传入函数对应的参数, 参数必须是可迭代对象, 如果是元组且只有一个参数必须写成 (参数,) 的形式, 逗号不能省略
一旦启动一个线程, 该线程将由操作系统来全权管理, 独立执行直到目标函数返回一般情况下, 线程的操作有以下几种:
- t.is_alive() # 查询线程对象的状态, 返回布尔值
- t.join() # 将线程加入到当前线程, 并等待其终止
- t = Thread(target=countdown, args=(10,), daemon=True) # 后台线程
- t.start()
查看线程状态示例:
- import time
- import threading
- def func(n):
- while n> 0:
- print("线程 name:", threading.current_thread().name, "参数 n:", n)
- n -= 1
- time.sleep(1)
- t = threading.Thread(target=func, args=(2,))
- t.start()
- print("主线程:", threading.current_thread().name)
- if t.is_alive():
- print("活着的")
- else:
- print("未存活")
- print("主线程结束")
让主线程等待其他线程, 就是主线程会在 join()处一直等待所有线程都结束之后, 再继续运行参见下例:
- import time
- import threading
- def func(n):
- while n> 0:
- print("线程 name:", threading.current_thread().name, "参数 n:", n)
- n -= 1
- time.sleep(1)
- t = threading.Thread(target=func, args=(2,))
- t.start()
- t.join()
- print("主线程:", threading.current_thread().name)
- print("主线程结束")
- # 运行结果:
- # 线程 name: Thread-1 参数 n: 2
- # 线程 name: Thread-1 参数 n: 1
- # 主线程: MainThread
- # 主线程结束
后台线程参见下例:
- import time
- import threading
- def func(n):
- while n> 0:
- print("参数 n:", n)
- n -= 1
- time.sleep(1)
- t = threading.Thread(target=func, args=(10, ), daemon=True)
- t.start()
- time.sleep(3)
- print("主线程结束")
- # 参数 n: 10
- # 参数 n: 9
- # 参数 n: 8
- # 参数 n: 7
- # 主线程结束
后台线程无法等待, 但主线程终止时后台线程自动销毁 如果要对线程进行高级操作, 如发送信号, 终止线程, 都需要自己实现下例通过轮询控制线程退出:
- import time
- from threading import Thread
- class StopThread:
- def __init__(self):
- self._flag = True
- def terminate(self):
- self._flag = False
- def run(self, n):
- while self._flag and n> 0:
- print('num>>:', n)
- n -= 1
- time.sleep(1)
- obj = StopThread()
- t = Thread(target=obj.run, args=(11,))
- t.start()
- time.sleep(5) # 表示 do something
- obj.terminate() # 终止线程
- t.join()
- print("主线程结束")
上例通过类中的_flag 控制线程的终止, 当主线程执行 5 秒之后, 主动将_flag 赋值为 False 终止线程通过轮询终止线程存在一个问题, 如果 while self._flag and n> 0: 这句后, 某次循环一直阻塞在 I/O 操作上, 根本不会进行下一次循环, 自然就无法终止这该怎么办呢? 留一个思考题
多线程还可以通过继承 Thread 实现, 如下:
- import time
- from threading import Thread
- class A(Thread):
- def __init__(self,):
- super().__init__()
- def run(self):
- print("run1..", )
- time.sleep(5)
- print("run2..")
- obj = A()
- obj.start()
- print("主线程结束")
2 线程锁和一个怪象
当我们用多个线程同时修改同一份数据时, 怎么保证最终结果是我们期许的呢? 举个例子, 当前有一个全局变量 a=0, 如果有 10 个线程同时对其加 1, 这就出现了线程间的竞争, 到底应该听谁的呢? 这时候, 应该用线程锁来解决也就是当某一个线程 A 对该数据操作时, 对该数据加锁, 其他线程只能等着, 等待 A 操作完之后释放了该锁, 其他线程才能操作该数据, 一旦某个线程获得操作数据的权限, 立即又加上锁如此便能保证数据的安全准确奇怪的是, 在 Python3 中, 即使不加锁, 好像也不会发生数据出错的情况或许这个例子不是很好, 也或许是 Python3 中自动加了锁希望有知道的读者赐教一下这个奇怪的现象就是下例了:
- from threading import Thread
- import time
- def add_one(a):
- time.sleep(1)
- print("in thread a:", a)
- a[1] += 1
- if __name__ == '__main__':
- array = [0, 1, 4]
- thread_obj_list = []
- for i in range(50):
- t = Thread(target=add_one, args=(array,))
- t.start()
- thread_obj_list.append(t)
- for j in thread_obj_list:
- j.join()
- print("array result::", array)
- # array result:: [0, 51, 4]
我们看到, 最后 array 的第二个元素是 51, 并没有出错, 这真是令人费解好了, 言归正传, 来看看线程锁的几个方法吧:
- lock = threading.Lock() # Lock 对象
- lock.acquire() # 锁定
- lock.release() # 解锁
Lock 有锁定或解锁两种状态之一它是在解锁状态下创建的它有两个基本方法, acquire() 和 release()
当状态为解锁时, acquire()将状态更改为锁定并立即返回当状态被锁定时, acquire()块直到对另一个协程中的 release()的调用将其改变为解锁, 然后 acquire()调用将其重置为锁定并返回
release()方法只应在锁定状态下调用; 它将状态更改为已解锁并立即返回如果尝试释放已解锁的锁, 则会引发 RuntimeError
下面是一个具体的使用例子:
- from threading import Thread
- import time
- import threading
- lock = threading.Lock()
- def add_one(a):
- time.sleep(1)
- lock.acquire()
- a[1] += 1
- lock.release()
- if __name__ == '__main__':
- array = [0, 1, 4]
- thread_obj_list = []
- for i in range(50):
- t = Thread(target=add_one, args=(array,))
- t.start()
- thread_obj_list.append(t)
- for j in thread_obj_list:
- j.join()
- print("array result::", array)
- # array result:: [0, 51, 4]
acquire()和 release()方法成对出现但是这样手动释放有时候可能会遗忘, 这时候可以考虑用上下文管理协议关于上下文管理协议, 可参见作者的这篇文章 Python 上下文管理器
Lock 对象支持 with 语句:
- def add_one(a):
- time.sleep(1)
- with lock:
- a[1] += 1
3 递归锁
可重入锁 (又称递归锁, RLock), 就是大锁中包含子锁的情况下使用在这种情况下, 再用 Lock 时, 就会出现死锁现象, 此时应该用 threading.RLock() 对象了, 用法同 Lock, 参见下例:
- from threading import Thread
- import time
- import threading
- lock = threading.RLock()
- def add_one(a):
- lock.acquire()
- a[1] += 1
- lock.release()
- def add_two(b):
- time.sleep(1)
- lock.acquire()
- b[1] += 2
- add_one(b)
- lock.release()
- if __name__ == '__main__':
- array = [0, 1, 4]
- thread_obj_list = []
- for i in range(50):
- t = Thread(target=add_two, args=(array,))
- t.start()
- thread_obj_list.append(t)
- for j in thread_obj_list:
- j.join()
- print("array result::", array)
- # array result:: [0, 151, 4]
上例读者可以试试 Lock(), 看看什么效果 RLock()还支持上下文管理协议, 上例中的两个函数可以改成这样:
- def add_one(a):
- with rlock:
- a[1] += 1
- def add_two(b):
- time.sleep(1)
- with rlock:
- b[1] += 2
- add_one(b)
- 4 GIL
全局解释器锁(英语: Global Interpreter Lock, 缩写 GIL), 是计算机程序设计语言解释器用于同步线程的一种机制, 它使得任何时刻仅有一个线程在执行所以很多人说 Python 的线程是假线程, 并能利用多核, 并不能真正并行之所以感觉到线程并行, 是因为线程上下文不断切换的缘故 Python 3.2 开始使用新的 GIL 新的 GIL 实现中用一个固定的超时时间来指示当前的线程放弃全局锁在当前线程保持这个锁, 且其他线程请求这个锁时, 当前线程就会在 5 毫秒后被强制释放该锁关于全局锁, 强调三点:
(1)GIL 的存在, 同一时刻只能有一个线程在运行
(2)GIL 是 CPython 的特性, Jython,pypy 等并无 GIL
(3)Cpython 的多线程适用于 I/O 密集型问题, 计算密集型问题可使用多进程编程
5 判断线程状态
在多线程编程中, 有时候某个线程依赖另一个线程的状态, 需要使用 threading 库中的 Event 对象 Event 对象包含一个可由线程设置的信号标志, 它允许线程等待某些事件的发生可将线程设置等待 Event 对象, 直到有其他线程将 Event 对象设置为真, 这些等待 Event 对象的线程将开始执行 Event()对象的常用方法:
- event = threading.Event() # 创建 threading.Event()对象
- event.is_set() # 获取 event 的设置值, 默认为 False
- event.set() # 设置 event 的值为 True
- event.clear() # 设置 event 的值为 False
- event.wait() # 等到 event 的值被设为 True 就执行
下面通过交通信号灯问题示范 event 的使用:
- import threading
- import time
- def traffic_light(event):
- count = 0
- event.set()
- while True:
- # 如果计数器 [0, 5) 之间, 红灯, event=False
- if 0 <= count < 5:
- event.clear()
- print("light is Red")
- # 如果计数器 [5, 10) 之间, 绿灯, event=True
- elif 5 <= count < 10:
- event.set()
- print("light is Green")
- # 如果计数器大于 10, 红灯, 将 event 设置为 False, 计数器置为 0
- else:
- event.clear()
- count = 0
- time.sleep(1)
- count += 1
- def car(name, event):
- while True:
- if not event.is_set():
- # event 为 False, 表示红灯, 车只能等待
- print("RED, the %s is waiting..." % name)
- # 此处会阻塞住, 直到 event 被设置为 True 在执行
- event.wait()
- print("Green, The %s going...." % name)
- e = threading.Event()
- light = threading.Thread(target=traffic_light, args=(e,))
- light.start()
- car1 = threading.Thread(target=car, args=("Tesla", e, ))
- car1.start()
交通信号灯有红灯和绿灯两种状态, 每 5 秒切换一次状态, 而 car()函数中, 只要灯变绿就放 car 通行运行试试看
event 对象的一个重要特点是当它被设置为真时会唤醒所有等待它的线程如果你只想唤醒单个或者一定数目的线程, 最好是使用信号量或者 Condition 对象来替代
6 Condition 对象
condition 对象总是与锁关联, 可以手动传入锁对象, 也可以不传入使用默认值当有多个线程需要等待某个变量改变时, 才开始执行这种情况可以用 condition 对象实现 condition 对象的主要方法有:
- condition = threading.Condition(lock=None) # 创建 Condition 对象 参数可以不传
- condition.acquire() # 加锁
- condition.release() # 解锁
- condition.wait(timeout=None) # 阻塞, 直到有调用 notify(), 或者 notify_all()时再触发
- condition.wait_for(predicate, timeout=None) # 阻塞, 等待 predicate 条件为真时执行
- condition.notify(n=1) # 通知 n 个 wait()的线程执行, n 默认为 1
- condition.notify_all() # 通知所有 wait 着的线程执行
- with condition: # 支持 with 语法, 不必每次手动调用 acquire()/release()
看一个例子不是很优雅的例子:
- import threading
- import time
- condition = threading.Condition() # 创建 condition 对象
- def func():
- condition.acquire() # 如果没有 with 语句, 必写这句, 否者报错
- condition.wait() # 阻塞, 等待其他线程调用 notify()
- print("in func..")
- condition.release() # 与 acquire()成对出现
- # 启 10 个线程
- for i in range(10):
- t = threading.Thread(target=func, args=())
- t.start()
- time.sleep(5)
- condition.acquire()
- condition.notify(2) # 通知两个线程执行
- condition.release()
- # in func..
- # in func..
- # 其他 8 个线程会继续等待...
上例中, 我们看到启动的 10 个线程会等待 5 秒钟并且调用了 notify(2)之后, 才会通知两个线程继续运行且这两个线程执行完毕之后, 其他 8 个线程仍然会阻塞在 condition.wait() 处
频繁写 acquire() / release()很繁琐, 下面是优雅的写法:
- import threading
- import time
- condition = threading.Condition() # 创建 condition 对象
- def func(n):
- with condition: # with 更优雅
- condition.wait() # 阻塞, 等待其他线程调用 notify()
- print("in func..", n)
- # 启 10 个线程
- for i in range(10):
- t = threading.Thread(target=func, args=(i,))
- t.start()
- time.sleep(5)
- with condition:
- condition.notify_all() # 通知所有线程执行
运行下, 是不是等待 5 秒之后, 所有线程都继续执行了?
7 信号量
信号量通常用于防范容量有限的资源, 例如数据库服务器一般而言信号量可以控制释放固定量的线程比如启动 100 个线程, 信号量的控制值设为 5, 那么前 5 个线程拿到信号量之后, 其余线程只能阻塞, 等到这 5 个线程释放信号量锁之后才能去拿锁参见下例:
- import threading
- import time
- def func(n):
- # semaphore.acquire()
- with semaphore:
- time.sleep(2)
- print("Thread::", n)
- # semaphore.release()
- semaphore = threading.BoundedSemaphore(5) # 信号量, 每次释放 5 个线程
- thread_list = []
- for i in range(23):
- t = threading.Thread(target=func, args=(i,))
- thread_list.append(t)
- t.start()
- for j in thread_list:
- j.join()
- print("all threads done")
上例中, 可以看到线程是每 5 个一组进行释放的
8 Barrier 对象
Barriers 字面意思是屏障, 是 Python 线程 (或进程) 同步原语每个线程中都调用 wait()方法, 当其中一个线程执行到 wait 方法处会立阻塞; 一直等到所有线程都执行到 wait 方法处, 所有线程再继续执行参见下例:
- import time
- import threading
- bar = threading.Barrier(3) # 创建 barrier 对象, 指定满足 3 个线程
- def worker1():
- print("worker1")
- time.sleep(1)
- bar.wait()
- print("worker1 end")
- def worker2():
- print("worker2")
- time.sleep(2)
- bar.wait()
- print("worker2 end")
- def worker3():
- print("worker3")
- time.sleep(5)
- bar.wait()
- print("worker3 end")
- thread_list = []
- t1 = threading.Thread(target=worker1)
- t2 = threading.Thread(target=worker2)
- t3 = threading.Thread(target=worker3)
- thread_list.append(t1)
- thread_list.append(t2)
- thread_list.append(t3)
- for t in thread_list:
- t.start()
- # 每个线程中都调用了 wait()方法, 在所有 (此处设置为 3) 线程调用 wait 方法之前是阻塞的
- # 也就是说, 只有等到 3 个线程都执行到了 wait 方法这句时, 所有线程才继续执行
上例中, 可以看到, 所有线程会先各自运行 wait()方法之前的代码, 到 wait()处阻塞等待最后一个线程执行到 wait()处, 也就是 5 秒之后, 所有线程恢复执行
9 线程间通信
两个或多个线程之间相互发送数据最安全的方式可能就是使用 queue 库中的队列了创建一个线程共享的 Queue 对象, 线程通过使用 put()和 get()操作来向队列中添加或者删除元素 Queue 对象已经内置了锁机制, 编程时不必手动操作锁下例 producer()函数代表包子铺, 生产包子放入队列中; consumer()函数代表吃包子的人, 不断从队列中取出包子吃掉; 以此演示线程间通过队列通信
- from queue import Queue
- import threading
- import time
- q = Queue(10)
- def producer():
- n = 0
- while True:
- q.put("包子 %s" % n)
- print("包子铺生产 包子 %s" % n)
- n += 1
- time.sleep(2)
- def consumer():
- while True:
- r = q.get()
- print("bucker 吃掉 %s" % r)
- time.sleep(1)
- t1 = threading.Thread(target=producer)
- t1.start()
- t2 = threading.Thread(target=consumer)
- t2.start()
形如上例的编程模型, 又叫生产者 - 消费者模型它降低了程序之前的耦合, 使得队列的上游只关注生产数据, 队列的下游只关注消费数据在票务系统, 或者资源有限的情况中可用此模型补充两点:
(1)get() 和 put() 方法都支持非阻塞方式和设定超时
(2)q.qsize() , q.full() , q.empty() 等可以获取一个队列的当前大小和状态但它们不是线程安全的, 尽量别用
10 线程池
Python3.2 开始, 增加了标准库 concurrent.futures, 该库中的 ThreadPoolExecutor 是自带的线程池简单使用:
- from concurrent.futures import ThreadPoolExecutor
- import time
- def tell(i):
- print("this is tread {}.".format(i))
- time.sleep(1)
- if __name__ == '__main__':
- future = ThreadPoolExecutor(10)
- a = "ddd"
- for i in range(100):
- future.submit(tell, (i,)) # 添加一个线程到线程池
- future.shutdown(wait=True) # 此函数用于释放异步执行操作后的系统资源
其中, submit()方法第一个参数为函数名, 第二个为函数的参数 shutdown(wait=True)用于释放异步执行操作后的系统资源 ThreadPoolExecutor 还有一个优点就是: 任务提交者更方便的从被调用函数中获取返回值参见下例:
- import concurrent.futures
- import requests
- URLS = ['http://www.cnblogs.com/zingp/p/5878330.html',
- 'http://www.cnblogs.com/zingp/',
- 'https://docs.python.org/']
- # 爬取网页内容
- def load_url(url, timeout):
- with requests.get(url, timeout=timeout) as conn:
- return conn.text
- with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
- # 创建 future 对象和对应的 url 的字典
- future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
- for future in concurrent.futures.as_completed(future_to_url):
- url = future_to_url[future]
- try:
- data = future.result()
- except Exception as err:
- print('url:%s -- err: %s' % (url, err))
- else:
- print(url, len(data))
- # http://www.cnblogs.com/zingp/ 12391
- # http://www.cnblogs.com/zingp/p/5878330.html 90029
- # https://docs.python.org/ 9980
上例创建一个大小为 3 的线程池, 用了不少 with 语句, 并用 future.result() 获取函数返回值最终, 我们看到爬取了三个网页, 并获得网页内容 future.result()操作会阻塞, 直到对应的函数执行完成并返回一个结果
Python3.2 以前并没有自带线程池, 那时往往采用自定义线程池下面一个就是自定义线程池的例子, 看看是否能够看得懂:
- import queue
- import threading
- import contextlib
- StopEvent = object()
- class ThreadPool(object):
- """定义一个线程池类"""
- def __init__(self, max_num, max_task_num=None):
- if max_task_num:
- self.q = queue.Queue(max_task_num)
- else:
- self.q = queue.Queue()
- self.max_num = max_num
- self.cancel = False
- self.terminal = False
- self.generate_list = []
- self.free_list = []
- def run(self, func, args, callback=None):
- """
线程池执行一个任务
:param func: 任务函数;
:param args: 任务函数所需参数;
:param callback: 任务执行失败或成功后执行的回调函数, 回调函数有两个参数 1 任务函数执行状态;
2 任务函数返回值(默认为 None, 即: 不执行回调函数);
:return: 如果线程池已经终止, 则返回 True 否则 None
- """
- if self.cancel:
- return
- if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
- self.generate_thread()
- w = (func, args, callback,)
- self.q.put(w)
- def generate_thread(self):
- """创建一个线程"""
- t = threading.Thread(target=self.call)
- t.start()
- def call(self):
- """循环去获取任务函数并执行任务函数"""
- current_thread = threading.currentThread()
- self.generate_list.append(current_thread)
- event = self.q.get()
- while event != StopEvent:
- func, arguments, callback = event
- try:
- result = func(*arguments)
- success = True
- except Exception as e:
- success = False
- result = None
- if callback is not None:
- try:
- callback(success, result)
- except Exception as e:
- pass
- with self.worker_state(self.free_list, current_thread):
- if self.terminal:
- event = StopEvent
- else:
- event = self.q.get()
- else:
- self.generate_list.remove(current_thread)
- def close(self):
- """执行完所有的任务后,
- 所有线程停止"""
- self.cancel = True
- full_size = len(self.generate_list)
- while full_size:
- self.q.put(StopEvent)
- full_size -= 1
- def terminate(self):
- """无论是否还有任务,
- 终止线程"""
- self.terminal = True
- while self.generate_list:
- self.q.put(StopEvent)
- self.q.queue.clear()
- @contextlib.contextmanager
- def worker_state(self, state_list, worker_thread):
- """用于记录线程中正在等待的线程数"""
- state_list.append(worker_thread)
- try:
- # 遇到 yield 就返回回去执行 with 中的语句, 执行完了回来
- yield
- finally:
- state_list.remove(worker_thread)"
创建大的线程池的一个可能需要关注的问题是内存的使用 例如, 如果你在 OS X 系统上面创建 2000 个线程, 系统显示 Python 进程使用了超过 9GB 的虚拟内存 不过, 这个计算通常是有误差的当创建一个线程时, 操作系统会预留一个虚拟内存区域来 放置线程的执行栈 (通常是 8MB 大小) 但是这个内存只有一小片段被实际映射到真实内存中 因此, Python 进程使用到的真实内存其实很小 (比如, 对于 2000 个线程来讲, 只使用到了 70MB 的真实内存, 而不是 9GB)如果担心虚拟内存大小, 可以使用 threading.stack_size() 函数来降低它
- import threading
- threading.stack_size(65536)
如果加上这条语句并再次运行前面的创建 2000 个线程试验, 会发现 Python 进程只使用到了大概 210MB 的虚拟内存, 而真实内存使用量没有变 注意线程栈大小必须至少为 32768 字节, 通常是系统内存页大小 (40968192 等) 的整数倍
11 小结与讨论
(1)Python 多线程编程常用 threading 模块启动一个多线程需要创建一个 Thread 对象, 调用 star()方法启动线程注意 is_alive() /join()方法和 daemon 参数的使用
(2)python 多线程锁有 Lock / Rlock, 全局锁 GILGIL 是 CPython 特性, 同一时刻只能运行一个线程, 不能利用多核资源
(3)线程同步原语有 Event / Condition / Semaphore / BarrierEvent 用于常用语通知全部线程, condition 和 Semapher 常用于通知一定数量的线程, Barrier 用于多个线程必须完成某些步骤再一起执行
(4)Lock / Rlock / Event / Condition / Semaphore 支持上下文管理协议(with 语句, 好用)
(5)线程间通信可以用 queue 模块中的 Queue 队列, get()和 put()已加锁, 是线程安全的 qsize()/full()/empty() 等可以获取一个队列的当前大小和状态, 不是线程安全的, 尽量别用
(6)concurrent.futures 中的 ThreadPoolExecutor 是 Python3.2 之后自带的线程池模块, 十分好用, 支持 with 语句, 通过 future.result()获取线程返回值
(7)Python 多线程适用于 I/O 密集型问题, CPU 密集型问题可以用 C 代码优化底层算法提升性能, 需注意一个写的不好的 C 语言扩展会导致这个问题更加严重; 也可以用 pypy 或者多进程
以上是本篇全部内容, 欢迎读者批评指正
参考资料:
threading 官方文档
concurrent.futures 官方文档
Python3-cookbook 中文文档
来源: https://www.cnblogs.com/zingp/p/8626834.html