- CountDownLatch
- A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
通常情况下, countDown 如下调用
- CountDownLatch countDownLatch = new CountDownLatch(1);
- countDownLatch.countDown();
- countDownLatch.await();
看一下 countDown 方法:
- public void countDown() {
- sync.releaseShared(1);
- }
AQS 中 releaseShared 方法如下:
- public final boolean releaseShared(int arg) {
- if (tryReleaseShared(arg)) {
- doReleaseShared();
- return true;
- }
- return false;
- }
CountDownLatch 中 tryReleaseShared 方法如下:
- // 方法判断许可如果减 1 之后是否为 0, 如果为 0 的话就执行 doReleaseShared()方法.
- protected boolean tryReleaseShared(int releases) {
- // Decrement count; signal when transition to zero
- for (;;) {
- int c = getState();
- if (c == 0)
- return false;
- int nextc = c-1;
- if (compareAndSetState(c, nextc))
- return nextc == 0;
- }
- }
来看 doReleaseShared()方法:
- private void doReleaseShared() {
- /*
- * Ensure that a release propagates, even if there are other
- * in-progress acquires/releases. This proceeds in the usual
- * way of trying to unparkSuccessor of head if it needs
- * signal. But if it does not, status is set to PROPAGATE to
- * ensure that upon release, propagation continues.
- * Additionally, we must loop in case a new node is added
- * while we are doing this. Also, unlike other uses of
- * unparkSuccessor, we need to know if CAS to reset status
- * fails, if so rechecking.
- */
- for (;;) {
- Node h = head;
- if (h != null && h != tail) {
- int ws = h.waitStatus;
- if (ws == Node.SIGNAL) {
- if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
- continue; // loop to recheck cases
- unparkSuccessor(h);
- }
- else if (ws == 0 &&
- !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
- continue; // loop on failed CAS
- }
- if (h == head) // loop if head changed
- break;
- }
- }
不过尴尬的是, CountDownLatch 这里未做任何事情.
再看一下 await()方法:
await 方法会让当前线程进入 wait 状态, 除非满足下面两个条件:
count 到 0
线程中断
- public void await() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- if (tryAcquireShared(arg) <0)
- doAcquireSharedInterruptibly(arg);
- }
tryAcquireShared 方法如下:
- protected int tryAcquireShared(int acquires) {
- return (getState() == 0) ? 1 : -1;
- }
所以, 当 state 不是 0 的时候进入 doAcquireSharedInterruptibly 方法.
- private void doAcquireSharedInterruptibly(int arg)
- throws InterruptedException {
- final Node node = addWaiter(Node.SHARED);
- boolean failed = true;
- try {
- for (;;) {
- final Node p = node.predecessor();
- if (p == head) {
- // 只有当 state 为 0 时 r 为 1
- int r = tryAcquireShared(arg);
- if (r>= 0) {
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- failed = false;
- return;
- }
- }
- // 如果 state 不为 0, 该线程会进入 wait 状态
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- throw new InterruptedException();
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
CountDownLatch 文档中有一句非常重要的话:
Memory consistency effects: Until the count reaches zero, actions in a thread prior to calling countDown() happen-before actions following a successful return from a corresponding await() in another thread
大意是一个线程 countdown()之前的操作 happens-before 另一个线程中 await()之后的操作.
- Semaphore
- Semaphores are often used to restrict the number of threads than can access some (physical or logical) resource.
Semaphore 主要用来限制获取资源的线程数.
Actions in a thread prior to calling a "release" method such as release() happen-before actions following a successful "acquire" method such as acquire() in another thread
内存语义: release() happen-before acquire()之前
启一个 springboot 项目, 写一个方法:
- @RequestMapping("/test/semaphore")
- @ResponseBody
- public void test() throws InterruptedException {
- Semaphore semaphore = new Semaphore(5);
- for (int i = 0; i <7; i++) {
- int finalI = i;
- new Thread(()->{
- try {
- semaphore.acquire();
- System.err.println(Thread.currentThread() + "获取了许可" + semaphore.availablePermits());
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }, "线程" + i).start();
- }
- new Thread(()->{
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.err.println(Thread.currentThread() + "要释放许可" + semaphore.availablePermits());
- semaphore.release();
- }, "线程 7").start();
- }
一次输出如下:
Thread[线程 1,5,main]获取了许可 4
Thread[线程 0,5,main]获取了许可 3
Thread[线程 3,5,main]获取了许可 2
Thread[线程 4,5,main]获取了许可 0
Thread[线程 2,5,main]获取了许可 0
Thread[线程 7,5,main]要释放许可 0
Thread[线程 5,5,main]获取了许可 0
会发现, 线程 5 获取许可之前是先等线程 7 释放许可.
至于线程 6 会因为由于许可为 0, 进入等待状态. 直到有线程释放许可, 来调用 unparkSuccessor.
- CyclicBarrier
- A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
- Actions in a thread prior to calling await() happen-before actions that are part of the barrier action, which in turn happen-before actions following a successful return from the corresponding await() in other threads.
内部类 Generation 只有一个属性 broken(默认 false)
我们发现, await()方法如下:
- public int await() throws InterruptedException, BrokenBarrierException {
- try {
- return dowait(false, 0L);
- } catch (TimeoutException toe) {
- throw new Error(toe); // cannot happen
- }
- }
进入 dowait 方法:
- private int dowait(boolean timed, long nanos)
- throws InterruptedException, BrokenBarrierException,
- TimeoutException {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- final Generation g = generation;
- if (g.broken)
- throw new BrokenBarrierException();
- if (Thread.interrupted()) {
- breakBarrier();
- throw new InterruptedException();
- }
- // 来一个线程 count 减 1, 如果 index 为 0, 就会翻车
- int index = --count;
- if (index == 0) { // tripped
- boolean ranAction = false;
- try {
- final Runnable command = barrierCommand;
- if (command != null)
- command.run();
- ranAction = true;
- nextGeneration();
- return 0;
- } finally {
- if (!ranAction)
- breakBarrier();
- }
- }
- // 没翻车 (broken,interrupted,timed out) 的话就执行下面的逻辑
- // loop until tripped, broken, interrupted, or timed out
- for (;;) {
- try {
- if (!timed)
- trip.await();
- else if (nanos> 0L)
- nanos = trip.awaitNanos(nanos);
- } catch (InterruptedException IE) {
- if (g == generation && ! g.broken) {
- breakBarrier();
- throw IE;
- } else {
- // We're about to finish waiting even if we had not
- // been interrupted, so this interrupt is deemed to
- // "belong" to subsequent execution.
- Thread.currentThread().interrupt();
- }
- }
- if (g.broken)
- throw new BrokenBarrierException();
- if (g != generation)
- return index;
- if (timed && nanos <= 0L) {
- breakBarrier();
- throw new TimeoutException();
- }
- }
- } finally {
- lock.unlock();
- }
- }
下面进入 trip.await()方法
- public final void await() throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- // 往等待队列加入节点 Node
- Node node = addConditionWaiter();
- // 这里释放 AQS 中的 state, 如果释放失败, 会将 node 的 waitstatus 置为 CANCELLED, 这是传参 node 的唯一用处
- int savedState = fullyRelease(node);
- int interruptMode = 0;
- // 如果 node 有 next 就肯定返回 true
- while (!isOnSyncQueue(node)) {
- LockSupport.park(this);
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
- }
- // 如果当前线程
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- if (node.nextWaiter != null) // clean up if cancelled
- unlinkCancelledWaiters();
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
- }
进入 addConditionWaiter()
- private Node addConditionWaiter() {
- Node t = lastWaiter;
- // If lastWaiter is cancelled, clean out.
- if (t != null && t.waitStatus != Node.CONDITION) {
- unlinkCancelledWaiters();
- t = lastWaiter;
- }
- Node node = new Node(Thread.currentThread(), Node.CONDITION);
- if (t == null)
- firstWaiter = node;
- else
- t.nextWaiter = node;
- lastWaiter = node;
- return node;
- }
假如 5 个线程按顺序进入 await(), 则此时, trip 这个 ConditionObject 上 firstWaiter==lastWaiter==new Node("线程 0 对应的线程", Node.CONDITION)
同时, 因为 dowait 方法中的 lock.lock(),AQS 的同步队列如下:
head 节点 --》线程 1--》线程 2--》线程 3--》线程 4(tail)
等待队列: t0
当释放线程 0 的锁之后, 唤醒线程 1, 将线程 1 加入等待队列, 线程 2/3 也加入等待队列. 此时同步队列还剩下线程 4. 此时队列情况是:
同步队列: head 节点
等待队列: t0->t1->t2->t3
到了最后一个线程 4 执行的时候, index==0, 执行 nextGeneration, 会 signalAll trip 这个 Condition 上的所有等待线程. 所以经过 signalAll 之后, 队列情况变成了:
同步队列: head->t0->t1->t2->t3
等待队列: 空
此时线程 4 运行, 释放锁之后唤醒同步队列上的第一个节点 t0
来源: https://www.cnblogs.com/studentytj/p/11324371.html