本博客系列是学习并发编程过程中的记录总结. 由于文章比较多, 写的时间也比较散, 所以我整理了个目录贴(传送门), 方便查阅.
并发编程系列博客传送门
本文参考了 [Java 多线程进阶(六)-- J.U.C 之 locks 框架: AQS 综述(1) https://segmentfault.com/a/1190000015562787 ] 和 Java 技术之 AQS 详解 https://www.jianshu.com/p/da9d051dcc3d 两篇文章.
AQS 简介
AbstractQueuedSynchronizer (简称 AQS)类是整个 JUC 包的核心类. JUC 中的 ReentrantLock,ReentrantReadWriteLock,CountDownLatch,Semaphore 和 LimitLatch 等同步工具都是基于 AQS 实现的.
AQS 分离出了构建同步器时的通用关注点, 这些关注点主要包括如下:
资源是可以被同时访问? 还是在同一时间只能被一个线程访问?(共享 / 独占功能)
访问资源的线程如何进行并发管理?(等待队列)
如果线程等不及资源了, 如何从等待队列退出?(超时 / 中断)
这些关注点都是围绕着资源 -- 同步状态 (synchronization state) 来展开的, AQS 将这些通用的关注点封装成了一个个模板方法, 让子类可以直接使用.
AQS 留给用户的只有两个问题:
什么是资源
什么情况下资源是可以被访问的
这样一来, 定义同步器的难度就大大降低了. 用户只要解决好上面两个问题, 就能构建出一个性能优秀的同步器.
下面是几个常见的同步器对资源的定义:
同步器 | 资源的定义 |
---|---|
ReentrantLock | 资源表示独占锁。State 为 0 表示锁可用;为 1 表示被占用;为 N 表示重入的次数 |
ReentrantReadWriteLock | 资源表示共享的读锁和独占的写锁。state 逻辑上被分成两个 16 位的 unsigned short,分别记录读锁被多少线程使用和写锁被重入的次数。 |
CountDownLatch | 资源表示倒数计数器。State 为 0 表示计数器归零,所有线程都可以访问资源;为 N 表示计数器未归零,所有线程都需要阻塞。 |
Semaphore | 资源表示信号量或者令牌。State≤0 表示没有令牌可用,所有线程都需要阻塞;大于 0 表示由令牌可用,线程每获取一个令牌,State 减 1,线程没释放一个令牌,State 加 1。 |
AQS 原理
上面一节中介绍到 AQS 抽象出了三个关注点, 下面就具体看下 AQS 是如果解决这三个问题的.
同步状态的管理
同步状态, 其实就是资源. AQS 使用单个 int(32 位)来保存同步状态, 并暴露出 getState,setState 以及 compareAndSetState 操作来读取和更新这个状态.
- private volatile int state;
- protected final int getState() {
- return state;
- }
- protected final void setState(int newState) {
- state = newState;
- }
- protected final boolean compareAndSetState(int expect, int update) {
- // See below for intrinsics setup to support this
- return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
- }
线程的阻塞和唤醒
在 JDK1.5 之前, 除了内置的监视器机制外, 没有其它方法可以安全且便捷得阻塞和唤醒当前线程.
JDK1.5 以后, java.util.concurrent.locks 包提供了 LockSupport https://segmentfault.com/a/1190000015562456 类来作为线程阻塞和唤醒的工具.
等待队列
等待队列, 是 AQS 框架的核心, 整个框架的关键其实就是如何在并发状态下管理被阻塞的线程.
等待队列是严格的 FIFO 队列, 是 Craig,Landin 和 Hagersten 锁 (CLH 锁) 的一种变种, 采用双向循环链表实现, 因此也叫 CLH 队列.
1. 节点定义
CLH 队列中的结点是对线程的包装, 结点一共有两种类型: 独占 (EXCLUSIVE) 和共享(SHARED).
每种类型的结点都有一些状态, 其中独占结点使用其中的 CANCELLED(1),SIGNAL(-1),CONDITION(-2), 共享结点使用其中的 CANCELLED(1),SIGNAL(-1),PROPAGATE(-3).
结点状态 | 值 | 描述 |
---|---|---|
CANCELLED | 1 | 取消。表示后驱结点被中断或超时,需要移出队列 |
SIGNAL | -1 | 发信号。表示后驱结点被阻塞了(当前结点在入队后、阻塞前,应确保将其 prev 结点类型改为 SIGNAL,以便 prev 结点取消或释放时将当前结点唤醒。) |
CONDITION | -2 | Condition 专用。表示当前结点在 Condition 队列中,因为等待某个条件而被阻塞了 |
PROPAGATE | -3 | 传播。适用于共享模式(比如连续的读操作结点可以依次进入临界区,设为 PROPAGATE 有助于实现这种迭代操作。) |
INITIAL | 0 | 默认。新结点会处于这种状态 |
AQS 使用 CLH 队列实现线程的结构管理, 而 CLH 结构正是用前一结点某一属性表示当前结点的状态, 之所以这种做是因为在双向链表的结构下, 这样更容易实现取消和超时功能.
next 指针: 用于维护队列顺序, 当临界区的资源被释放时, 头结点通过 next 指针找到队首结点.
prev 指针: 用于在结点 (线程) 被取消时, 让当前结点的前驱直接指向当前结点的后驱完成出队动作.
- static final class Node {
- // 共享模式结点
- static final Node SHARED = new Node();
- // 独占模式结点
- static final Node EXCLUSIVE = null;
- static final int CANCELLED = 1;
- static final int SIGNAL = -1;
- static final int CONDITION = -2;
- static final int PROPAGATE = -3;
- /**
- * INITAL: 0 - 默认, 新结点会处于这种状态.
- * CANCELLED: 1 - 取消, 表示后续结点被中断或超时, 需要移出队列;
- * SIGNAL: -1- 发信号, 表示后续结点被阻塞了;(当前结点在入队后, 阻塞前, 应确保将其 prev 结点类型改为 SIGNAL, 以便 prev 结点取消或释放时将当前结点唤醒.)
- * CONDITION: -2- Condition 专用, 表示当前结点在 Condition 队列中, 因为等待某个条件而被阻塞了;
- * PROPAGATE: -3- 传播, 适用于共享模式.(比如连续的读操作结点可以依次进入临界区, 设为 PROPAGATE 有助于实现这种迭代操作.)
- *
- * waitStatus 表示的是后续结点状态, 这是因为 AQS 中使用 CLH 队列实现线程的结构管理, 而 CLH 结构正是用前一结点某一属性表示当前结点的状态, 这样更容易实现取消和超时功能.
- */
- volatile int waitStatus;
- // 前驱指针
- volatile Node prev;
- // 后驱指针
- volatile Node next;
- // 结点所包装的线程
- volatile Thread thread;
- // Condition 队列使用, 存储 condition 队列中的后继节点
- Node nextWaiter;
- Node() {
- }
- Node(Thread thread, Node mode) {
- this.nextWaiter = mode;
- this.thread = thread;
- }
- }
2. 队列定义
对于 CLH 队列, 当线程请求资源时, 如果请求不到, 会将线程包装成结点, 将其挂载在队列尾部.
CLH 队列的示意图如下:
初始状态, 队列 head 和 tail 都指向空
首个线程入队, 先创建一个空的头结点, 然后以自旋的方式不断尝试插入一个包含当前线程的新结点
再次加入新节点, 新加入的节点会被放置到队列的尾部.(PS: 看下了代码, AQS 的线程管理队列好像是一个双向循环队列, 这边这个图是不是有点问题???)
AQS 的方法介绍
用户需要自己重写的方法
上面介绍到 AQS 已经帮用户解决了同步器定义过程中的大部分问题, 只将下面两个问题丢给用户解决:
什么是资源
什么情况下资源是可以被访问的
具体的, AQS 是通过暴露以下 API 来让用户解决上面的问题的.
钩子方法 | 描述 |
---|---|
tryAcquire | 独占方式。尝试获取资源,成功则返回 true,失败则返回 false。 |
tryRelease | 独占方式。尝试释放资源,成功则返回 true,失败则返回 false。 |
tryAcquireShared | 共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 |
tryReleaseShared | 共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回 true,否则返回 false。 |
isHeldExclusively | 该线程是否正在独占资源。只有用到 condition 才需要去实现它。 |
如果你需要实现一个自己的同步器, 一般情况下只要继承 AQS , 并重写 AQS 中的这个几个方法就行了. 至于具体线程等待队列的维护(如获取资源失败入队 / 唤醒出队等),AQS 已经在顶层实现好了. 要不怎么说 Doug Lea http://gee.cs.oswego.edu/dl/papers/aqs.pdf 贴心呢.
需要注意的是: 如果你没在子类中重写这几个方法就直接调用了, 会直接抛出异常. 所以, 在你调用这些方法之前必须重写他们. 不使用的话可以不重写.
AQS 提供的一系列模板方法
查看 AQS 的源码我们就可以发现这个类提供了很多方法, 看起来让人 "眼花缭乱" 的. 但是最主要的两类方法就是获取资源的方法和释放资源的方法. 因此我们抓住主要矛盾就行了:
- public final void acquire(int arg) // 独占模式的获取资源
- public final boolean release(int arg) // 独占模式的释放资源
- public final void acquireShared(int arg) // 共享模式的获取资源
- public final boolean releaseShared(int arg) // 共享模式的释放资源
acquire(int)方法
该方法以独占方式获取资源, 如果获取到资源, 线程继续往下执行, 否则进入等待队列, 直到获取到资源为止, 且整个过程忽略中断的影响. 该方法是独占模式下线程获取共享资源的顶层入口.
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
下面分析下这个 acquire 方法的具体执行流程:
step1: 首先这个方法调用了用户自己实现的方法 tryAcquire 方法尝试获取资源, 如果这个方法返回 true, 也就是表示获取资源成功, 那么整个 acquire 方法就执行结束了, 线程继续往下执行;
step2: 如果 tryAcquir 方法返回 false, 也就表示尝试获取资源失败. 这时 acquire 方法会先调用 addWaiter 方法将当前线程封装成 Node 类并加入一个 FIFO 的双向队列的尾部.
step3: 再看 acquireQueued 这个关键方法. 首先要注意的是这个方法中哪个无条件的 for 循环, 这个 for 循环说明 acquireQueued 方法一直在自旋尝试获取资源. 进入 for 循环后, 首先判断了当前节点的前继节点是不是头节点, 如果是的话就再次尝试获取资源, 获取资源成功的话就直接返回 false(表示未被中断过)
假如还是没有获取资源成功, 判断是否需要让当前节点进入 waiting 状态, 经过 shouldParkAfterFailedAcquire 这个方法判断, 如果需要让线程进入 waiting 状态的话, 就调用 LockSupport 的 park 方法让线程进入 waiting 状态. 进入 waiting 状态后, 这线程等待被 interupt 或者 unpark(在 release 操作中会进行这样的操作, 可以参见后面的代码). 这个线程被唤醒后继续执行 for 循环来尝试获取资源.
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- // 首先判断了当前节点的前继节点是不是头节点, 如果是的话就再次尝试获取资源,
- // 获取资源成功的话就直接返回 false(表示未被中断过)
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- // 判断是否需要让当前节点进入 waiting 状态
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- // 如果在整个等待过程中被中断过, 则返回 true, 否则返回 false.
- // 如果线程在等待过程中被中断过, 它是不响应的. 只是获取资源后才再进行自我中断 selfInterrupt(), 将中断补上.
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
以上就是 acquire 方法的简单分析.
单独看这个方法的话可能会不太清晰, 结合 ReentrantLock,ReentrantReadWriteLock,CountDownLatch,Semaphore 和 LimitLatch 等同步工具看这个代码的话就会好理解很多.
release(int)方法
release(int)方法是独占模式下线程释放共享资源的顶层入口. 它会释放指定量的资源, 如果彻底释放了(即 state=0), 它会唤醒等待队列里的其他线程来获取资源.
- public final boolean release(int arg) {
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
- // 上面已经讲过了, 需要用户自定义实现
- protected boolean tryRelease(int arg) {
- throw new UnsupportedOperationException();
- }
- private void unparkSuccessor(Node node) {
- /*
- * If status is negative (i.e., possibly needing signal) try
- * to clear in anticipation of signalling. It is OK if this
- * fails or if status is changed by waiting thread.
- */
- int ws = node.waitStatus;
- if (ws <0)
- compareAndSetWaitStatus(node, ws, 0);
- /*
- * Thread to unpark is held in successor, which is normally
- * just the next node. But if cancelled or apparently null,
- * traverse backwards from tail to find the actual
- * non-cancelled successor.
- */
- Node s = node.next;
- if (s == null || s.waitStatus> 0) {
- s = null;
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- if (s != null)
- LockSupport.unpark(s.thread);
- }
与 acquire()方法中的 tryAcquire()类似, tryRelease()方法也是需要独占模式的自定义同步器去实现的. 正常来说, tryRelease()都会成功的, 因为这是独占模式, 该线程来释放资源, 那么它肯定已经拿到独占资源了, 直接减掉相应量的资源即可(state-=arg), 也不需要考虑线程安全的问题.
但要注意它的返回值, 上面已经提到了, release()是根据 tryRelease()的返回值来判断该线程是否已经完成释放掉资源了! 所以自义定同步器在实现时, 如果已经彻底释放资源(state=0), 要返回 true, 否则返回 false.
unparkSuccessor(Node)方法用于唤醒等待队列中下一个线程. 这里要注意的是, 下一个线程并不一定是当前节点的 next 节点, 而是下一个可以用来唤醒的线程, 如果这个节点存在, 调用 unpark()方法唤醒.
总之, release()是独占模式下线程释放共享资源的顶层入口. 它会释放指定量的资源, 如果彻底释放了(即 state=0), 它会唤醒等待队列里的其他线程来获取资源.
acquireShared(int)方法
acquireShared(int)方法是共享模式下线程获取共享资源的顶层入口. 它会获取指定量的资源, 获取成功则直接返回, 获取失败则进入等待队列, 直到获取到资源为止, 整个过程忽略中断.
- public final void acquireShared(int arg) {
- //tryAcquireShared 需要用户自定义实现
- if (tryAcquireShared(arg) <0)
- doAcquireShared(arg);
- }
可以发现, 这个方法的关键实现其实是获取资源失败后, 怎么管理线程. 也就是 doAcquireShared 的逻辑.
- // 不响应中断
- private void doAcquireShared(int arg) {
- final Node node = addWaiter(Node.SHARED);
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- if (p == head) {
- int r = tryAcquireShared(arg);
- if (r>= 0) {
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- if (interrupted)
- selfInterrupt();
- failed = false;
- return;
- }
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
可以看出, doAcquireShared 的逻辑和 acquireQueued 的逻辑差不多. 将当前线程加入等待队列尾部休息, 直到其他线程释放资源唤醒自己, 自己成功拿到相应量的资源后才返回.
简单总结下 acquireShared 的流程:
step1:tryAcquireShared()尝试获取资源, 成功则直接返回;
step2: 失败则通过 doAcquireShared()进入等待队列 park(), 直到被 unpark()/interrupt()并成功获取到资源才返回. 整个等待过程也是忽略中断的.
releaseShared(int)方法
releaseShared(int)方法是共享模式下线程释放共享资源的顶层入口. 它会释放指定量的资源, 如果成功释放且允许唤醒等待线程, 它会唤醒等待队列里的其他线程来获取资源.
- public final boolean releaseShared(int arg) {
- if (tryReleaseShared(arg)) {
- doReleaseShared();
- return true;
- }
- return false;
- }
释放掉资源后, 唤醒后继. 跟独占模式下的 release()相似, 但有一点稍微需要注意: 独占模式下的 tryRelease()在完全释放掉资源 (state=0) 后, 才会返回 true 去唤醒其他线程, 这主要是基于独占下可重入的考量; 而共享模式下的 releaseShared()则没有这种要求, 共享模式实质就是控制一定量的线程并发执行, 那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点.
参考
- https://segmentfault.com/a/1190000015562787
- https://www.jianshu.com/p/da9d051dcc3d
来源: https://www.cnblogs.com/54chensongxia/p/11970870.html