threading 模块
线程有两种调用方式
直接调用
- import threading
- import time
- def sayhi(num): # 定义每个线程要执行的函数
- print("running on number:%s" % num)
- time.sleep(2)
- if __name__ == "__main__":
- t1 = threading.Thread(target=sayhi, args=(1,)) # 生成一个线程实例
- t2 = threading.Thread(target=sayhi, args=(2,)) # 生成一个线程实例
- t1.start() # 启动线程
- t2.start() # 启动线程
- print(t1.getName()) # 获取线程名
- print(t2.getName())
继承式调用
- import threading
- import time
- class MyThread(threading.Thread):
- def __init__(self, num):
- super(MyThread, self).__init__()
- self.num = num
- def run(self): # 定义每个线程要运行的函数
- print("running on number:%s" % self.num)
- time.sleep(2)
- if __name__ == "__main__":
- t1 = MyThread("1")
- t2 = MyThread("2")
- t1.start()
- t2.start()
并发的多线程示例
先看一段代码,启动十个线程,每个线程都会 sleep2 秒,我想计算下执行时间,预计是 2 秒左右
- import threading
- import time
- def run(n):
- print("task", n, threading.current_thread())
- time.sleep(2)
- start_time = time.time() # 开始时间
- for i in range(10):
- t = threading.Thread(target=run, args=(i,))
- t.start()
- use_time = time.time() - start_time
- print("use time: ", use_time)
但是执行后发现,程序并没有等待我所以线程都执行完就打印了执行时间,这样不是我想要的结果,原因是主线程的执行操作,不会等待子线程执行完成,此处是不堵塞的,说到这里就引入了 join,join 是等待子线程执行
- import threading
- import time
- def run(n):
- print("task", n, threading.current_thread())
- time.sleep(2)
- start_time = time.time()
- t_list = []
- for i in range(10):
- t = threading.Thread(target=run, args=(i,))
- t.start()
- t_list.append(t)
- for r in t_list:
- r.join()
- use_time = time.time() - start_time
- print("use time: ", use_time)
这样,主线程就能等待所以线程都执行结束再打印执行时间了,结果没有问题,use time: 2.0050017833709717
守护线程
主线程结束,守护线程不管执行没执行完它要做的操作都会跟着结束
- import threading
- import time
- def run(n):
- print("task", n, threading.current_thread())
- time.sleep(2)
- start_time = time.time()
- for i in range(10):
- t = threading.Thread(target=run, args=(i,))
- t.setDaemon(True) # 设置为守护线程
- t.start()
- use_time = time.time() - start_time
- print("use time: ", use_time)
通过 setDaemon(True) 设置此线程为守护线程,代码执行效果为主线程打印完使用时间程序就退出了,并没有 sleep
线程锁(互斥锁)
一个进程可以启动多个线程,多个线程共享一块内存空间,也就意味着每个线程可以访问同一个数据,如果两个线程要同时修改同一份数据时,肯定会出问题
为了避免这样的问题,我们可以 "加把锁",也就是线程锁,同一时间只能让一个线程去修改这个数据
- import threading
- def run(n):
- lock.acquire() # 加锁
- global num
- num += 1
- print(n)
- lock.release() # 释放锁
- lock = threading.Lock() # 实例化一个互斥锁
- num = 0
- for i in range(1000):
- t = threading.Thread(target=run, args=("t-%s" % i,))
- t.start()
- print("num:", num)
递归锁
一个大锁中还要有子锁
- import threading
- num1, num2 = 0, 0
- def run1():
- print("grab the first part data")
- lock.acquire()
- global num1
- num1 += 1
- lock.release()
- return num1
- def run2():
- print("grab the second part data")
- lock.acquire()
- global num2
- num2 += 1
- lock.release()
- return num2
- def run3():
- lock.acquire()
- res1 = run1()
- print("-----between run1 and run2-----")
- res2 = run2()
- lock.release()
- print("res1:%s,res2:%s" % (res1, res2))
- lock = threading.RLock()
- for i in range(10):
- t = threading.Thread(target=run3)
- t.start()
- while threading.active_count() != 1:
- pass
- else:
- print("-----all threads done-----")
- print(num1, num2)
semaphore(信号量)
互斥锁同时允许一个线程更改数据,而信号量是同时允许一定数量的线程更改数据,比如厕所有 3 个坑,最多允许 3 个人上厕所,后面的人只能等有人出来才能进去上厕所
- import threading
- import time
- def run(n):
- semaphore.acquire()
- time.sleep(2)
- print("run the thread: %s" % n)
- semaphore.release()
- if __name__ == "__main__":
- semaphore = threading.BoundedSemaphore(3) # 最多同时运行3个线程
- for i in range(10):
- t = threading.Thread(target=run, args=(i,))
- t.start()
- while threading.active_count() != 1:
- pass
- else:
- print("-----all threads done-----")
event 事件
通过事件可以实现两个或多个进程间的交互
- import threading
- import time
- import random
- def light():
- if not event.isSet():
- event.set() # 设置标志,表示通行
- count = 0
- while True:
- if count < 10:
- print('\033[42;1m--green light on---\033[0m')
- elif count < 13:
- print('\033[43;1m--yellow light on---\033[0m')
- elif count < 20:
- if event.isSet():
- event.clear() # 清除标志位,表示禁止通行
- print('\033[41;1m--red light on---\033[0m')
- else:
- count = 0
- event.set()
- time.sleep(1)
- count += 1
- def car(n):
- while True:
- time.sleep(random.randrange(10))
- if event.isSet():
- print("car [%s] is running.." % n)
- else:
- print("car [%s] is waiting for the red light.." % n)
- if __name__ == "__main__":
- event = threading.Event()
- Light = threading.Thread(target=light)
- Light.start()
- for i in range(3):
- Car = threading.Thread(target=car, args=(i,))
- Car.start()
queue 队列
程序解耦
加快执行速度
queue 有几种模式
queue.Queue(maxsize=0) # 先入先出
queue.LifoQueue(maxsize=0) # 后入先出
queue.PriorityQueue(maxsize=0) # 存储数据时可设置优先级的队列
先入先出的例子,取到 1,2,3
- import queue
- q = queue.Queue()
- q.put(1)
- q.put(2)
- q.put(3)
- print(q.get())
- print(q.get())
- print(q.get())
后入先出的例子,取到 3,2,1
- import queue
- q = queue.LifoQueue()
- q.put(1)
- q.put(2)
- q.put(3)
- print(q.get())
- print(q.get())
- print(q.get())
优先级的例子
- import queue
- q = queue.PriorityQueue()
- q.put((2, "jack"))
- q.put((3, "tom"))
- q.put((1, "jiachen"))
- print(q.get())
- print(q.get())
- print(q.get())
q.qsize() 为队列中有几个数据
q.empty() 判断队列中是否为空,空为 True,非空为 False
q.full() 判断队列是否满了,满了为 True,不满为 False
q.put(item,block=True,timeout=None) 将数据放入队列中
q.put_nowait(item) 将数据放入队列中,如果队列满了,不堵塞直接报异常
q.get(item,block=True,timeout=None) 取出队列中的数据,block 为 True 如果没有数据可以取就堵塞,为 False 如果没有数据可以取就报异常,timeout 为超时时间,超出时间报异常
q.get_nowait(item) 取出队列中的数据,如果没有数据,不堵塞直接报异常
q.task_done()
q.join() 堵塞直到队列被消费完
生产者消费者模型
一个简单的例子
- import queue
- import threading
- import time
- def producer(name):
- count = 1
- while True:
- q.put(count)
- print("[%s]生产了骨头[%s]" % (name, count))
- count += 1
- time.sleep(0.5)
- def consumer(name):
- while True:
- q.get()
- print("[%s]吃了根骨头" % name)
- time.sleep(1)
- if __name__ == "__main__":
- q = queue.Queue(5)
- p = threading.Thread(target=producer, args=("xxx",))
- c1 = threading.Thread(target=consumer,args=("tom",))
- c2 = threading.Thread(target=consumer,args=("jack",))
- p.start()
- c1.start()
- c2.start()
来源: http://www.bubuko.com/infodetail-1984201.html