AQS 是并发编程中非常重要的概念, 它是 juc 包下的许多并发工具类, 如 CountdownLatch,CyclicBarrier,Semaphore 和锁, 如 ReentrantLock, ReaderWriterLock 的实现基础, 提供了一个基于 int 状态码和队列来实现的并发框架. 本文将对 AQS 框架的几个重要组成进行简要介绍, 读完本文你将 get 到以下几个点:
AQS 进行并发控制的机制是什么
AQS 独占和共享模式是如何实现的
同步队列和条件等待队列的区别, 和数据出入队原则
一, AQS 基本概念
AQS(AbstractQueuedSynchronizer)是用来构建锁或者其他同步组件的基础框架, 它使用了一个 int 成员变量来表示状态, 通过内置的 FIFO(first in,first out)队列来完成资源获取线程的排队工作.
队列可分为两种, 一种是同步队列, 是程序执行入口出处的等待队列; 而另一种则是条件等待队列, 队列中的元素是在程序执行时在某个条件上发生等待.
1.1 独占 or 共享模式
AQS 支持两种获取同步状态的模式既独占式和共享式. 顾名思义, 独占式模式同一时刻只允许一个线程获取同步状态, 而共享模式则允许多个线程同时获取.
1.2 同步队列
当一个线程尝试获取同步状态失败时, 同步器会将这个线程以及等待状态等信息构造成一个节点加入到等待队列中, 同时会阻塞当前线程, 当同步状态释放时, 会把首节点中的线程唤醒, 使其再次尝试重复获取同步队列.
1.3 条件队列
AQS 内部类 ConditionObject 来实现的条件队列, 当一个线程获取到同步状态, 但是却通过 Condition 调用了 await 相关的方法时, 会将该线程封装成 Node 节点并加入到条件队列中, 它的结构和同步队列相同.
二, 独占 or 共享模式
AQS 框架中, 通过维护一个 int 类型的状态, 来进行并发控制, 线程通常通过修改此状态信息来表明当前线程持有此同步状态. AQS 则是通过保存修改状态线程的引用来实现独占和共享模式的.
- /**
- * 获取同步状态
- */
- public final void acquire(int arg) {
- // 尝试获取同步状态, 如果尝试获取到同步状态失败, 则加入到同步队列中
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
- /**
- * 尝试获取同步状态[子类中实现] , 因为 aqs 基于模板模式, 仅提供基于状态和同步队列的实
- * 现思路, 具体的实现由子类决定
- */
- protected final boolean tryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) {
- // 如果当前状态值为 0, 并且等待队列中没有元素, 执行修改状态值操作
- if (!hasQueuedPredecessors() &&
- compareAndSetState(0, acquires)) {
- // 修改状态值成功, 记录当前持有同步状态的线程信息
- setExclusiveOwnerThread(current);
- return true;
- }
- // 如果当前线程已经持有同步状态, 继续修改同步状态[重入锁实现原理]
- } else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc <0)
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
- /**
- * 根据传入的模式以及当前线程信息创建一个队列的节点并加入到同步队列尾部
- */
- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode);
- // Try the fast path of enq; backup to full enq on failure
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- enq(node);
- return node;
- }
- /**
- * 同步队列中节点, 尝试获取同步状态
- */
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- // 自旋(死循环)
- for (;;) {
- // 只有当前节点的前驱节点是头节点时才会尝试执行获取同步状态操作
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
独占式是如何控制得?
当修改状态信息成功后, 如果执行的是独占式操作, AQS 的具体实现类中会保存当前线程的信息来声明同步状态已被当前线程占用, 此时其他线程再尝试获取同步状态会返回 false.
三, 同步队列
3.1 队列中保存那些信息?
同步队列节点中主要保存着线程的信息以及模式(共享 or 独占).
3.2 何时执行入队操作?
- /**
- * 获取同步状态
- */
- public final void acquire(int arg) {
- // 尝试获取同步状态, 如果尝试获取到同步状态失败, 则加入到同步队列中
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
复用上文中的代码, 不难看出再获取同步状态失败后, 会执行入队操作.
3.3 何时执行出队操作?
当线程获取同步状态失败时, 会被封装成 Node 节点加入到等待队列中, 此时所有节点都回进入自旋过程, 首先判断自己 prev 是否时头节点, 如果是则尝试获取同步状态.
被阻塞线程的唤醒主要以靠前驱节点的出队或阻塞线程被中断来实现.
- /**
- * 同步队列中节点, 尝试获取同步状态
- *
- * 1. 当一个线程获取到同步状态时, 会将当前线程构造程 Node 并设置为头节点
- * 2. 并将原始的 head 节点设置为 null, 以便于垃圾回收
- */
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
四, 条件等待队列
条件变量 (ConidtionObject) 是 AQS 中的一个内部类, 用来实现同步队列机制. 同步队列复用了等待队列中 Node 节点, 所以同步队列到等待队列中不需要进行额外的转换.
4.1 什么时候执行入队操作?
当线程获取到同步状态, 但是在临界区中调用了 await()方法, 此时该线程会被加入到对应的条件队列汇总.
ps: 临界区, 加锁和释放锁之间的代码区域
- /**
- * ConditionObject 中的 await 方法, 调用后使得当前执行线程加入条件等待队列
- */
- public final void await() throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- Node node = addConditionWaiter();
- // ----- 省略代码 ------
- }
- /**
- * 添加等待线程
- */
- private Node addConditionWaiter() {
- Node t = lastWaiter;
- // ----- 省略代码 ------
- // 将当前线程构造程条件队列节点, 并加入到队列中
- Node node = new Node(Thread.currentThread(), Node.CONDITION);
- if (t == null)
- firstWaiter = node;
- else
- t.nextWaiter = node;
- lastWaiter = node;
- return node;
- }
4.2 什么时候执行出队操作?
当对应的 Conditioni 调用 signial/signalAll()方法时回选择从条件队列中出队列, 同步队列是通过自旋的方式获取同步状态, 而条件队列中的节点则通过通知的方式出队. 条件队列中的节点被唤醒后会加入到入口等待队列中.
- /**
- * 唤醒当前条件等到队列中的所有等待线程
- */
- public final void signalAll() {
- if (!isHeldExclusively())
- throw new IllegalMonitorStateException();
- Node first = firstWaiter;
- if (first != null)
- doSignalAll(first);
- }
- /**
- * 遍历队列, 将元素从条件队列 加入到 同步队列
- */
- private void doSignalAll(Node first) {
- lastWaiter = firstWaiter = null;
- do {
- Node next = first.nextWaiter;
- first.nextWaiter = null;
- transferForSignal(first);
- first = next;
- } while (first != null);
- }
- final boolean transferForSignal(Node node) {
- // ----- 省略代码 ------
- // 执行入队操作, 将 node 添加到同步队列中
- Node p = enq(node);
- int ws = p.waitStatus;
- if (ws> 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
- LockSupport.unpark(node.thread);
- return true;
- }
五, 总结
使用 Node 实现的 FIFO 队列, 可以用于构建锁或者其他同步装置的基础框架
利用一个 int 类型的属性表示状态
使用模板方法模式, 子类可以通过继承它来管理状态实现各种并发工具
可以同时实现独占和共享模式
本文对 AQS 的基本原理进行的简要的描述, 对于子类的公平性和非公平行实现, 中断, 队列中节点的等待状态, cas 等操作没有进行探讨, 感兴趣的小伙伴可以进行源码阅读或者查阅相关资料.
六, Q&A
Question1: 在 java 中通常使用 synchronized 来实现方法同步, AQS 中通过 CAS 保证了修改同步状态的一致性问题, 那么对比 synchronized,cas 有什么优势不同与优势呢? 你还知道其他无锁并发的策略吗?
笔者的个人博客网站 http://www.mycookies.cn/
来源: https://www.cnblogs.com/liqiangchn/p/11895430.html