前言
CountDownLatch 是一个闭锁实现, 它可以使一个或者多个线程等待一组事件发生. 它包含一个计数器, 用来表示需要等待的事件数量, coutDown 方法用于表示一个事件发生, 计数器随之递减, 而 await 方法等待计数器为 0 之前一直阻塞. 它是基于 AQS 的共享锁来实现的, 其中使用了较多的 AQS 的方法, 所以在这之前最好阅读过 AQS 的源码, 不嫌弃也可以查看本人之前 AQS 的源码分析, 有些 AQS 方法没有在之前分析过的这里涉及到了会进行分析.
源码
我们先看它的属性和构造器,
- // Sync 为其内部类
- private final Sync sync;
- // 唯一的一个构造器
- // 构造参数 count 就是需要等待事件的数量
- public CountDownLatch(int count) {
- // 为了保证 count>= 0
- if (count <0) throw new IllegalArgumentException("count < 0");
- // 构造 sync
- this.sync = new Sync(count);
- }
现在来看内部类 Sync, 它继承了 AQS, 实现了共享锁方法, 下面来看其源码, 代码行数不多很好理解
- private static final class Sync extends AbstractQueuedSynchronizer {
- private static final long serialVersionUID = 4982264981922014374L;
- Sync(int count) {
- // setState 为 AQS 更改其 state 变量的方法
- // 将 AQS state 变量设置成 count
- setState(count);
- }
- int getCount() {
- // AQS 的获取 state 锁状态值
- return getState();
- }
- // 尝试获取共享锁
- protected int tryAcquireShared(int acquires) {
- // 返回 1 表示此时锁状态值为 0 表示锁已释放
- // -1 表示此时锁状态值大于 0, 表示出于锁定状态
- return (getState() == 0) ? 1 : -1;
- }
- // 尝试释放共享锁 (计数器递减 releases 次)
- protected boolean tryReleaseShared(int releases) {
- // Decrement count; signal when transition to zero
- // 等待锁状态值为 0 或者更改锁状态值成功
- for (;;) {
- // 将 state 赋值给变量 c
- int c = getState();
- if (c == 0)
- // 此时锁已清除
- return false;
- // 递减
- int nextc = c-1;
- // 比较 state 的状态值是否等于 C, 等于将 state 状态值改为 nextc
- if (compareAndSetState(c, nextc))
- // 更改成功后, 如果 nextc 为 0 则返回 true
- return nextc == 0;
- }
- }
- }
await 方法
await 方法就是当 state 状态值不为 0 时将当前线程阻塞, 然后等待唤醒
- public void await() throws InterruptedException {
- // 调用的 AQS 获取共享锁可中断方法
- sync.acquireSharedInterruptibly(1);
- }
我们来看看 AQS 的 acquireSharedInterruptibly 方法
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- // 此方法调用的是 CountDownLatch 内部类 Sync 的方法
- // 如果锁状态不为 0, 则执行 doAcquireSharedInterruptibly 方法
- if (tryAcquireShared(arg) < 0)
- doAcquireSharedInterruptibly(arg);
- }
doAcquireSharedInterruptibly 方法也是由 AQS 实现的
- private void doAcquireSharedInterruptibly(int arg)
- throws InterruptedException {
- // 添加一个共享锁节点到队列
- final Node node = addWaiter(Node.SHARED);
- boolean failed = true;
- try {
- // 直到线程被唤醒或者线程被中断时跳出循环
- for (;;) {
- // node 节点的前驱节点
- final Node p = node.predecessor();
- if (p == head) {
- // 调用 CountDownLatch 内部类 Sync 的方法
- // 如果锁状态值为 0, 则返回值大于 0
- int r = tryAcquireShared(arg);
- if (r>= 0) {
- // 当锁状态值为 0, 开始将 note 节点设置为头节点并唤醒后继节点
- // 也就是队列不断的出列, 然后唤醒后继节点, 后继节点被唤醒后由于前驱节点被设置成头节点, 又会调用该方法进行后继节点的唤醒
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- failed = false;
- return;
- }
- }
- /*
- shouldParkAfterFailedAcquire 用于清除已中断 / 或者取消的线程以及判断此次循环是否需要挂起线程
- parkAndCheckInterrupt 挂机当前线程
- shouldParkAfterFailedAcquire 和 parkAndCheckInterrupt 在 AQS 之前博文里分析过这里就不再分析了
- */
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- throw new InterruptedException();
- }
- } finally {
- if (failed)
- // 表示当前线程中断, 取消获取锁
- // 之前分析过, 略过源码分析
- cancelAcquire(node);
- }
- }
setHeadAndPropagate 方法, 主要作用是唤醒后继节点线程
- private void setHeadAndPropagate(Node node, int propagate) {
- Node h = head;
- // 当前节点设置为头节点, 节点关联的线程设置为空
- setHead(node)
- if (propagate> 0 || h == null || h.waitStatus < 0 ||
- (h = head) == null || h.waitStatus < 0) {
- Node s = node.next;
- if (s == null || s.isShared())
- // 节点等待状态为 signal 时, 唤醒后继节点线程
- doReleaseShared();
- }
- }
doReleaseShared 很巧妙, 当当前节点等待状态为 signal 时, 唤醒后继节点线程
- private void doReleaseShared() {
- for (;;) {
- Node h = head;
- if (h != null && h != tail) {
- int ws = h.waitStatus;
- // 当前线程等待状态为 signal 时表示后继节点需要唤醒
- if (ws == Node.SIGNAL) {
- if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
- // 表示 h 节点的状态替换失败, 会再次循环判断 h 节点的状态
- continue; // loop to recheck cases
- // 唤醒后继节点
- unparkSuccessor(h);
- }
- // 状态为 0 时, 将其改成 PROPAGATE, 更改失败会再次循环判断 h 节点的状态
- // 这种情况发生在一个线程调用 await 方法, 节点的等待状态还是初始值 0 未来得及被修改, 刚好 state 被置为 0 然后调用了 doReleaseShared 方法
- else if (ws == 0 &&
- !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
- continue; // loop on failed CAS
- }
- if (h == head) // loop if head changed
- break;
- }
- }
countDown 方法
countDown 方法递减 state 值, 当值为 0 时, 依次唤醒等待的线程
- public void countDown() {
- // 递减一次 state 值, 知道 state 为 0 时唤醒等待中的线程
- sync.releaseShared(1);
- }
- public final boolean releaseShared(int arg) {
- // 尝试将 state 减去 arg
- if (tryReleaseShared(arg)) {
- // state 为 0 时唤醒线程
- doReleaseShared();
- return true;
- }
- return false;
- }
到此分析完毕.
总结
通过源码知道 CountDownLatch 不能像 CyclicBarrier 那样使用完毕后还可以复用;
CountDownLatch 是通过共享锁来实现的, 它的构造参数就是 AQS state 的值;
由于内部类继承了 AQS, 所以它内部也是 FIFO 队列, 同时也一样是前驱节点唤醒后继节点.
来源: https://www.cnblogs.com/d-homme/p/9375105.html