本文是 Java 并发专题 (欢迎大家关注) 的一篇.
以下是完整的目录:
Java 并发之基础知识
Java 并发之 volatile 关键字
Java 并发之 synchronized 关键字
Java 并发之原子类
Java 并发之线程池
Java 并发之并发工具类
一. 总体框架
AQS 是指 AbstractQueuedSynchronizer. 它是一个抽象类, java 并发包里的 ReentrantLock,CountDownLatch 和 Semaphroe 等重要的工具类都是基于 AQS 来实现的.
总体来说, AQS 维护了一个 volatile 的 state 变量代表共享资源, 还有一个 FIFO 的等待队列, 在多线程争夺资源被阻塞时会进入此退了. 等待队列是个双向链表记录则没有获取的执行许可的线程. 等待队列中的结点元素是 AQS 自定义的 static 的内部类 Node.AQS 支持共享和独占两种模式. ReentrantLock 就是独占型的, 只有一个线程可以获得到锁并执行. CountDownLatch 和 Semaphore 就是共享型, 允许多个线程同时执行.
AQS 是一个抽象类, 并不能被直接实例化使用. 它的作用是提供等待队列的管理, 包括如何入队何时唤醒等. 而具体的资源如何获取和释放等由具体的自定义同步器来实现. 也就是说 ReentrantLock 等类自定义了资源 (state) 的获取和释放, 而使用 AQS 的来管理阻塞队列. 不同的自定义资源获取方式实现了 CountDownLatch 和 Semaphore 等类.
自定义同步方法需要实现的方法有:
- isHeldExclusively() // 返回该线程是否正在独占资源, 只有用的 condition 才需要去实现它
- tryAcquire(int); // 独占方式, 尝试获取资源, 成功返回 true, 失败返回 false
- tryRelease(int); // 独占方式, 尝试释放资源, 成功返回 true, 失败返回 false
- tryAcquireShared(int); // 共享方法, 尝试获取资源. 返回负数表示失败, 0 表示成功, 但没有可用资源了, 正数表示成功且有剩余资源
- tryReleaseShared(int);// 共享方式. 尝试释放资源. 如果释放后运行唤醒后续结点返回 true, 否则返回 false
这其中 tryAcquire 和 tryRelease 是一组, 用于实现独占资源的情况, 如 ReentrantLock;tryAcquireShared 和 tryReleaseShared 是一组用于实现共享资源的情况, 如 CountDownLatch.
二. 源码分析
2.1 acquire 方法源码详解
在 AQS 中一个重要的方法是 acquire(int), 这个方法实现请求资源和阻塞线程的功能. 下面先贴一下它的源码:
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
这个方法里只有一个 if 语句. 首先执行 tryAcquire(int)方法, 前面说了这个方法需要子类来自定义, 这里先看一下它的代码:
- protected boolean tryAcquire(int arg) {
- throw new UnsupportedOperationException();
- }
可以看到在 AQS 中 tryAcquire 方法直接抛出了异常. 因为具体的获取资源细节需要子类根据自己要实现的功能来写, AQS 只负责阻塞队列的管理等工作. 同时注意到这个方法并不是一个抽象的方法. 其实前面说的需要子类实现的 5 个方法都不是抽象的, 因为子类并不一定需要实现所有这些方法, 这提供了一定的灵活性.
2.1.1 addWaiter 方法详解
先忙接着看 acquire 方法. 在 if 语句里, 如果 tryAcquire 返回 true, 那么 acquire 就返回了, 说明成功获取到了资源. 如果 tryAcquire 返回 false,if 语句的前半句判断就成立了, 需要继续执行 && 右边的 acquireQueued 方法, 执行它之前先执行了 addWaiter. 先看一下 addWaiter 它的代码:
- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode);
- // Try the fast path of enq; backup to full enq on failure
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- enq(node);
- return node;
- }
可以看到这个方法有一个 Node 参数. Node 类便是 aqs 维护的 FIFO 队列中的元素的类型. 回顾一下 acquire()方法的代码, 是将 Node.EXCLUSIVE 作为参数传入了 addWaiter. 查看 Node 源码发现有这么一句:
- /** Marker to indicate a node is waiting in exclusive mode */
- static final Node EXCLUSIVE = null;
原来这是一个 null 值, 用来表示独占性线程. 不管如何, 先继续看 addWaiter 的源码吧.
第一句代码: Node node = new Node(Thread.currentThread,mode); 新建了一个表示当前线程的结点. 刚才传入的 null 作为模式传给构造方法. 进入对象构造方法查看:
- Node(Thread thread, Node mode) { // Used by addWaiter
- this.nextWaiter = mode;
- this.thread = thread;
- }
继续看 addWaiter 的后续代码, 发现是获取了当前队列的尾节点, 并将新建结点的 prev 指针执行尾节点, 再使用 cas 尝试替换尾节点, 如果成功, 那么当前结点就成为新的尾节点, 返回.
如果 cas 失败或者当前 tail 为 null, 调用 eng 方法处理. 下面看一下 eng 的代码:
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- if (t == null) { // Must initialize
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
熟悉 AtomicInteger 的朋友看到这段代码一定会感动非常熟悉. 这里就是使用了循环尝试的方式来进行 cas 操作, 指导成功为止. 另外当 tail==null 时, 先新建 head 结点再进行操作, 当前这里给 head 变量反之也是使用了 cas 操作.
综上, addWaiter()进行的操作就是安全地更新队列的 tail 指针.
2.1.2 acquireQueue 方法详解
下面继续看 acquire()方法. 再把代码贴一次.
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
给 acqureQueued 方法传入的第一个参数是 addWaiter 方法的返回值, 回想一下刚才的 addWaiter 方法, 发现它的返回值是新创建的表示当前线程的 Node 结点. acquireQueued 方法的另一个参数是 acquire 的形参 arg, 这个一般是获取资源的个数, 像 ReentrantLock 的 lock 方法就是调用了 acquire(1). 下面看一下 acquireQueued 方法的源码吧:
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
这个方法的主体是一个死循环, 不断测试两件事: 1. 是否是头结点的下一个节点, 说明该轮到自己获取资源了. 2: 是否可以休息了. 判断 1 成功后就用 tryAcquire 获取资源, 成功后设置当前结点为头结点, 返回. 如果 1 判断不成功则执行 shouldParkAfterFailedAcquire 方法, 先贴一下代码:
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus;
- if (ws == Node.SIGNAL)
- /*
- * This node has already set status asking a release
- * to signal it, so it can safely park.
- */
- return true;
- if (ws> 0) {
- /*
- * Predecessor was cancelled. Skip over predecessors and
- * indicate retry.
- */
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus> 0);
- pred.next = node;
- } else {
- /*
- * waitStatus must be 0 or PROPAGATE. Indicate that we
- * need a signal, but don't park yet. Caller will need to
- * retry to make sure it cannot acquire before parking.
- */
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
这个方法的工作是找到当前结点之前的一个未取消的结点, 将其 waitStatue 改为 SIGNAL(-1). 这样在该结点释放资源时就会唤醒当前结点.
当 shouldParkAfterFailedAcquire 返回 true 之后, 当前线程就可以去休息了 -- 调用 parkAndCheckInterrupt 方法:
- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this);
- return Thread.interrupted();
- }
这个方法使用了 LockSupport 的 park 方法, 使线程进入 waiting 状态. 当其它线程调用 unPark 方法, 或此线程被中断后才会返回.
2.1.3 小结
下面来总结一下 acquire 方法.
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
先尝试获取资源, 获取到的情况直接返回. 获取不到将线程加入队列: 首先将 tail 指向表示当前线程的结点, 使用 CAS 操作更新 tail. 之后执行 acquireQueued 方法, 如果是当前队列的第二个则再次尝试获取 tryAcquire, 成功后将自己设置为 head(head 表示已经获取到的资源的结点). 不能获取资源时判断是否可以 park(), 判断依据是其 prev 的结点的 waitState 是否是 signal, 即是否会在释放资源时通知它. 之后当前线程调用 park 进入 waiting 状态. waitting 结束时返回是否中断标志, 并重置标志. 回到 acquire, 如果 waitting 期间中断过, 则调用 selfInterrupt 响应中断.
2.2 release(int)
此方法是独占模式下释放资源的顶层方法.
- public final boolean release(int arg) {
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
这里可以看出释放资源成功时, 获取到 head 结点 (因为 head 结点表示的线程就是当前获取到资源的线程), 执行 unparkSuccessor() 操作. 这里便和 shouldParkAfterFailedAcquire 中'休息'的代码相呼应. 如果那里设置了 waitStatus 为 signal 就会使用 LockSupport.unpark 方法来唤醒等待的线程.
- private void unparkSuccessor(Node node) {
- /*
- * If status is negative (i.e., possibly needing signal) try
- * to clear in anticipation of signalling. It is OK if this
- * fails or if status is changed by waiting thread.
- */
- int ws = node.waitStatus;
- if (ws <0)
- compareAndSetWaitStatus(node, ws, 0);
- /*
- * 如果后继结点为 null 或等待状态 > 0(当前结点被取消), 则从后往前找到正在应该被唤醒的结点
- */
- Node s = node.next;
- if (s == null || s.waitStatus> 0) {
- s = null;
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- if (s != null)
- LockSupport.unpark(s.thread);
- }
对了, tryRelease 方法也是具体的同步器来实现的.
2.3 其它方法
acquireShared(int)和 releaseShared()方法是共享模式下获取资源和释放资源的方法. 这里不再详细展开了, 请看参考资料里的文章.
参考资料: 1.Java 并发之 AQS 详解 http://www.cnblogs.com/waterystone/p/4920797.html
来源: http://www.jianshu.com/p/12302ce8b12e