前言
由于 AQS 的源码太过凝练, 而且有很多分支比如取消排队, 等待条件等, 如果把所有的分支在一篇文章的写完可能会看懵, 所以这篇文章主要是从正常流程先走一遍, 重点不在取消排队等分支, 之后会专门写一篇取消排队和等待条件的分支逻辑. 读源码千万别在每个代码分支中来回游走, 先按一个正常的分支把流程看明白, 之后再去重点关注其他分支, 各个击破. 我相信看完正常流程, 你再去分析其他分支会更加得心应手. 本篇将主要方法名都做了目录索引, 查看时可通过目录快速跳到指定方法的逻辑.
执行流程
AQS 的执行流程大体为当线程获取锁失败时, 会加入到等待队列中, 在等待队列中的线程会按照从头至尾的顺序依次再去尝试获取锁执行.
当线程获取锁后如果还需要等待特定的条件才能执行, 那么线程就加入到条件队列排队, 当等待的条件到来时再从条件队列中按照从头至尾的顺序加入到等待队列中, 然后再按照等待队列的执行流程去获取锁. 所以 AQS 最核心的数据结构其实就两个队列, 等待队列和条件队列, 然后再加上一个获取锁的同步状态.
AQS 数据结构
AQS 最核心的数据结构就三个
等待队列
源码中 head 和 tail 为等待队列的头尾节点, 在通过前后指向则构成了等待队列, 为双向链表, 学名为 CLH 队列.
条件队列
ConditionObject 中的 firstWaiter 和 lastWaiter 为等待队列的头尾节点, 然后通过 next 指向构成了条件队列, 是个单向链表.
同步状态
state 为同步状态, 通过 CAS 操作来实现获取锁的操作.
- public abstract class AbstractQueuedSynchronizer{
- /**
- * 等待队列的头节点
- */
- private transient volatile Node head;
- /**
- * 等待队列的尾节点
- */
- private transient volatile Node tail;
- /**
- * 同步状态
- */
- private volatile int state;
- public class ConditionObject implements Condition, java.io.Serializable {
- /** 条件队列的头节点 */
- private transient Node firstWaiter;
- /** 条件队列的尾节点 */
- private transient Node lastWaiter;
- }
- }
Node 节点
两个队列中的节点都是通过 AQS 中内部类 Node 来实现的. 主要字段:
waitStatus
当前节点的状态, 具体看源码列出的注释. 很重要, 之后会在源码中讲解.
Node prev
等待队列节点指向的前置节点
Node next
待队列节点指向的后置节点
Node nextWaiter
条件队列中节点指向的后置节点
Thread thread
当前节点持有的线程
- static final class Node {
- /** */
- static final Node SHARED = new Node();
- /** */
- static final Node EXCLUSIVE = null;
- /** 标明当前节点线程取消排队 */
- static final int CANCELLED = 1;
- /** 标明该节点的后置节点需要自己去唤醒 */
- static final int SIGNAL = -1;
- /** 标明当前节点在等待某个条件, 此时节点在条件队列中 */
- static final int CONDITION = -2;
- /**
- * waitStatus value to indicate the next acquireShared should
- * unconditionally propagate
- */
- static final int PROPAGATE = -3;
- /**
- * 等待状态, 值对于上面的四个常量
- */
- volatile int waitStatus;
- /**
- * 等待队列节点指向的前置节点
- */
- volatile Node prev;
- /**
- * 等待队列节点指向的后置节点
- */
- volatile Node next;
- /**
- * 当前节点持有的线程
- */
- volatile Thread thread;
- /**
- * 条件队列中节点指向的后置节点
- */
- Node nextWaiter;
加锁
上面说明的数据结构我们先大致有个印象, 现在通过加锁来一步步说明下具体的流程, 上篇文章 JUC 并发编程基石 AQS 之结构篇, 我们知道了 AQS 加锁代码执行的是 acquire 方法, 那么我们从这个方法说起, 从源码中看出执行流程为: tryAcquire-->addWaiter-->acquireQueued
tryAcquire 为自己实现的具体加锁逻辑, 当加锁失败时返回 false, 则会执行 addWaiter, 将线程加入到等待队列中, Node.EXCLUSIVE 为独占锁的模式, 即同时只能有一个线程获取锁去执行.
例子说明
首先假设有四个线程 t0-t4 调用 tryAcquire 获取锁, t0 线程为天选之子获取到了锁, 则 t1-t4 线程接着去执行 addWaiter.
- acquire
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
addWaiter 分支 1
addWaiter 方法, 首先会初始化一个 node 节点, 将当前线程设置到 node 节点中. 然后判断 head 和 tail 节点是否为空, head 和 tail 节点是懒加载的, 当 AQS 初始化时为 null, 则第一次进来时 if (pred != null) 条件不成立, 执行 enq 方法.
例子说明
假如 t1 和 t2 线程同时执行到该方法, head 节点未初始化则执行 enq.
- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode);
- // Try the fast path of enq; backup to full enq on failure
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- enq(node);
- return node;
- }
- enq
此时可能多个线程会同时调用 enq 方法, 所以该方法中也使用 CAS 操作. for (;;) 是个死循环, 首先会 CAS 操作初始化 head 节点, 且 head 节点是个空节点, 没有设置线程. 然后第二次循环时通过 CAS 操作将该节点设置我尾部节点, 并将前置节点指向 head, 之后会跳出循环, 返回生成的 Node 节点到 addWaiter, 从源码可以看到 addWaiter 方法后面没有逻辑, 之后会调用 acquireQueued.
例子说明
t1 和 t2 线程同时执行, t1 线程上天眷顾 CAS 成功, 则流程为
初始化 head
t1 线程的 node 节点加入等待队列
t2 线程执行, node 节点加入等待队列
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- if (t == null) { // Must initialize
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
addWaiter 分支 2
现在在来说 t3 和 t4,t3 和 t4 线程这时终于获取到了 CPU 的执行权, 此时 head 节点已经初始化, 则进入条件中的代码, 其实也是通过 CAS 操作将节点加入到等待队列尾部, 之后会调用 acquireQueued.
例子说明
假如 t3 线程先 CAS 成功, 之后 t4 成功, 此时的数据结构为
- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode);
- // Try the fast path of enq; backup to full enq on failure
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- enq(node);
- return node;
- }
- acquireQueued
这个方法有两个逻辑, 首先如果该节点的前置节点是 head 会走第一个 if, 再次去尝试获取锁???
获取锁成功, 则将头节点设置为自己, 并返回到 acquire 方法, 此时 acquire 方法执行完, 代表获取锁成功, 线程可以执行自己的逻辑了. 这里有下面几个注意点
p.next = null; // help GC 设置旧的 head 节点的后置节点为 null
setHead 方法 将 t1 节点设置为头节点, 因为头节点是个空节点, 所以设置 t1 线程节点线程为 null, 设置 t1 前置节点为 null, 此时旧的 head 节点已经没有任何指向和关联, 可以被 gc 回收, 所以上面那一步会写个 help GC 的注释.
例子说明
现在 t1 线程的前置节点为头结点, 如果 t1 执行 tryAcquire 成功则结果为
当获取锁失败或者前置节点不是头节点都会走第二个 if 逻辑, 首先会判断当前线程是否需要挂起, 如果需要则执行线程挂起.
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
- private void setHead(Node node) {
- head = node;
- node.thread = null;
- node.prev = null;
- }
- shouldParkAfterFailedAcquire
判断线程是否需要挂起, 首先需要注意的是这个方法的参数是当前节点的前置节点. 当线程需要挂起的时候, 它需要把身后事安排明白, 挂起后让谁来把我唤醒. 这个方法就主要做这个操作. 我们再来看 Node 节点中的 waitStatus 状态, 这个状态有一个 Node.SIGNAL=-1, 代表了当前节点需要将后置节点唤醒. 这个理解可能有点绕. 首先我们要理解一点, 如果我需要被唤醒, 那么我就要设置我们的前置节点的状态为 Node.SIGNAL, 这样当我的前置节点发现 waitStatus=Node.SIGNAL 时, 它才知道, 我执行完后需要去唤醒后置节点让后置节点去执行. 所以这个方法是当前节点去设置自己的前置节点的状态为 Node.SIGNAL.
waitStatus 初始化后是 0,
第一次进入该方法, 发现自己的前置节点不是 Node.SIGNAL, 需要先设置为 Node.SIGNAL 状态
第二次进入时发现前置节点已经是 Node.SIGNAL 状态, 那么我就可以安心的挂起了, 有人会唤醒我的.
所以这个方法其实是两个逻辑, 先设置前置节点状态, 再判断是否可以挂起. 因为前面 acquireQueued 方法中 for (; 也是个循环, 所以会重复进入.
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus;
- if (ws == Node.SIGNAL)
- /*
- * This node has already set status asking a release
- * to signal it, so it can safely park.
- */
- return true;
- if (ws> 0) {
- /*
- * Predecessor was cancelled. Skip over predecessors and
- * indicate retry.
- */
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus> 0);
- pred.next = node;
- } else {
- /*
- * waitStatus must be 0 or PROPAGATE. Indicate that we
- * need a signal, but don't park yet. Caller will need to
- * retry to make sure it cannot acquire before parking.
- */
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
- parkAndCheckInterrupt
将自己的前置节点设置为可唤醒的状态后进入该方法, 线程挂起.
例子说明
此时 t2-t4 线程都执行到了此方法, 则 t2-t4 线程都已经挂起不再执行, 并且 head-t3 节点的 waitStatus 都为 Node.SIGNAL, 因为 t4 没有后置节点.
- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this);
- return Thread.interrupted();
- }
解锁
release
解锁方法的入口是 AQS 的 release 方法, 首先会调用 tryRelease 方法, 这个是 AQS 实现类自己实现的方法, 去 CAS 改变 state 状态, 如果解锁成功, 则会进入 if 里的代码, 获取 head 节点, 判断 waitStatus!=0, 如果等于 0 代表没有后置节点需要去唤醒. 之后调用 unparkSuccessor 方法.
- public final boolean release(int arg) {
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
waitStatus>0 时, 代表为 CANCELLED = 1 状态, 即线程取消排队, 这个以后会细讲. 先将头结点的 waitStatus 状态设为初始值 0, 之后查看后置节点的状态, 如果 > 0 代表后置节点取消了排队, 不需要唤醒. 但是当前节点需要去唤醒后续的节点让后续节点再去执行, 所以会从尾结点开始寻找找到离当前线程最近的一个且 waitStatus<0 的去唤醒. 之后会调用 LockSupport.unpark(s.thread); 取消后续节点的挂起, 让后续节点继续执行.
- unparkSuccessor
- private void unparkSuccessor(Node node) {
- int ws = node.waitStatus;
- if (ws <0)
- compareAndSetWaitStatus(node, ws, 0);
- Node s = node.next;
- if (s == null || s.waitStatus> 0) {
- s = null;
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- if (s != null)
- LockSupport.unpark(s.thread);
- }
例子说明
此时等待队列的数据, 当 t0 线程执行完成后执行解锁操作, 此时所有等待的线程都没有取消等待.
则 t0 线程会唤醒 t1 线程
如果 t1 和 t3 线程取消的排队时, t0 线程会唤醒 t2, 从后往前找离 head 最近的一个没有取消派对的节点.
线程执行到 parkAndCheckInterrupt 方法时被挂起, 当被头节点唤醒后会继续执行, 设置 interrupted=true, 表示被中断, 会继续执行 for 循环逻辑, 到现在一个正常的获取锁失败 --> 加入等待队列 --> 挂起 --> 被唤醒继续执行的流程已经整体走了一遍.
本篇文章都是自己根据源码写出的阅读心得, 可能有的地方没有揣摩到 Doug Lea 大神的意图, 如果有理解不对的地方欢迎一起探讨.
如有不实, 还望指正
来源: https://www.cnblogs.com/LuxBai/p/12775298.html