引言
上一篇文章中详细分析了基于 AQS 的 ReentrantLock 原理, ReentrantLock 通过 AQS 中的 state 变量 0 和 1 之间的转换代表了独占锁. 那么可以思考一下, 当 state 变量大于 1 时代表了什么? J.U.C 中是否有基于 AQS 的这种实现呢? 如果有, 那他们都是怎么实现的呢? 这些疑问通过详细分析 J.U.C 中的 Semaphore 与 CountDownLatch 类后, 将会得到解答.
Semaphore 与 CountDownLatch 的共享逻辑
Semaphore 与 CountDownLatch 的使用示例
2.1 Semaphore 的使用
2.2 CountDownLatch 的使用
源码分析
3.1 AQS 中共享锁的实现
3.2 Semaphore 源码分析
3.3 CountDownLatch 源码分析
总结
1. Semaphore 与 CountDownLatch 的共享方式
独占锁意味着只能有一个线程获取锁, 其他的线程在锁被占用的情况下都必须等待锁释放后才能进行下一步操作. 由此类推, 共享锁是否意味着可以由多个线程同时使用这个锁, 不需要等待呢? 如果是这样, 那锁的意义也就不存在了. 在 J.U.C 中共享意味着有多个线程可以同时获取锁, 但这个多个是有限制的, 并不是无限个, J.U.C 中通过 Semaphore 与 CountDownLatch 来分别实现了两种有限共享锁.
Semaphore 又叫信号量, 他通过一个共享的'信号包'来给每个使用他的线程来分配信号, 当信号包中的信号足够时, 线程可以获取锁, 反之, 信号包中信号不够了, 则不能获取到锁, 需要等待足够的信号被释放, 才能获取.
CountDownLatch 又叫计数器, 他通过一个共享的计数总量来控制线程锁的获取, 当计数器总量大于 0 时, 线程将被阻塞, 不能够获取锁, 只有当计数器总量为 0 时, 所有被阻塞的线程同时被释放.
可以看到 Semaphore 与 CountDownLatch 都有一个共享总量, 这个共享总量就是通过 state 来实现的.
2. Semaphore 与 CountDownLatch 的使用示例
在详细分析 Semaphore 与 CountDownLatch 的原理之前, 先来看看他们是怎么使用的, 这样方便后续我们理解他们的原理. 先知道他是什么? 然后再问为什么? 下面通过两个示例来详细说明 Semaphore 与 CountDownLatch 的使用.
2.1 Semaphore 的使用
- // 初始化 10 个信号量在信号包中, 让 ABCD4 个线程分别去获取
- public static void main(String[] args) throws InterruptedException {
- Semaphore semaphore = new Semaphore(10);
- SemaphoreTest(semaphore);
- }
- private static void SemaphoreTest(final Semaphore semaphore) throws InterruptedException {
- // 线程 A 初始获取了 4 个信号量, 然后分 3 次释放了这 4 个信号量
- Thread threadA = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- semaphore.acquire(4);
- System.out.println(Thread.currentThread().getName() + "get 4 semaphore");
- Thread.sleep(2000);
- System.out.println(Thread.currentThread().getName() + "release 1 semaphore");
- semaphore.release(1);
- Thread.sleep(2000);
- System.out.println(Thread.currentThread().getName() + "release 1 semaphore");
- semaphore.release(1);
- Thread.sleep(2000);
- System.out.println(Thread.currentThread().getName() + "release 2 semaphore");
- semaphore.release(2);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- threadA.setName("threadA");
- // 线程 B 初始获取了 5 个信号量, 然后分 2 次释放了这 5 个信号量
- Thread threadB = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- semaphore.acquire(5);
- System.out.println(Thread.currentThread().getName() + "get 5 semaphore");
- Thread.sleep(2000);
- System.out.println(Thread.currentThread().getName() + "release 2 semaphore");
- semaphore.release(2);
- Thread.sleep(2000);
- System.out.println(Thread.currentThread().getName() + "release 3 semaphore");
- semaphore.release(3);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- threadB.setName("threadB");
- // 线程 C 初始获取了 4 个信号量, 然后分 1 次释放了这 4 个信号量
- Thread threadC = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- semaphore.acquire(4);
- System.out.println(Thread.currentThread().getName() + "get 4 semaphore");
- Thread.sleep(1000);
- System.out.println(Thread.currentThread().getName() + "release 4 semaphore");
- semaphore.release(4);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- threadC.setName("threadC");
- // 线程 D 初始获取了 10 个信号量, 然后分 1 次释放了这 10 个信号量
- Thread threadD = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- semaphore.acquire(10);
- System.out.println(Thread.currentThread().getName() + "get 10 semaphore");
- Thread.sleep(1000);
- System.out.println(Thread.currentThread().getName() + "release 10 semaphore");
- semaphore.release(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- threadD.setName("threadD");
- // 线程 A 和线程 B 首先分别获取了 4 个和 5 个信号量, 总信号量变为了 1 个
- threadA.start();
- threadB.start();
- Thread.sleep(1);
- // 线程 C 尝试获取 4 个发现不够则等待
- threadC.start();
- Thread.sleep(1);
- // 线程 D 尝试获取 10 个发现不够则等待
- threadD.start();
- }
复制代码
执行结果如下:
- threadB get 5 semaphore
- threadA get 4 semaphore
- threadA release 1 semaphore
- threadB release 2 semaphore
- threadC get 4 semaphore
- threadA release 1 semaphore
- threadC release 4 semaphore
- threadB release 3 semaphore
- threadA release 2 semaphore
- threadD get 10 semaphore
- threadD release 10 semaphore
复制代码
可以看到 threadA 和 threadB 在获取了 9 个信号量之后 threadC 和 threadD 之后等待信号量足够时才能继续往下执行. 而 threadA 和 threadB 在信号量足够时是可以同时执行的.
其中有一个问题, 当 threadD 排队在 threadC 之前时, 信号量如果被释放了 4 个, threadC 会先于 threadD 执行吗? 还是需要排队等待呢? 这个疑问在详细分析了 Semaphore 的源码之后再来给大家答案.
2.2 CountDownLatch 的使用
- // 初始化计数器总量为 2
- public static void main(String[] args) throws InterruptedException {
- CountDownLatch countDownLatch = new CountDownLatch(2);
- CountDownLatchTest(countDownLatch);
- }
- private static void CountDownLatchTest(final CountDownLatch countDownLatch) throws InterruptedException {
- //threadA 尝试执行, 计数器为 2 被阻塞
- Thread threadA = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- countDownLatch.await();
- System.out.println(Thread.currentThread().getName() + "await");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- threadA.setName("threadA");
- //threadB 尝试执行, 计数器为 2 被阻塞
- Thread threadB = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- countDownLatch.await();
- System.out.println(Thread.currentThread().getName() + "await");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- threadB.setName("threadB");
- //threadC 在 1 秒后将计数器数量减 1
- Thread threadC = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(1000);
- countDownLatch.countDown();
- System.out.println(Thread.currentThread().getName() + "countDown");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- threadC.setName("threadC");
- //threadD 在 5 秒后将计数器数量减 1
- Thread threadD = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(5000);
- countDownLatch.countDown();
- System.out.println(Thread.currentThread().getName() + "countDown");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- threadD.setName("threadD");
- threadA.start();
- threadB.start();
- threadC.start();
- threadD.start();
- }
复制代码
执行结果如下:
- threadC countDown
- threadD countDown
- threadA await
- threadB await
复制代码
threadA 和 threadB 在尝试执行时由于计数器总量为 2 被阻塞, 当 threadC 和 threadD 将计数器总量减为 0 后, threadA 和 threadB 同时开始执行.
总结一下: Semaphore 就像旋转寿司店, 共有 10 个座位, 当座位有空余时, 等待的人就可以坐上去. 如果有只有 2 个空位, 来的是一家 3 口, 那就只有等待. 如果来的是一对情侣, 就可以直接坐上去吃. 当然如果同时空出 5 个空位, 那一家 3 口和一对情侣可以同时上去吃. CountDownLatch 就像大型商场里面的临时游乐场, 每一场游乐的时间过后等待的人同时进场玩, 而一场中间会有不爱玩了的人随时出来, 但不能进入, 一旦所有进入的人都出来了, 新一批人就可以同时进场.
3. 源码分析
明白了 Semaphore 与 CountDownLatch 是做什么的, 怎么使用的. 接下来就来看看 Semaphore 与 CountDownLatch 底层时怎么实现这些功能的.
3.1 AQS 中共享锁的实现
上篇文章通过对 ReentrantLock 的分析, 得倒了 AQS 中实现独占锁的几个关键方法:
- // 状态量, 独占锁在 0 和 1 之间切换
- private volatile int state;
- // 调用 tryAcquire 获取锁, 获取失败后加入队列中挂起等操作, AQS 中实现
- public final void acquire(int arg);
- // 独占模式下尝试获取锁, ReentrantLock 中实现
- protected boolean tryAcquire(int arg);
- // 调用 tryRelease 释放锁以及恢复线程等操作, AQS 中实现
- public final boolean release(int arg);
- // 独占模式下尝试释放锁, ReentrantLock 中实现
- protected boolean tryRelease(int arg);
复制代码
其中具体的获取和释放独占锁的逻辑都放在 ReentrantLock 中自己实现, AQS 中负责管理获取或释放独占锁成功失败后需要具体处理的逻辑. 那么共享锁的实现是否也是遵循这个规律呢? 由此我们在 AQS 中发现了以下几个类似的方法:
- // 调用 tryAcquireShared 获取锁, 获取失败后加入队列中挂起等操作, AQS 中实现
- public final void acquireShared(int arg);
- // 共享模式下尝试获取锁
- protected int tryAcquireShared(int arg);
- // 调用 tryReleaseShared 释放锁以及恢复线程等操作, AQS 中实现
- public final boolean releaseShared(int arg);
- // 共享模式下尝试释放锁
- protected boolean tryReleaseShared(int arg);
复制代码
共享锁和核心就在上面 4 个关键方法中, 先来看看 Semaphore 是怎么调用上述方法来实现共享锁的.
3.2 Semaphore 源码分析
首先是 Semaphore 的构造方法, 同 ReentrantLock 一样, 他有两个构造方法, 这样也是为了实现公平共享锁和非公平共享锁, 大家可能有疑问, 既然是共享锁, 为什么还分公平和非公平的呢? 这就回到了上面那个例子后面的疑问, 前面有等待的线程时, 后来的线程是否可以直接获取信号量, 还是一定要排队. 等待当然是公平的, 插队就是非公平的.
还是用旋转寿司的例子来说: 现在只有 2 个空位, 已经有一家 3 口在等待, 这时来了一对情侣, 公平共享锁的实现就是这对情侣必须等待, 只到一家 3 口上桌之后才轮到他们, 而非公平共享锁的实现是可以让这对情况直接去吃, 因为刚好有 2 个空位, 让一家 3 口继续等待 (好像是很不公平......), 这种情况下非公平共享锁的好处就是可以最大化寿司店的利润 (好像同时也得罪了等待的顾客......), 也是 Semaphore 默认的实现方式.
- public Semaphore(int permits) {
- sync = new NonfairSync(permits);
- }
- public Semaphore(int permits, boolean fair) {
- sync = fair ? new FairSync(permits) : new NonfairSync(permits);
- }
复制代码
Semaphore 的例子中使用了两个核心方法 acquire 和 release, 分别调用了 AQS 中的 acquireSharedInterruptibly 和 releaseShared 方法:
- // 获取 permits 个信号量
- public void acquire(int permits) throws InterruptedException {
- if (permits <0) throw new IllegalArgumentException();
- sync.acquireSharedInterruptibly(permits);
- }
- // 释放 permits 个信号量
- public void release(int permits) {
- if (permits < 0) throw new IllegalArgumentException();
- sync.releaseShared(permits);
- }
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- if (tryAcquireShared(arg) < 0) // 尝试获取 arg 个信号量
- doAcquireSharedInterruptibly(arg); // 获取信号量失败时排队挂起
- }
- public final boolean releaseShared(int arg) {
- if (tryReleaseShared(arg)) { // 尝试释放 arg 个信号量
- doReleaseShared();
- return true;
- }
- return false;
- }
复制代码
Semaphore 在获取和释放信号量的流程都是通过 AQS 来实现, 具体怎么算获取成功或释放成功则由 Semaphore 本身实现.
- // 公平共享锁尝试获取 acquires 个信号量
- protected int tryAcquireShared(int acquires) {
- for (;;) {
- if (hasQueuedPredecessors()) // 前面是否有排队, 有则返回获取失败
- return -1;
- int available = getState(); // 剩余的信号量 (旋转寿司店剩余的座位)
- int remaining = available - acquires;
- if (remaining < 0 ||
- compareAndSetState(available, remaining)) // 剩余信号量不够, 够的情况下尝试获取 (旋转寿司店座位不够, 或者同时来两对情况抢座位)
- return remaining;
- }
- }
- // 非公平共享锁尝试获取 acquires 个信号量
- final int nonfairTryAcquireShared(int acquires) {
- for (;;) {
- int available = getState(); // 剩余的信号量 (旋转寿司店剩余的座位)
- int remaining = available - acquires;
- if (remaining < 0 ||
- compareAndSetState(available, remaining)) // 剩余信号量不够, 够的情况下尝试获取 (旋转寿司店座位不够, 或者同时来两对情侣抢座位)
- return remaining;
- }
- }
复制代码
可以看到公平共享锁和非公平共享锁的区别就在是否需要判断队列中是否有已经等待的线程. 公平共享锁需要先判断, 非公平共享锁直接插队, 尽管前面已经有线程在等待.
为了验证这个结论, 稍微修改下上面的示例:
- threadA.start();
- threadB.start();
- Thread.sleep(1);
- threadD.start(); //threadD 已经在排队
- Thread.sleep(3500);
- threadC.start(); //3500 毫秒后 threadC 来插队
复制代码
结果输出:
- threadB get 5 semaphore
- threadA get 4 semaphore
- threadB release 2 semaphore
- threadA release 1 semaphore
- threadC get 4 semaphore //threadC 先与 threadD 获取到信号量
- threadA release 1 semaphore
- threadB release 3 semaphore
- threadC release 4 semaphore
- threadA release 2 semaphore
- threadD get 10 semaphore
- threadD release 10 semaphore
复制代码
这个示例很好的说明了当为非公平锁时会先尝试获取共享锁, 然后才排队.
当获取信号量失败之后会去排队, 排队这个操作通过 AQS 中的 doAcquireSharedInterruptibly 方法来实现:
- private void doAcquireSharedInterruptibly(int arg)
- throws InterruptedException {
- final Node node = addWaiter(Node.SHARED); // 加入等待队列
- boolean failed = true;
- try {
- for (;;) {
- final Node p = node.predecessor(); // 获取当前节点的前置节点
- if (p == head) {
- int r = tryAcquireShared(arg); // 前置节点是头节点时, 说明当前节点是第一个挂起的线程节点, 再次尝试获取共享锁
- if (r>= 0) {
- setHeadAndPropagate(node, r); // 与 ReentrantLock 不同的地方: 获取共享锁成功设置头节点, 同时通知下一个节点
- p.next = null; // help GC
- failed = false;
- return;
- }
- }
- if (shouldParkAfterFailedAcquire(p, node) && // 非头节点或者获取锁失败, 检查节点状态, 查看是否需要挂起线程
- parkAndCheckInterrupt()) // 挂起线程, 当前线程阻塞在这里!
- throw new InterruptedException();
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
复制代码
这一段代码和 ReentrantLock 中的 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法基本一样, 说下两个不同的地方. 一是加入等待队列时这里加入的是 Node.SHARED 类型的节点. 二是获取锁成功后会通知下一个节点, 也就是唤醒下一个线程. 以旋转寿司店的例子为例, 前面同时走了 5 个客人, 空余 5 个座位, 一家 3 口坐进去之后会告诉后面的一对情侣, 让他们也坐进去, 这样就达到了共享的目的. shouldParkAfterFailedAcquire 和 parkAndCheckInterrupt 方法在上一篇文章中都有详细说明, 这里就做解释了.
再来看看 releaseShared 方法时怎么释放信号量的, 首先调用 tryReleaseShared 来尝试释放信号量, 释放成功后调用 doReleaseShared 来判断是否需要唤醒后继线程:
- protected final boolean tryReleaseShared(int releases) {
- for (;;) {
- int current = getState();
- int next = current + releases;
- if (next <current) // overflow // 释放信号量过多
- throw new Error("Maximum permit count exceeded");
- if (compareAndSetState(current, next)) //cas 操作设置新的信号量
- return true;
- }
- }
- private void doReleaseShared() {
- for (;;) {
- Node h = head;
- if (h != null && h != tail) {
- int ws = h.waitStatus;
- if (ws == Node.SIGNAL) { //SIGNAL 状态下唤醒后继节点
- if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
- continue; // loop to recheck cases
- unparkSuccessor(h); // 唤醒后继节点
- }
- else if (ws == 0 &&
- !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
- continue; // loop on failed CAS
- }
- if (h == head) // loop if head changed
- break;
- }
- }
复制代码
释放的逻辑很好理解, 相比 ReentrantLock 只是在 state 的数量上有点差别.
3.3 CountDownLatch 源码分析
CountDownLatch 相比 Semaphore 在实现逻辑上要简单的多, 同时他也没有公平和非公平的区别, 因为当计数器达到 0 的时候, 所有等待的线程都会释放, 不为 0 的时候, 所有等待的线程都会阻塞. 直接来看看 CountDownLatch 的两个核心方法 await 和 countDown.
- public void await() throws InterruptedException {
- // 和 Semaphore 的不同在于参数为 1, 其实这个参数对 CountDownLatch 来说没什么意义, 因为后面 CountDownLatch 的 tryAcquireShared 实现是通过 getState() == 0 来判断的
- sync.acquireSharedInterruptibly(1);
- }
- public boolean await(long timeout, TimeUnit unit)
- throws InterruptedException {
- // 这里加入了一个等待超时控制, 超过时间后直接返回 false 执行后面的代码, 不会长时间阻塞
- return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
- }
- public void countDown() {
- sync.releaseShared(1); // 每次释放 1 个计数
- }
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- if (tryAcquireShared(arg) < 0) // 尝试获取 arg 个信号量
- doAcquireSharedInterruptibly(arg); // 获取信号量失败时排队挂起
- }
- protected int tryAcquireShared(int acquires) {
- return (getState() == 0) ? 1 : -1; // 奠定了同时获取锁的基础, 无论 State 初始为多少, 只能计数等于 0 时触发
- }
复制代码
和 Semaphore 区别有两个, 一是 State 每次只减少 1, 同时只有为 0 时才释放所有等待线程. 二是提供了一个超时等待方法. acquireSharedInterruptibly 方法跟 Semaphore 一样, 就不细说了, 这里重点说下 tryAcquireSharedNanos 方法.
- public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
- throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- return tryAcquireShared(arg)>= 0 ||
- doAcquireSharedNanos(arg, nanosTimeout);
- }
- // 最小自旋时间
- static final long spinForTimeoutThreshold = 1000L;
- private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
- throws InterruptedException {
- if (nanosTimeout <= 0L)
- return false;
- final long deadline = System.nanoTime() + nanosTimeout; // 计算了一个 deadline
- final Node node = addWaiter(Node.SHARED);
- boolean failed = true;
- try {
- for (;;) {
- final Node p = node.predecessor();
- if (p == head) {
- int r = tryAcquireShared(arg);
- if (r>= 0) {
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- failed = false;
- return true;
- }
- }
- nanosTimeout = deadline - System.nanoTime();
- if (nanosTimeout <= 0L) // 超时后直接返回 false, 继续执行
- return false;
- if (shouldParkAfterFailedAcquire(p, node) &&
- nanosTimeout> spinForTimeoutThreshold) // 大于最小 cas 操作时间则挂起线程
- LockSupport.parkNanos(this, nanosTimeout); // 挂起线程也有超时限制
- if (Thread.interrupted())
- throw new InterruptedException();
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
复制代码
重点看标了注释的几行代码, 首先计算了一个超时时间, 当超时后直接退出等待, 继续执行. 如果未超时并且大于最小的 cas 操作时间, 这里定义的是 1000ns, 则挂起, 同时挂起操作也有超时限制. 这样就实现了超时等待.
4. 总结
至此关于 AQS 的共享锁的两个实现 Semaphore 与 CountDownLatch 就分析完了, 他们与非共享最大的区别就在于是否能多个线程同时获取锁. 看完后希望大家能对 Semaphore 与 CountDownLatch 有深刻的理解, 不明白的时候想想旋转寿司店和游乐场的例子, 如果对大家有帮助, 觉得写的好的话, 可以点个赞, 当然更希望大家能积极指出文中的错误和提出积极的改进意见.
来源: https://juejin.im/post/5b8743c5f265da433f2a2757