CyclicBarrier 也叫回环栅栏, 能够实现让一组线程运行到栅栏处并阻塞, 等到所有线程都到达栅栏时再一起执行的功能."回环" 意味着 CyclicBarrier 可以多次重复使用, 相比于 CountDownLatch 只能使用一次, CyclicBarrier 可以节省许多资源, 并且还可以在构造器中传入任务, 当栅栏条件满足时执行这个任务. CyclicBarrier 是使用了 ReentrantLock, 主要方法在执行时都会加锁, 因此并发性能不是很高.
1. 相关字段
- // 重入锁, CyclicBarrier 内部通过重入锁实现线程安全
- private final ReentrantLock lock = new ReentrantLock();
- // 线程阻塞时的等待条件
- private final Condition trip = lock.newCondition();
- // 需要等待的线程数
- private final int parties;
- // 栅栏打开之后首先执行的任务
- private final Runnable barrierCommand;
- // 记录当前的分代标记
- private Generation generation = new Generation();
- // 当前还需要等待多少个线程运行到栅栏位置
- private int count;
需要注意的是 generation 字段, 用于标记栅栏当前处在哪一代. 当满足一定的条件时 (例如调用了 reset 方法, 或者栅栏打开等), 栅栏状态会切换到下一代, 实际就是 new 一个新的 Generation 对象, 这是 CyclicBarrier 的内部类, 代码非常简单, 如下:
- private static class Generation {
- boolean broken = false; // 标记栅栏是否被破坏
- }
实际使用的过程中, 会利用 generation 字段判断当前是否在同一个分代, 而使用 broker 字段判断栅栏是否被破坏.
2. 构造函数
CyclicBarrier 有两个重载的构造函数, 构造函数只是对上述的相关字段进行初始化, 如下:
- public CyclicBarrier(int parties) {
- this(parties, null);
- }
- public CyclicBarrier(int parties, Runnable barrierAction) {
- if (parties <= 0) throw new IllegalArgumentException();
- this.parties = parties;
- this.count = parties;
- this.barrierCommand = barrierAction;
- }
3. 核心方法
await
await 是开发时最常用到的方法了, 同 CountDownLatch 一样, CyclicBarrier 也提供了两个 await 方法, 一个不带参数, 一个带有超时参数, 其内部只是简单调用了一下 dowait 方法:
- public int await() throws InterruptedException, BrokenBarrierException {
- try {
- return dowait(false, 0L);
- } catch (TimeoutException toe) {
- throw new Error(toe); // cannot happen
- }
- }
- public int await(long timeout, TimeUnit unit)
- throws InterruptedException,
- BrokenBarrierException,
- TimeoutException {
- return dowait(true, unit.toNanos(timeout));
- }
接下来看看至关重要的 dowait 方法:
- private int dowait(boolean timed, long nanos)
- throws InterruptedException, BrokenBarrierException,
- TimeoutException {
- final ReentrantLock lock = this.lock;
- // 加重入锁
- lock.lock();
- try {
- // 首先获取年龄代信息
- final Generation g = generation;
- // 如果栅栏状态被破坏, 抛出异常, 例如先启动的线程调用了 breakBarrier 方法, 后启动的线程就能够看到 g.broker=true
- if (g.broken)
- throw new BrokenBarrierException();
- // 检测线程的中断状态, 如果线程设置了中断状态, 则通过 breakBarrier 设置栅栏为已破坏状态, 并唤醒其他线程
- // 如果这里能够检测到中断状态, 那只可能是在 await 方法外部设置的
- if (Thread.interrupted()) {
- breakBarrier();
- throw new InterruptedException();
- }
- // 每调用一次 await, 就将需要等待的线程数减 1
- int index = --count;
- //index=0 表示这是最后一个到达的线程, 由该线程执行下面的逻辑
- if (index == 0) { // tripped
- boolean ranAction = false;
- try {
- final Runnable command = barrierCommand;
- // 如果在构造器中传入了第二个任务参数, 就在放开栅栏前先执行这个任务
- if (command != null)
- command.run();
- ranAction = true;
- // 正常结束, 需要唤醒阻塞的线程, 并换代
- nextGeneration();
- return 0;
- } finally {
- //try 代码块如果正常执行, ranAction 就一定等于 true, 而 try 代码块唯一可能发生异常的地方就是 command.run(),
- // 因此这里为了保证在任务执行失败时, 将栅栏标记为已破坏, 唤醒阻塞线程
- if (!ranAction)
- breakBarrier();
- }
- }
- // loop until tripped, broken, interrupted, or timed out
- for (;;) {
- try {
- // 没有设置超时标记, 就加入等待队列
- if (!timed)
- trip.await();
- // 设置了超时标记, 但目前还没有超时, 则继续等待
- else if (nanos> 0L)
- nanos = trip.awaitNanos(nanos);
- } catch (InterruptedException IE) {
- // 如果线程等待的过程中被中断, 会执行到这里
- //g == generation 表示当前还在同一个年龄分代中,!g.broker 表示当前栅栏状态没有被破坏
- if (g == generation && ! g.broken) {
- breakBarrier();
- throw IE;
- } else {
- // 上面的条件不满足, 说明: 1)g!=generation, 说明线程执行到这里时已经换代了
- //2) 没有换代, 但是栅栏被破坏了
- // 无论哪种情况, 都只是简单地设置一下当前线程的中断状态
- Thread.currentThread().interrupt();
- }
- }
- // 栅栏被破坏, 抛出异常
- // 注意, 在 breakBarrier 方法中会唤醒所有等待条件的线程, 这些线程会执行到这里, 判断栅栏已经被破坏, 都会抛出异常
- if (g.broken)
- throw new BrokenBarrierException();
- // 距离上一次设置 g 变量的值已经过去很长时间了, 在执行过程中 generation 可能已经发生改变,
- // 当前线程还是前几代的, 不需要再循环阻塞了, 直接返回上一代剩余需要等待的线程数
- // 注意: 代码中 breakBarrier 方法和 nextGeneration 方法都会唤醒阻塞的线程, 但是 breakBarrier 在上一个判断就被拦截了,
- // 因此走到这里的有三种情况:
- //a) 最后一个线程正常执行, 栅栏打开导致其他线程被唤醒; 不属于当前代的线程直接返回,
- // 属于当前代的则可能因为没到栅栏开放条件要继续循环阻塞
- //b) 栅栏被重置 (调用了 reset 方法), 此时 g!=negeration, 全都直接返回
- //c) 线程等待超时了, 不属于当前代的返回就可以了, 属于当前代的则要设置 generation.broken = true
- if (g != generation)
- return index;
- // 如果线程等待超时, 标记栅栏为破坏状态并抛出异常, 如果还没超时, 则自旋后又重新阻塞
- if (timed && nanos <= 0L) {
- breakBarrier();
- throw new TimeoutException();
- }
- }
- } finally {
- // 别忘了解锁
- lock.unlock();
- }
- }
dowait 的方法逻辑是: 每一个调用 await 方法的线程都会将计数 count 减 1, 最后一个线程将 count 减为 0 时, 顺带还要执行 barrierCommand 指定的任务, 并将 generation 切换到下一代, 当然, 最重要的还是要唤醒之前在栅栏处阻塞的线程. 由于 trip 对应的 Condition 对象没有任何地方会修改, 因此 trip.signalAll() 会唤醒所有在该条件上等待的线程, 如果线程在等待的过程中, 其他线程将 generation 更新到下一代, 就会出现被唤醒的线程中有部分还属于之前那一代的情况.
接下来将会对 dowait 用到的一些方法进行简单介绍.
breakBarrier
dowait 方法有四个地方调用了 breakBarrier, 从名字可以看出, 该方法会将 generation.broken 设置为 true, 除此之外, 还会还原 count 的值, 并且唤醒所有被阻塞的线程:
- private void breakBarrier() {
- generation.broken = true;
- count = parties;
- // 唤醒所有的阻塞线程
- trip.signalAll();
- }
纵观 CyclicBarrier 源码, generation.broken 统一在 breakBarrier 方法中被设置为 true, 而一旦将 generation.broken 设置为 true 之后, 代码中检查到这个状态之后都会抛出异常, 栅栏就没办法再使用了 (可以手动调用 reset 进行重置), 而源码中会在以下几种情况调用 breakBarrier 方法:
1) 当前线程被中断
2) 通过构造器传入的任务执行失败
3) 条件等待时被中断
4) 线程等待超时
5) 显式调用 reset 方法
- nextGeneration
- private void nextGeneration() {
- // 唤醒所有的阻塞线程
- trip.signalAll();
- // 开启下一代
- count = parties;
- generation = new Generation();
- }
- reset
reset 方法主要是结束这一代, 并切换到下一代
- public void reset() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- breakBarrier(); // break the current generation
- nextGeneration(); // start a new generation
- } finally {
- lock.unlock();
- }
- }
介绍到这里, 整个 CyclicBarrier 已经差不多介绍完了, 但是内部的流程远远没有这么简单, 因为很大一部分逻辑封装在 AbstractQueuedSynchronizer 中, 这个类定义了阻塞的线程如何加入等待队列, 又如何被唤醒, 因此如果想要深入了解线程等待的逻辑, 还需要仔细研究 AbstractQueuedSynchronizer 才行. 本文不会对这部分内容进行介绍, 后面有时间的话将会专门对其进行介绍.
来源: https://www.cnblogs.com/NaLanZiYi-LinEr/p/12484022.html