线程执行
join 与 setDaemon
子线程在主线程运行结束后, 会继续执行完, 如果给子线程设置为守护线程 (setDaemon=True), 主线程运行结束子线程即结束;
如果 join() 线程, 那么主线程会等待子线程执行完再执行.
- import threading
- import time
- def get_thread_a():
- print("get thread A started")
- time.sleep(3)
- print("get thread A end")
- def get_thread_b():
- print("get thread B started")
- time.sleep(5)
- print("get thread B end")
- if __name__ == "__main__":
- thread_a = threading.Thread(target=get_thread_a)
- thread_b = threading.Thread(target=get_thread_b)
- start_time = time.time()
- thread_b.setDaemon(True)
- thread_a.start()
- thread_b.start()
- thread_a.join()
- end_time = time.time()
- print("execution time: {}".format(end_time - start_time))
thread_a 是 join, 首先子线程 thread_a 执行, thread_b 是守护线程, 当主线程执行完后, thread_b 不会再执行
执行结果如下:
- get thread A started
- get thread B started
- get thread A end
- execution time: 3.003199815750122
线程同步
当线程间共享全局变量, 多个线程对该变量执行不同的操作时, 该变量最终的结果可能是不确定的 (每次线程执行后的结果不同), 如: 对 count 变量执行加减操作 ,count 的值是不确定的, 要想 count 的值是一个确定的需对线程执行的代码段加锁.
python 对线程加锁主要有 Lock 和 Rlock 模块
- Lock:
- from threading import Lock
- lock = Lock()
- lock.acquire()
- lock.release()
Lock 有 acquire() 和 release() 方法, 这两个方法必须是成对出现的, acquire() 后面必须 release() 后才能再 acquire(), 否则会造成死锁
Rlock:
鉴于 Lock 可能会造成死锁的情况, RLock(可重入锁) 对 Lock 进行了改进, RLock 可以在同一个线程里面连续调用多次 acquire(), 但必须再执行相同次数的 release()
- from threading import RLock
- lock = RLock()
- lock.acquire()
- lock.acquire()
- lock.release()
- lock.release()
condition(条件变量), 线程在执行时, 当满足了特定的条件后, 才可以访问相关的数据
- import threading
- def get_thread_a(condition):
- with condition:
- condition.wait()
- print("A : Hello B,that's ok")
- condition.notify()
- condition.wait()
- print("A : I'm fine,and you?")
- condition.notify()
- condition.wait()
- print("A : Nice to meet you")
- condition.notify()
- condition.wait()
- print("A : That's all for today")
- condition.notify()
- def get_thread_b(condition):
- with condition:
- print("B : Hi A, Let's start the conversation")
- condition.notify()
- condition.wait()
- print("B : How are you")
- condition.notify()
- condition.wait()
- print("B : I'm fine too")
- condition.notify()
- condition.wait()
- print("B : Nice to meet you,too")
- condition.notify()
- condition.wait()
- print("B : Oh,goodbye")
- if __name__ == "__main__":
- condition = threading.Condition()
- thread_a = threading.Thread(target=get_thread_a, args=(condition,))
- thread_b = threading.Thread(target=get_thread_b, args=(condition,))
- thread_a.start()
- thread_b.start()
Condition 内部有一把锁, 默认是 RLock, 在调用 wait() 和 notify() 之前必须先调用 acquire() 获取这个锁, 才能继续执行; 当 wait() 和 notify() 执行完后, 需调用 release() 释放这个锁, 在执行 with condition 时, 会先执行 acquire(),with 结束时, 执行了 release(); 所以 condition 有两层锁, 最底层锁在调用 wait() 时会释放, 同时会加一把锁到等待队列, 等待 notify() 唤醒释放锁
wait() : 允许等待某个条件变量的通知, notify() 可唤醒
notify(): 唤醒等待队列 wait()
执行结果:
- B : Hi A, Let's start the conversation
- A : Hello B,that's ok
- B : How are you
- A : I'm fine,and you?
- B : I'm fine too
- A : Nice to meet you
- B : Nice to meet you,too
- A : That's all for today
- B : Oh,goodbye
- Semaphore(信号量)
用于控制线程的并发数, 如爬虫中请求次数过于频繁会被禁止 ip, 每次控制爬取网页的线程数量可在一定程度上防止 ip 被禁; 文件读写中, 控制写线程每次只有一个, 读线程可多个.
- import time
- import threading
- def get_thread_a(semaphore,i):
- time.sleep(1)
- print("get thread : {}".format(i))
- semaphore.release()
- def get_thread_b(semaphore):
- for i in range(10):
- semaphore.acquire()
- thread_a = threading.Thread(target=get_thread_a, args=(semaphore,i))
- thread_a.start()
- if __name__ == "__main__":
- semaphore = threading.Semaphore(2)
- thread_b = threading.Thread(target=get_thread_b, args=(semaphore,))
- thread_b.start()
上述示例了每隔 1 秒并发两个线程执行的情况, 当调用一次 semaphore.acquire() 时, Semaphore 的数量就减 1, 直至 Semaphore 数量为 0 时被锁上, 当 release() 后 Semaphore 数量加 1.Semaphore 在本质上是调用的 Condition,semaphore.acquire() 在 Semaphore 的值为 0 的条件下会调用 Condition.wait(), 否则将值减 1,semaphore.release() 会将 Semaphore 的值加 1, 并调用 Condition.notify()
Semaphore 源码
- def acquire(self, blocking=True, timeout=None):
- if not blocking and timeout is not None:
- raise ValueError("can't specify timeout for non-blocking acquire")
- rc = False
- endtime = None
- with self._cond:
- while self._value == 0:
- if not blocking:
- break
- if timeout is not None:
- if endtime is None:
- endtime = _time() + timeout
- else:
- timeout = endtime - _time()
- if timeout <= 0:
- break
- self._cond.wait(timeout)
- else:
- self._value -= 1
- rc = True
- return rc
- def release(self):
- with self._cond:
- self._value += 1
- self._cond.notify()
来源: https://www.cnblogs.com/FG123/p/9704158.html