一, 什么是 AQS ?
AQS 即 AbstractQueuedSynchronizer 的缩写, 是并发编程中实现同步器的一个框架. 框架, 框架, 重要的事情说三遍, 框架就是说它帮你处理了很大一部分的逻辑, 其它功能需要你来扩展. 想想你使用 Spring 框架的场景, Spring 帮助开发者实现 IOC 容器的 bean 依赖管理, 标签解析等, 我们只需要对 bean 进行配置即可, 其他不用管.
AQS 基于一个 FIFO 双向队列实现, 被设计给那些依赖一个代表状态的原子 int 值的同步器使用. 我们都知道, 既然叫同步器, 那个肯定有个代表同步状态 (临界资源) 的东西, 在 AQS 中即为一个叫 state 的 int 值, 该值通过 CAS 进行原子修改.
在 AQS 中存在一个 FIFO 队列, 队列中的节点表示被阻塞的线程, 队列节点元素有 4 种类型, 每种类型表示线程被阻塞的原因, 这四种类型分别是:
CANCELLED : 表示该线程是因为超时或者中断原因而被放到队列中
CONDITION : 表示该线程是因为某个条件不满足而被放到队列中, 需要等待一个条件, 直到条件成立后才会出队
SIGNAL : 表示该线程需要被唤醒
PROPAGATE : 表示在共享模式下, 当前节点执行释放 release 操作后, 当前结点需要传播通知给后面所有节点
由于一个共享资源同一时间只能由一条线程持有, 也可以被多个线程持有, 因此 AQS 中存在两种模式, 如下:
1, 独占模式
独占模式表示共享状态值 state 每次只能由一条线程持有, 其他线程如果需要获取, 则需要阻塞, 如 JUC 中的 ReentrantLock
2, 共享模式
共享模式表示共享状态值 state 每次可以由多个线程持有, 如 JUC 中的 CountDownLatch
二, AQS 中的核心数据结构和方法
1, 既然 AQS 是基于一个 FIFO 队列的框架, 那么我们先来看下队列的元素节点 Node 的数据结构, 源码如下:
- static final class Node {
- /** 共享模式 */
- static final Node SHARED = new Node();
- /** 独占模式 */
- static final Node EXCLUSIVE = null;
- /** 标记线程由于中断或超时, 需要被取消, 即踢出队列 */
- static final int CANCELLED = 1;
- /** 线程需要被唤醒 */
- static final int SIGNAL = -1;
- /** 线程正在等待一个条件 */
- static final int CONDITION = -2;
- /**
- * 传播
- */
- static final int PROPAGATE = -3;
- // waitStatus 只取上面 CANCELLED,SIGNAL,CONDITION,PROPAGATE 四种取值之一
- volatile int waitStatus;
- // 表示前驱节点
- volatile Node prev;
- // 表示后继节点
- volatile Node next;
- // 队列元素需要关联一个线程对象
- volatile Thread thread;
- // 表示下一个 waitStatus 值为 CONDITION 的节点
- Node nextWaiter;
- /**
- * 是否当前结点是处于共享模式
- */
- final boolean isShared() {
- return nextWaiter == SHARED;
- }
- /**
- * 返回前一个节点, 如果没有前一个节点, 则抛出空指针异常
- */
- final Node predecessor() throws NullPointerException {
- // 获取前一个节点的指针
- Node p = prev;
- // 如果前一个节点不存在
- if (p == null)
- throw new NullPointerException();
- else
- // 否则返回
- return p;
- }
- // 初始化头节点使用
- Node() {}
- /**
- * 当有线程需要入队时, 那么就创建一个新节点, 然后关联该线程对象, 由 addWaiter()方法调用
- */
- Node(Thread thread, Node mode) { // Used by addWaiter
- this.nextWaiter = mode;
- this.thread = thread;
- }
- /**
- * 一个线程需要等待一个条件阻塞了, 那么就创建一个新节点, 关联线程对象
- */
- Node(Thread thread, int waitStatus) { // Used by Condition
- this.waitStatus = waitStatus;
- this.thread = thread;
- }
- }
总结下 Node 节点数据结构设计, 队列中的元素, 肯定是为了保存由于某种原因导致无法获取共享资源 state 而被入队的线程, 因此 Node 中使用了 waitStatus 表示节点入队的原因, 使用 Thread 对象来表示节点所关联的线程. 至于 prev,next, 则是一般双向队列数据结构必须提供的指针, 用于对队列进行相关操作.
2,AQS 中的共享状态值
之前提到, AQS 是基于一个共享的 int 类型的 state 值来实现同步器同步的, 其声明如下:
- /**
- * 同步状态值
- */
- private volatile int state;
- /**
- * 获取同步状态值
- */
- protected final int getState() {
- return state;
- }
- /**
- * 修改同步状态值
- */
- protected final void setState(int newState) {
- state = newState;
- }
由源码我们可以看出, AQS 声明了一个 int 类型的 state 值, 为了达到多线程同步的功能, 必然对该值的修改必须多线程可见, 因此, state 采用 volatile 修饰, 而且 getState()和 setState()方法采用 final 进行修饰, 目的是限制 AQS 的子类只能调用这两个方法对 state 的值进行设置和获取, 而不能对其进行重写自定义设置 / 获取逻辑.
AQS 中提供对 state 值修改的方法不仅仅只有 setState()和 getState(), 还有诸如采用 CAS 机制进行设置的
compareAndSetState()
方法, 同样, 该方法也是采用 final 修饰的, 不允许子类重写, 只能调用.
3,AQS 中的 tryXXX 方法
一般基于 AQS 实现的同步器, 如 ReentrantLock,CountDownLatch 等, 对于 state 的获取操作, 子类只需重写其 tryAcquire()和 tryAcquireShared()方法即可, 这两个方法分别对应独占模式和共享模式下对 state 的获取操作; 而对于释放操作, 子类只需重写 tryRelease()和 tryReleaseShared()方法即可.
至于如何维护队列的出队, 入队操作, 子类不用管, AQS 已经帮你做好了.
三, AQS 设计妙处
优秀的项目总会有亮点可挖, AQS 也是. 小编在看了 AQS 的源码之后, 结合其他作者相关博客, 总结了以下两点感觉很优秀的设计点, 这是我们应该学习的, 前辈总是那么优秀.
1, 自旋锁
当我们执行一个有确定结果的操作, 同时又需要并发正确执行, 通常可以采用自旋锁实现. 在 AQS 中, 自旋锁采用 死循环 + CAS 实现. 针对 AQS 中的 enq()进行讲解:
- private Node enq(final Node node) {
- // 死循环 + CAS , 解决入队并发问题
- /**
- * 假设有三个线程同时都需要入队操作, 那么使用死循环和 CAS 可保证并发安全, 同一时间只有一个节点安全入队, 入队失败的线程则循环重试
- *
- * 1, 如果不要死循环可以吗? 只用 CAS.
- * 不可以, 因为如果其他线程修改了 tail 的值, 导致 1 处代码返回 false, 那么方法 enq 方法将推出, 导致该入队的节点却没能入队
- *
- * 2, 如果只用死循环, 不需要 CAS 可以吗?
- * 不可以, 首先不需要使用 CAS, 那就没必要再使用死循环了, 再者, 如果不使用 CAS, 那么当执行 1 处代码时, 将会改变队列的结构
- */
- for (;;) {
- // 获取尾部节点
- Node t = tail;
- // 如果还没有初始化, 那么就初始化
- if (t == null) { // Must initialize
- if (compareAndSetHead(new Node()))
- // 刚开始肯定是头指针和尾指针相等
- tail = head;
- } else {
- // 当前结点的前驱节点等于尾部节点
- node.prev = t;
- // 如果当前尾结点仍然是 t, 那么执行入队并返回 true, 否则返回 false, 然后重试
- if (compareAndSetTail(t, node)) { // 1
- t.next = node;
- return t;
- }
- }
- }
- }
首先入队操作要求的最终结果必须是一个节点插入到队列中去, 只能成功, 不能失败! 然而这个入队的操作是需要并发执行的, 有可能同时有很多的线程需要执行入队操作, 因此我们需要采取相关的线程同步机制. 自旋锁采取乐观策略, 即使用了 CAS 中的 compareAndSet()操作, 如果某次执行返回 fasle, 那么当前操作必须重试, 因此, 采用 for 死循环直到成功为止, 成功, 则 break 跳出 for 循环或者直接 return 操作退出方法.
2, 模板方法
在 AQS 中, 模板方法设计模式体现在其
acquire(),release()
方法上, 我们先来看下源码:
- public final void acquire(int arg) {
- // 首先尝试获取共享状态, 如果获取成功, 则 tryAcquire()返回 true
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
其中调用 tryAcquire()方法的默认实现是抛出一个异常, 也就是说 tryAcquire()方法留给子类去实现, acquire()方法定义了一个模板, 一套处理逻辑, 相关具体执行方法留给子类去实现.
关于更多模板方法设计模式, 可以查阅谈一谈我对'模板方法'设计模式的理解(Template)
四, 自定义自己的并发同步器
下边以 JDK 文档的一个实例进行介绍:
- class Mutex implements Lock, java.io.Serializable {
- // 自定义同步器
- private static class Sync extends AbstractQueuedSynchronizer {
- // 判断是否锁定状态
- protected boolean isHeldExclusively() {
- return getState() == 1;
- }
- // 尝试获取资源, 立即返回. 成功则返回 true, 否则 false.
- public boolean tryAcquire(int acquires) {
- assert acquires == 1; // 这里限定只能为 1 个量
- if (compareAndSetState(0, 1)) {//state 为 0 才设置为 1, 不可重入!
- setExclusiveOwnerThread(Thread.currentThread());// 设置为当前线程独占资源
- return true;
- }
- return false;
- }
- // 尝试释放资源, 立即返回. 成功则为 true, 否则 false.
- protected boolean tryRelease(int releases) {
- assert releases == 1; // 限定为 1 个量
- if (getState() == 0)// 既然来释放, 那肯定就是已占有状态了. 只是为了保险, 多层判断!
- throw new IllegalMonitorStateException();
- setExclusiveOwnerThread(null);
- setState(0);// 释放资源, 放弃占有状态
- return true;
- }
- }
- // 真正同步类的实现都依赖继承于 AQS 的自定义同步器!
- private final Sync sync = new Sync();
- //lock<-->acquire. 两者语义一样: 获取资源, 即便等待, 直到成功才返回.
- public void lock() {
- sync.acquire(1);
- }
- //tryLock<-->tryAcquire. 两者语义一样: 尝试获取资源, 要求立即返回. 成功则为 true, 失败则为 false.
- public boolean tryLock() {
- return sync.tryAcquire(1);
- }
- //unlock<-->release. 两者语文一样: 释放资源.
- public void unlock() {
- sync.release(1);
- }
- // 锁是否占有状态
- public boolean isLocked() {
- return sync.isHeldExclusively();
- }
- }
实现自己的同步类一般都会自定义同步器 (sync), 并且将该类定义为内部类, 供自己使用; 而同步类自己(Mutex) 则实现某个接口, 对外服务. 当然, 接口的实现要直接依赖 sync, 它们在语义上也存在某种对应关系!! 而 sync 只用实现资源 state 的获取 - 释放方式 tryAcquire-tryRelelase, 至于线程的排队, 等待, 唤醒等, 上层的 AQS 都已经实现好了, 我们不用关心.
除了 Mutex,ReentrantLock/CountDownLatch/Semphore 这些同步类的实现方式都差不多, 不同的地方就在获取 - 释放资源的方式 tryAcquire-tryRelelase. 掌握了这点, AQS 的核心便被攻破了!
来源: https://juejin.im/post/5afb9ab3f265da0b736dd1e1