在上面一篇分析 ThreadExecutedPool 的文章中我们看到线程池实现源码中大量使用了 ReentrantLock 锁,那么 ReentrantLock 锁的优势是什么?它又是怎么实现的呢?
ReentrantLock 又名可重入锁,为什么称之为可重入锁呢?简单来说因为它允许一个线程多次取获得该锁,不过多次获取该锁之后,也需要执行同样次数的释放锁操作,否则该锁将被当前线程一直持有,导致其它线程无法获取。需要注意的是,释放锁的操作需要我们用代码来控制,它并不会自动取释放锁。在 ReentrantLock 中实现了两种锁 fairSync 和 NonfairSync,即公平锁和非公平锁,今天我们就来聊聊 ReentrantLock 中 nonfairSync 锁的实现。
废话不多说,下面开始分析代码!
首先看一下 lock() 方法,这个方法非常重要,它也是我们获取锁的入口:
- final void lock() {
- if (compareAndSetState(0, 1))
- setExclusiveOwnerThread(Thread.currentThread());
- else
- acquire(1);
- }
ReentrantLock 锁的初始状态为 0,compareAndSetState 方法将尝试获取锁并将当前锁的状态设置为 1。如果成功获取了锁会调用 setExclusiveOwnerThread()方法设置当前线程拥有该锁的独占访问权。
如果调用 compareAndSetState()获取锁失败,则返回 false 并执行 acquire(1)。
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
我们看到 acquire(1) 的代码中有一条 if 语句,当 tryAcquire(1) 返回 false 以及 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 返回 true 时,才会去执行 selfInterrupt(); 方法。下面我们来看看 tryAcquire(1)和 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 这两个家伙干了什么。
先看一下 tryAcquire()方法。
- protected final boolean tryAcquire(int acquires) {
- return nonfairTryAcquire(acquires);
- }
tryAcquire 方法会去调用 nonfairTryAcquire()方法。
- final boolean nonfairTryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) {
- if (compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc < 0) // overflow
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
1、首先调用 getState()方法获取当前锁状态,如果锁状态为 0。表示当前锁没有被其他线程占用,这里会再次尝试去获取锁。如果成功的拿到了锁,将设置锁的拥有者为当前线程,同时返回 true。如果此时返回 true 的话,表示当前线程成功获取到了锁,lock()方法调用成功。
2、如果当前锁状态不为 0,判断当前线程是否为锁的拥有者,如果是的话,尝试将当前锁的状态值加 acquires。如果当前 neextc 的值小于 0,抛出异常。若不小于 0,将当前锁的值设置为 nextc。为什么说 ReentrantLock 为可重入锁,就体现在这里了,如果当前线程为锁的拥有者,该线程再次调用 lock 方法时,当前锁的状态值会加 1。当然我们释放该锁的时候,也要调用相应的 unlock()方法,以使得锁的 state 值为 0,可被其他线程请求。
3、如果当前锁的值不为 0 且拥有锁的线程也不为当前线程则返回 false。也就是 tryAcquire()再次获取锁并没有成功。
值得注意的是,既然再第一次调用 compareAndSetState()的时候,已经获取失败了为什么还要再调用 tryAcquire()方法再获取一次呢?我们可以理解为这是一种保险机制,如果此时无法获取锁,我们将会将当前线程加入到阻塞队列中挂起等待后面被唤醒重新争夺锁。
回顾一下上面的 if 判断条件
- if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
如果 tryAcquire()的返回值为 false,那么接下来会执行 acquireQueued(addWaiter(Node.EXCLUSIVE),arg) 方法。这个方法看起来比较复杂,它在 acquireQueued()方法又调用了 addWaiter() 方法,我们先来看看 addWaiter()方法:
- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode);
- // Try the fast path of enq; backup to full enq on failure
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- enq(node);
- return node;
- }
首先我们创建了一个包含了当前线程的 Node 节点,并将 tail(tail 节点即是尾节点)赋值给 pred 节点。如果我们第一次进来,那么 tail 节点肯定为空,将会去执行 enq(node)方法。如果 tail 不为空,那么接下来的三句代码干了什么呢?
先回忆一下,如果我们希望在一个双向链表的尾部新增一个节点,应该如何操作,大致应该有如下三步:
- private final boolean compareAndSetTail(Node expect, Node update) {
- return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
- }
- compareAndSetTail的原理其实就是CAS算法,将期望值和内存地址为tailOffset上的值进行比较,如果两者相同,则更新tailOffset上的值为最新值update。其实也就是如果tailOffset上的值和pred(老的尾节点)的值相同,则将尾节点更新为新的node节点。
接下来看一下 enq(node) 方法
- private Node enq(final Node node) {
- for (;;) {
- Node t = tail;
- if (t == null) { // Must initialize
- if (compareAndSetHead(new Node()))
- tail = head;
- } else {
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
enq 代码中有一个死循环 for(;;),在这个循环中会执行以下操作:
看到现在,我们大致明白了 addWaiter()方法其实就是将当前节点添加到链表尾部的一个方法。
- if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
再跳回到这句代码,现在已经将线程添加到队列中了。那么 acquireQueued 方法到底又是干什么的呢?我们接着分析。
- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
在 acquireQueued 方法中有一个无限循环,这个循环干了什么,它的用处是什么呢?我们接着看
- final Node p = node.predecessor();
获取当前 node 节点的前驱节点,并赋值给 p
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- private void setHead(Node node) {
- head = node;
- node.thread = null;
- node.prev = null;
- }
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus;
- if (ws == Node.SIGNAL)
- /*
- * This node has already set status asking a release
- * to signal it, so it can safely park.
- */
- return true;
- if (ws > 0) {
- /*
- * Predecessor was cancelled. Skip over predecessors and
- * indicate retry.
- */
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- pred.next = node;
- } else {
- /*
- * waitStatus must be 0 or PROPAGATE. Indicate that we
- * need a signal, but don't park yet. Caller will need to
- * retry to make sure it cannot acquire before parking.
- */
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
上面的代码可以总结为一下三句话:
到此为止,我们已经知道了尝试去获取锁的线程是如何被放入到阻塞队列中并挂起的。接下来我们来看看获取到锁的线程是如何释放锁的。
上面我们已经较为清晰的理了一遍 ReentrantLock 获取锁的思路,下面我们开始分析一下如何释放获取到的锁。
在 ReentrantLock 里面有如下代码,我们跟踪一下 release()方法干了什么。
- public void unlock() {
- sync.release(1);
- }
- public final boolean release(int arg) {
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
在 release()方法中,首先尝试去执行 tryRelease()方法。看到这个名字我们就知道它的用处是尝试去释放当前获取到的锁。计入 tryRelease()方法看一下它到底干了什么。
- protected final boolean tryRelease(int releases) {
- int c = getState() - releases;
- if (Thread.currentThread() != getExclusiveOwnerThread())
- throw new IllegalMonitorStateException();
- boolean free = false;
- if (c == 0) {
- free = true;
- setExclusiveOwnerThread(null);
- }
- setState(c);
- return free;
- }
继续看上面的代码
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
如果当前线程所持有的锁的 state 值为 0,那么此时需要唤醒当前线程的后继节点去争夺该锁。我们看一下这段代码干了什么:
那么 unparkSuccessor 方法到底做了什么,它为什么可以唤醒后继节点呢?我们下面就来看看:
- private void unparkSuccessor(Node node) {
- /*
- * If status is negative (i.e., possibly needing signal) try
- * to clear in anticipation of signalling. It is OK if this
- * fails or if status is changed by waiting thread.
- */
- int ws = node.waitStatus;
- if (ws < 0)
- compareAndSetWaitStatus(node, ws, 0);
- /*
- * Thread to unpark is held in successor, which is normally
- * just the next node. But if cancelled or apparently null,
- * traverse backwards from tail to find the actual
- * non-cancelled successor.
- */
- Node s = node.next;
- if (s == null || s.waitStatus > 0) {
- s = null;
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- if (s != null)
- LockSupport.unpark(s.thread);
- }
来源: http://www.cnblogs.com/cfyrwang/p/6653971.html