此篇博客所有源码均来自 JDK 1.8
J.U.C 之 AQS 传送门
[死磕 Java 并发] --J.U.C 之 AQS(一篇就够了) https://mp.weixin.qq.com/s?__biz=MzU2NjIzNDk5NQ==&mid=2247484289&idx=1&sn=56ad0dd81fb2b4e4f86c5aaaaf3164a6&scene=21#wechat_redirect , 作为同步组件的基础, AQS 做了太多的工作, 自定义同步组件只需要简单地实现自定义方法, 然后加上 AQS 提供的模板方法, 就可以实现强大的自定义同步组件, 理解了 AQS, 其他理解起来就容易很多了.
CountDownLatch 介绍
在上篇博客中介绍了 Java 四大并发工具之一的 CyclicBarrier, 今天要介绍的 CountDownLatch 与 CyclicBarrier 有点儿相似.
CyclicBarrier 所描述的是 "允许一组线程互相等待, 直到到达某个公共屏障点, 才会进行后续任务", 而 CountDownLatch 所描述的是 "在完成一组正在其他线程中执行的操作之前, 它允许一个或多个线程一直等待". 在 API 中是这样描述的:
用给定的计数 初始化 CountDownLatch. 由于调用了 countDown() 方法, 所以在当前计数到达零之前, await 方法会一直受阻塞. 之后, 会释放所有等待的线程, await 的所有后续调用都将立即返回. 这种现象只出现一次 -- 计数无法被重置. 如果需要重置计数, 请考虑使用 CyclicBarrier.
CountDownLatch 是通过一个计数器来实现的, 当我们在 new 一个 CountDownLatch 对象的时候需要带入该计数器值, 该值就表示了线程的数量. 每当一个线程完成自己的任务后, 计数器的值就会减 1. 当计数器的值变为 0 时, 就表示所有的线程均已经完成了任务, 然后就可以恢复等待的线程继续执行了.
虽然, CountDownlatch 与 CyclicBarrier 有那么点相似, 但是他们还是存在一些区别的:
CountDownLatch 的作用是允许 1 或 N 个线程等待其他线程完成执行; 而 CyclicBarrier 则是允许 N 个线程相互等待
CountDownLatch 的计数器无法被重置; CyclicBarrier 的计数器可以被重置后使用, 因此它被称为是循环的 barrier
实现分析
CountDownLatch 结构如下
通过上面的结构图我们可以看到, CountDownLatch 内部依赖 Sync 实现, 而 Sync 继承 AQS.CountDownLatch 仅提供了一个构造方法:
CountDownLatch(int count) : 构造一个用给定计数初始化的 CountDownLatch
- public CountDownLatch(int count) {
- if (count <0) throw new IllegalArgumentException("count < 0");
- this.sync = new Sync(count);
- }
sync 为 CountDownLatch 的一个内部类, 其定义如下:
- private static final class Sync extends AbstractQueuedSynchronizer {
- private static final long serialVersionUID = 4982264981922014374L;
- Sync(int count) {
- setState(count);
- }
- // 获取同步状态
- int getCount() {
- return getState();
- }
- // 获取同步状态
- protected int tryAcquireShared(int acquires) {
- return (getState() == 0) ? 1 : -1;
- }
- // 释放同步状态
- protected boolean tryReleaseShared(int releases) {
- for (;;) {
- int c = getState();
- if (c == 0)
- return false;
- int nextc = c-1;
- if (compareAndSetState(c, nextc))
- return nextc == 0;
- }
- }
- }
通过这个内部类 Sync 我们可以清楚地看到 CountDownLatch 是采用共享锁来实现的.
await()
CountDownLatch 提供 await() 方法来使当前线程在锁存器倒计数至零之前一直等待, 除非线程被中断, 定义如下:
- public void await() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
await 其内部使用 AQS 的 acquireSharedInterruptibly(int arg):
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- if (tryAcquireShared(arg) < 0)
- doAcquireSharedInterruptibly(arg);
- }
在内部类 Sync 中重写了 tryAcquireShared(int arg) 方法:
- protected int tryAcquireShared(int acquires) {
- return (getState() == 0) ? 1 : -1;
- }
getState() 获取同步状态, 其值等于计数器的值, 从这里我们可以看到如果计数器值不等于 0, 则会调用 doAcquireSharedInterruptibly(int arg), 该方法为一个自旋方法会尝试一直去获取同步状态:
- private void doAcquireSharedInterruptibly(int arg)
- throws InterruptedException {
- final Node node = addWaiter(Node.SHARED);
- boolean failed = true;
- try {
- for (;;) {
- final Node p = node.predecessor();
- if (p == head) {
/**
* 对于 CountDownLatch 而言, 如果计数器值不等于 0, 那么 r 会一直小于 0
*/
- int r = tryAcquireShared(arg);
- if (r>= 0) {
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- failed = false;
- return;
- }
- }
- // 等待
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- throw new InterruptedException();
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
- countDown()
CountDownLatch 提供 countDown() 方法递减锁存器的计数, 如果计数到达零, 则释放所有等待的线程.
- public void countDown() {
- sync.releaseShared(1);
- }
内部调用 AQS 的 releaseShared(int arg) 方法来释放共享锁同步状态:
- public final boolean releaseShared(int arg) {
- if (tryReleaseShared(arg)) {
- doReleaseShared();
- return true;
- }
- return false;
- }
tryReleaseShared(int arg) 方法被 CountDownLatch 的内部类 Sync 重写:
- protected boolean tryReleaseShared(int releases) {
- for (;;) {
- // 获取锁状态
- int c = getState();
- //c == 0 直接返回, 释放锁成功
- if (c == 0)
- return false;
- // 计算新 "锁计数器"
- int nextc = c-1;
- // 更新锁状态 (计数器)
- if (compareAndSetState(c, nextc))
- return nextc == 0;
- }
- }
总结
CountDownLatch 内部通过共享锁实现. 在创建 CountDownLatch 实例时, 需要传递一个 int 型的参数: count, 该参数为计数器的初始值, 也可以理解为该共享锁可以获取的总次数. 当某个线程调用 await() 方法, 程序首先判断 count 的值是否为 0, 如果不会 0 的话则会一直等待直到为 0 为止. 当其他线程调用 countDown() 方法时, 则执行释放共享锁状态, 使 count 值 - 1. 当在创建 CountDownLatch 时初始化的 count 参数, 必须要有 count 线程调用 countDown 方法才会使计数器 count 等于 0, 锁才会释放, 前面等待的线程才会继续运行. 注意 CountDownLatch 不能回滚重置.
关于共享锁的请参考: [死磕 Java 并发] --J.U.C 之 AQS: 同步状态的获取与释放 https://mp.weixin.qq.com/s?__biz=MzU2NjIzNDk5NQ==&mid=2247484289&idx=1&sn=56ad0dd81fb2b4e4f86c5aaaaf3164a6&scene=21#wechat_redirect
应用示例
示例仍然使用开会案例. 老板进入会议室等待 5 个人全部到达会议室才会开会. 所以这里有两个线程老板等待开会线程, 员工到达会议室:
- public class CountDownLatchTest {
- private static CountDownLatch countDownLatch = new CountDownLatch(5);
- /**
- * Boss 线程, 等待员工到达开会
- */
- static class BossThread extends Thread{
- @Override
- public void run() {
- System.out.println("Boss 在会议室等待, 总共有" + countDownLatch.getCount() + "个人开会...");
- try {
- //Boss 等待
- countDownLatch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("所有人都已经到齐了, 开会吧...");
- }
- }
- // 员工到达会议室
- static class EmpleoyeeThread extends Thread{
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getName() + ", 到达会议室....");
- // 员工到达会议室 count - 1
- countDownLatch.countDown();
- }
- }
- public static void main(String[] args){
- //Boss 线程启动
- new BossThread().start();
- for(int i = 0 ; i < countDownLatch.getCount() ; i++){
- new EmpleoyeeThread().start();
- }
- }
- }
运行结果:
来源: http://www.tuicool.com/articles/JbIVneR