- /**
- * Atomically update Java variable to x if it is currently
- * holding expected.
- * @return true if successful
- */
- public final native boolean compareAndSwapObject(Object o,longoffset, Object expected, Object x);/**
- * Atomically update Java variable to x if it is currently
- * holding expected.
- * @return true if successful
- */
- public final native boolean compareAndSwapInt(Object o,longoffset,intexpected,intx);/**
- * Atomically update Java variable to x if it is currently
- * holding expected.
- * @return true if successful
- */
- public final native boolean compareAndSwapLong(Object o,longoffset,longexpected,longx);
上面三个函数定义在 sun.misc.Unsafe 类中,使用该类可以进行一些底层的操作,例如直接操作原生内存,更多关于 Unsafe 类的文章可以参考 这篇。以 compareAndSwapInt 为例,我们看下如何使用 CAS 函数:
- import sun.misc.Unsafe;
- import java.lang.reflect.Field;
- /**
- * Created by Jikai Zhang on 2017/4/8.
- */
- public classCASIntTest {private volatile intcount =0;private static finalUnsafe unsafe =getUnsafe();private static final longoffset;// 获得 count 属性在 CASIntTest 中的偏移量(内存地址偏移)
- static{try{
- offset = unsafe.objectFieldOffset(CASIntTest.class.getDeclaredField("count"));
- }catch(NoSuchFieldException e) {throw newError(e);
- }
- }// 通过反射的方式获得 Unsafe 类
- public staticUnsafegetUnsafe() {
- Unsafe unsafe =null;try{
- Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
- theUnsafe.setAccessible(true);
- unsafe = (Unsafe) theUnsafe.get(null);
- }catch(NoSuchFieldException | IllegalAccessException e) {
- e.printStackTrace();
- }returnunsafe;
- }public void increment() {intprevious = count;
- unsafe.compareAndSwapInt(this, offset, previous, previous +1);
- }public static void main(String[] args) {
- CASIntTest casIntTest =new CASIntTest();
- casIntTest.increment();
- System.out.println(casIntTest.count);
- }
- }
在 CASIntTest 类中,我们定义一个 count 变量,其中 increment 方法是将 count 的值加 1。下面是 increase 方法的代码:
- intprevious = count;
- unsafe.compareAndSwapInt(this, offset, previous, previous +1);
在没有线程竞争的条件下,该代码执行的结果是将 count 变量的值加 1(多个线程竞争可能会有线程执行失败),但是在 compareAndSwapInt 函数中,我们并没有传入 count 变量,那么函数是如何修改的 count 变量值?其实我们往 compareAndSwapInt 函数中传入了 count 变量在堆内存中的地址,函数直接修改了 count 变量所在内存区域。count 属性在堆内存中的地址是由 CASIntTest 实例的起始内存地址和 count 属性相对于起始内存的偏移量决定的。其中对象属性在对象中的偏移量通过
函数获得,函数原型如下所示。该函数接受一个 Filed 类型的参数,返回该 Filed 属性在对象中的偏移量。
- objectFieldOffset
- /**
- * Report the location of a given static field, in conjunction with {@link
- * #staticFieldBase}.
- * Do not expect to perform any sort of arithmetic on this offset;
- * it is just a cookie which is passed to the unsafe heap memory accessors.
- *
- * Any given field will always have the same offset, and no two distinct
- * fields of the same class will ever have the same offset.
- *
- * As of 1.4.1, offsets for fields are represented as long values,
- * although the Sun JVM does not use the most significant 32 bits.
- * It is hard to imagine a JVM technology which needs more than
- * a few bits to encode an offset within a non-array object,
- * However, for consistency with other methods in this class,
- * this method reports its result as a long value.
- */
- public native long objectFieldOffset(Field f);
下面我们再看一下 compareAndSwapInt 的函数原型。我们知道 CAS 操作需要知道 3 个信息:内存中的值,期望的旧值以及要修改的新值。通过前面的分析,我们知道通过 o 和 offset 我们可以确定属性在内存中的地址,也就是知道了属性在内存中的值。expected 对应期望的旧址,而 x 就是要修改的新值。
- public final native boolean compareAndSwapInt(Object o,longoffset,intexpected,intx);
compareAndSwapInt 函数首先比较一下 expected 是否和内存中的值相同,如果不同证明其他线程修改了属性值,那么就不会执行更新操作,但是程序如果就此返回了,似乎不太符合我们的期望,我们是希望程序可以执行更新操作的,如果其他线程先进行了更新,那么就在更新后的值的基础上进行修改,所以我们一般使用循环配合 CAS 函数,使程序在更新操作完成之后再返回,如下所示:
- longbefore = counter;while(!unsafe.compareAndSwapLong(this, offset, before, before +1)) {
- before = counter;
- }
下面是使用 CAS 函数实现计数器的一个实例:
- import sun.misc.Unsafe;
- import java.lang.reflect.Field;
- /**
- * Created by Jikai Zhang on 2017/4/8.
- */
- public classCASCounter {// 通过反射的方式获得 Unsafe 类
- public staticUnsafegetUnsafe() {
- Unsafe unsafe =null;try{
- Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
- theUnsafe.setAccessible(true);
- unsafe = (Unsafe) theUnsafe.get(null);
- }catch(NoSuchFieldException | IllegalAccessException e) {
- e.printStackTrace();
- }returnunsafe;
- }private volatile longcounter =0;private static final longoffset;private static finalUnsafe unsafe =getUnsafe();static{try{
- offset = unsafe.objectFieldOffset(CASCounter.class.getDeclaredField("counter"));
- }catch(NoSuchFieldException e) {throw newError(e);
- }
- }public void increment() {longbefore = counter;while(!unsafe.compareAndSwapLong(this, offset, before, before +1)) {
- before = counter;
- }
- }public long getCounter() {returncounter;
- }private static longintCounter =0;public static void main(String[] args)throwsInterruptedException {intthreadCount =10;
- Thread threads[] =newThread[threadCount];finalCASCounter casCounter =new CASCounter();for(inti =0; i < threadCount; i++) {
- threads[i] =newThread(newRunnable() {@Override
- public void run() {for(inti =0; i <10000; i++) {
- casCounter.increment();
- intCounter++;
- }
- }
- });
- threads[i].start();
- }for(inti =0; i < threadCount; i++) {
- threads[i].join();
- }
- System.out.printf("CASCounter is %d \nintCounter is %d\n", casCounter.getCounter(), intCounter);
- }
- }
在 AQS 中,对原始的 CAS 函数封装了一下,省去了获得变量地址的步骤,如下面的形式:
- private static final longwaitStatusOffset;static{try{
- waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
- }catch(Exception ex) {throw newError(ex);
- }
- }private static final boolean compareAndSetWaitStatus(Node node,intexpect,intupdate) {returnunsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
- }
AQS 依赖内部的同步队列(一个 FIFO 的双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把队列中第一个等待节点线程唤醒(下图中的 Node1),使其再次尝试获取同步状态。同步队列的结构如下所示:
图片来自 http://www.infoq.com/cn/articles/jdk1.8-abstractqueuedsynchronizer
Head 节点本身不保存等待线程的信息,它通过 next 变量指向第一个保存线程等待信息的节点(Node1)。当线程被唤醒之后,会删除 Head 节点,而唤醒线程所在的节点会设置为 Head 节点(Node1 被唤醒之后,Node1 会被置为 Head 节点)。下面我们看下 JDK 中同步队列的实现。
首先看在节点所对应的 Node 类:
- static final classNode {/**
- * 标志是独占式模式还是共享模式
- */
- static finalNode SHARED =newNode();static finalNode EXCLUSIVE =null;/**
- * 线程等待状态的有效值
- */
- static final intCANCELLED =1;static final intSIGNAL = -1;static final intCONDITION = -2;static final intPROPAGATE = -3;/**
- * 线程状态,合法值为上面 4 个值中的一个
- */
- volatile intwaitStatus;/**
- * 当前节点的前置节点
- */
- volatileNode prev;/**
- * 当前节点的后置节点
- */
- volatileNode next;/**
- * 当前节点所关联的线程
- */
- volatileThread thread;/**
- * 指向下一个在某个条件上等待的节点,或者指向 SHARE 节点,表明当前处于共享模式
- */Node nextWaiter;final boolean isShared() {returnnextWaiter == SHARED;
- }finalNodepredecessor()throwsNullPointerException {
- Node p = prev;if(p ==null)throw newNullPointerException();else
- returnp;
- }
- Node() {// Used to establish initial head or SHARED marker}
- Node(Thread thread, Node mode) {// Used by addWaiter
- this.nextWaiter= mode;this.thread= thread;
- }
- Node(Thread thread,intwaitStatus) {// Used by Condition
- this.waitStatus= waitStatus;this.thread= thread;
- }
- }
在 Node 类中定义了四种等待状态:
我们首先看下独占锁的获取和释放过程
独占锁获取
下面是获取独占锁的流程图:
我们通过 acquire 方法来获取独占锁,下面是方法定义
- public final void acquire(intarg) {// 首先尝试获取锁,如果获取失败,会先调用 addWaiter 方法创建节点并追加到队列尾部
- // 然后调用 acquireQueued 阻塞或者循环尝试获取锁
- if(!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){// 在 acquireQueued 中,如果线程是因为中断而退出的阻塞状态会返回 true
- // 这里的 selfInterrupt 主要是为了恢复线程的中断状态
- selfInterrupt();
- }
- }
acquire 会首先调用 tryAcquire 方法来获得锁,该方法需要我们来实现,这个在前面已经提过了。如果没有获取锁,会调用 addWaiter 方法创建一个和当前线程关联的节点追加到同步队列的尾部,我们调用 addWaiter 时传入的是 Node.EXCLUSIVE,表明当前是独占模式。下面是 addWaiter 的具体实现
- privateNodeaddWaiter(Node mode) {
- Node node =newNode(Thread.currentThread(), mode);// tail 指向同步队列的尾节点Node pred = tail;// Try the fast path of enq; backup to full enq on failure
- if(pred !=null) {
- node.prev= pred;if(compareAndSetTail(pred, node)) {
- pred.next= node;returnnode;
- }
- }enq(node);returnnode;
- }
addWaiter 方法会首先调用 if 方法,来判断能否成功将节点添加到队列尾部,如果添加失败,再调用 enq 方法(使用循环不断重试)进行添加,下面是 enq 方法的实现:
- privateNodeenq(finalNode node) {for(;;) {
- Node t = tail;// 同步队列采用的懒初始化(lazily initialized)的方式,
- // 初始时 head 和 tail 都会被设置为 null,当一次被访问时
- // 才会创建 head 对象,并把尾指针指向 head。
- if(t ==null) {// Must initialize
- if(compareAndSetHead(newNode()))
- tail = head;
- }else{
- node.prev= t;if(compareAndSetTail(t, node)) {
- t.next= node;returnt;
- }
- }
- }
- }
addWaiter 仅仅是将节点加到了同步队列的末尾,并没有阻塞线程,线程阻塞的操作是在 acquireQueued 方法中完成的,下面是 acquireQueued 的实现:
- final boolean acquireQueued(finalNode node,intarg) {booleanfailed =true;try{booleaninterrupted =false;for(;;) {finalNode p = node.predecessor();// 如果当前节点的前继节点是 head,就使用自旋(循环)的方式不断请求锁
- if(p == head &&tryAcquire(arg)) {// 成功获得锁,将当前节点置为 head 节点,同时删除原 head 节点
- setHead(node);
- p.next=null;// help GCfailed =false;returninterrupted;
- }// shouldParkAfterFailedAcquire 检查是否可以挂起线程,
- // 如果可以挂起进程,会调用 parkAndCheckInterrupt 挂起线程,
- // 如果 parkAndCheckInterrupt 返回 true,表明当前线程是因为中断而退出挂起状态的,
- // 所以要将 interrupted 设为 true,表明当前线程被中断过
- if(shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
- interrupted =true;
- }
- }finally{if(failed)cancelAcquire(node);
- }
- }
acquireQueued 会首先检查当前节点的前继节点是否为 head,如果为 head,将使用自旋的方式不断的请求锁,如果不是 head,则调用 shouldParkAfterFailedAcquire 查看是否应该挂起当前节点关联的线程,下面是 shouldParkAfterFailedAcquire 的实现:
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {// 当前节点的前继节点的等待状态
- intws = pred.waitStatus;// 如果前继节点的等待状态为 SIGNAL 我们就可以将当前节点对应的线程挂起
- if(ws == Node.SIGNAL)return true;if(ws >0) {// ws 大于 0,表明当前线程的前继节点处于 CANCELED 的状态,
- // 所以我们需要从当前节点开始往前查找,直到找到第一个不为
- // CAECELED 状态的节点
- do{
- node.prev= pred = pred.prev;
- }while(pred.waitStatus>0);
- pred.next= node;
- }else{/*
- * waitStatus must be 0 or PROPAGATE. Indicate that we
- * need a signal, but don't park yet. Caller will need to
- * retry to make sure it cannot acquire before parking.
- */
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }return false;
- }
shouldParkAfterFailedAcquire 会检查前继节点的等待状态,如果前继节点状态为 SIGNAL,则可以将当前节点关联的线程挂起,如果不是 SIGNAL,会做一些其他的操作,在当前循环中不会挂起线程。如果确定了可以挂起线程,就调用 parkAndCheckInterrupt 方法对线程进行阻塞:
- private final boolean parkAndCheckInterrupt() {// 挂起当前线程LockSupport.park(this);// 可以通过调用 interrupt 方法使线程退出 park 状态,
- // 为了使线程在后面的循环中还可以响应中断,会重置线程的中断状态。
- // 这里使用 interrupted 会先返回线程当前的中断状态,然后将中断状态重置为 false,
- // 线程的中断状态会返回给上层调用函数,在线程获得锁后,
- // 如果发现线程曾被中断过,会将中断状态重新设为 true
- returnThread.interrupted();
- }
独占锁释放
下面是释放独占锁的流程:
通过 release 方法,我们可以释放互斥锁。下面是 release 方法的实现:
- public final boolean release(intarg) {if(tryRelease(arg)) {
- Node h = head;// waitStatus 为 0,证明是初始化的空队列或者后继结点已经被唤醒了
- if(h !=null&& h.waitStatus!=0)unparkSuccessor(h);return true;
- }return false;
- }
在独占模式下释放锁时,是没有其他线程竞争的,所以处理会简单一些。首先尝试释放锁,如果失败就直接返回(失败不是因为多线程竞争,而是线程本身就不拥有锁)。如果成功的话,会检查 h 的状态,然后调用 unparkSuccessor 方法来唤醒后续线程。下面是 unparkSuccessor 的实现:
- private void unparkSuccessor(Node node) {intws = node.waitStatus;// 将 head 节点的状态置为 0,表明当前节点的后续节点已经被唤醒了,
- // 不需要再次唤醒,修改 ws 状态主要作用于 release 的判断
- if(ws <0)compareAndSetWaitStatus(node, ws,0);/*
- * Thread to unpark is held in successor, which is normally
- * just the next node. But if cancelled or apparently null,
- * traverse backwards from tail to find the actual
- * non-cancelled successor.
- */Node s = node.next;if(s ==null|| s.waitStatus>0) {
- s =null;for(Node t = tail; t !=null&& t != node; t = t.prev)if(t.waitStatus<=0)
- s = t;
- }if(s !=null)
- LockSupport.unpark(s.thread);
- }
在 unparkSuccessor 方法中,如果发现头节点的后继结点为 null 或者处于 CANCELED 状态,会从尾部往前找(在节点存在的前提下,这样一定能找到)离头节点最近的需要唤醒的节点,然后唤醒该节点。
独占锁的流程和原理比较容易理解,因为只有一个锁,但是共享锁的处理就相对复杂一些了。在独占锁中,只有在释放锁之后,才能唤醒等待的线程,而在共享模式中,获取锁和释放锁之后,都有可能唤醒等待的线程。如果想要理清共享锁的工作过程,必须将共享锁的获取和释放结合起来看。这里我们先看一下共享锁的释放过程,只有明白了释放过程做了哪些工作,才能更好的理解获取锁的过程。
共享锁释放
下面是释放共享锁的流程:
通过 releaseShared 方法会释放共享锁,下面是具体的实现:
- public final boolean releaseShared(intreleases) {if(tryReleaseShared(arg)) {doReleaseShared();return true;
- }return false;
- }
releases 是要释放的共享资源数量,其中 tryReleaseShared 的方法由我们自己重写,该方法的主要功能就是修改共享资源的数量(state + releases),因为可能会有多个线程同时释放资源,所以实现的时候,一般采用循环加 CAS 操作的方式,如下面的形式:
- protected boolean tryReleaseShared(intreleases) {// 释放共享资源,因为可能有多个线程同时执行,所以需要使用 CAS 操作来修改资源总数。
- for(;;) {intlastCount =getState();intnewCount = lastCount + releases;if(compareAndSetState(lastCount, newCount)) {return true;
- }
- }
- }
当共享资源数量修改了之后,会调用 doReleaseShared 方法,该方法主要唤醒同步队列中的第一个等待节点(head.next),下面是具体实现:
- private void doReleaseShared() {/*
- * Ensure that a release propagates, even if there are other
- * in-progress acquires/releases. This proceeds in the usual
- * way of trying to unparkSuccessor of head if it needs
- * signal. But if it does not, status is set to PROPAGATE to
- * ensure that upon release, propagation continues.
- * Additionally, we must loop in case a new node is added
- * while we are doing this. Also, unlike other uses of
- * unparkSuccessor, we need to know if CAS to reset status
- * fails, if so rechecking.
- */
- for(;;) {
- Node h = head;// head = null 说明没有初始化,head = tail 说明同步队列中没有等待节点
- if(h !=null&& h != tail) {// 查看当前节点的等待状态
- intws = h.waitStatus;// 我们在前面说过,SIGNAL说明有后续节点需要唤醒
- if(ws == Node.SIGNAL) {/*
- * 将当前节点的值设为 0,表明已经唤醒了后继节点
- * 可能会有多个线程同时执行到这一步,所以使用 CAS 保证只有一个线程能修改成功,
- * 从而执行 unparkSuccessor,其他的线程会执行 continue 操作
- */
- if(!compareAndSetWaitStatus(h, Node.SIGNAL,0))continue;// loop to recheck cases
- unparkSuccessor(h);
- }else if(ws ==0&& !compareAndSetWaitStatus(h,0, Node.PROPAGATE)) {/*
- * ws 等于 0,说明无需唤醒后继结点(后续节点已经被唤醒或者当前节点没有被阻塞的后继结点),
- * 也就是这一次的调用其实并没有执行唤醒后继结点的操作。就类似于我只需要一张优惠券,
- * 但是我的两个朋友,他们分别给我了一张,因此我就剩余了一张。然后我就将这张剩余的优惠券
- * 送(传播)给其他人使用,因此这里将节点置为可传播的状态(PROPAGATE)
- */
- continue;// loop on failed CAS}
- }if(h == head)// loop if head changed
- break;
- }
- }
从上面的实现中,doReleaseShared 的主要作用是用来唤醒阻塞的节点并且一次只唤醒一个,让该节点关联的线程去重新竞争锁,它既不修改同步队列,也不修改共享资源。
当多个线程同时释放资源时,可以确保两件事:
所以释放锁做的主要工作还是修改共享资源的数量。而有了多个共享资源后,如何确保同步队列中的多个节点可以获取锁,是由获取锁的逻辑完成的。下面看下共享锁的获取。
共享锁的获取
下面是获取共享锁的流程
通过 acquireShared 方法,我们可以申请共享锁,下面是具体的实现:
- public final void acquireShared(intarg) {// 如果返回结果小于 0,证明没有获取到共享资源
- if(tryAcquireShared(arg) <0)doAcquireShared(arg);
- }
如果没有获取到共享资源,就会执行 doAcquireShared 方法,下面是该方法的具体实现:
- private void doAcquireShared(intarg) {finalNode node =addWaiter(Node.SHARED);booleanfailed =true;try{booleaninterrupted =false;for(;;) {finalNode p = node.predecessor();if(p == head) {intr =tryAcquireShared(arg);if(r >=0) {setHeadAndPropagate(node, r);
- p.next=null;// help GC
- if(interrupted)selfInterrupt();
- failed =false;return;
- }
- }if(shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
- interrupted =true;
- }
- }finally{if(failed)cancelAcquire(node);
- }
- }
从上面的代码中可以看到,只有前置节点为 head 的节点才有可能去竞争锁,这点和独占模式的处理是一样的,所以即便唤醒了多个线程,也只有一个线程能进入竞争锁的逻辑,其余线程会再次进入 park 状态,当线程获取到共享锁之后,会执行 setHeadAndPropagate 方法,下面是具体的实现:
- private void setHeadAndPropagate(Node node,longpropagate) {// 备份一下头节点Node h = head;// Record old head for check below
- /*
- * 移除头节点,并将当前节点置为头节点
- * 当执行完这一步之后,其实队列的头节点已经发生改变,
- * 其他被唤醒的线程就有机会去获取锁,从而并发的执行该方法,
- * 所以上面备份头节点,以便下面的代码可以正确运行
- */
- setHead(node);/*
- * Try to signal next queued node if:
- * Propagation was indicated by caller,
- * or was recorded (as h.waitStatus either before
- * or after setHead) by a previous operation
- * (note: this uses sign-check of waitStatus because
- * PROPAGATE status may transition to SIGNAL.)
- * and
- * The next node is waiting in shared mode,
- * or we don't know, because it appears null
- *
- * The conservatism in both of these checks may cause
- * unnecessary wake-ups, but only when there are multiple
- * racing acquires/releases, so most need signals now or soon
- * anyway.
- */
- /*
- * 判断是否需要唤醒后继结点,propagate > 0 说明共享资源有剩余,
- * h.waitStatus < 0,表明当前节点状态可能为 SIGNAL,CONDITION,PROPAGATE
- */
- if(propagate >0|| h ==null|| h.waitStatus<0||
- (h = head) ==null|| h.waitStatus<0) {
- Node s = node.next;// 只有 s 不处于独占模式时,才去唤醒后继结点
- if(s ==null|| s.isShared())doReleaseShared();
- }
- }
判断后继结点是否需要唤醒的条件是十分宽松的,也就是一定包含必要的唤醒,但是也有可能会包含不必要的唤醒。从前面我们可以知道 doReleaseShared 函数的主要作用是唤醒后继结点,它既不修改共享资源,也不修改同步队列,所以即便有不必要的唤醒也是不影响程序正确性的。如果没有共享资源,节点会再次进入等待状态。
到了这里,脉络就比较清晰了,当一个节点获取到共享锁之后,它除了将自身设为 head 节点之外,还会判断一下是否满足唤醒后继结点的条件,如果满足,就唤醒后继结点,后继结点获取到锁之后,会重复这个过程,直到判断条件不成立。就类似于考试时从第一排往最后一排传卷子,第一排先留下一份,然后将剩余的传给后一排,后一排会重复这个过程。如果传到某一排卷子没了,那么位于这排的人就要等待,直到老师又给了他新的卷子。
在获取锁时还可以设置响应中断,独占锁和共享锁的处理逻辑类似,这里我们以独占锁为例。使用 acquireInterruptibly 方法,在获取独占锁时可以响应中断,下面是具体的实现:
- public final void acquireInterruptibly(intarg)throwsInterruptedException {if(Thread.interrupted())throw newInterruptedException();if(!tryAcquire(arg))doAcquireInterruptibly(arg);
- }private void doAcquireInterruptibly(intarg)throwsInterruptedException {finalNode node =addWaiter(Node.EXCLUSIVE);booleanfailed =true;try{for(;;) {finalNode p = node.predecessor();if(p == head &&tryAcquire(arg)) {setHead(node);
- p.next=null;// help GCfailed =false;return;
- }if(shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) {// 这里会抛出异常
- throw newInterruptedException();
- }
- }
- }finally{if(failed)cancelAcquire(node);
- }
- }
从上面的代码中我们可以看出,acquireInterruptibly 和 acquire 的逻辑类似,只是在下面的代码处有所不同:当线程因为中断而退出阻塞状态时,会直接抛出 InterruptedException 异常。
- if(shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()) {// 这里会抛出异常
- throw newInterruptedException();
- }
我们知道,不管是抛出异常还是方法返回,程序都会执行 finally 代码,而 failed 肯定为 true,所以抛出异常之后会执行 cancelAcquire 方法,cancelAcquire 方法主要将节点从同步队列中移除。下面是具体的实现:
- private void cancelAcquire(Node node) {// Ignore if node doesn't exist
- if(node ==null)return;
- node.thread=null;// 跳过前面的已经取消的节点Node pred = node.prev;while(pred.waitStatus>0)
- node.prev= pred = pred.prev;// 保存下 pred 的后继结点,以便 CAS 操作使用
- // 因为可能存在已经取消的节点,所以 pred.next 不一等于 nodeNode predNext = pred.next;// Can use unconditional write instead of CAS here.
- // After this atomic step, other Nodes can skip past us.
- // Before, we are free of interference from other threads.
- //
来源: http://www.cnblogs.com/zhangjk1993/p/6715653.html