此篇博客所有源码均来自 JDK 1.8
J.U.C 之 AQS 传送门
[死磕 Java 并发] --J.U.C 之 AQS(一篇就够了) https://mp.weixin.qq.com/s?__biz=MzU2NjIzNDk5NQ==&mid=2247484289&idx=1&sn=56ad0dd81fb2b4e4f86c5aaaaf3164a6&scene=21#wechat_redirect , 作为同步组件的基础, AQS 做了太多的工作, 自定义同步组件只需要简单地实现自定义方法, 然后加上 AQS 提供的模板方法, 就可以实现强大的自定义同步组件, 理解了 AQS, 其他理解起来就容易很多了.
Condition 介绍
在没有 Lock 之前, 我们使用 synchronized 来控制同步, 配合 Object 的 wait(),notify()系列方法可以实现等待 / 通知模式. 在 Java SE5 后, Java 提供了 Lock 接口, 相对于 Synchronized 而言, Lock 提供了条件 Condition, 对线程的等待, 唤醒操作更加详细和灵活. 下图是 Condition 与 Object 的监视器方法的对比(摘自Java 并发编程的艺术):
Condition 提供了一系列的方法来对阻塞和唤醒线程:
await(): 造成当前线程在接到信号或被中断之前一直处于等待状态.
await(long time, TimeUnit unit): 造成当前线程在接到信号, 被中断或到达指定等待时间之前一直处于等待状态.
awaitNanos(long nanosTimeout): 造成当前线程在接到信号, 被中断或到达指定等待时间之前一直处于等待状态. 返回值表示剩余时间, 如果在 nanosTimesout 之前唤醒, 那么返回值 = nanosTimeout - 消耗时间, 如果返回值 <= 0 , 则可以认定它已经超时了.
awaitUninterruptibly(): 造成当前线程在接到信号之前一直处于等待状态.[注意: 该方法对中断不敏感] .
awaitUntil(Date deadline): 造成当前线程在接到信号, 被中断或到达指定最后期限之前一直处于等待状态. 如果没有到指定时间就被通知, 则返回 true, 否则表示到了指定时间, 返回返回 false.
signal(): 唤醒一个等待线程. 该线程从等待方法返回前必须获得与 Condition 相关的锁.
signal()All: 唤醒所有等待线程. 能够从等待方法返回的线程必须获得与 Condition 相关的锁.
Condition 是一种广义上的条件队列. 他为线程提供了一种更为灵活的等待 / 通知模式, 线程在调用 await 方法后执行挂起操作, 直到线程等待的某个条件为真时才会被唤醒. Condition 必须要配合锁一起使用, 因为对共享状态变量的访问发生在多线程环境下. 一个 Condition 的实例必须与一个 Lock 绑定, 因此 Condition 一般都是作为 Lock 的内部实现.
Condtion 的实现
获取一个 Condition 必须要通过 Lock 的 newCondition()方法. 该方法定义在接口 Lock 下面, 返回的结果是绑定到此 Lock 实例的新 Condition 实例. Condition 为一个接口, 其下仅有一个实现类 ConditionObject, 由于 Condition 的操作需要获取相关的锁, 而 AQS 则是同步锁的实现基础, 所以 ConditionObject 则定义为 AQS 的内部类. 定义如下:
- public class ConditionObject implements Condition, java.io.Serializable {
- }
等待队列
每个 Condition 对象都包含着一个 FIFO 队列, 该队列是 Condition 对象通知 / 等待功能的关键. 在队列中每一个节点都包含着一个线程引用, 该线程就是在该 Condition 对象上等待的线程. 我们看 Condition 的定义就明白了:
- public class ConditionObject implements Condition, java.io.Serializable {
- private static final long serialVersionUID = 1173984872572414699L;
- // 头节点
- private transient Node firstWaiter;
- // 尾节点
- private transient Node lastWaiter;
- public ConditionObject() {
- }
- /** 省略方法 **/
- }
从上面代码可以看出 Condition 拥有首节点 (firstWaiter), 尾节点(lastWaiter). 当前线程调用 await() 方法, 将会以当前线程构造成一个节点(Node), 并将节点加入到该队列的尾部. 结构如下:
Node 里面包含了当前线程的引用. Node 定义与 AQS 的 CLH 同步队列的节点使用的都是同一个类(AbstractQueuedSynchronized.Node 静态内部类).
Condition 的队列结构比 CLH 同步队列的结构简单些, 新增过程较为简单只需要将原尾节点的 nextWaiter 指向新增节点, 然后更新 lastWaiter 即可.
等待
调用 Condition 的 await()方法会使当前线程进入等待状态, 同时会加入到 Condition 等待队列同时释放锁. 当从 await()方法返回时, 当前线程一定是获取了 Condition 相关连的锁.
- public final void await() throws InterruptedException {
- // 当前线程中断
- if (Thread.interrupted())
- throw new InterruptedException();
- // 当前线程加入等待队列
- Node node = addConditionWaiter();
- // 释放锁
- long savedState = fullyRelease(node);
- int interruptMode = 0;
- /**
- * 检测此节点的线程是否在同步队上, 如果不在, 则说明该线程还不具备竞争锁的资格, 则继续等待
- * 直到检测到此节点在同步队列上
- */
- while (!isOnSyncQueue(node)) {
- // 线程挂起
- LockSupport.park(this);
- // 如果已经中断了, 则退出
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
- }
- // 竞争同步状态
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- // 清理下条件队列中的不是在等待条件的节点
- if (node.nextWaiter != null) // clean up if cancelled
- unlinkCancelledWaiters();
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
- }
此段代码的逻辑是: 首先将当前线程新建一个节点同时加入到条件队列中, 然后释放当前线程持有的同步状态. 然后则是不断检测该节点代表的线程释放出现在 CLH 同步队列中(收到 signal 信号之后就会在 AQS 队列中检测到), 如果不存在则一直挂起, 否则参与竞争同步状态.
加入条件队列 (addConditionWaiter()) 源码如下:
- private Node addConditionWaiter() {
- Node t = lastWaiter; // 尾节点
- //Node 的节点状态如果不为 CONDITION, 则表示该节点不处于等待状态, 需要清除节点
- if (t != null && t.waitStatus != Node.CONDITION) {
- // 清除条件队列中所有状态不为 Condition 的节点
- unlinkCancelledWaiters();
- t = lastWaiter;
- }
- // 当前线程新建节点, 状态 CONDITION
- Node node = new Node(Thread.currentThread(), Node.CONDITION);
- /**
- * 将该节点加入到条件队列中最后一个位置
- */
- if (t == null)
- firstWaiter = node;
- else
- t.nextWaiter = node;
- lastWaiter = node;
- return node;
- }
该方法主要是将当前线程加入到 Condition 条件队列中. 当然在加入到尾节点之前会清楚所有状态不为 Condition 的节点.
fullyRelease(Node node), 负责释放该线程持有的锁.
- final long fullyRelease(Node node) {
- boolean failed = true;
- try {
- // 节点状态 -- 其实就是持有锁的数量
- long savedState = getState();
- // 释放锁
- if (release(savedState)) {
- failed = false;
- return savedState;
- } else {
- throw new IllegalMonitorStateException();
- }
- } finally {
- if (failed)
- node.waitStatus = Node.CANCELLED;
- }
- }
isOnSyncQueue(Node node): 如果一个节点刚开始在条件队列上, 现在在同步队列上获取锁则返回 true
- final boolean isOnSyncQueue(Node node) {
- // 状态为 Condition, 获取前驱节点为 null, 返回 false
- if (node.waitStatus == Node.CONDITION || node.prev == null)
- return false;
- // 后继节点不为 null, 肯定在 CLH 同步队列中
- if (node.next != null)
- return true;
- return findNodeFromTail(node);
- }
unlinkCancelledWaiters(): 负责将条件队列中状态不为 Condition 的节点删除
- private void unlinkCancelledWaiters() {
- Node t = firstWaiter;
- Node trail = null;
- while (t != null) {
- Node next = t.nextWaiter;
- if (t.waitStatus != Node.CONDITION) {
- t.nextWaiter = null;
- if (trail == null)
- firstWaiter = next;
- else
- trail.nextWaiter = next;
- if (next == null)
- lastWaiter = trail;
- }
- else
- trail = t;
- t = next;
- }
- }
通知
调用 Condition 的 signal()方法, 将会唤醒在等待队列中等待最长时间的节点(条件队列里的首节点), 在唤醒节点前, 会将节点移到 CLH 同步队列中.
- public final void signal() {
- // 检测当前线程是否为拥有锁的独
- if (!isHeldExclusively())
- throw new IllegalMonitorStateException();
- // 头节点, 唤醒条件队列中的第一个节点
- Node first = firstWaiter;
- if (first != null)
- doSignal(first); // 唤醒
- }
该方法首先会判断当前线程是否已经获得了锁, 这是前置条件. 然后唤醒条件队列中的头节点.
doSignal(Node first): 唤醒头节点
- private void doSignal(Node first) {
- do {
- // 修改头结点, 完成旧头结点的移出工作
- if ( (firstWaiter = first.nextWaiter) == null)
- lastWaiter = null;
- first.nextWaiter = null;
- } while (!transferForSignal(first) &&
- (first = firstWaiter) != null);
- }
doSignal(Node first)主要是做两件事: 1. 修改头节点, 2. 调用 transferForSignal(Node first) 方法将节点移动到 CLH 同步队列中. transferForSignal(Node first)源码如下:
- final boolean transferForSignal(Node node) {
- // 将该节点从状态 CONDITION 改变为初始状态 0,
- if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
- return false;
- // 将节点加入到 syn 队列中去, 返回的是 syn 队列中 node 节点前面的一个节点
- Node p = enq(node);
- int ws = p.waitStatus;
- // 如果结点 p 的状态为 cancel 或者修改 waitStatus 失败, 则直接唤醒
- if (ws> 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
- LockSupport.unpark(node.thread);
- return true;
- }
整个通知的流程如下:
判断当前线程是否已经获取了锁, 如果没有获取则直接抛出异常, 因为获取锁为通知的前置条件.
如果线程已经获取了锁, 则将唤醒条件队列的首节点
唤醒首节点是先将条件队列中的头节点移出, 然后调用 AQS 的 enq(Node node)方法将其安全地移到 CLH 同步队列中
最后判断如果该节点的同步状态是否为 Cancel, 或者修改状态为 Signal 失败时, 则直接调用 LockSupport 唤醒该节点的线程.
总结
一个线程获取锁后, 通过调用 Condition 的 await()方法, 会将当前线程先加入到条件队列中, 然后释放锁, 最后通过 isOnSyncQueue(Node node)方法不断自检看节点是否已经在 CLH 同步队列了, 如果是则尝试获取锁, 否则一直挂起. 当线程调用 signal()方法后, 程序首先检查当前线程是否获取了锁, 然后通过 doSignal(Node first)方法唤醒 CLH 同步队列的首节点. 被唤醒的线程, 将从 await()方法中的 while 循环中退出来, 然后调用 acquireQueued()方法竞争同步状态.
Condition 的应用
只知道原理, 如果不知道使用那就坑爹了, 下面是用 Condition 实现的生产者消费者问题:
- public class ConditionTest {
- private LinkedList<String> buffer; // 容器
- private int maxSize ; // 容器最大
- private Lock lock;
- private Condition fullCondition;
- private Condition notFullCondition;
- ConditionTest(int maxSize){
- this.maxSize = maxSize;
- buffer = new LinkedList<String>();
- lock = new ReentrantLock();
- fullCondition = lock.newCondition();
- notFullCondition = lock.newCondition();
- }
- public void set(String string) throws InterruptedException {
- lock.lock(); // 获取锁
- try {
- while (maxSize == buffer.size()){
- notFullCondition.await(); // 满了, 添加的线程进入等待状态
- }
- buffer.add(string);
- fullCondition.signal();
- } finally {
- lock.unlock(); // 记得释放锁
- }
- }
- public String get() throws InterruptedException {
- String string;
- lock.lock();
- try {
- while (buffer.size() == 0){
- fullCondition.await();
- }
- string = buffer.poll();
- notFullCondition.signal();
- } finally {
- lock.unlock();
- }
- return string;
- }
- }
来源: http://www.tuicool.com/articles/YJnMrm6