一, 上节课复习
1, 守护进程: 如果父进程将子进程设置为守护进程, 那么在主进程代码运行完毕后守护进程就立即被回收
2, 互斥锁: 用来将并发编程串行, 牺牲了效率而保证了数据安全
3
, 队列: 管道 + 锁
二, 守护进程例子
解决: 消费者取空列表后 q.get()阻塞的问题
方法一:
- from multiprocessing import Process
- import time
- def foo():
- print(123)
- time.sleep(1)
- print("end123")
- def bar():
- print(456)
- time.sleep(3)
- print("end456")
- if __name__ == '__main__':
- p1=Process(target=foo)
- p2=Process(target=bar)
- p1.daemon=True #主进程代码运行完毕, 守护进程就会结束
- p1.start()
- p2.start()
- print("main-------")
三, 守护进程与应用
- import time
- import random
- from multiprocessing import Process,Queue
- def consumer(name,q):
- while True:
- res=q.get()
- if res is None:break
- time.sleep(random.randint(1,3))
- print('\033[46m
消费者
===%s 吃了 %s\033[0m' %(name,res))
- def producer(name,q,food):
- for i in range(5):
- time.sleep(random.randint(1,2))
- res='%s%s' %(food,i)
- q.put(res)
- print('\033[45m
生产者者
===%s 生产了 %s\033[0m' %(name,res))
- if __name__ == '__main__':
- #1, 共享的盆
- q=Queue()
- #2, 生产者们
- p1=Process(target=producer,args=('egon',q,'包子'))
- p2=Process(target=producer,args=('刘清政',q,'泔水'))
- p3=Process(target=producer,args=('杨军',q,'米饭'))
- #3, 消费者们
- c1=Process(target=consumer,args=('alex',q))
- c2=Process(target=consumer,args=('梁书东',q))
- p1.start()
- p2.start()
- p3.start()
- c1.start()
- c2.start()
- # 在生产者生产完毕后, 往队列的末尾添加一个结束信号 None
- p1.join()
- p2.join()
- p3.join()
- # 有几个消费者就应该放几个结束信号
- q.put(None)
- q.put(None)
方法二:
- import time
- import random
- from multiprocessing import Process,JoinableQueue
- def consumer(name,q):
- while True:
- res=q.get()
- time.sleep(random.randint(1,3))
- print('\033[46m
消费者
===%s 吃了 %s\033[0m' %(name,res))
- q.task_done()
- def producer(name,q,food):
- for i in range(5):
- time.sleep(random.randint(1,2))
- res='%s%s' %(food,i)
- q.put(res)
- print('\033[45m
生产者者
===%s 生产了 %s\033[0m' %(name,res))
- if __name__ == '__main__':
- #1, 共享的盆
- q=JoinableQueue()
- #2, 生产者们
- p1=Process(target=producer,args=('egon',q,'包子'))
- p2=Process(target=producer,args=('刘清政',q,'泔水'))
- p3=Process(target=producer,args=('杨军',q,'米饭'))
- #3, 消费者们
- c1=Process(target=consumer,args=('alex',q))
- c2=Process(target=consumer,args=('梁书东',q))
- c1.daemon=True # c1.daemon=True 必须在 c1.start() 前
- c2.daemon=True
- p1.start()
- p2.start()
- p3.start()
- c1.start()
- c2.start()
- # 确定生产者确确实实已经生产完毕
- p1.join()
- p2.join()
- p3.join()
- # 在生产者生产完毕后, 拿到队列中元素的总个数, 然后直到元素总数变为 0,q.join()这一行代码才算运行完毕
- q.join()
- #q.join()一旦结束意味着队列确实被取空, 消费者已经确确实实把数据都取干净了
- print('主进程结束')
四, 线程理论
1, 什么是线程
线程指的是一条流水线的工作过程
进程不是执行单位, 是资源单位
一个进程内自带一个线程, 线程是执行单位
2, 进程 VS 线程
1, 同一进程内的线程们共享该进程内资源, 不同进程内的线程资源肯定是隔离的
2, 创建线程的开销比创建进程要小的多
4, 线程中没有父子关系. 相较于子线程, 主线程特殊之处在于其代变了主进程的生命周期.
主进程等待子进程结束然后结束, 是为子进程回收资源.
主线程等待子线程结束然后结束, 是等待这个进程的代码 (其他非守护线程) 执行完毕.
主进程: 执行完代码就结束.
主线程: 所以子线程结束才结束.
五, 开启线程的两种方式
方式一: 导入 Thread 模块
- from threading import Thread
- import time
- def task(name):
- print('%s is running' %name)
- time.sleep(3)
- if __name__ == '__main__':
- t=Thread(target=task,args=('egon',))
- t.start()
- print('主线程')
方式二: 创建类继承 Thread
- from threading import Thread
- import time
- class MyThread(Thread):
- def run(self):
- print('%s is running' %self.name)
- time.sleep(3)
- if __name__ == '__main__':
- t=MyThread()
- t.start()
- print('主线程')
六, 进程 vs 线程
1
, 瞅一瞅 PID (Process ID)
- from threading import Thread
- import time,os
- def task():
- print('%s is running' %os.getpid())
- time.sleep(3)
- if __name__ == '__main__':
- t=Thread(target=task,)
- t.start()
- print('主线程',os.getpid()) #一个进程中的子线程 pid 相同
2, 线程创建开销小
3, 同一进程内的多个线程共享该进程内的资源
- from threading import Thread
- import time,os
- x=1000
- def task():
- global x
- x=0
- if __name__ == '__main__':
- t=Thread(target=task,)
- t.start()
- t.join()
- print('主线程',x) #主线程 0
七, 线程对象的其他方法
- from threading import Thread,current_thread,active_count,enumerate
- import time,os
- def task():
- print('%s is running' %current_thread().name) #Thread-1 is running
- time.sleep(3)
- if __name__ == '__main__':
- t1=Thread(target=task,name='第一个线程')
- t2=Thread(target=task,)
- t3=Thread(target=task,)
- t1.start()
- t2.start()
- t3.start()
- print(t1.is_alive()) #True
- print(active_count()) #4
- print(enumerate()) #[<_MainThread(MainThread, started 4320768832)>, <Thread(第一个线程, started 123145551912960)>, <Thread(Thread-1, started 123145557168128)>, <Thread(Thread-2, started 123145562423296)>] #当前活跃的线程
- print('主线程',current_thread().name) #主线程 MainThread
八, 守护线程
- from threading import Thread,current_thread
- import time
- def task():
- print('%s is running' %current_thread().name)
- time.sleep(3)
- if __name__ == '__main__':
- t1=Thread(target=task,name='第一个线程')
- t1.daemon = True
- t1.start()
- print('主线程')
- from threading import Thread
- import time
- def foo():
- print(123)
- time.sleep(5)
- print("end123")
- def bar():
- print(456)
- time.sleep(3)
- print("end456")
- if __name__ == '__main__':
- t1=Thread(target=foo)
- t2=Thread(target=bar)
- t1.daemon=True
- t1.start()
- t2.start()
- print("main-------")
- '''
- 123
- 456
- main-------
- end456
- '''
主进程: 执行完代码就结束.
主线程: 所以子线程结束才结束.
总结: 只要进程内没有可执行的代码守护就结束
九, 线程互斥锁
- from threading import Thread,Lock
- import time
- mutex=Lock()
- x=100
- def task():
- global x
- mutex.acquire()
- temp=x
- time.sleep(0.1)
- x=temp-1
- mutex.release()
- if __name__ == '__main__':
- start=time.time()
- t_l=[]
- for i in range(100):
- t=Thread(target=task)
- t_l.append(t)
- t.start()
- for t in t_l:
- t.join()
- print('主',x) #0
- print(time.time()-start)
十, 死锁现象与递归锁
- from threading import Thread,Lock,RLock
- import time
- # mutexA=Lock() #如果用 Lock(互斥锁), 会发生死锁现象
- # mutexB=Lock()
- mutexA=mutexB=RLock() #
是一把锁, 可连续 acqruie, 但只有其上的计数为 0 时其他线程才可对其调用
- class MyThread(Thread):
- def run(self):
- self.f1()
- self.f2()
- def f1(self):
- mutexA.acquire()
- print('%s 拿到了 A 锁' %self.name)
- mutexB.acquire()
- print('%s 拿到了 B 锁' %self.name)
- mutexB.release()
- mutexA.release()
- def f2(self):
- mutexB.acquire()
- print('%s 拿到了 B 锁' %self.name)
- time.sleep(0.1)
- mutexA.acquire()
- print('%s 拿到了 A 锁' %self.name)
- mutexA.release()
- mutexB.release()
- if __name__ == '__main__':
- for i in range(10):
- t=MyThread()
- t.start()
- print('主')
十一, 信号量
- # from multiprocessing import Semaphore #进程和线程中皆可导入 Semaphore 模块
- from threading import Thread,Semaphore,current_thread
- import time,random
- sm=Semaphore(5) #5 把钥匙, 即同时可以 5 个对象进行执行
- def go_wc():
- sm.acquire()
- print('%s 上厕所 ing' %current_thread().getName())
- time.sleep(random.randint(1,3))
- sm.release()
- if __name__ == '__main__':
- for i in range(23):
- t=Thread(target=go_wc)
- t.start()
来源: http://www.bubuko.com/infodetail-2576170.html