序言
AQS 可以说是 JAVA 源码中必读源码之一. 同时它也是 JAVA 大厂面试的高频知识点之一. 认识并了解它, JAVA 初中升高级工程师必备知识点之一. AQS 是 AbstractQueuedSynchronizer 的简称, 它也是 JUC 包下众多非原生锁实现的核心.
一: AQS 基础概况
AQS 是基于 CLH 队列算法改进实现的锁机制. 大体逻辑是 AQS 内部有一个链型队列, 队列结点类是 AQS 的一个内部类 Node, 形成一个类似如下 Sync Queue(记住这个名词)
可以看出, 一个 Node 除了前后结点的索引外, 还维护了一个 Thread 对象, 一个 int 的 waitStatus. Thread 对象就是处于竞争队列中的线程对象本身. waitStatus 表示当前竞争结点的状态, 这里暂且忽略掉.
处于队首的, 即 Head 所指向的结点, 即为获取到锁的结点. 释放锁即为出队, 后续结点则成为队首, 即获取到锁
Tips: 这里帮大家理解一个事情, 每一个 ReentrantLock 实例都有且只有一个 AQS 实例, 一个 AQS 实例维护一个 Sync Queue. 所以说, 当我们的业务代码中的多个线程对同一个 ReentrantLock 实例进行锁竞争操作时, 其实际就是对同一个 Sync Queue 的队列进行入队, 出队操作.
二: AQS 调用入口 ----ReentrantLock
我们在用 ReentrantLock 时, 代码通常如下:
- ReentrantLock lock = new ReentrantLock();
- Runnable runnable = new Runnable() {
- public void run() {
- lock.lock();
- Sys.out(Thread.currentThread().name() + "抢到锁");
- lock.unlock();
- }
- };
- Thread t1 = new Thread(runnable);
- Thread t2 = new Thread(runnable);
- t2.start();
- t1.start();
可见线程 t1,t2 竞争 lock 这个锁. lock.lock() 抢锁 lock.unlock() 释放锁 来看入口函数 lock
- public void lock() {
- sync.lock();
- }
sync 是 ReentrantLock 内的一个继承了 AQS 的抽象类
- abstract static class Sync extends AbstractQueuedSynchronizer {
- }
抽象类的具体实现是另两个内部类 NonFairSync FairSync, 分别代表非公平锁, 公平锁.
我们系统来看下继承图
ReentrantLock 中的 sync 实例, 默认是在构造函数中初始化的,
- public ReentrantLock() {
- sync = new NonfairSync();
- }
那我们就看默认的 NonFairSync 的实现逻辑.
NonFairSync
当我们调用 ReentrantLock.lock() 时, 直接调用到的是 NonFairSync.lock()
- /**
- * Performs lock. Try immediate barge, backing up to normal
- * acquire on failure.
- */
- final void lock() {
- if (compareAndSetState(0, 1))
- setExclusiveOwnerThread(Thread.currentThread());
- else
- acquire(1);
- }
compareAndSetState(0, 1) 通过 CAS(CAS 是啥, 这里不讲, 参考乐观锁. 具体 Google 吧) 方式, 设置 AQS 的 int state 字段为 1.AQS 内部就是通过这个 state 是否为 0 来判断当前锁是否已经被线程获取到. 返回 true, 则说明获取锁成功, 设置当前锁的独占线程 setExclusiveOwnerThreaThread.currentThread()); 否则, acquire(1) 尝试获 d(取锁. 这个方法会自旋, 阻塞, 一直到获取锁成功为止. 这里, 传进去的参数 1, 就参考乐观锁的版本字段, 同时, 这里, 它还记录了可重入锁重复获取到锁的次数. 只有释放同样次数才能最终释放锁.
具体看 AQS 的 acquire() 的逻辑
- AbstractQueuedSynchronizer.acquire()
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
要注意, 这个方法是忽略线程中断的. 先看 tryAcquire(arg), 这个方法是 AQS 留给子实现类的口子, 具体实现看 NonFairSync, 它的实现里直接调用了 Sync.nonfairTryAcquire(),
- final boolean nonfairTryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) {
- if (compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc <0) // overflow
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
可以看到方法内, 先是尝试获取 state = 0 时的初始锁, 如果失败, 判断当前锁是否是被当前线程获取, 是的话, 将 acquire 时传入的参数累加到 state 字段上. 在这里, 这个字段就是用来记录重复获取锁的次数. 获取失败则返回 false
回到 acquire 在获取失败, 返回 false 后, 才会继续调用 && 右边的 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法. 先看 addWaiter
- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode);
- // Try the fast path of enq; backup to full enq on failure
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- enq(node);
- return node;
- }
利用传进来的 Node.EXCLUSIVE 表示的排斥锁参数以及当前线程实例初始化新 Node, 第 4-10 行代码是在 Sync Queue 队列内有竞争线程时进入. 为空时会走到 enq(node), 这里是在竞争为空时将竞争线程入队的操作. 然后返回当前竞争的线程 node 此时 Sync Queue 如图
我们假设 node_1 是已经获取到锁的结点, node_2 即为我们当前操作的结点.
返回 acquire 接着看 addWaiter 返回的 node 直接作为参数给了 acquireQueued, 这个方法就是主要的 node 竞争锁方法.
- final boolean acquireQueued(final Node node, int arg) {
- // 获取锁成功失败标记
- boolean failed = true;
- try {
- // 当前竞争线程的中断标记
- boolean interrupted = false;
- // 自旋竞争锁, 竞争不到锁的话, 线程又没有中断
- // 则一直在这儿循环
- for (;;) {
- // 获取当前线程的前驱结点
- final Node p = node.predecessor();
- // 如果前驱结点是头结点, 则尝试去 tryAcquire
- // 这个逻辑我们之前看过, 当前线程未获取到锁的情况下
- // 在 AQS 的 state 字段不为 0 时, 则返回 false
- if (p == head && tryAcquire(arg)) {
- // 进入到这里, 说明要不就是当前线程在重复获取
- // 要不就是前边的结点释放锁, state 归 0, 这里获取到
- setHead(node);
- p.next = null; // help GC
- // 标识竞争锁成功
- failed = false;
- // 这个方法不响应线程中断, 但是会返回线程在竞争锁过程中
- // 中断标记返回
- return interrupted;
- }
- // 若获取锁失败, 则到这里. 这里的逻辑主要在 If 判断
- // 的两个方法中, 用来将当前线程挂起的, 具体逻辑
- // 看下面
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
park 就是停下的意思. 所以这个方法从名字上也比较好理解, 就是 挂起线程并且检查线程的中断状态. 这里要注意, LockSupport.part(this) 方法是会在线程中断时自动唤醒的
- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this);
- return Thread.interrupted();
- }
这个方法传入了当前竞争结点及其前驱结点
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- // 前驱结点的等待状态. 这里只需要记住, 我们这里考虑的是
- // 非共享锁, 非公平锁的 AQS. 所以, 只需要确保当前竞争
- // 结点的前驱结点状态为 SIGNAL 就好. 剩下的状态,
- // 与我们此时研究的情况而言没有用
- int ws = pred.waitStatus;
- if (ws == Node.SIGNAL)
- /*
- * This node has already set status asking a release
- * to signal it, so it can safely park.
- */
- return true;
- if (ws> 0) {
- /*
- * Predecessor was cancelled. Skip over predecessors and
- * indicate retry.
- */
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus> 0);
- pred.next = node;
- } else {
- /*
- * waitStatus must be 0 or PROPAGATE. Indicate that we
- * need a signal, but don't park yet. Caller will need to
- * retry to make sure it cannot acquire before parking.
- */
- // 如果前驱结点的 status 为 0, 则将其改为 SIGNAL
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
从 shouldParkAfterFailedAcquire 可以看出来, 在确保前驱结点 status 为 SIGNAL 时, 就可以放心的去 unsafe.park() 了. 之所以要为 SIGNAL, 是因为这个状态含义为: 当前结点 OVER 时要唤醒后继结点.
所以不难推出, 我们的结点现在就 park 在那了. 等他前驱结点释放锁, 或者自己 interrupt 来唤醒, 但因为这个方法是无视中断的, 所以即使 interrupt 了, 只是设置了一个标记位, 但仍然在循环中.
这里假设前驱结点获取锁后释放, 则当前结点在 parkAndCheckInterrupt() 方法中被唤醒, 而后再次循环 for(;;), 这次会在第一个 if 中就进入, 当前结点获取到锁, 然后重置 Head 指向的结点等, 返回当前线程的中断标记.
返回 acquire
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
if 中的 selfInterrupt() 方法只是去重新设置当前线程的中断标记位. 这是因为获取线程中断状态的方法, 在返回状态字段的同时, 也会重置字段, 所以需要标记后重新设置相应的值.
下面我们看下 AQS 释放锁的接口方法
- ReentrantLock.unlock
- public void unlock() {
- sync.release(1);
- }
追进去
- ReentrantLock.release
- public final boolean release(int arg) {
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
tryRelease 是在 NonFairLock 中的实现的, 如果是释放成功, 则在 Head 存在并且状态不为 0(其实可以理解为值为 SIGNAL 时) 去唤醒 Head 的后继结点.
NonFailLock.tryRelease
下面看下 NonFairLock 的 tryRelease 方法的实现
- protected final boolean tryRelease(int releases) {
- int c = getState() - releases;
- if (Thread.currentThread() != getExclusiveOwnerThread())
- throw new IllegalMonitorStateException();
- boolean free = false;
- if (c == 0) {
- free = true;
- setExclusiveOwnerThread(null);
- }
- setState(c);
- return free;
- }
可以看出, 这个 tryRelease 其实就是去判断下是不是当前线程拥有锁, 是的话, 判断下当前的释放锁是否完全释放, 因为锁可以重复获取, 完全释放的话, 就设置 state 为 0, 代表 AQS 的锁已经被释放了.
来源: https://juejin.im/post/5c45bde551882523f0261f6f