一, 前言
最近在研究 java.util.concurrent 包下的一些的常用类, 之前写了 AQS,ReentrantLock,ArrayBlockingQueue 以及 LinkedBlockingQueue 的相关博客, 今天这篇博客就来写一写并发包下的另一个常用类 --CountDownLatch. 这里首先要说明一点, CountDownLatch 是基于 AQS 实现的, AQS 才是真正实现了线程同步的组件, CountDownLatch 只是它的使用者, 所以如果想要学习 CountDownLatch, 请一定先要弄懂 AQS 的实现原理. 我以下的描述均建立在已经了解 AQS 的基础之上. 我之前写过一篇 AQS 实现原理的分析博客, 感兴趣可以看一看: 并发 -- 抽象队列同步器 AQS 的实现原理.
二, 正文
2.1 抽象队列同步器 AQS
在说 CountDownLatch 前, 必须要先提一下 AQS.AQS 全称抽象队列同步器 (AbstractQuenedSynchronizer), 它是一个可以用来实现线程同步的基础框架. 当然, 它不是我们理解的 Spring 这种框架, 它是一个类, 类名就是 AbstractQuenedSynchronizer, 如果我们想要实现一个能够完成线程同步的锁或者类似的同步组件, 就可以在使用 AQS 来实现, 因为它封装了线程同步的方式, 我们在自己的类中使用它, 就可以很方便的实现一个我们自己的锁.
AQS 的实现相对复杂, 无法通过短短的几句话将其说清楚, 我之前专门写过一篇分析 AQS 实现原理的博客: 并发 -- 抽象队列同步器 AQS 的实现原理.
在阅读下面的内容前, 请一定要先学习 AQS 的实现原理, 因为 CountDownLatch 的实现非常简单, 完全就是依赖于 AQS 的, 所以我以下的描述均建立在已经理解 AQS 的基础之上. 可以阅读上面推荐博客, 也可以自己去查阅相关资料.
2.2 CountDownLatch 的实现原理
既然已经开始学习 CountDownLatch 的实现原理了, 那一定已经知道了它的作用, 我这里就不详细展示了, 简单介绍一下: CountDownLatch 的被称为门栓, 可以将它看成是门上的锁, 它会给门上多把锁, 只有每一把锁都解开, 才能通过. 对于线程来说, CountDownLatch 会阻塞线程的运行, 只有当 CountDownLatc 内部记录的值减小为 0, 线程才能继续向前执行.
CountDownLatch 底层通过 AQS 实现, AQS 的一般使用方式就是以内部类的形式继承它, CountDownLatch 就是这么使用它的. 在 CountDownLatch 内部有一个内部类 Sync, 继承自 AQS, 并重写了 AQS 加锁解锁的方法, 并通过 Sync 的对象, 调用 AQS 的方法, 阻塞线程的运行. 我们知道, 创建一个 CountDownLatch 对象时, 需要传入一个整数值 count, 只有当 count 被减小为 0 时线程才能通过 await 方法, 否则将被 await 阻塞. 这里实际上是这样的: 当线程运行到 await 方法时, 需要去获取锁 (锁由 AQS 实现), 若 count 不为 0, 则线程就会获取锁失败, 被阻塞; 若 count 为 0, 则就能顺利通过. CountDownLatch 是一次性的, 因为没有方法可以增加 count 的值, 也就是说, 一旦 count 被减小为 0, 则之后就一直是 0 了, 也就再也不能阻塞线程了. 下面我们就从源码的角度来分析 CountDownLatch.
2.3 CountDownLatch 的内部类
前面我们说过, CountDownLatch 内部定义了一个内部类 Sync, 继承自 AQS, 通过这个内部类来实现线程阻塞, 下面我们就来看一看这个内部类的实现:
- private static final class Sync extends AbstractQueuedSynchronizer {
- private static final long serialVersionUID = 4982264981922014374L;
- /** 构造方法, 接收 count 值, 只有 count 减小为 0 时, 线程才不会被 await 方法阻塞 */
- Sync(int count) {
- // CountDownLatch 利用 AQS 的方式就是直接让 count 作为 AQS 的同步变量 state
- // 所以直接用 state 记录 count 值
- setState(count);
- }
- /** 获取当前的 count 值 */
- int getCount() {
- return getState();
- }
- /**
- * 这是 AQS 的模板方法 acquireShared,acquireSharedInterruptibly 等方法内部将会调用的方法,
- * 由子类实现, 这个方法的作用是尝试获取一次共享锁, 对于 AQS 来说,
- * 此方法返回值大于等于 0, 表示获取共享锁成功, 反之则获取共享锁失败,
- * 而在这里, 实际上就是判断 count 是否等于 0, 线程能否向下运行
- */
- protected int tryAcquireShared(int acquires) {
- // 此处判断 state 的值是否为 0, 也就是判断 count 是否为 0,
- // 若 count 为 0, 返回 1, 表示获取锁成功, 此时线程将不会阻塞, 正常运行
- // 若 count 不为 0, 则返回 - 1, 表示获取锁失败, 线程将会被阻塞
- // 从这里我们已经可以看出 CountDownLatch 的实现方式了
- return (getState() == 0) ? 1 : -1;
- }
- /**
- * 此方法的作用是用来是否 AQS 的共享锁, 返回 true 表示释放成功, 反之则失败
- * 此方法将会在 AQS 的模板方法 releaseShared 中被调用,
- * 在 CountDownLatch 中, 这个方法用来减小 count 值
- */
- protected boolean tryReleaseShared(int releases) {
- // 使用死循环不断尝试释放锁
- for (;;) {
- // 首先获取当前 state 的值, 也就是 count 值
- int c = getState();
- // 若 count 值已经等于 0, 则不能继续减小了, 于是直接返回 false
- // 为什么返回的是 false, 因为等于 0 表示之前等待的那些线程已经被唤醒了,
- // 若返回 true,AQS 会尝试唤醒线程, 若返回 false, 则直接结束, 所以
- // 在没有线程等待的情况下, 返回 false 直接结束是正确的
- if (c == 0)
- return false;
- // 若 count 不等于 0, 则将其 - 1
- int nextc = c-1;
- // compareAndSetState 的作用是将 count 值从 c, 修改为新的 nextc
- // 此方法基于 CAS 实现, 保证了操作的原子性
- if (compareAndSetState(c, nextc))
- // 若 nextc == 0, 则返回的是 true, 表示已经没有锁了, 线程可以运行了,
- // 若 nextc> 0, 则表示线程还需要继续阻塞, 此处将返回 false
- return nextc == 0;
- }
- }
- }
可以看到, 内部类 Sync 的实现非常简单, 它只实现了 AQS 中的两个方法, 即 tryAcquireShared 以及 tryReleaseShared, 这两个方法是 AQS 提供的使用共享锁的接口. 这也就表明, CountDownLatch 实际上是一种共享锁机制, 即锁可以同时被多个线程获取, 这个不难理解, 因为一旦 count 被减小为 0, 则所有线程通过 await 方法时, 都能够顺利通过, 不会因为获取不到锁而阻塞. 而且从上面的实现中我们可以看到, Sync 直接将 count 值作为 AQS 的 state 的值, 只有 state 的值为 0, 线程才能获取锁, 也就是获得执行权限.
2.4 CountDownLatch 的成员变量和构造方法
下面来看一看 CountDownLatch 的属性和构造方法:
- /**
- * 只有一个成员变量, 就是内部类 Sync 的一个对象, 通过此对象调用 AQS 的方法, 实现线程阻塞和唤醒
- */
- private final Sync sync;
- /**
- * 只有一个构造方法, 接收一个 count 值
- */
- public CountDownLatch(int count) {
- // count 值不能小于 0
- if (count < 0) throw new IllegalArgumentException("count < 0");
- // 直接创建一个 Sync 对象, 并传入 count 值, Sync 内部将会执行 setState(count)
- this.sync = new Sync(count);
- }
2.5 await 方法分析
CountDownLatch 类最最核心的两个方法就是 await 以及 ountDown, 我们先来看一看 await 方法的实现:
- // 此方法用来让当前线程阻塞, 直到 count 减小为 0 才恢复执行
- public void await() throws InterruptedException {
- // 这里直接调用 sync 的 acquireSharedInterruptibly 方法, 这个方法定义在 AQS 中
- // 方法的作用是尝试获取共享锁, 若获取失败, 则线程将会被加入到 AQS 的同步队列中等待
- // 直到获取成功为止. 且这个方法是会响应中断的, 线程在阻塞的过程中, 若被其他线程中断,
- // 则此方法会通过抛出异常的方式结束等待.
- sync.acquireSharedInterruptibly(1);
- }
await 的实现异常简单, 只有短短一行代码, 调用了 AQS 中已经封装好的方法. 这就是 AQS 的好处, AQS 已经实现了线程的阻塞和唤醒机制, 将实现的复杂性隐藏, 而其他类只需要简单的使用它即可. 为了方便理解, 我们还是来看看 acquireSharedInterruptibly 方法吧:
- /** 此方法是 AQS 中提供的一个模板方法, 用以获取共享锁, 并且会响应中断 */
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- // 首先判断当前线程释放被中断, 若被中断, 则直接抛出异常结束
- if (Thread.interrupted())
- throw new InterruptedException();
- // 调用 tryAcquireShared 方法尝试获取锁, 这个方法被 Sycn 类重写了,
- // 若 count == 0, 则这个方法会返回 1, 表示获取锁成功, 则这里会直接返回, 线程不会被阻塞
- // 若 count < 0, 将会执行下面的 doAcquireSharedInterruptibly 方法,
- // 此处请去查看 Sync 中 tryAcquireShared 方法的实现
- if (tryAcquireShared(arg) < 0)
- // 下面这个方法的作用是, 线程获取锁失败, 将会加入到 AQS 的同步队列中阻塞等待,
- // 直到成功获取到锁, 而此处成功获取到锁的条件就是 count == 0, 若当前线程在等待的过程中,
- // 成功地获取了锁, 则它会继续唤醒在它后面等待的线程, 也尝试获取锁,
- // 这也就是说, 只要 count == 0 了, 则所有被阻塞的线程都能恢复运行
- doAcquireSharedInterruptibly(arg);
- }
相信看到这里, 对 CountDownLatch 的实现原理已经有一个比较清晰的理解了. CountDownLatch 的实现完全就是依赖于 AQS 的, 所有再次提醒, 如果以上内容理解不了, 请先去学习 AQS.
2.6 countDown 方法分析
下面我们来分析 CountDownLatch 中另一个核心的方法 --countDown,
- /**
- * 此方法的作用就是将 count 的值 - 1, 如果 count 等于 0 了, 就唤醒等待的线程
- */
- public void countDown() {
- // 这里直接调用 sync 的 releaseShared 方法, 这个方法的实现在 AQS 中, 也是 AQS 提供的模板方法,
- // 这个方法的作用是当前线程释放锁, 若释放失败, 返回 false, 若释放成功, 则返回 false,
- // 若锁被释放成功, 则当前线程会唤醒 AQS 同步队列中第一个被阻塞的线程, 让他尝试获取锁
- // 对于 CountDownLatch 来说, 释放锁实际上就是让 count - 1, 只有当 count 被减小为 0,
- // 锁才是真正被释放, 线程才能继续向下运行
- sync.releaseShared(1);
- }
为了方便理解, 我们还是来看一看 AQS 中 releaseShared 方法的实现:
- public final boolean releaseShared(int arg) {
- // 调用 tryReleaseShared 尝试释放锁, 这个方法已经由 Sycn 重写, 请回顾上面对此方法的分析
- // 若 tryReleaseShared 返回 true, 表示 count 经过这次释放后, 等于 0 了, 于是执行 doReleaseShared
- if (tryReleaseShared(arg)) {
- // 这个方法的作用是唤醒 AQS 的同步队列中, 正在等待的第一个线程
- // 而我们分析 acquireSharedInterruptibly 方法时已经说过,
- // 若一个线程被唤醒, 检测到 count == 0, 会继续唤醒下一个等待的线程
- // 也就是说, 这个方法的作用是, 在 count == 0 时, 唤醒所有等待的线程
- doReleaseShared();
- return true;
- }
- return false;
- }
三, 总结
如果直接去看 CountDownLatch 的源码会发现, 它的实现真的非常简单, 包括注释在内, 总共 300 行代码, 除去注释, 连 100 行代码都不到. 因为它所作的工作, 除了重写 AQS 的两个方法外, 其余的基本上就是调用 AQS 提供的模板方法而已. 所以, 理解 CountDownLatch 的过程, 实际上是理解 AQS 的过程, 只要理解了 AQS, 看懂 CountDownLatch 的原理, 不需要 5 分钟. AQS 真的是 Java 并发中非常重要的一个组件, 很多类都是基于它实现的, 比如还有 ReentrantLock, 同时 AQS 也是面试中的常考点, 所以一定要好好研究. 最后再次推荐我之前编写的有关 AQS 的源码分析博客: 并发 -- 抽象队列同步器 AQS 的实现原理.
四, 参考
JDK1.8 源码
来源: https://www.cnblogs.com/tuyang1129/p/12692423.html