CountDownLatch
CountDownLatch 闭锁相当于一扇门, 在闭锁到达结束状态之前, 这扇门一直是关闭的, 并且没有任何线程能通过, 当到达结束状态时, 这扇门会打开并允许所有的线程通过. 当闭锁到达结束状态后, 将不会再改变状态, 门永远保持打开状态
CountDownLatch 实现原理
CountDownLatch 通过内部类 Sync 实现方法, sync 继承 AQS 重写模板中的方法. sync 内部定义:
- private static final class Sync extends AbstractQueuedSynchronizer {
- private static final long serialVersionUID = 4982264981922014374L;
- Sync(int count) {
- setState(count);
- }
- /**
- * 获取同步状态
- */
- int getCount() {
- return getState();
- }
- /**
- * 获取同步状态
- */
- protected int tryAcquireShared(int acquires) {
- return (getState() == 0) ? 1 : -1;
- }
- /**
- * 释放同步状态
- */
- protected boolean tryReleaseShared(int releases) {
- // Decrement count; signal when transition to zero
- for (;;) {
- int c = getState();
- if (c == 0)
- return false;
- int nextc = c-1;
- if (compareAndSetState(c, nextc))
- return nextc == 0;
- }
- }
- }
复制代码
从源码中重写的方法可以得知, CountDownLatch 中的 sync 采用共享模式. CountDownLatch 示例:
- public class TestHarness {
- public static long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
- final CountDownLatch startGate = new CountDownLatch(1);
- final CountDownLatch endGate = new CountDownLatch(nThreads);
- for (int i = 0; i <nThreads; i++) {
- Thread t = new Thread() {
- @Override
- public void run() {
- try {
- startGate.await();
- try {
- System.out.println(Thread.currentThread().getName() + "开始执行");
- task.run();
- } finally {
- endGate.countDown();
- System.out.println(Thread.currentThread().getName() + "执行结束");
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- };
- t.start();
- }
- long start = System.nanoTime();
- startGate.countDown();
- endGate.await();
- long end = System.nanoTime();
- System.out.println("所有线程执行完毕, 耗时:" + (end-start));
- return end - start;
- }
- public static void main(String[] args) throws InterruptedException {
- System.out.println(timeTasks(10, new Runnable() {
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName() + "--------work");
- }
- }));
- }
- }
复制代码
运行结果:
Thread-0 开始执行
Thread-3 开始执行
Thread-0--------work
Thread-0 执行结束
Thread-1 开始执行
Thread-2 开始执行
Thread-7 开始执行
Thread-7--------work
Thread-7 执行结束
Thread-9 开始执行
Thread-9--------work
Thread-9 执行结束
Thread-8 开始执行
Thread-8--------work
Thread-8 执行结束
Thread-2--------work
Thread-2 执行结束
Thread-6 开始执行
- Thread-1--------work
- Thread-6--------work
Thread-6 执行结束
Thread-5 开始执行
Thread-5--------work
Thread-5 执行结束
Thread-3--------work
Thread-3 执行结束
Thread-4 开始执行
Thread-1 执行结束
Thread-4--------work
Thread-4 执行结束
所有线程执行完毕, 耗时: 2794976
2794976
复制代码
CyclicBarrier
相对于 CountDownLatch 是一次性对象, 一旦进入终止状态, 就不能被重置, CyclicBarrier 可以反复使用. CyclicBarrier 类似于闭锁, 与闭锁的关键区别在于, 闭锁用于等待事件, 栅栏用于等待其他线程, 其作用是让一组线程到达一个屏障 (也可以叫同步点) 时被阻塞, 直到最后一个线程到达屏障时, 屏障才会开门, 所有被屏障拦截的线程才会继续运行.
CyclicBarrier 实现原理
CyclicBarrier 构造方法
- public CyclicBarrier(int parties) {
- this(parties, null);
- }
- public CyclicBarrier(int parties, Runnable barrierAction) {
- if (parties <= 0) throw new IllegalArgumentException();
- this.parties = parties;
- this.count = parties;
- this.barrierCommand = barrierAction;
- }
复制代码
参数 parties 指栅栏拦截的线程数量 参数 barrierAction 指当这些线程都到达栅栏时优先会执行的线程
await()方法
- public int await() throws InterruptedException, BrokenBarrierException {
- try {
- return dowait(false, 0L);
- } catch (TimeoutException toe) {
- throw new Error(toe); // cannot happen
- }
- }
- private int dowait(boolean timed, long nanos)
- throws InterruptedException, BrokenBarrierException,
- TimeoutException {
- final ReentrantLock lock = this.lock;
- // 获取锁
- lock.lock();
- try {
- final Generation g = generation;
- // 若栅栏处于断开状态, 抛出异常
- if (g.broken)
- throw new BrokenBarrierException();
- // 若线程中断, 断开 CyclicBarrier
- if (Thread.interrupted()) {
- breakBarrier();
- throw new InterruptedException();
- }
- int index = --count;
- // count 为 0 表明所有线程到达栅栏位置
- if (index == 0) { // tripped
- boolean ranAction = false;
- try {
- // 若初始化时指定了所有线程到达栅栏时的任务, 执行它
- final Runnable command = barrierCommand;
- if (command != null)
- command.run();
- ranAction = true;
- // 唤醒所有等待线程, 开始新的 generation
- nextGeneration();
- return 0;
- } finally {
- // 若任务执行异常, 断开 CyclicBarrier
- if (!ranAction)
- breakBarrier();
- }
- }
- // 循环所有线程到达栅栏或栅栏断开或线程中断或超时
- for (;;) {
- try {
- // 一直等待
- if (!timed)
- trip.await();
- // 限时等待
- else if (nanos> 0L)
- nanos = trip.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- // 若线程中断且栅栏没有断开, 断开 CyclicBarrier
- if (g == generation && ! g.broken) {
- breakBarrier();
- throw ie;
- } else {
- // We're about to finish waiting even if we had not
- // been interrupted, so this interrupt is deemed to
- // "belong" to subsequent execution.
- Thread.currentThread().interrupt();
- }
- }
- if (g.broken)
- throw new BrokenBarrierException();
- if (g != generation)
- return index;
- // 若等待超时, 断开 CyclicBarrier
- if (timed && nanos <= 0L) {
- breakBarrier();
- throw new TimeoutException();
- }
- }
- } finally {
- // 释放锁
- lock.unlock();
- }
- }
复制代码
其主要逻辑: 若有线程未到达栅栏位置, 到达栅栏位置的线程一直等待状态, 直至发生以下场景: . 所有线程都到达栅栏位置 . 有线程被中断 . 线程等待超时 . 有线程调用 reset()方法, 断开当前栅栏, 将栅栏重置为初始状态 reset 方法:
- public void reset() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- // 断开当前栅栏
- breakBarrier(); // break the current generation
- // 开始新的 generation
- nextGeneration(); // start a new generation
- } finally {
- lock.unlock();
- }
- }
复制代码
CyclicBarrier 示例
- public class CyclicBarrierTest {
- private static CyclicBarrier cyclicBarrier;
- static class CyclicBarrierThread extends Thread{
- public void run() {
- System.out.println("运动员:" + Thread.currentThread().getName() + "到场");
- try {
- cyclicBarrier.await();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- public static void main(String[] args){
- cyclicBarrier = new CyclicBarrier(5, new Runnable() {
- @Override
- public void run() {
- System.out.println("运动员全部到齐, 比赛开始");
- }
- });
- for(int i = 0 ; i < 5 ; i++){
- new CyclicBarrierThread().start();
- }
- }
- }
复制代码
运行结果:
运动员: Thread-0 到场
运动员: Thread-1 到场
运动员: Thread-2 到场
运动员: Thread-3 到场
运动员: Thread-4 到场
运动员全部到齐, 比赛开始
复制代码
CountDownLatch 与 CyclicBarrier 区别
.CountDownLatch 的计数器只能使用一次, 而 CyclicBarrier 的计数器可以使用 reset()方法重置. 所以 CyclicBarrier 能处理更为复杂的业务场景. 例如, 如果计算发生错误, 可以重置计数器, 并让线程重新执行一次
.CyclicBarrier 还提供其他有用的方法, 比如 getNumberWaiting 方法可以获得 Cyclic-Barrier 阻塞的线程数量. isBroken()方法用来了解阻塞的线程是否被中断
.CountDownLatch 倾向于一个线程等多个线程, CyclicBarrier 倾向于多个线程互相等待
来源: https://juejin.im/post/5b87f575e51d45389005bca9