概述
CountDownLatch 是并发包中的一个工具类, 它的典型应用场景为: 一个线程等待几个线程执行, 待这几个线程结束后, 该线程再继续执行.
简单起见, 可以把它理解为一个倒数的计数器: 初始值为线程数, 每个线程结束时执行减 1 操作, 当计数器减到 0 时等待的线程再继续执行.
代码分析
CountDownLatch 的类签名和主要方法如下:
public class CountDownLatch {}
常用方法为: await(),await(long, TimeUnit) 和 countDown. 其中两个 await 都是让当前线程进入等待状态(获取资源失败); 而 countDown 方法是将计数器减去 1, 当计数器为 0 的时候, 那些处于等待状态的线程会继续执行(获取资源成功).
构造器代码如下:
- private final Sync sync;
- public CountDownLatch(int count) {
- if (count <0) throw new IllegalArgumentException("count < 0");
- this.sync = new Sync(count);
- }
构造器 (该构造器是唯一的) 传入一个正整数, 且初始化了 sync 变量, Sync 是内部的一个嵌套类, 继承自 AQS.
- await / await(long, TimeUnit):
- public void await() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
- public boolean await(long timeout, TimeUnit unit)
- throws InterruptedException {
- return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
- }
- countDown:
- public void countDown() {
- sync.releaseShared(1);
- }
其中, acquireSharedInterruptibly,tryAcquireSharedNanos 和 releaseShared 都是 AQS 中「共享模式」的方法, 具体代码可参考前文「JDK 源码分析 - AbstractQueuedSynchronizer(3)」的分析.
嵌套类 Sync 代码如下:
- private static final class Sync extends AbstractQueuedSynchronizer {
- private static final long serialVersionUID = 4982264981922014374L;
- // 构造器, 初始化 AQS 的 state 变量
- Sync(int count) {
- setState(count);
- }
- int getCount() {
- return getState();
- }
- // 尝试获取资源的操作
- // 只有当 state 变量为 0 的时候才能获取成功(返回 1)
- protected int tryAcquireShared(int acquires) {
- return (getState() == 0) ? 1 : -1;
- }
- // 尝试释放资源的操作
- protected boolean tryReleaseShared(int releases) {
- // Decrement count; signal when transition to zero
- for (;;) {
- int c = getState();
- if (c == 0)
- return false;
- // 该操作就是尝试把 state 变量减去 1
- int nextc = c-1;
- if (compareAndSetState(c, nextc))
- return nextc == 0;
- }
- }
- }
Sync 继承了 AQS 抽象类, 根据 AQS 可知, acquireSharedInterruptibly 和 tryAcquireSharedNanos 方法的实现都调用了 tryAcquireShared.
流程说明: 通常先把 CountDownLatch 的计数器 (state) 初始化为 N, 执行 wait 操作就是尝试以共享模式获取资源, 而每次 countDown 操作就是将 N 减去 1, 只有当 N 减到 0 的时候, 才能获取成功(tryAcquireShared 方法), 然后继续执行.
场景举例
为便于理解该类的用法, 举两个简单的例子来说明它的使用场景.
场景 1: 一个线程等待多个线程执行完之后再继续执行
- public void test() throws InterruptedException {
- int count = 5;
- // CountDownLatch 的初始化计数器为 5
- // 注意线程数和计数器保持一致
- CountDownLatch countDownLatch = new CountDownLatch(count);
- for (int i = 0; i < count; i++) {
- int finalI = i;
- new Thread(() -> {
- try {
- TimeUnit.SECONDS.sleep(finalI);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(Thread.currentThread().getName() + "is working ..");
- // 每个线程执行结束时执行 countDown
- countDownLatch.countDown();
- }).start();
- }
- // 主线程进入等待状态(尝试获取资源, 成功后才能继续执行)
- countDownLatch.await();
- System.out.println(Thread.currentThread().getName() + "go on ..");
- }
- /* 输出结果:
- Thread-0 is working ..
- Thread-1 is working ..
- Thread-2 is working ..
- Thread-3 is working ..
- Thread-4 is working ..
- main go on ..
- */
场景 2: 一个线程到达指定条件后, 通知另一个线程
- private static volatile List<Integer> list = new ArrayList<>();
- private static void test() {
- CountDownLatch countDownLatch = new CountDownLatch(1);
- new Thread(() -> {
- if (list.size() != 5) {
- try {
- // list 的大小为 5 时再继续执行, 否则等待
- // 等待 state 减到 0
- countDownLatch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- System.out.println(Thread.currentThread().getName() + "start..");
- }).start();
- new Thread(() -> {
- for (int i = 0; i < 10; i++) {
- list.add(i);
- System.out.println(Thread.currentThread().getName() + "add" + i);
- if (list.size() == 5) {
- // 满足条件时将 state 减 1
- countDownLatch.countDown();
- }
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).start();
- }
- /* 输出结果:
- Thread-1 add 0
- Thread-1 add 1
- Thread-1 add 2
- Thread-1 add 3
- Thread-1 add 4
- Thread-0 start..
- Thread-1 add 5
- Thread-1 add 6
- Thread-1 add 7
- Thread-1 add 8
- Thread-1 add 9
- */
小结
CountDownLatch 可以理解为一个倒数的计数器, 它的典型应用场景就是一个线程等待几个线程执行结束后再继续执行. 其内部是基于 AQS 的共享模式实现的.
来源: https://www.cnblogs.com/jaxer/p/11317908.html