Hello, 大家好,在上一篇文章中,作者简单的把 Lock 接口和 AQS 的 API,以及关系大致说了一下,本文还是围绕 AQS 为话题 (AQS 是重中之重,这个搞明白了。后面 JUC 自己看源码都很 Easy 可以看懂),先具体到 API 大致说下 API 的对应关系,然后作者自己写俩自定义 Lock 说明问题,最后再详细讲解 AQS 提供的一些模板 API 具体怎么实现的. 文章结构:
在上文的讲解 Lock 和 AQS 关系时,有一张图大致讲解了这些 API 的调用关系,本文作者决定再画一张,具体到 AQS 中具体 API 的对应关系:
图中可以看到绿色箭头表示独占性锁的实现逻辑,红色箭头表示共享式锁实现的逻辑,所谓的独占性锁意思就是,只要有一个线程拿到锁,其他线程全部 T 出去到队列等待。共享性锁就好理解了,一部分个性化 (根据 tryAcquire 返回值决定) 的线程可以拿到锁,没拿到的到队列。
好了,根据上面的知识,结合上一节讲解的各个模块 API 调用关系,作者不废话了,来,自定义一个独占性锁: 任何情况下,只允许一个线程拿到锁!即使是自己也只能拿到一次!(不可重入!)
- /**
- * Created by zdy on 17/12/22.
- */
- public
- class
- OnlyOneLock
- implements
- Lock
- {
- private static
- class
- OnlyOneLockAQS
- extends
- AbstractQueuedSynchronizer
- {
- //在该类内部调用State相关API,维护是否获取到锁的逻辑
- @Override
- protected
- boolean
- tryAcquire
- (int arg)
- {
- //状态为0时,设置为1,表示当前线程获取到锁
- if(compareAndSetState(0,1)){
- //设置当前线程为获取到锁的线程
- setExclusiveOwnerThread(Thread.currentThread());
- return true ;
- }
- return false;
- }
- @Override
- protected
- boolean
- tryRelease
- (int arg)
- {
- //如果状态为0,表示还没有线程获取到锁,你释放什么释放
- if(getState()==0) {throw new IllegalMonitorStateException();}
- setExclusiveOwnerThread(null);
- setState(0);
- return true;
- }
- @Override
- protected
- boolean
- isHeldExclusively
- ()
- {
- //判断该线程是否被占有
- return getState()==1;
- }
- }
- //讲所有锁的语音,直接调用aqs的api来实现
- private final OnlyOneLockAQS aqs=new OnlyOneLockAQS();
- @Override
- public
- void
- lock
- ()
- {
- aqs.acquire(1);
- }
- @Override
- public
- void
- lockInterruptibly
- ()
- throws
- InterruptedException
- {
- aqs.acquireInterruptibly(1);
- }
- @Override
- public
- boolean
- tryLock
- ()
- {
- return aqs.tryAcquire(1);
- }
- @Override
- public
- boolean
- tryLock
- (long time, TimeUnit unit)
- throws
- InterruptedException
- {
- return aqs.tryAcquireNanos(1,unit.toNanos(time));
- }
- @Override
- public
- void
- unlock
- ()
- {
- //记住一定不能直接调用tyrRelease那一套API,因为release方法帮我们维护释放后的通知逻辑.
- aqs.release(1);
- }
- @Override
- public Condition newCondition() {
- return null;
- }
- }
好了,大家好好理解理解,其实也没那么难。可以看到。Lock 中的实现 API 都是调用了同步器 AQS 的模板方法和我们实现的方法来实现锁的逻辑的。我们实现的 try 开头的那几个 API 根本不用管什么队列,什么通知逻辑。只需要管是否获取到锁的逻辑。是不是很神奇?这里强调一下,其实在 Lock 接口中只有 tryLock()这个 API 会直接调用 tryAcquire() 这个我们实现的 API 之外, 其他的 API 其实都是调用的 AQS 的模板方法,应为模板方法封装了很多复杂的队列通知等逻辑。
OK,废话少说,上面可以说是自己写了一个独占性的锁,永远只有一个线程可以获取到锁。下面再定义一个共享性的锁。来个简单点的。大家应该都知道限流,现在假如有个需求,一个应用级的限流 (业务接口层面的), 要求一个接口最后只能被 5 个线程并发访问,后续的线程再访问,直接返回,不做业务逻辑处理.(常用语秒杀业务中某个商品只有 5 个,那么有必要放很多请求进来吗?)
- package com.zdy;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.AbstractQueuedSynchronizer;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.Lock;
- /**
- * Created by zdy on 17/12/22.
- */
- public
- class
- ShareLock
- implements
- Lock
- {
- private
- class
- ShareLockAQS
- extends
- AbstractQueuedSynchronizer
- {
- protected ShareLockAQS(Integer count) {
- super();
- setState(count);
- }
- @Override
- protected
- int
- tryAcquireShared
- (int arg)
- {
- for (; ; ) {
- Integer state = getState();
- Integer newCount = state - arg;
- if (newCount < 0 || compareAndSetState(state, newCount)) {
- return newCount;
- }
- }
- }
- @Override
- protected
- boolean
- tryReleaseShared
- (int arg)
- {
- for (; ; ) {
- //注意这里不能直接setState了,因为可能多个线程同时release
- Integer state = getState();
- Integer newCount = state + arg;
- if (compareAndSetState(state,newCount)) {
- return true;
- }
- }
- }
- @Override
- protected
- boolean
- isHeldExclusively
- ()
- {
- return getState()==0;
- }
- }
- private ShareLockAQS aqs=new ShareLockAQS(5);
- @Override
- public
- void
- lock
- ()
- {
- aqs.acquireShared(1);
- }
- @Override
- public
- void
- lockInterruptibly
- ()
- throws
- InterruptedException
- {
- aqs.acquireInterruptibly(1);
- }
- @Override
- public
- boolean
- tryLock
- ()
- {
- return aqs.tryAcquireShared(1)>=0;
- }
- @Override
- public
- boolean
- tryLock
- (long time, TimeUnit unit)
- throws
- InterruptedException
- {
- return aqs.tryAcquireSharedNanos(1,unit.toNanos(time));
- }
- @Override
- public
- void
- unlock
- ()
- {
- aqs.releaseShared(1);
- }
- @Override
- public Condition newCondition() {
- return null;
- }
- }
大致逻辑: 内部维护了一个 state,初始化为 5,获取一个锁,减 1,释放一个锁,+1。注意获取和释放的并发性!尤其是在释放时,释放失败了,一定要 for(;;), 切记! 一句话说,就是释放必须是成功的.
下面来验证一下这个并发 Lock:
其实思路还是比较清晰的。在 web 请求接口中每进来就去取锁。由于我们的锁最多只能取 5 次,所有当第 6 次请求进来后直接 return "活动已经结束...", 结果我就不演示了,这就是我们的限流 Lock 了。看,限流没想的那么高大上。当然了,我这里说的只是应用限流的一种,其实粗暴一点,直接用个 Integer 变量,加锁每次减一就可以了。后面 JUC 里面还有许多内置的工具类来提供给我们使用。说到限流这一块,其实应用层面接口的限流是最 Low 的,效果最不好旳,为什么呢?因为接口已经进入到容器了,对资源的消耗也是存在的。其实最标准的限流都是在网关或者接入层,比如 Nginx 层面的限流,效果比较显著。好了好了,又扯这么多。回到主题...
通过上面的例子可以知道,其实我们想要实现自己的同步组件还是比较简单的。只需要写好 AQS,而 AQS 中需要我们覆盖的几个方法只需要处理好拿到锁和没有拿到锁的逻辑,至于线程怎么维护,我们还是不清楚的,这一小节就带大家揭开这个神秘的面纱。先说简单独占 Lock 的,比如在我们重写了 tryAcquire 时,AQS 的模板方法 acquire 内部会调用这个方法,然后维护队列逻辑。废话不多说直接上源码:(代码全为源码,注释为小弟所加)
- public
- final
- void
- acquire
- (int arg)
- {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
先调用 tryAcquire(); 如果拿到,代码直接跑完,不阻塞。 如果没有拿到。调用 addWaiter(); 把当前线程构建成一个 Node 加入到队列尾部。
- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode);
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- //Cas算法设置进去。
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- enq(node);
- return node;
- }
把当前线程加入到队列 (FIFO) 尾部后,调用 acquireQueued();
- final
- boolean
- acquireQueued
- (final Node node, int arg)
- {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- //parkAndCheckInterrupt这个API会Park当前线程
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
这个 API 贼有意思,先检查当前线程在队列中的前一个线程是否为头结点,如果是就在尝试拿一次,没拿到的话会在 parkAndCheckInterrupt 时把自己给 Park 掉,Park 大致可以理解为休眠 (Sleep),这个时候,线程就老老实实的在队列里面等着,等什么呢?等获取锁的线程释放锁后通知它,它好继续在 for 循环里面获取锁。。 我贴一下流程图。然后大家根据源码对着看一看。因为源码细节比较多。不可能一一讲解。
大家要明白,这个流程是独占式的队列流程。首节点永远为获取到锁的那个节点。
然后说下共享式的队列如何维护,还拿上面那个例子,支持并发 5 个线程获取到锁:
这一块的逻辑和独占 Lock 还是很大差别的,我就不和大家 Show 源码了。还是比较麻烦的。大致和独占 Lock 一样。只是队列的维护不一样,大家感兴趣自己看一看。
顺便提一个小知识点:
Sleep() 和 Object.wait() 遇到 Interrupt 出异常。
LockSupport.park() 遇到则会唤醒继续运行。
结语
好了,其实 AQS 这一块源码翻起来远不止这么多,我这里只是大致说了个主线,队列如何入队,如何唤醒,稍微说了下,感兴趣的同学可以再仔细琢磨,因为文字确实不太好跟源码。Have a good day .
来源: https://juejin.im/post/5a3c6aa551882538d3101d5f