并发的学习与使用系列 第八篇
AQS 是 AbstractQueuedSynchronizer 的简称,是并发组件包 java.util.concurrent 也就是 JUC(Java Util Concurrency) 的核心,以及很多并发组件如前面几篇中介绍到的 ReentrantLock,Condition,BlockingQueue 以及线程池里使用的 worker 等都是基于其实现的,将很多复杂的,繁琐的并发控制过程封装起来,便于其他并发工具类来实现更多,方便的功能,其主要通过 volatile 和 UnSafe 类的原子操作(Atomic 相关)来实现阻塞和同步,这里通过 ReentrantLock 来分析下 AQS 的实现原理。
之前的文章并发的学习与使用系列提到里 Lock 的实现类 ReentrantLock 是一个可重入的,可实现公平的锁。下面通过 ReentrantLock 的源码来看看其是怎么实现的。
AbstractQueuedSynchronizer 是一个抽象类,其他类主要通过重载其 tryAcquire(int arg) 来获取锁通过 tryRealese() 来释放锁。
ReentrantLock 的默认构造函数
- publicReentrantLock(){
- sync = new NonfairSync();
- }
- static final classNonfairSyncextendsSync{
- ...
- }
- abstract static classSyncextendsAbstractQueuedSynchronizer{
- ...
- }
Sync 是 ReentrantLock 实现公平与非公平锁的主要实现,所以默认请况下 ReentrantLock 是个非公平锁。一般通过 ReentrantLock.lock() 来获取锁,其实现是在 Sync 中完成的。下面是先以非公平锁的实现方式来分析。
- NoFairSync类
- finalvoidlock(){
- if (compareAndSetState(0, 1))
- setExclusiveOwnerThread(Thread.currentThread());
- else
- acquire(1);
- }
- }
- AQS类
- protectedfinalbooleancompareAndSetState(intexpect,intupdate){
- // See below for intrinsics setup to support this
- return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
- }
compareAndSetState(0, 1) 是一个原子操作,其代表的是如果原来的值是 0 那就将其设为 1,并且返回 ture。那这个原来的值是指的谁的值呢?从 compareAndSetState 中并看不出来。那就从整体来看,在 AQS 中有个表示当前锁的状态的 int 值 state, 当 state 等于 0 时,表示锁可用,否则表示锁定状态,是否可用还需考虑其他情况如可重入性。
- private volatile int state;
可以想到 compareAndSetState(0, 1) 应该就是设置这个 state 的状态,其实现原理是通过 Unsafe 类可直接操作内存的特性来实现的。
- private static final Unsafe unsafe =Unsafe.getUnsafe();
- private static final long stateOffset;
- stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class
- .getDeclaredField("state"));
所以这样就很好理解了,通过 Unsafe 直接得到 state 的内存地址然后直接错作内存,因为直接可以控制内存值,这也是 Unsafe 类名字的来源,后面还会介绍其改进。
在回来分析 lock(),如果设置状态成功,也就是成功获取了锁,接下来是
- setExclusiveOwnerThread(Thread.currentThread());
- AQS类中
- exclusiveOwnerThread = thread;
表示当前 exclusiveOwnerThread 占据着该锁,可重入性的实现就与其有关,后面介绍。这时就可以直接执行 lock() 后面的程序了。
如果获取锁失败进入 AQS 的 acquire(int arg);
- publicfinalvoidacquire(intarg){
- if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
tryAcquire 实在 NonfairSync 中实现的。
- NonfairSync:
- protectedfinalbooleantryAcquire(intacquires){
- return nonfairTryAcquire(acquires);
- }
- finalbooleannonfairTryAcquire(intacquires){
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) {
- if (compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc < 0) // overflow
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
这个过程是先去判断锁的状态是否为可用,如果锁已被持有,则再判断持有锁的线程是否未当前线程,如果是则将锁的持有递增,这也是 java 层实现可重入性的原理。如果再次失败,则进入等待队列。
通过类 Node 来实现
- static final classNode{
- static final Node SHARED = new Node();
- static final Node EXCLUSIVE = null;
- static final int CANCELLED = 1;
- /** waitStatus value to indicate successor's thread needs unparking */
- static final int SIGNAL = -1;
- static final int CONDITION = -2;
- static final int PROPAGATE = -3;
- volatile int waitStatus;
- volatile Node prev;
- volatile Node next;
- volatile Thread thread;
- Node nextWaiter;
- finalbooleanisShared(){
- return nextWaiter == SHARED;
- }
- finalNodepredecessor()throwsNullPointerException{
- Node p = prev;
- if (p == null)
- throw new NullPointerException();
- else
- return p;
- }
- Node() { // Used to establish initial head or SHARED marker
- }
- Node(Thread thread, Node mode) { // Used by addWaiter
- this.nextWaiter = mode;
- this.thread = thread;
- }
- Node(Thread thread, int waitStatus) { // Used by Condition
- this.waitStatus = waitStatus;
- this.thread = thread;
- }
- }
AQS 的线程阻塞队列是个双向队列,提供了 FIFO 先来先服务的公平性,用 head 节点表示对首,tail 表示队尾。
接着看下获取锁失败后进入队列的实现
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
- 将新的节点加入到队尾
- privateNodeaddWaiter(Node mode){
- Node node = new Node(Thread.currentThread(), mode);
- // Try the fast path of enq; backup to full enq on failure
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- enq(node);
- return node;
- }
未获取到锁将新建一个 Node 节点,然后将其加入到队尾,此时并未有将其阻塞,在 acquireQueued 中将再次尝试获取锁。
- finalbooleanacquireQueued(finalNode node,intarg){
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- //判断前一个节点是否为头节点并且成功过去了锁,如果是将当前节点设为头结点,也就是说明队列的头结点就是当前获取锁的线程,可以看出一个节点是否能获取锁只和他前面的节点有关
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
- privatestaticbooleanshouldParkAfterFailedAcquire(Node pred, Node node){
- int ws = pred.waitStatus;
- if (ws == Node.SIGNAL)
- //如果其前节点当前属于等待被唤醒的状态,返回值代表可以将当前节点阻塞。
- return true;
- if (ws > 0) {
- //如果其前节点已取消,则向前继续找知道找到状态不是CANCELLED的作为新的前节点。
- do {
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- pred.next = node;
- } else {
- //前节点状态是0(不要和AQS的锁的状态state弄混),或者其他状态,先将其前节点置为Node.SIGNAL,此时不阻塞,待下次循环中确认
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- return false;
- }
- 如果shouldParkAfterFailedAcquire()返回false,及当且只有其前节点pred状态为Node.SIGNAL时,将当前节点node阻塞
- privatefinalbooleanparkAndCheckInterrupt(){
- LockSupport.park(this);
- return Thread.interrupted();
- }
- LockSupport.java类:
- publicstaticvoidpark(Object blocker){
- Thread t = Thread.currentThread();
- //这里记录线程阻塞在的对象,便于调试用
- setBlocker(t, blocker);
- //此步骤为将当前线程阻塞
- unsafe.park(false, 0L);
- setBlocker(t, null);
- }
jdk 中 unsafe.park 调用 native 方法将线程阻塞,而在 Android sdk 中有不同的实现方式,后续介绍。
以上就是 ReentrantLock 的非公平锁调用 lock() 过程,首先去尝试改变 AQS 设为 state 的状态,改变成功就获取了锁,失败后再次通过判断当前的 state 是否为 0,即未锁定状态,再次尝试改变 state 状态获取锁,如果 state 不为 0,即锁已经被其他线程持有,则判断当前线程是不是已经持有该锁,如果是,则获取锁成功,且锁的次数增加。否则加入到 Node 队列,加入队列后在在 for 循环中通过判断其前节点的状态来决定是否需要阻塞, 可以看出在加入队列前及阻塞前多次尝试去获取锁,而避免进入线程阻塞,这是因为阻塞、唤醒都需要 cpu 的调度,以及上下文切换,这是个重量级的操作,应尽量避免 。
- FairSync类:
- finalvoidlock(){
- //先去判断锁的状态,而不是直接去获取
- acquire(1);
- }
- AQS类:
- publicfinalvoidacquire(intarg){
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
- FairSync类:
- protectedfinalbooleantryAcquire(intacquires){
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) {
- //hasQueuedPredecessors判断是否有前节点,如果有就不会尝试去获取锁
- if (!hasQueuedPredecessors() &&
- compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc < 0)
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
看下实现过程,和非公平锁很相似,主要差别 lock() 的时候不是直接去获取锁,而是先看锁是否可用并且没有前节点,有前节点的话,即使锁是空闲也不会获取锁。
公平锁和非公平锁的释放过程是一样的,其实现都是在 Sync 父类中
- publicvoidunlock(){
- sync.release(1);
- }
- publicfinalbooleanrelease(intarg){
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
- protectedfinalbooleantryRelease(intreleases){
- int c = getState() - releases;
- if (Thread.currentThread() != getExclusiveOwnerThread())
- throw new IllegalMonitorStateException();
- boolean free = false;
- if (c == 0) {
- free = true;
- setExclusiveOwnerThread(null);
- }
- setState(c);
- return free;
- }
代码很好理解,就是先去改变 AQS 的中代表锁状态的 state 值,改变后如果 state 为 0,说明没有线程持有该锁,因为是可重入的,所以如果之前一个线程多次获取该锁,也需要释放多次。
锁释放后并没有唤醒之前阻塞的线程,所以还需要后续的唤醒操作。
- privatevoidunparkSuccessor(Node node){
- //改变头结点的值,对唤醒后续节点没影响.
- int ws = node.waitStatus;
- if (ws < 0)
- compareAndSetWaitStatus(node, ws, 0);
- Node s = node.next;
- if (s == null || s.waitStatus > 0) {
- s = null;
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- if (s != null)
- LockSupport.unpark(s.thread);
- }
- LockSupport类:
- publicstaticvoidunpark(Thread thread){
- if (thread != null)
- unsafe.unpark(thread);
- }
从 lock() 的过程可知,Head 节点就是当前持有锁的线程节点,当释放锁时,从头结点的 next 来看,头结点的下一个节点如果不为 null, 且 waitStatus 不大于 0,则跳过判断,否则从队尾向前找到最前的一个 waitStatus 的节点,然后通过 LockSupport.unpark(s.thread) 唤醒该节点线程。可以看出 ReentrantLock 的非公平锁只是在获取锁的时候是非公平的,如果进入到等待队列后,在 head 节点的线程 unlock() 时,会按照进入的顺序来得到唤醒,保证了队列的 FIFO 的特性。
可以看出无论是获取锁还是释放锁的时候锁最多只有两个线程在竞争,而其他情况下,阻塞的线程不会被唤醒参与竞争,所以性能较高,因为阻塞和唤醒都是需要上下文切换,同时多个线的程竞争都会使 CPU 占用率升高,降低吞吐量。内置锁在最初的 jdk 版本中会有这个情况,但后续逐渐优化,所以选择 ReentrantLock 和 synchronized 的条件不应该是主要原因,而是应该考虑锁是否需要公平性,是否需要可中断,可共享等来作为选择依据。
为什么叫 Unsafe 类呢?也就是不安全的,因为 Unsafe 类可以直接操作内存,这对 java 安全性是一个隐患, 据说后续会逐渐改变其实现方式,而在 Android 的 sdk-23 中,Unsafe 类已经有些改变了,Thread 也和 jdk 的不同,之前通过 unsafe.park() 的 native 方法来阻塞一个线程,unsafe.unpark() 来唤醒,而在 Android 中 unsafe.park() 会调用 Thread 的 park 方法,而 Thread 的 park 其实又会调用其 wait(),这下就清晰了,就和普通的对象的 wait() 一样,然后 unsafe.unpark() 最终会调用到 notify() 方法。
所以源码一直实在变化的,尤其 Android sdk 和 jdk 也有一些区别,但掌握其思想就能很好的理解并掌握新的变化。这也是这一年来在做的,放下浮躁的心态,扎实基础,掌握原理。
来源: