谈到并发,不得不谈 ReentrantLock;而谈到 ReentrantLock,不得不谈 AbstractQueuedSynchronized(AQS)!, 类如其名,抽象的队列式的同步器,AQS 定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的 ReentrantLock/Semaphore/CountDownLatch...。我们以 ReentrantLock 作为讲解切入点。
ReentrantLock 把所有 Lock 接口的操作都委派到一个 Sync 类上,该类继承了 AbstractQueuedSynchronizer:
- static abstract class Sync extends AbstractQueuedSynchronizer
Sync 又有两个子类:
- final static class NonfairSync extends Sync
- final static class FairSync extends Sync
显然是为了支持公平锁和非公平锁而定义,默认情况下为非公平锁。 先理一下 Reentrant.lock() 方法的调用过程(默认非公平锁):
0_13119022769n5R.gif
简单说来,AbstractQueuedSynchronizer 会把所有的请求线程构成一个 CLH 队列,当一个线程执行完毕(lock.unlock())时会激活自己的后继节点,但正在执行的线程并不在队列中,而那些等待执行的线程全部处于阻塞状态.
线程的显式阻塞是通过调用 LockSupport.park() 完成,而 LockSupport.park() 则调用 sun.misc.Unsafe.park() 本地方法,再进一步,HotSpot 在 Linux 中中通过调用 pthread_mutex_lock 函数把线程交给系统内核进行阻塞。该队列如图:
0_1311847049xnXb.gif
与 synchronized 相同的是,这也是一个虚拟队列,不存在队列实例,仅存在节点之间的前后关系。令人疑惑的是为什么采用 CLH 队列呢?原生的 CLH 队列是用于自旋锁,但 Doug Lea 把其改造为阻塞锁。
当有线程竞争锁时,该线程会首先尝试获得锁,这对于那些已经在队列中排队的线程来说显得不公平,这是非公平锁的由来之一,与 synchronized 实现类似,这样会极大提高吞吐量。 如果已经存在 Running 线程,则新的竞争线程会被追加到队尾,具体是采用基于 CAS 的 Lock-Free 算法,因为线程并发对 Tail 调用 CAS 可能会导致其他线程 CAS 失败,解决办法是循环 CAS 直至成功。
AbstractQueuedSynchronizer 的实现非常精巧,令人叹为观止,不入细节难以完全领会其精髓,下面详细说明实现过程:
2.1 Sync.nonfairTryAcquire nonfairTryAcquire 方法将是 lock 方法间接调用的第一个方法,每次请求锁时都会首先调用该方法。
- final boolean nonfairTryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) {
- if (compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- } else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc < 0) // overflow
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
2.2 AbstractQueuedSynchronizer.addWaiter1. 该方法会首先判断当前状态,如果 c==0 说明没有线程正在竞争该锁,如果不 c !=0 说明有线程正拥有了该锁。 2. 如果发现 c==0,则通过 CAS 设置该状态值为 acquires,acquires 的初始调用值为 1,每次线程重入该锁都会 + 1,每次 unlock 都会 - 1,但为 0 时释放锁。如果 CAS 设置成功,则可以预计其他任何线程调用 CAS 都不会再成功,也就认为当前线程得到了该锁,也作为 Running 线程,很显然这个 Running 线程并未进入等待队列。 3. 如果 c !=0 但发现自己已经拥有锁,只是简单地 ++acquires,并修改 status 值,但因为没有竞争,所以通过 setStatus 修改,而非 CAS,也就是说这段代码实现了偏向锁的功能,并且实现的非常漂亮。
addWaiter 方法负责把当前无法获得锁的线程包装为一个 Node 添加到队尾:
- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode);
- // Try the fast path of enq; backup to full enq on failure
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- enq(node);
- return node;
- }
下面是 enq 方法:其中参数 mode 是独占锁还是共享锁,默认为 null,独占锁。追加到队尾的动作分两步: 1. 如果当前队尾已经存在 (tail!=null),则使用 CAS 把当前线程更新为 Tail 2. 如果当前 Tail 为 null 或则线程调用 CAS 设置队尾失败,则通过 enq 方法继续设置 Tail
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- if (t == null) { // Must initialize
- Node h = new Node(); // Dummy header
- h.next = node;
- node.prev = h;
- if (compareAndSetHead(h)) {
- tail = node;
- return h;
- }
- } else {
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
该方法就是循环调用 CAS,即使有高并发的场景,无限循环将会最终成功把当前线程追加到队尾(或设置队头)。总而言之,addWaiter 的目的就是通过 CAS 把当前现在追加到队尾,并返回包装后的 Node 实例。 把线程要包装为 Node 对象的主要原因,除了用 Node 构造供虚拟队列外,还用 Node 包装了各种线程状态,这些状态被精心设计为一些数字值:
- SIGNAL(-1) :线程的后继线程正 / 已被阻塞,当该线程 release 或 cancel 时要重新这个后继线程 (unpark)
- CANCELLED(1):因为超时或中断,该线程已经被取消
- CONDITION(-2):表明该线程被处于条件队列,就是因为调用了 >- Condition.await 而被阻塞
- PROPAGATE(-3):传播共享锁
- 0:0 代表无状态
2.3 AbstractQueuedSynchronizer.acquireQueued acquireQueued 的主要作用是把已经追加到队列的线程节点(addWaiter 方法返回值)进行阻塞,但阻塞前又通过 tryAccquire 重试是否能获得锁,如果重试成功能则无需阻塞,这里是非公平锁的由来之二
- final boolean acquireQueued(final Node node, int arg) {
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- return interrupted;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } catch (RuntimeException ex) {
- cancelAcquire(node);
- throw ex;
- }
- }
仔细看看这个方法是个无限循环,感觉如果 p == head && tryAcquire(arg) 条件不满足循环将永远无法结束,当然不会出现死循环,奥秘在于第 12 行的 parkAndCheckInterrupt 会把当前线程挂起,从而阻塞住线程的调用栈。
- private final boolean parkAndCheckInterrupt() {
- LockSupport.park(this);
- return Thread.interrupted();
- }
如前面所述,LockSupport.park 最终把线程交给系统(Linux)内核进行阻塞。当然也不是马上把请求不到锁的线程进行阻塞,还要检查该线程的状态,比如如果该线程处于 Cancel 状态则没有必要,具体的检查在 shouldParkAfterFailedAcquire 中:
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus;
- if (ws == Node.SIGNAL)
- /*
- * This node has already set status asking a release
- * to signal it, so it can safely park
- */
- return true;
- if (ws > 0) {
- /*
- * Predecessor was cancelled. Skip over predecessors and
- * indicate retry.
- */
- do {
- node.prev = pred = pred.prev;
- } while ( pred . waitStatus > 0 );
- pred.next = node;
- } else {
- /*
- * waitStatus must be 0 or PROPAGATE. Indicate that we
- * need a signal, but don't park yet. Caller will need to
- * retry to make sure it cannot acquire before parking.
- */
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
检查原则在于:
至此,锁住线程的逻辑已经完成,下面讨论解锁的过程。规则 1:如果前继的节点状态为 SIGNAL,表明当前节点需要 unpark,则返回成功,此时 acquireQueued 方法的第 12 行(parkAndCheckInterrupt)将导致线程阻塞 规则 2:如果前继节点状态为 CANCELLED(ws>0),说明前置节点已经被放弃,则回溯到一个非取消的前继节点,返回 false,acquireQueued 方法的无限循环将递归调用该方法,直至规则 1 返回 true,导致线程阻塞 规则 3:如果前继节点状态为非 SIGNAL、非 CANCELLED,则设置前继的状态为 SIGNAL,返回 false 后进入 acquireQueued 的无限循环,与规则 2 同 总体看来,shouldParkAfterFailedAcquire 就是靠前继节点判断当前线程是否应该被阻塞,如果前继节点处于 CANCELLED 状态,则顺便删除这些节点重新构造队列。
请求锁不成功的线程会被挂起在 acquireQueued 方法的第 12 行,12 行以后的代码必须等线程被解锁锁才能执行,假如被阻塞的线程得到解锁,则执行第 13 行,即设置 interrupted = true,之后又进入无限循环。
从无限循环的代码可以看出,并不是得到解锁的线程一定能获得锁,必须在第 6 行中调用 tryAccquire 重新竞争,因为锁是非公平的,有可能被新加入的线程获得,从而导致刚被唤醒的线程再次被阻塞,这个细节充分体现了 "非公平" 的精髓。通过之后将要介绍的解锁机制会看到,第一个被解锁的线程就是 Head,因此 p == head 的判断基本都会成功。 至此可以看到,把 tryAcquire 方法延迟到子类中实现的做法非常精妙并具有极强的可扩展性,令人叹为观止!当然精妙的不是这个 Templae 设计模式,而是 Doug Lea 对锁结构的精心布局。
解锁代码相对简单,主要体现在 AbstractQueuedSynchronizer.release 和 Sync.tryRelease 方法中: class AbstractQueuedSynchronizer
class Sync
- public final boolean release(int arg) {
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
- protected final boolean tryRelease(int releases) {
- int c = getState() - releases;
- if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException();
- boolean free = false;
- if (c == 0) {
- free = true;
- setExclusiveOwnerThread(null);
- }
- setState(c);
- return free;
- }
tryRelease 与 tryAcquire 语义相同,把如何释放的逻辑延迟到子类中。tryRelease 语义很明确:如果线程多次锁定,则进行多次释放,直至 status==0 则真正释放锁,所谓释放锁即设置 status 为 0,因为无竞争所以没有使用 CAS。
release 的语义在于:如果可以释放锁,则唤醒队列第一个线程(Head),具体唤醒代码如下:
- private void unparkSuccessor(Node node) {
- /*
- * If status is negative (i.e., possibly needing signal) try
- * to clear in anticipation of signalling. It is OK if this
- * fails or if status is changed by waiting thread.
- */
- int ws = node.waitStatus;
- if (ws < 0)
- compareAndSetWaitStatus(node, ws, 0);
- /*
- * Thread to unpark is held in successor, which is normally
- * just the next node. But if cancelled or apparently null,
- * traverse backwards from tail to find the actual
- * non-cancelled successor.
- */
- Node s = node.next;
- if (s == null || s.waitStatus > 0) {
- s = null;
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- if (s != null)
- LockSupport.unpark(s.thread);
- }
这段代码的意思在于找出第一个可以 unpark 的线程,一般说来 head.next == head,Head 就是第一个线程,但 Head.next 可能被取消或被置为 null,因此比较稳妥的办法是从后往前找第一个可用线程。貌似回溯会导致性能降低,其实这个发生的几率很小,所以不会有性能影响。之后便是通知系统内核继续该线程,在 Linux 下是通过 pthread_mutex_unlock 完成。之后,被解锁的线程进入上面所说的重新竞争状态。
AbstractQueuedSynchronizer 通过构造一个基于阻塞的 CLH 队列容纳所有的阻塞线程,而对该队列的操作均通过 Lock-Free(CAS)操作,但对已经获得锁的线程而言,ReentrantLock 实现了偏向锁的功能。
synchronized 的底层也是一个基于 CAS 操作的等待队列,但 JVM 实现的更精细,把等待队列分为 ContentionList 和 EntryList,目的是为了降低线程的出列速度;当然也实现了偏向锁,从数据结构来说二者设计没有本质区别。但 synchronized 还实现了自旋锁,并针对不同的系统和硬件体系进行了优化,而 Lock 则完全依靠系统阻塞挂起等待线程。
当然 Lock 比 synchronized 更适合在应用层扩展,可以继承 AbstractQueuedSynchronizer 定义各种实现,比如实现读写锁(ReadWriteLock),公平或不公平锁;同时,Lock 对应的 Condition 也比 wait/notify 要方便的多、灵活的多。
来源: http://www.jianshu.com/p/279baac48960