是一个同步工具类, 它让一组线程等待直到一个屏障条件到达才接着执行后续代码. 名如其类, 它的意思就是循环屏障, 就是可以服复用的, 而我们知道另一个同步类 CountDownLatch 是不能复用的.
用法举例
- public class CB {
- static class MyThread extends Thread {
- private CyclicBarrier cyclicBarrier;
- public MyThread(CyclicBarrier cyclicBarrier) {
- this.cyclicBarrier = cyclicBarrier;
- }
- @Override
- public void run() {
- System.out.println("线程" + Thread.currentThread().getName() + "正在准备...");
- try {
- Thread.sleep(5000); // 以睡眠来模拟准备操作
- System.out.println("线程" + Thread.currentThread().getName() + "准备完毕");
- cyclicBarrier.await();
- } catch (Exception e) {
- e.printStackTrace();
- }
- System.out.println(Thread.currentThread().getName() + "所有线程准备完毕, 各自继续处理其他任务...");
- }
- }
- public static void main(String[] args) {
- int cnt = 4;
- CyclicBarrier cyclicBarrier = new CyclicBarrier(cnt);
- for (int i = 0; i <cnt; i++) {
- new MyThread(cyclicBarrier).start();
- }
- }
- }
以上面代码为例子, 其输出是:
线程 Thread-0 正在准备...
线程 Thread-3 正在准备...
线程 Thread-2 正在准备...
线程 Thread-1 正在准备...
线程 Thread-3 准备完毕
线程 Thread-0 准备完毕
线程 Thread-2 准备完毕
线程 Thread-1 准备完毕
Thread-3 所有线程准备完毕, 各自继续处理其他任务...
Thread-2 所有线程准备完毕, 各自继续处理其他任务...
Thread-0 所有线程准备完毕, 各自继续处理其他任务...
Thread-1 所有线程准备完毕, 各自继续处理其他任务...
可以看出, 每一个线程都必须等到所有线程准备完毕之后才能各自执行其后续操作, 这个条件就是屏障状态 barrier, 当调用 cyclicBarrier.await()方法之后, 线程处于等待 barrier 状态而无法执行后续代码.
构造器和成员变量
CyclicBarrier 提供了两个构造器:
- public CyclicBarrier(int parties, Runnable barrierAction) {
- if (parties <= 0) throw new IllegalArgumentException();
- this.parties = parties;
- this.count = parties;
- this.barrierCommand = barrierAction;
- }
- public CyclicBarrier(int parties) {
- this(parties, null);
- }
参数 parties 记录一共等待至 barrier 状态的线程个数, count 记录依然等待至 barrier 状态的线程的个数, 参数 barrierAction 为当这些线程都达到 barrier 状态时会执行的内容.
看下它的成员变量:
- /** 守护 barrier 状态的锁 */
- private final ReentrantLock lock = new ReentrantLock();
- /** 条件队列 */
- private final Condition trip = lock.newCondition();
- /** 参与的线程的数目 */
- private final int parties;
- /* 所有线程到达 barrier 状态后的执行操作 */
- private final Runnable barrierCommand;
- /** 当前代, 复用通过这个类实现 */
- private Generation generation = new Generation();
- /** 依然在等待 barrier 状态的线程数目 */
- private int count;
核心方法
核心方法是 dowait:
- private int dowait(boolean timed, long nanos)
- throws InterruptedException, BrokenBarrierException,
- TimeoutException {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- final Generation g = generation;
- if (g.broken)
- throw new BrokenBarrierException();
- if (Thread.interrupted()) {
- breakBarrier();
- throw new InterruptedException();
- }
- int index = --count;
- if (index == 0) { // tripped
- boolean ranAction = false;
- try {
- final Runnable command = barrierCommand;
- if (command != null)
- command.run();
- ranAction = true;
- nextGeneration();
- return 0;
- } finally {
- if (!ranAction)
- breakBarrier();
- }
- }
- // loop until tripped, broken, interrupted, or timed out
- for (;;) {
- try {
- if (!timed)
- trip.await();
- else if (nanos> 0L)
- nanos = trip.awaitNanos(nanos);
- } catch (InterruptedException IE) {
- if (g == generation && ! g.broken) {
- breakBarrier();
- throw IE;
- } else {
- // We're about to finish waiting even if we had not
- // been interrupted, so this interrupt is deemed to
- // "belong" to subsequent execution.
- Thread.currentThread().interrupt();
- }
- }
- if (g.broken)
- throw new BrokenBarrierException();
- if (g != generation)
- return index;
- if (timed && nanos <= 0L) {
- breakBarrier();
- throw new TimeoutException();
- }
- }
- } finally {
- lock.unlock();
- }
- }
所有线程进入屏障后 (即 count==0) 会调用 nextGeneration()方法进入下一代所有线程又可以重新进入到屏障中, 源代码:
- private void nextGeneration() {
- // signal completion of last generation
- trip.signalAll();
- // set up next generation
- count = parties;
- generation = new Generation();
- }
会调用 signalAll()方法唤醒所有的线程继续执行后续操作.
来源: http://www.jianshu.com/p/c90017b892c2