- # ### lock (互斥锁)
- """
- # 应用在多进程当中
- # 互斥锁 lock : 互斥锁是进程间的 get_ticket 互相排斥
- 进程之间, 谁先抢占到资源, 谁就先上锁, 等到解锁之后, 下一个进程在继续使用
- """
- lock.acquire()# 上锁
- lock.release()# 解锁
- # 同一时间允许一个进程上一把锁 就是 Lock
加锁可以保证多个进程修改同一块数据时, 同一时间只能有一个任务可以进行修改, 即串行 (同步) 的修改, 没错, 速度是慢了, 但牺牲速度却保证了数据安全.
# 同一时间允许多个进程上多把锁 就是[信号量 Semaphore]
信号量是锁的变形: 实际实现是 计数器 + 锁, 同时允许多个进程上锁
- # 互斥锁 Lock : 互斥锁就是进程的互相排斥, 谁先抢到资源, 谁就上锁改资源内容, 为了保证数据的同步性
- # 注意: 多个锁一起上, 不开锁, 会造成死锁. 上锁和解锁是一对.
代码如下
- lock = Lock()
- # 上锁
- lock.acquire()
- print(1)
- # 上锁和解锁是一对, 一对一的关系, 如果只上锁不解锁, 会发生死锁
- lock.acquire()
- print(2)
- lock.release()
- # 解锁
- lock.release()
- import JSON
- # 读取票数, 更新票数
- def wr_info(sign,dic=None):
- if sign == "r":
- with open("ticket",mode="r",encoding="utf-8") as fp:
- dic = JSON.load(fp)
- return dic
- elif sign == "w":
- with open("ticket",mode="w",encoding="utf-8") as fp:
- JSON.dump(dic,fp)
这段代码里函数是根据我所传入的状态来执行并且返回相应的条件, 如果我是传入的是一个 "r" 那么我当前处于读取的模式会把文件里面票数读出来, 并且返回给调用处, 如果我传入的是 "w" 模式那么我当前是写的模式把我当前的状态写入票卡文件夹里
进程在创建的时候是异步创建的, 在上锁的时候同步进行的
这里介绍下 Lock 的简写
首先实例化一个 lock 对象
l=Lock()
with l:
code...
这里相当于传统的写法
- l.acquire()
- code...
- l.release()
信号量 Semaphore
同一时间可以上多把锁即首先实例化一个 semaphore 对象, 然后指定你要上锁的数量由进程调用
- # ### 信号量 Semaphore 本质上就是锁, 同一时间可以上多把锁
- """
- # 语法:
- sem = Semaphore(3)
- sem.acquire()
- sem.release()
- """
- import time
- import random
- from multiprocessing import Process,Semaphore
- def ktv(person,sem):
- sem.acquire()
- print("%s 进入了 ktv, 正在唱歌" % (person))
- time.sleep(random.randrange(3,6))
- print("%s 唱完了, 离开了 ktv" % (person))
- sem.release()
- if __name__ == "__main__":
- sem = Semaphore(3)
- for i in range(10):
- p = Process(target=ktv,args=("person%s" % (i), sem))
- p.start()
- """
- lock 多个进程之间, 一次只能上一把锁
- Semaphore 多个进程之间, 可以自定义上锁的数量, 不限于一个
- """
用法如上.....
- # ### 事件 (Event)
- # 阻塞事件 :
e = Event()生成事件对象 e
e.wait()动态给程序加阻塞 , 程序当中是否加阻塞完全取决于该对象中的 is_set() [默认返回值是 False]
- # 如果是 True 不加阻塞
- # 如果是 False 加阻塞
- # 控制这个属性的值
- # set()方法 将这个属性的值改成 True
- # clear()方法 将这个属性的值改成 False
- # is_set()方法 判断当前的属性是否为 True (默认上来是 False)
事件 Event 只针对于多进程, 在普通的同步进程中是不会出现 Event 事件的, 那么当我使用事件时一个进程中最少需要保持一个子进程去进行修改状态为 True 另外一个进程去获取状态后在修改 False 状态, 造成的程序也会变成同步, 两个进程之间同时保持着紧密的联系
以下通过代码方式实现事件 event 的过程
- def traffic_light(e):
- # 默认红灯亮
- print("红灯亮")
- while True:
- if e.is_set():
- # 让绿灯亮 1 秒钟
- time.sleep(1)
- # 切换成红灯
- print("红灯亮")
- # 把 True 改成 False
- e.clear()
- else:
- # 让红灯亮 1 秒钟
- time.sleep(1)
- # 切换成绿灯
- print("绿灯亮")
- # 把默认值从 False 改成 True
- e.set()
- # e = Event()
- # traffic_light(e)
- def car(e,i):
- # 判断如果是红灯亮, 就执行下面代码
- if not e.is_set():
- print("car%s 在等待" % (i))
- e.wait()
- print("car%s 通行了" % (i))
- """
- # 方法一
- if __name__ == "__main__":
- e = Event()
- # 创建交通灯对象
- p1 = Process(target=traffic_light,args=(e,))
- p1.start()
- # 创建小车
- for i in range(20):
- time.sleep(random.randrange(0,2))
- p2 = Process(target=car,args=(e,i))
- p2.start()
- """
- # 方法二: 优化红绿灯代码[当小车执行结束的时候, 把红绿灯终止]
- if __name__ == "__main__":
- lst = []
- e = Event()
- # 创建交通灯对象
- p1 = Process(target=traffic_light,args=(e,))
- p1.daemon=True
- p1.start()
- # 创建小车
- for i in range(20):
- time.sleep(random.randrange(0,2))
- p2 = Process(target=car,args=(e,i))
- p2.start()
- lst.append(p2)
- # 等所有小车都同行之后, 在关闭守护进程
- for i in lst:
- i.join()
- print("主程序执行结束...")
通过程序我们可以看出我们得通过事件的 is_set 方法来判定当前事件的状态是为 False 还是为 True, 为 False 执行 car 函数为 True 执行 traffic_light 函数
进程之队列
- # ### 进程队列 [让进程之间共享资源]
- from multiprocessing import Process,Queue
- # 线程队列
- import queue
- """先进先出"""
(1) 基本语法
q = Queue()
1. 用 put 方法往队列中存值
q.put(111)
2. 用 get 方法从队列中取值
- res = q.get()
- print(res)
3. 当队列中没有值了, 在调用 get 就会发生阻塞
- res = q.get()
- print(res)
4.get_nowait 在没值的时候, 直接报错; 存在兼容性问题(不推荐使用, 报错报的是线程队列中的空, 在取值时存在 bug)
- res = q.get_nowait()
- print(res)
- # (了解)
- try:
- res = q.get_nowait()
- print(res)
- except queue.Empty:
- pass
- # (2) 可以适用 queue 指定队列长度
- q1 = Queue(3)
- q1.put(11)
- q1.put(22)
- q1.put(33)
注意: 如果超出了队列的长度, 直接阻塞
# q1.put(44)
注意: 如果超出了队列的长度, 直接报错(不推荐)
- # q1.put_nowait(44)
- # (3) 多进程之间共享数据
- def func(q):
- # 2. 在子进程中获取数据
- res = q.get()
- print(res)
- # 3. 子进程添加数据
- q.put("bbb")
- if __name__ == "__main__":
- q2 = Queue()
- p1 = Process(target=func,args=(q2,))
- p1.start()
- # 1. 在主进程中, 添加数据
- q2.put("aaa")
- # 为了能够拿到子进程中添加的队列元素, 需要等待子进程执行结束后在获取
- p1.join()
- # 4. 主进程获取子进程添加的数据
- res = q2.get()
- print("主进程执行结束: 值 %s" % (res))
生产者与消费者模型
- # (1) 基本语法
- from multiprocessing import Process,Queue
- import time,random
- # (1) 优化生产者和消费者模型 [生产者生产多少, 对应的就消费多少]
- # 消费者模型
- def consumer(q,name):
- while True:
- food = q.get()
- if food is None:
- break
- time.sleep(random.uniform(0.1,1))
- print("%s 吃了一个 %s" % (name,food))
- # 生产者模型
- def producer(q,name,food):
- for i in range(5):
- time.sleep(random.uniform(0.1,1))
- print("%s 生产了 %s%s" % (name,food,i))
- q.put(food+str(i))
- if __name__ == "__main__":
- q = Queue()
- # 创建生产者
- p1 = Process(target=producer,args=(q,"周永玲","便便"))
- p1.start()
- # 创建生产者 2 号
- p2 = Process(target=producer,args=(q,"常远","茶叶"))
- p2.start()
- # 创建消费者
- c1 = Process(target=consumer,args=(q,"张龙"))
- c1.start()
- # 创建消费者 2 号
- c2 = Process(target=consumer,args=(q,"林银展"))
- c2.start()
- p1.join()
- p2.join()
- # 在生产完所有数据之后, 在队列的最后塞进去一个 None, 用来表达已经生产完所有数据;
- q.put(None) # 便便 0 便便 1 便便 2 便便 3 便便 4 None
- q.put(None) # 便便 0 便便 1 便便 2 便便 3 便便 4 茶叶 1 茶叶 2 茶叶 3 茶叶 4 None None
- print("主程序执行结束 ...")
JoinableQueue 阻塞事件
put 存放
get 获取
task_done 队列数据减一
join 阻塞
task_done 与 join 通过一个中间变量统计队列中元素个数
每放入一个值 , 成员中的中间变量值加 1
没执行一次 task_done, 成员中的中间变量值减 1
join 会根据中间变量值来确定是阻塞还是放行
如果中间变量是 0 意味着放行
如果中间变量不是 0 意味着阻塞
- 1 # (1) 基本语法
- """
- jq = JoinableQueue()
- jq.put("aabbcc")
- print(jq.get())
- jq.task_done()
- jq.join()
- print("finish")
- """
生产者与消费者模型改造
- # 消费者模型
- def consumer(q,name):
- while True:
- food = q.get()
- time.sleep(random.uniform(0.1,1))
- print("%s 吃了一个 %s" % (name,food))
- q.task_done()
- # 生产者模型
- def producer(q,name,food):
- for i in range(5):
- time.sleep(random.uniform(0.1,1))
- print("%s 生产了 %s%s" % (name,food,i))
- q.put(food+str(i))
- if __name__ == "__main__":
- # 创建队列
- jq = JoinableQueue()
- # 消费者进程
- c1 = Process(target=consumer,args=(jq,"张晓东"))
- c1.daemon = True
- c1.start()
- # 生产者进程
- p1 = Process(target=producer,args=(jq,"黄乐锡","大茄子"))
- p1.start()
- # 等待生产者把所有数据放到队列中;
- p1.join()
- # 直到所有数据被消费完毕之后, 放行;
- jq.join()
- print("主进程执行结束 ...")
来源: http://www.bubuko.com/infodetail-3303490.html