问题
(1)CyclicBarrier 是什么?
(2)CyclicBarrier 具有什么特性?
(3)CyclicBarrier 与 CountDownLatch 的对比?
简介
CyclicBarrier, 回环栅栏, 它会阻塞一组线程直到这些线程同时达到某个条件才继续执行. 它与 CountDownLatch 很类似, 但又不同, CountDownLatch 需要调用 countDown()方法触发事件, 而 CyclicBarrier 不需要, 它就像一个栅栏一样, 当一组线程都到达了栅栏处才继续往下走.
使用方法
- public class CyclicBarrierTest {
- public static void main(String[] args) {
- CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
- for (int i = 0; i <3; i++) {
- new Thread(()->{
- System.out.println("before");
- try {
- cyclicBarrier.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (BrokenBarrierException e) {
- e.printStackTrace();
- }
- System.out.println("after");
- }).start();
- }
- }
- }
这段方法很简单, 使用一个 CyclicBarrier 使得三个线程保持同步, 当三个线程同时到达 cyclicBarrier.await(); 处大家再一起往下运行.
源码分析
主要内部类
- private static class Generation {
- boolean broken = false;
- }
Generation, 中文翻译为代, 一代人的代, 用于控制 CyclicBarrier 的循环使用.
比如, 上面示例中的三个线程完成后进入下一代, 继续等待三个线程达到栅栏处再一起执行, 而 CountDownLatch 则做不到这一点, CountDownLatch 是一次性的, 无法重置其次数.
主要属性
- // 重入锁
- private final ReentrantLock lock = new ReentrantLock();
- // 条件锁, 名称为 trip, 绊倒的意思, 可能是指线程来了先绊倒, 等达到一定数量了再唤醒
- private final Condition trip = lock.newCondition();
- // 需要等待的线程数量
- private final int parties;
- // 当唤醒的时候执行的命令
- private final Runnable barrierCommand;
- // 代
- private Generation generation = new Generation();
- // 当前这一代还需要等待的线程数
- private int count;
通过属性可以看到, CyclicBarrier 内部是通过重入锁的条件锁来实现的, 那么你可以脑补一下这个场景吗?
彤哥来脑补一下: 假如初始时 count = parties = 3, 当第一个线程到达栅栏处, count 减 1, 然后把它加入到 Condition 的队列中, 第二个线程到达栅栏处也是如此, 第三个线程到达栅栏处, count 减为 0, 调用 Condition 的 signalAll()通知另外两个线程, 然后把它们加入到 AQS 的队列中, 等待当前线程运行完毕, 调用 lock.unlock()的时候依次从 AQS 的队列中唤醒一个线程继续运行, 也就是说实际上三个线程先依次 (排队) 到达栅栏处, 再依次往下运行.
以上纯属彤哥脑补的内容, 真实情况是不是如此呢, 且往后看.
构造方法
- public CyclicBarrier(int parties, Runnable barrierAction) {
- if (parties <= 0) throw new IllegalArgumentException();
- // 初始化 parties
- this.parties = parties;
- // 初始化 count 等于 parties
- this.count = parties;
- // 初始化都到达栅栏处执行的命令
- this.barrierCommand = barrierAction;
- }
- public CyclicBarrier(int parties) {
- this(parties, null);
- }
构造方法需要传入一个 parties 变量, 也就是需要等待的线程数.
await()方法
每个需要在栅栏处等待的线程都需要显式地调用 await()方法等待其它线程的到来.
- public int await() throws InterruptedException, BrokenBarrierException {
- try {
- // 调用 dowait 方法, 不需要超时
- return dowait(false, 0L);
- } catch (TimeoutException toe) {
- throw new Error(toe); // cannot happen
- }
- }
- private int dowait(boolean timed, long nanos)
- throws InterruptedException, BrokenBarrierException,
- TimeoutException {
- final ReentrantLock lock = this.lock;
- // 加锁
- lock.lock();
- try {
- // 当前代
- final Generation g = generation;
- // 检查
- if (g.broken)
- throw new BrokenBarrierException();
- // 中断检查
- if (Thread.interrupted()) {
- breakBarrier();
- throw new InterruptedException();
- }
- // count 的值减 1
- int index = --count;
- // 如果数量减到 0 了, 走这段逻辑(最后一个线程走这里)
- if (index == 0) { // tripped
- boolean ranAction = false;
- try {
- // 如果初始化的时候传了命令, 这里执行
- final Runnable command = barrierCommand;
- if (command != null)
- command.run();
- ranAction = true;
- // 调用下一代方法
- nextGeneration();
- return 0;
- } finally {
- if (!ranAction)
- breakBarrier();
- }
- }
- // 这个循环只有非最后一个线程可以走到
- for (;;) {
- try {
- if (!timed)
- // 调用 condition 的 await()方法
- trip.await();
- else if (nanos> 0L)
- // 超时等待方法
- nanos = trip.awaitNanos(nanos);
- } catch (InterruptedException IE) {
- if (g == generation && ! g.broken) {
- breakBarrier();
- throw IE;
- } else {
- // We're about to finish waiting even if we had not
- // been interrupted, so this interrupt is deemed to
- // "belong" to subsequent execution.
- Thread.currentThread().interrupt();
- }
- }
- // 检查
- if (g.broken)
- throw new BrokenBarrierException();
- // 正常来说这里肯定不相等
- // 因为上面打破栅栏的时候调用 nextGeneration()方法时 generation 的引用已经变化了
- if (g != generation)
- return index;
- // 超时检查
- if (timed && nanos <= 0L) {
- breakBarrier();
- throw new TimeoutException();
- }
- }
- } finally {
- lock.unlock();
- }
- }
- private void nextGeneration() {
- // 调用 condition 的 signalAll()将其队列中的等待者全部转移到 AQS 的队列中
- trip.signalAll();
- // 重置 count
- count = parties;
- // 进入下一代
- generation = new Generation();
- }
dowait()方法里的整个逻辑分成两部分:
(1)最后一个线程走上面的逻辑, 当 count 减为 0 的时候, 打破栅栏, 它调用 nextGeneration()方法通知条件队列中的等待线程转移到 AQS 的队列中等待被唤醒, 并进入下一代.
(2)非最后一个线程走下面的 for 循环逻辑, 这些线程会阻塞在 condition 的 await()方法处, 它们会加入到条件队列中, 等待被通知, 当它们唤醒的时候已经更新换 "代" 了, 这时候返回.
图解
学习过前面的章节, 看这个图很简单了, 看不懂的同学还需要把推荐的内容好好看看哦 ^^
总结
(1)CyclicBarrier 会使一组线程阻塞在 await()处, 当最后一个线程到达时唤醒 (只是从条件队列转移到 AQS 队列中) 前面的线程大家再继续往下走;
(2)CyclicBarrier 不是直接使用 AQS 实现的一个同步器;
(3)CyclicBarrier 基于 ReentrantLock 及其 Condition 实现整个同步逻辑;
彩蛋
CyclicBarrier 与 CountDownLatch 的异同?
(1)两者都能实现阻塞一组线程等待被唤醒;
(2)前者是最后一个线程到达时自动唤醒;
(3)后者是通过显式地调用 countDown()实现的;
(4)前者是通过重入锁及其条件锁实现的, 后者是直接基于 AQS 实现的;
(5)前者具有 "代" 的概念, 可以重复使用, 后者只能使用一次;
(6)前者只能实现多个线程到达栅栏处一起运行;
(7)后者不仅可以实现多个线程等待一个线程条件成立, 还能实现一个线程等待多个线程条件成立(详见 CountDownLatch 那章使用案例);
来源: https://www.cnblogs.com/tong-yuan/p/CyclicBarrier.html