上文介绍了 AQS 的一些基础知识, 包括 CLH 锁的原理和 AQS 的一些数据结构, 这篇文章中我们来分析一下 AQS 的方法. AQS 是一个抽象类, 定义了几个模板方法交给子类去实现, 分别是:
- protected boolean tryAcquire(int arg)
- protected boolean tryRelease(int arg)
- protected int tryAcquireShared(int arg)
- protected boolean tryReleaseShared(int arg)
本文不分析获取共享锁的情况.
先来看一下比较简单的一个子类实现 ReentrantLock 中的内部类 FairSync, 下面就直接分析一下 ReentrantLock 的实现, ReentrantLock 有一个字段
private final Sync sync;
, 而 Sync 正是继承自 AQS. 来看一下 ReentrantLock 的构造函数:
- public ReentrantLock() {
- sync = new NonfairSync();
- }
- public ReentrantLock(boolean fair) {
- sync = fair ? new FairSync() : new NonfairSync();
- }
其中 NonfairSync 和 FairSync 均继承自 Sync 类, 区别只是获取锁的方式是否公平:
- //NonfairSync
- final void lock() {
- if (compareAndSetState(0, 1))
- setExclusiveOwnerThread(Thread.currentThread());
- else
- acquire(1);
- }
- //FairSync
- final void lock() {
- acquire(1);
- }
可以看出, 非公平锁在获取锁的时候会先尝试一下是否能直接获取锁, 如果可以, 就不必进行排队 (acquire 方法), 而公平锁会直接进入等待队列排队.
看一下 AQS 中 acquire 方法的实现:
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
acquire 方法大体的逻辑就是先调用 tryAcquire 尝试获取锁, 如果失败的话就新建一个 Node 对象并添加到 AQS 的等待队列中去.
- private Node addWaiter(Node mode) {
- // 使用当前线程新建一个 Node 对象, 并且设置锁为排它锁
- Node node = new Node(Thread.currentThread(), mode);
- // Try the fast path of enq; backup to full enq on failure
- // 直接使用 compareAndSetTail 把当前 Node 添加到等待队列的队尾, 由于多线程的问题, 可能导致设置失败, 此时转到 enq 方法去执行
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- // 添加 Node 失败, 调用 enq 方法无限循环地添加
- enq(node);
- return node;
- }
addWaiter 方法新建一个 Node 对象, 并且设置成 EXCLUSIVE 模式, 也就是排它锁. 在该方法中, 先使用原子操作尝试把新建节点添加到等待队列的队尾, 如果添加成功则直接返回, 否则调用 enq() 方法添加.
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- // 如果等待队列还没有建立, 那么先初始化, 并且设置头, 尾节点都是当前节点
- if (t == null) { // Must initialize
- // 如果是等待队列的初始化, 则创建一个哑节点, 需要注意的是此 if 语句并没有返回, 而是在 enq 的下轮循环中将 node 节点添加到这个哑节点后
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {
- node.prev = t;
- // 依然是调用原子操作来把当前节点添加到队尾, 不过由于代码处于无限循环当中, 总会有添加成功的时候
- if (compareAndSetTail(t, node)) {
- // 可以看到把原尾节点的 next 字段设置成 node 和把 node 设置成尾节点合并在一起并不是个原子操作
- t.next = node;
- return t;
- }
- }
- }
- }
在此要说明的是, AQS 的等待队列是使用 lazy 模式来初始化的, 也就是说, 如果一直没有线程来竞争锁, 那么等待队列一直都不会建立. 而等待队列的初始化是在 enq() 方法中进行的.
由于在 addWaiter 的过程中可能其他线程已经释放了锁, 所以在 acquireQueued 方法中把新建 Node 阻塞之前可以再尝试一下获取锁, 如果仍然失败, 再阻塞该线程:
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- // 如果前驱节点为头节点, 而且尝试获取锁成功了, 那么把当前节点设置成头节点
- // 这也只是一条快速路径
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
AQS 等待队列中的头节点代表当前持有锁的线程, 所以当一个节点的前驱节点是头节点的时候, 可以尝试一下获取锁 (因为可能锁持有线程已经释放了锁), 如果获取锁成功, 那么把该节点设置成头节点并返回. 如果当前节点的前驱节点并不是头节点, 那么不给它尝试获取锁的机会, 因为 AQS 等待队列是 FIFO 的, 此时没有轮到它获取锁.
如果该节点尝试获取锁失败, 那么调用
shouldParkAfterFailedAcquire
方法判断是否需要阻塞当前节点:
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus;
- // 如果前驱节点的状态是 SIGNAL, 说明前驱节点已经已经做好了后继节点阻塞的准备, 可以直接阻塞当前节点
- if (ws == Node.SIGNAL)
- return true;
- // 只有 CANCELLED 的值大于 0
- // 如果前驱节点的状态是 CANCELLED, 那么需要继续向前遍历找到一个未取消的节点做为当前节点的前驱节点
- if (ws> 0) {
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus> 0);
- pred.next = node;
- } else {
- // 此时前驱节点的状态值为 0 或者 PROPAGATE(用于共享锁, 本文暂不考虑这种状态)
- // 将前驱节点的值设置成 SIGNAL
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
如果
shouldParkAfterFailedAcquire
方法返回 false, 那么在 acquireQueued 方法中进行下一轮循环 (因为
shouldParkAfterFailedAcquire
方法可能修改了当前节点的前驱节点或者前驱节点的状态), 此时当前节点的前驱节点可能变成头节点或者前驱节点的状态变成 SIGNAL. 如果
shouldParkAfterFailedAcquire
方法返回 true, 那么就调用
parkAndCheckInterrupt
方法阻塞当前节点代表的线程:
- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this);
- return Thread.interrupted();
- }
至此, 获取锁的 acquire 方法已经分析完毕. 再来看看释放锁的 release 方法:
- public final boolean release(int arg) {
- if (tryRelease(arg)) {
- Node h = head;
- // 如果某个节点有后继节点
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
release 方法调用 tryRelease(模板方法, 交给子类去实现) 来释放相应数量的信号量, 如果方法调用成功, 那么就唤醒等待队列头节点的后继节点.
- private void unparkSuccessor(Node node) {
- int ws = node.waitStatus;
- if (ws <0)
- compareAndSetWaitStatus(node, ws, 0);
- Node s = node.next;
- // 即使某个节点的 next 字段为空也不能判定该节点没有后继节点, 因为创建一个节点并且把它的前驱节点的 next 字段指向它这并不是个原子操作, 所以在多线程的环境中可能出现 node 节点的 next 字段为空但是实际上有后继节点的情况, 此时需要从队列的 tail 节点向前遍历来寻找 node 的后继节点
- if (s == null || s.waitStatus> 0) {
- s = null;
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- if (s != null)
- LockSupport.unpark(s.thread);
- }
假设没有中断的情况下, 在 unparkSuccessor 方法中 unpark 的线程会继续在方法 acquireQueued 方法的运行, 并且其
parkAndCheckInterrupt
方法会返回 false, 然后就会再次尝试获取锁, 此次获取锁就会成功了, 然后 acquire 方法就会返回, 代表该线程已经可以执行临界区代码了.
来源: https://blog.csdn.net/Q_AN1314/article/details/79873200