针对 Object 的 notify/notifyAll 方法
voidsignal()// 唤醒一个等待在 condition 上的线程, 将该线程从等待队列中转移到同步队列中, 如果在同步队列中能够竞争到 Lock 则可以从等待方法中返回. voidsignalAll()// 与 1 的区别在于能够唤醒所有等待在 condition 上的线程
Condition 实现原理分析 等待队列
创建一个 Condition 对象是通过 lock.newCondition(), 而这个方法实际上是会创建 ConditionObject 对象, 该类是 AQS 的一个内部类. Condition 是要和 Lock 配合使用的也就是 Condition 和 Lock 是绑定在一起的, 而 lock 的实现原理又依赖于 AQS, 自然而然 ConditionObject 作为 AQS 的一个内部类无可厚非. 我们知道在锁机制的实现上, AQS 内部维护了一个同步队列, 如果是独占式锁的话, 所有获取锁失败的线程的尾插入到同步队列, 同样的, Condition 内部也是使用同样的方式, 内部维护了一个等待队列, 所有调用 condition.await 方法的线程会加入到等待队列中, 并且线程状态转换为等待状态. 另外注意到 ConditionObject 中有两个成员变量:
/**Firstnodeofconditionqueue.*/privatetransientNodefirstWaiter;/**Lastnodeofconditionqueue.*/privatetransientNodelastWaiter;
ConditionObject 通过持有等待队列的头尾指针来管理等待队列. 注意 Node 类复用了在 AQS 中的 Node 类, Node 类有这样一个属性:
// 后继节点 NodenextWaiter;
等待队列是一个单向队列, 而在之前说 AQS 时知道同步队列是一个双向队列.
等待队列示意图:
注意: 我们可以多次调用 lock.newCondition()方法创建多个 Condition 对象, 也就是一个 lLock 可以持有多个等待队列. 利用 Object 的方式实际上是指在对象 Object 对象监视器上只能拥有一个同步队列和一个等待队列; 并发包中的 Lock 拥有一个同步队列和多个等待队列. 示意图如下:
ConditionObject 是 AQS 的内部类, 因此每个 ConditionObject 能够访问到 AQS 提供的方法, 相当于每个 Condition 都拥有所属同步器的引用.
await 实现原理
当调用 condition.await()方法后会使得当前获取 lock 的线程进入到等待队列, 如果该线程能够从 await()方法返回的话一定是该线程获取了与 condition 相关联的 lock. await()方法源码如下:
- public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 1. 将当前线程包装成 Node, 尾插法插入到等待队列中
- Node node = addConditionWaiter(); // 2. 释放当前线程所占用的 lock, 在释放的过程中会唤醒同步队列中的下一个节点
- int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { // 3. 当前线程进入到等待状态
- LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break;
- } // 4. 自旋等待获取到同步状态(即获取到 lock)
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled
- unlinkCancelledWaiters(); // 5. 处理被中断的情况
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
- }
当前线程调用 condition.await()方法后, 会使得当前线程释放 lock 然后加入到等待队列中, 直至被 signal/signalAll 后会使得当前线程从等待队列中移至到同步队列中去, 直到获得了 lock 后才会从 await 方法返回, 或者在等待时被中断会做中断处理.
addConditionWaiter()将当前线程添加到等待队列中, 其源码如下:
- private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out.
- if (t != null && t.waitStatus != Node.CONDITION) {
- unlinkCancelledWaiters();
- t = lastWaiter;
- } // 将当前线程包装成 Node
- Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) //t==null, 同步队列为空的情况
- firstWaiter = node; else
- // 尾插法
- t.nextWaiter = node; // 更新 lastWaiter
- lastWaiter = node; return node;
- }
这里通过尾插法将当前线程封装的 Node 插入到等待队列中, 同时可以看出等待队列是一个不带头结点的链式队列, 之前我们学习 AQS 时知道同步队列是一个带头结点的链式队列.
将当前节点插入到等待对列之后, 使用 fullyRelease(0)方法释放当前线程释放 lock, 源码如下:
- final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { // 成功释放同步状态
- failed = false; return savedState;
- } else { // 不成功释放同步状态抛出异常
- throw new IllegalMonitorStateException();
- }
- } finally { if (failed)
- node.waitStatus = Node.CANCELLED;
- }
- }
调用 AQS 的模板方法 release()方法释放 AQS 的同步状态并且唤醒在同步队列中头结点的后继节点引用的线程, 如果释放成功则正常返回, 若失败的话就抛出异常.
如何从 await()方法中退出? 再看 await()方法有这样一段代码:
- while (!isOnSyncQueue(node)) { // 3. 当前线程进入到等待状态
- LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break;
- }
当线程第一次调用 condition.await()方法时, 会进入到这个 while()循环中, 然后通过 LockSupport.park(this)方法使得当前线程进入等待状态, 那么要想退出这个 await 方法就要先退出这个 while 循环, 退出 while 循环的出口有 2 个:
break 退出 while 循环
while 循环中的逻辑判断为 false
第 1 种情况的条件是当前等待的线程被中断后会走到 break 退出,
第 2 种情况是当前节点被移动到了同步队列中,(即另外线程调用的 condition 的 signal 或者 signalAll 方法), while 中逻辑判断为 false 后结束 while 循环.
当退出 while 循环后就会调用 acquireQueued(node, savedState), 该方法的作用是 在自旋过程中线程不断尝试获取同步状态, 直至成功(线程获取到 lock).
这样就说明了退出 await 方法必须是已经获得了 Condition 引用 (关联) 的 Lock.
await 方法示意图如下:
调用 condition.await 方法的线程必须是已经获得了 lock, 也就是当前线程是同步队列中的头结点. 调用该方法后会使得当前线程所封装的 Node 尾插入到等待队列中.
超时机制的支持
condition 还额外支持了超时机制, 使用者可调用方法 awaitNanos,awaitUtil. 这两个方法的实现原理, 基本上与 AQS 中的 tryAcquire 方法如出一辙.
不响应中断的支持
调用 condition.awaitUninterruptibly()方法, 该方法的源码为:
- public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted())
- interrupted = true;
- } if (acquireQueued(node, savedState) || interrupted)
- selfInterrupt();
- }
与上面的
await 方法基本一致, 只不过减少了对中断的处理, 并省略了 reportInterruptAfterWait 方法抛被中断的异常.
signal 和 signalAll 实现原理
调用 Condition 的 signal 或者 signalAll 方法可以将等待队列中等待时间最长的节点移动到同步队列中, 使得该节点能够有机会获得 lock. 按照等待队列是先进先出 (FIFO) 的, 所以等待队列的头节点必然会是等待时间最长的节点, 也就是每次调用 condition 的 signal 方法是将头节点移动到同步队列中. signal()源码如下:
- public final void signal() { //1. 先检测当前线程是否已经获取 lock
- if (!isHeldExclusively()) throw new IllegalMonitorStateException(); //2. 获取等待队列中第一个节点, 之后的操作都是针对这个节点
- Node first = firstWaiter; if (first != null)
- doSignal(first);
- }
signal 方法首先会检测当前线程是否已经获取 lock, 如果没有获取 lock 会直接抛出异常, 如果获取的话再得到等待队列的头指针引用的节点, doSignal 方法也是基于该节点. doSignal 方法源码如下:
- private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null)
- lastWaiter = null; //1. 将头结点从等待队列中移除
- first.nextWaiter = null; //2. while 中 transferForSignal 方法对头结点做真正的处理
- } while (!transferForSignal(first) &&
- (first = firstWaiter) != null);
- }
真正对头节点做处理的是 transferForSignal(), 该方法源码如下:
- final boolean transferForSignal(Node node) { //1. 更新状态为 0
- if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //2. 将该节点移入到同步队列中去
- Node p = enq(node); int ws = p.waitStatus; if (ws> 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true;
- }
这段代码主要做了两件事情:
1. 将头结点的状态更改为 CONDITION
2. 调用 enq 方法, 将该节点尾插入到同步队列中
调用 condition 的 signal 的前提条件是当前线程已经获取了 lock, 该方法会使得等待队列中的头节点 (等待时间最长的那个节点) 移入到同步队列, 而移入到同步队列后才有机会使得等待线程被唤醒, 即从 await 方法中的 LockSupport.park(this)方法中返回, 从而才有机会使得调用 await 方法的线程成功退出.
signal 方法示意图如下:
signalAll
sigllAll 与 sigal 方法的区别体现在 doSignalAll 方法上. doSignalAll()的源码如下:
- private void doSignalAll(Node first) {
- lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter;
- first.nextWaiter = null;
- transferForSignal(first);
- first = next;
- } while (first != null);
- }
doSignal 方法只会对等待队列的头节点进行操作, 而 doSignalAll 方法将等待队列中的每一个节点都移入到同步队列中, 即 "通知" 当前调用 condition.await()方法的每一个线程.
await 与 signal 和 signalAll 的结合
await 和 signal 和 signalAll 方法就像一个开关控制着线程 A(等待方)和线程 B(通知方). 它们之间的关系可以用下面一个图来表现得更加贴切:
线程 awaitThread 先通过 lock.lock()方法获取锁成功后调用了 condition.await 方法进入等待队列, 而另一个线程 signalThread 通过 lock.lock()方法获取锁成功后调用了 condition.signal 或者 signalAll 方法, 使得线程 awaitThread 能够有机会移入到同步队列中, 当其他线程释放 lock 后使得线程 awaitThread 能够有机会获取 lock, 从而使得线程 awaitThread 能够从 await 方法中退出, 然后执行后续操作. 如果 awaitThread 获取 lock 失败会直接进入到同步队列.
免费 Java 高级资料需要自己领取, 涵盖了 Java,Redis,MongoDB,MySQL,Zookeeper,Spring Cloud,Dubbo 高并发分布式等教程, 一共 30G.
传送门: https://mp.weixin.qq.com/s/JzddfH-7yNudmkjT0IRL8Q
来源: https://www.2cto.com/kf/201905/809597.html