此篇博客所有源码均来自 JDK 1.8
J.U.C 之 AQS 传送门
[死磕 Java 并发] --J.U.C 之 AQS(一篇就够了) https://mp.weixin.qq.com/s?__biz=MzU2NjIzNDk5NQ==&mid=2247484289&idx=1&sn=56ad0dd81fb2b4e4f86c5aaaaf3164a6&scene=21#wechat_redirect , 作为同步组件的基础, AQS 做了太多的工作, 自定义同步组件只需要简单地实现自定义方法, 然后加上 AQS 提供的模板方法, 就可以实现强大的自定义同步组件, 理解了 AQS, 其他理解起来就容易很多了.
Condition 介绍
在没有 Lock 之前, 我们使用 synchronized 来控制同步, 配合 Object 的 wait(),notify()系列方法可以实现等待 / 通知模式. 在 Java SE5 后, Java 提供了 Lock 接口, 相对于 Synchronized 而言, Lock 提供了条件 Condition, 对线程的等待, 唤醒操作更加详细和灵活. 下图是 Condition 与 Object 的监视器方法的对比(摘自Java 并发编程的艺术):
Condition 提供了一系列的方法来对阻塞和唤醒线程:
await() : 造成当前线程在接到信号或被中断之前一直处于等待状态.
await(long time, TimeUnit unit) : 造成当前线程在接到信号, 被中断或到达指定等待时间之前一直处于等待状态.
awaitNanos(long nanosTimeout) : 造成当前线程在接到信号, 被中断或到达指定等待时间之前一直处于等待状态. 返回值表示剩余时间, 如果在 nanosTimesout 之前唤醒, 那么返回值 = nanosTimeout - 消耗时间, 如果返回值 <= 0 , 则可以认定它已经超时了.
awaitUninterruptibly() : 造成当前线程在接到信号之前一直处于等待状态.[注意: 该方法对中断不敏感] .
awaitUntil(Date deadline) : 造成当前线程在接到信号, 被中断或到达指定最后期限之前一直处于等待状态. 如果没有到指定时间就被通知, 则返回 true, 否则表示到了指定时间, 返回返回 false.
signal() : 唤醒一个等待线程. 该线程从等待方法返回前必须获得与 Condition 相关的锁.
signal()All : 唤醒所有等待线程. 能够从等待方法返回的线程必须获得与 Condition 相关的锁.
Condition 是一种广义上的条件队列. 他为线程提供了一种更为灵活的等待 / 通知模式, 线程在调用 await 方法后执行挂起操作, 直到线程等待的某个条件为真时才会被唤醒. Condition 必须要配合锁一起使用, 因为对共享状态变量的访问发生在多线程环境下. 一个 Condition 的实例必须与一个 Lock 绑定, 因此 Condition 一般都是作为 Lock 的内部实现.
Condtion 的实现
获取一个 Condition 必须要通过 Lock 的 newCondition()方法. 该方法定义在接口 Lock 下面, 返回的结果是绑定到此 Lock 实例的新 Condition 实例. Condition 为一个接口, 其下仅有一个实现类 ConditionObject, 由于 Condition 的操作需要获取相关的锁, 而 AQS 则是同步锁的实现基础, 所以 ConditionObject 则定义为 AQS 的内部类. 定义如下:
public class ConditionObject implements Condition, java.io.Serializable {}
等待队列
每个 Condition 对象都包含着一个 FIFO 队列, 该队列是 Condition 对象通知 / 等待功能的关键. 在队列中每一个节点都包含着一个线程引用, 该线程就是在该 Condition 对象上等待的线程. 我们看 Condition 的定义就明白了:
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; // 头节点 private transient Node firstWaiter; // 尾节点 private transient Node lastWaiter; public ConditionObject() { } /** 省略方法 **/}
从上面代码可以看出 Condition 拥有首节点 (firstWaiter), 尾节点(lastWaiter). 当前线程调用 await() 方法, 将会以当前线程构造成一个节点(Node), 并将节点加入到该队列的尾部. 结构如下:
Node 里面包含了当前线程的引用. Node 定义与 AQS 的 CLH 同步队列的节点使用的都是同一个类(AbstractQueuedSynchronized.Node 静态内部类).
Condition 的队列结构比 CLH 同步队列的结构简单些, 新增过程较为简单只需要将原尾节点的 nextWaiter 指向新增节点, 然后更新 lastWaiter 即可.
等待
调用 Condition 的 await()方法会使当前线程进入等待状态, 同时会加入到 Condition 等待队列同时释放锁. 当从 await()方法返回时, 当前线程一定是获取了 Condition 相关连的锁.
public final void await() throws InterruptedException { // 当前线程中断 if (Thread.interrupted()) throw new InterruptedException(); // 当前线程加入等待队列 Node node = addConditionWaiter(); // 释放锁 long savedState = fullyRelease(node); int interruptMode = 0; /** * 检测此节点的线程是否在同步队上, 如果不在, 则说明该线程还不具备竞争锁的资格, 则继续等待 * 直到检测到此节点在同步队列上 */ while (!isOnSyncQueue(node)) { // 线程挂起 LockSupport.park(this); // 如果已经中断了, 则退出 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 竞争同步状态 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // 清理下条件队列中的不是在等待条件的节点 if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
此段代码的逻辑是: 首先将当前线程新建一个节点同时加入到条件队列中, 然后释放当前线程持有的同步状态. 然后则是不断检测该节点代表的线程释放出现在 CLH 同步队列中(收到 signal 信号之后就会在 AQS 队列中检测到), 如果不存在则一直挂起, 否则参与竞争同步状态.
加入条件队列 (addConditionWaiter()) 源码如下:
private Node addConditionWaiter() { Node t = lastWaiter; // 尾节点 //Node 的节点状态如果不为 CONDITION, 则表示该节点不处于等待状态, 需要清除节点 if (t != null && t.waitStatus != Node.CONDITION) { // 清除条件队列中所有状态不为 Condition 的节点 unlinkCancelledWaiters(); t = lastWaiter; } // 当前线程新建节点, 状态 CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); /** * 将该节点加入到条件队列中最后一个位置 */ if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
该方法主要是将当前线程加入到 Condition 条件队列中. 当然在加入到尾节点之前会清楚所有状态不为 Condition 的节点.
fullyRelease(Node node), 负责释放该线程持有的锁.
final long fullyRelease(Node node) { boolean failed = true; try { // 节点状态 -- 其实就是持有锁的数量 long savedState = getState(); // 释放锁 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
isOnSyncQueue(Node node): 如果一个节点刚开始在条件队列上, 现在在同步队列上获取锁则返回 true
final boolean isOnSyncQueue(Node node) { // 状态为 Condition, 获取前驱节点为 null, 返回 false if (node.waitStatus == Node.CONDITION || node.prev == null) return false; // 后继节点不为 null, 肯定在 CLH 同步队列中 if (node.next != null) return true; return findNodeFromTail(node); }
unlinkCancelledWaiters(): 负责将条件队列中状态不为 Condition 的节点删除
private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
通知
调用 Condition 的 signal()方法, 将会唤醒在等待队列中等待最长时间的节点(条件队列里的首节点), 在唤醒节点前, 会将节点移到 CLH 同步队列中.
public final void signal() { // 检测当前线程是否为拥有锁的独 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 头节点, 唤醒条件队列中的第一个节点 Node first = firstWaiter; if (first != null) doSignal(first); // 唤醒 }
该方法首先会判断当前线程是否已经获得了锁, 这是前置条件. 然后唤醒条件队列中的头节点.
doSignal(Node first): 唤醒头节点
private void doSignal(Node first) { do { // 修改头结点, 完成旧头结点的移出工作 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
doSignal(Node first)主要是做两件事: 1. 修改头节点, 2. 调用 transferForSignal(Node first) 方法将节点移动到 CLH 同步队列中. transferForSignal(Node first)源码如下:
final boolean transferForSignal(Node node) { // 将该节点从状态 CONDITION 改变为初始状态 0, if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 将节点加入到 syn 队列中去, 返回的是 syn 队列中 node 节点前面的一个节点 Node p = enq(node); int ws = p.waitStatus; // 如果结点 p 的状态为 cancel 或者修改 waitStatus 失败, 则直接唤醒 if (ws> 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
整个通知的流程如下:
判断当前线程是否已经获取了锁, 如果没有获取则直接抛出异常, 因为获取锁为通知的前置条件.
如果线程已经获取了锁, 则将唤醒条件队列的首节点
唤醒首节点是先将条件队列中的头节点移出, 然后调用 AQS 的 enq(Node node)方法将其安全地移到 CLH 同步队列中
最后判断如果该节点的同步状态是否为 Cancel, 或者修改状态为 Signal 失败时, 则直接调用 LockSupport 唤醒该节点的线程.
总结
一个线程获取锁后, 通过调用 Condition 的 await()方法, 会将当前线程先加入到条件队列中, 然后释放锁, 最后通过 isOnSyncQueue(Node node)方法不断自检看节点是否已经在 CLH 同步队列了, 如果是则尝试获取锁, 否则一直挂起. 当线程调用 signal()方法后, 程序首先检查当前线程是否获取了锁, 然后通过 doSignal(Node first)方法唤醒 CLH 同步队列的首节点. 被唤醒的线程, 将从 await()方法中的 while 循环中退出来, 然后调用 acquireQueued()方法竞争同步状态.
Condition 的应用
只知道原理, 如果不知道使用那就坑爹了, 下面是用 Condition 实现的生产者消费者问题:
public class ConditionTest { private LinkedList<String> buffer; // 容器 private int maxSize ; // 容器最大 private Lock lock; private Condition fullCondition; private Condition notFullCondition; ConditionTest(int maxSize){ this.maxSize = maxSize; buffer = new LinkedList<String>(); lock = new ReentrantLock(); fullCondition = lock.newCondition(); notFullCondition = lock.newCondition(); } public void set(String string) throws InterruptedException { lock.lock(); // 获取锁 try { while (maxSize == buffer.size()){ notFullCondition.await(); // 满了, 添加的线程进入等待状态 } buffer.add(string); fullCondition.signal(); } finally { lock.unlock(); // 记得释放锁 } } public String get() throws InterruptedException { String string; lock.lock(); try { while (buffer.size() == 0){ fullCondition.await(); } string = buffer.poll(); notFullCondition.signal(); } finally { lock.unlock(); } return string; }}
来源: https://juejin.im/entry/5ae351b1f265da0b886d28a3