name | meaning |
Barrier(parties,action=None,timeout=None) | 构建 Barrier 对象, 指定参与方数目, timeout 是 wait 方法未指定超时的默认值 |
n_waiting | 当前在屏障中等待的线程数 |
parties | 各方数, 需要多少个等待 |
wait(timeout=None) | 等待通过屏障, 返回 0 到 线程数 -1 的 integer, 每个线程返回不同, 如果 wait 方法设置了超时, 并触发超时发送, 屏障处于 broken 状态 |
- import threading,logging
- logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(threadName)s %(message)s')
- def worker(barrier:threading.Barrier):
- logging.info('n_waiting= {}'.format(barrier.n_waiting))
- try:
- bid=barrier.wait(timeout=2)
- logging.info('after barrier'.format())
- except threading.BrokenBarrierError:
- logging.info('Broken Barrier in {}'.format(threading.current_thread().name))
- barrier=threading.Barrier(3,)
- # barrier=threading.Barrier(3,timeout=3)
- for b in range(5): # try 3 4 5
- threading.Thread(target=worker,name='worker-{}'.format(b),args=(barrier,)).start()
name | implication |
broken | 如果屏障处于 broken 状态, 返回 True |
abort() | 将屏障至于 broken 状态, 等待中的线程或者调用等待方法的线程都会抛出 BrokenBarrierError 异常, 知道 reset 方法恢复屏障 |
reset() | 恢复屏障, 重新开始拦截 |
- import threading,logging
- logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(threadName)s %(message)s')
- def worker(barrier:threading.Barrier):
- logging.info('n_waiting= {}'.format(barrier.n_waiting))
- try:
- # bid=barrier.wait(timeout=2)
- bid=barrier.wait()
- logging.info('after barrier {}'.format(bid))
- except threading.BrokenBarrierError:
- logging.info('Broken Barrier in {}'.format(threading.current_thread().name))
- barrier=threading.Barrier(3,)
- # barrier=threading.Barrier(3,timeout=3)
- for b in range(5): # try 3 4 5
- threading.Thread(target=worker,name='worker-{}'.format(b),args=(barrier,)).start()
- print(barrier.broken,111111111111111)
- threading.Event().wait(2)
- barrier.abort()
- print(barrier.broken,222222222222222222)
- barrier.reset()
- print(barrier.broken,3333333333333333333)
- import threading,logging
- FORMAT='%(asctime)-15s\t [ %(threadName)s %(thread)8d ] %(message)s'
- logging.basicConfig(format=FORMAT,level=logging.WARNING)
- def worker(barrier:threading.Barrier):
- logging.error('waiting threads: {}'.format(barrier.n_waiting))
- try:
- barrier_id=barrier.wait()
- logging.critical('my barrier id: {}'.format(barrier_id))
- except threading.BrokenBarrierError:
- logging.error('Broken Barrier')
- barrier=threading.Barrier(3)
- for b in range(0,9):
- if b==2:
- barrier.abort()
- elif b==6:
- barrier.reset()
- threading.Event().wait(1)
- threading.Thread(target=worker,name='work-{}'.format(b),args=(barrier,)).start()
wait 方法超时实例
如果 wait 方法超时发生, 屏障将处于 broken 状态知道 reset
- import threading,logging
- FORMAT='%(asctime)-15s\t[ %(threadName)s %(thread)8d ] %(message)s'
- logging.basicConfig(level=logging.INFO,format=FORMAT)
- def w(barrier:threading.Barrier,i:int):
- logging.info('waiting threads: {}'.format(barrier.n_waiting))
- try:
- logging.info(barrier.broken)
- if i<3:
- barrier_id=barrier.wait(1)
- else:
- if i==6:
- barrier.reset()
- barrier_id=barrier.wait()
- logging.info('barrier id: {}'.format(barrier_id))
- except threading.BrokenBarrierError:
- logging.info('Broken Barrier')
- barrier=threading.(3)
- for b in range(0,9):
- threading.Event().wait(2)
- threading.Thread(target=w,name='w-{}'.format(b),args=(barrier,b)).start()
Barrier 应用:
并发初始化
所有线程都必须初始化完成后才能继续工作, 例如运行前加载数据, 检查, 如果这些工作没完成, 就开始运行, 将不能正常工作
10 个线程做 10 种准备工作, 每个线程负责一种工作, 只有这 10 个线程都完成后, 才能继续工作, 先完成的要等待后完成的线程
例如, 启动一个程序, 要先加载磁盘文件, 缓存预热, 初始化连接池等, 这些工作可以并发, 但是只有全部准备工作完成后, 才能继续, 假设, 数据库连接失败, 则初始化工作失败, 需 abort(), 屏障 broken, 所有线程收到异常退出
来源: http://www.bubuko.com/infodetail-3685888.html