要深入了解 java 并发知识, AbstractQueuedSynchronizer(AQS)是必须要拿出来深入学习的, AQS 可以说是贯穿了整个 JUC 并发包, 例如 ReentrantLock,CountDownLatch,CyclicBarrier 等并发类都涉及到了 AQS. 接下来就对 AQS 的实现原理进行分析.
在开始分析之前, 势必先将 CLH 同步队列了解一下
CLH 同步队列
CLH 自旋锁: CLH(Craig, Landin, and Hagersten locks): 是一个自旋锁, 能确保无饥饿性, 提供先来先服务的公平性. CLH 自旋锁是一种基于隐式链表 (节点里面没有 next 指针) 的可扩展, 高性能, 公平的自旋锁, 申请线程只在本地变量上自旋, 它不断轮询前驱的状态, 如果发现前驱释放了锁就结束自旋.
AQS 中的 CLH 同步队列: AQS 中 CLH 同步队列是对 CLH 自旋锁进行了优化, 其主要从两方面进行了改造: 节点的结构与节点等待机制.
在结构上引入了头结点和尾节点, 他们分别指向队列的头和尾, 尝试获取锁, 入队列, 释放锁等实现都与头尾节点相关, 并且每个节点都引入前驱节点和后后续节点的引用;
在等待机制上由原来的自旋改成阻塞唤醒.
源码中 CLH 的简单表示
- * +------+ prev +-----+ +-----+
- * head | | <---- | | <---- | | tail
- * +------+ +-----+ +-----+
Node 就是实现 CLH 同步队列的数据结构, 计算下就了解下该类的相关字段属性
AQS 中重要的内部类 Node
- static final class Node {
- // 共享模式
- static final Node SHARED = new Node();
- // 独占模式
- static final Node EXCLUSIVE = null;
- // 如果属性 waitStatus == Node.CANCELLED, 则表明该节点已经被取消
- static final int CANCELLED = 1;
- // 如果属性 waitStatus == Node.SIGNAL, 则表明后继节点等待被唤醒
- static final int SIGNAL = -1;
- // 如果属性 waitStatus == Node.CONDITION, 则表明是 Condition 模式中的节点等待条件唤醒
- static final int CONDITION = -2;
- // 如果属性 waitStatus == Node.PROPAGATE, 在共享模式下, 传播式唤醒后继节点
- static final int PROPAGATE = -3;
- // 用于标记当前节点的状态, 取值为 1,-1,-2,-3, 分别对应以上 4 个取值
- volatile int waitStatus;
- // 前驱节点
- volatile Node prev;
- // 后继节点
- volatile Node next;
- // 当前节点对应的线程, 阻塞与唤醒的线程
- volatile Thread thread;
- // 使用 Condtion 时 (共享模式下) 的后继节点, 在独占模式中不会使用
- Node nextWaiter;
- final boolean isShared() {
- return nextWaiter == SHARED;
- }
- final Node predecessor() throws NullPointerException {
- Node p = prev;
- if (p == null)
- throw new NullPointerException();
- else
- return p;
- }
- Node() { // Used to establish initial head or SHARED marker
- }
- Node(Thread thread, Node mode) { // Used by addWaiter
- this.nextWaiter = mode;
- this.thread = thread;
- }
- Node(Thread thread, int waitStatus) { // Used by Condition
- this.waitStatus = waitStatus;
- this.thread = thread;
- }
- }
下面就开始着重对 AQS 中的重要方法进行分析说明
获取锁
1.acquire 开始获取锁
- public final void acquire(int arg) {
- // 如果 tryAcquire 返回 true, 即获取到锁就停止执行, 否则继续向下执行向同步队列尾部添加节点
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
tryAcquire 是用于获取锁的方法, 在 AQS 中默认没有实现具体逻辑, 由子类自定义实现.
如果返回 true 则说明获取到锁, 否则需要将当前线程封装为 Node 节点添加到同步队列尾部.
2. 当前节点入队列
将当前执行的线程封装为 Node 节点并加入到队尾
- private Node addWaiter(Node mode) {// 首先尝试快速添加到队尾, 失败再正常执行添加到队尾
- Node node = new Node(Thread.currentThread(), mode);
- // 快速方式尝试直接添加到队尾
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- // 如果快速添加到队尾失败则执行 enq(node)添加到队尾
- enq(node);
- return node;
- }
enq 方法循环遍历添加到队尾
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- if (t == null) { // Must initialize
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {
- // 添加到队列尾部
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
addWaiter(Node mode)方法执行完后, 接下来执行 acquireQueued 方法, 返回的是该线程是否需要中断, 该方法也是不停地循环获取锁, 如果前节点是头节点, 则尝试获取锁, 获取锁成功则返回是否需要中断标志, 如果获取锁失败, 则判断是否需要阻塞并阻塞线程
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- // 标记是否需要被中断
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- // 如果前驱节点是头节点, 并且获取锁成功, 则返回
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- // 判断获取锁失败后是否需要阻塞当前线程, 如果阻塞线程后再判断是否需要被中断线程
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
shouldParkAfterFailedAcquire(p, node)方法判断是否需要阻塞当前线程
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus;
- // 如果 ws == Node.SIGNAL, 则说明当前线程已经准备好被唤醒, 因此现在可以被阻塞, 之后等待被唤醒
- if (ws == Node.SIGNAL)
- return true;
- if (ws> 0) {
- // 如果 ws> 0, 说明当前节点已经被取消, 因此循环剔除 ws>0 的前驱节点
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus> 0);
- pred.next = node;
- } else {
- // 如果 ws<=0, 则将标志位设置为 Node.SIGNAL, 当还不可被阻塞, 需要的等待下次执行 shouldParkAfterFailedAcquire 判断是否需要阻塞
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
如果 shouldParkAfterFailedAcquire(p, node)方法返回 true, 说明需要阻塞当前线程, 则执行 parkAndCheckInterrupt 方法阻塞线程, 并返回阻塞过程中线程是否被中断
- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this); // 阻塞线程, 等待 unpark()或 interrupt()唤醒自己
- // 线程被唤醒后查看是否被中断过.
- return Thread.interrupted();
- }
那么重新回到获取锁的方法 acquire 方法, 如果 acquireQueued(final Node node, int arg)返回 true, 也即是阻塞过程中线程被中断, 则执行中断线程操作 selfInterrupt()
- public final void acquire(int arg) {
- // 如果 tryAcquire 返回 true, 即获取到锁就停止执行, 否则继续向下执行向同步队列尾部添加节点, 然后判断是否被中断过, 是则执行中断
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
中断当前线程
- static void selfInterrupt() {
- Thread.currentThread().interrupt();
- }
小结
AQS 获取锁的过程:
1. 执行 tryAcquire 方法获取锁, 如果获取锁成功则直接返回, 否则执行步骤 2
2. 执行 addWaiter 方法将当前线程封装位 Node 节点并添加到同步队列尾部, 执行步骤 3
3. 执行 acquireQueued 循环尝试获取锁,, 如果获取锁成功, 则判断返回中断标志位, 如果获取锁失败则调用 shouldParkAfterFailedAcquire 方法判断是否需要阻塞当前线程, 如果需要阻塞线程则调用 parkAndCheckInterrupt 阻塞线程, 并在被唤醒后再判断再阻塞过程中是否被中断过.
4. 如果 acquireQueued 返回 true, 说明在阻塞过程中线程被中断过, 则执行 selfInterrupt 中断线程
好了, 以上就是 AQS 的锁获取过程, 关于锁释放的分析会在后续继续输出.
来源: https://www.cnblogs.com/zyg-coding/p/12045194.html