提供一个框架, 用于实现依赖先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量, 事件等). 该类被设计为大多数类型的同步器的有用依据, 这些同步器依赖于单个原子 int 值来表示状态. 子类必须定义改变此状态的受保护方法, 以及根据该对象被获取或释放来定义该状态的含义. 给定这些, 这个类中的其他方法执行所有排队和阻塞机制. 子类可以保持其他状态字段, 但只
以原子方式更新 int 使用方法操纵值 getState() , setState(int)和
compareAndSetState(int, int)
被跟踪相对于同步
.
子类应定义为非公共内部助手类, 用于实现其封闭类的同步属性.
AbstractQueuedSynchronizer
类不实现任何同步接口. 相反, 它定义了一些方法, 如
acquireInterruptibly(int)
, 可以通过具体的锁和相关同步器来调用适当履行其公共方法.
此类支持默认独占模式和共享模式.[使用模板模式来定义是独占还是共享模式] 当以独占模式获取时, 尝试通过其他线程获取不能成功. 多线程获取的共享模式可能 (但不需要) 成功. 除了在机械意义上, 这个类不理解这些差异, 当共享模式获取成功时, 下一个等待线程 (如果存在) 也必须确定它是否也可以获取. 在不同模式下等待的线程共享相同的 FIFO 队列. 通常, 实现子类只支持这些模式之一, 但是两者都可以在 ReadWriteLock 中发挥作用 . 仅支持独占或仅共享模式的子类不需要定义支持未使用模式的方法.
这个类定义的嵌套
AbstractQueuedSynchronizer
.ConditionObject 可用于作为一类 Condition 由子类支持独占模式用于该方法的实施 isHeldExclusively()份报告是否同步排他相对于保持在当前线程, 方法 release(int)与当前调用 getState()值完全释放此目的, 和 acquire(int) , 给定此保存的状态值, 最终将此对象恢复到其先前获取的状态.
AbstractQueuedSynchronizer
方法将创建此类条件, 因此如果不能满足此约束, 请勿使用该约束.
AbstractQueuedSynchronizer.ConditionObject
的行为当然取决于其同步器实现的语义.
该类为内部队列提供检查, 检测和监控方法, 以及条件对象的类似方法. 这些可以根据需要导出到类中, 使用
AbstractQueuedSynchronizer
进行同步机制.
此类的序列化仅存储底层原子整数维持状态, 因此反序列化对象具有空线程队列. 需要可序列化的典型子类将定义一个 readObject 方法, 可以将其恢复为 readObject 时的已知初始状态.
用法
使用这个类用作同步的基础上, 重新定义以下方法,[即在同步器中定义内部类重写下面的方法] , 如适用, 通过检查和 / 或修改使用所述同步状态 getState() ,setState(int)或 compareAndSetState(int,int)[通过这三个方法来保证原子性和线程安全性] :
tryAcquire(int)独占模式获取锁
tryRelease(int)独占模式释放锁
tryAcquireShared(int)共享模式获取锁
tryReleaseShared(int)共享模式释放锁
isHeldExclusively()是否是独占式
每个这些方法默认抛出
UnsupportedOperationException
. 这些方法的实现必须是线程安全的, 通常应该是短的而不是阻止的. 定义这些方法是唯一支持使用此类的方法. 所有其他方法都被声明为 final , 因为它们不能独立变化.
看下面的源码分析前可先观看多线程高并发编程(2) -- 可重入锁介绍和自定义
一. ReentrantLock 的 lock 和 unlock 源码解析
lock 流程:
调用同步器 Sync 的抽象方法 lock, 由公平锁 FairSync 实现
调用 AQS 的 acquire 获取锁
尝试获取锁: 公平锁 FairSync 的 tryAcquire
第一次获取锁, 若队列没有前节点和设置状态成功, 存储当前线程, 返回 true, 表示获取成功;
如果是重入获取, 锁持有数量 + 1;
获取锁失败, 把当前线程加入等待队列中, 并对等待队列进行阻塞, 不断竞争获取锁: acquireQueued 遍历等待队列 addWaiter, 若有头节点则从队列中取出来, 若没有则进行中断;
unlock 流程:
调用同步器的 release 方法, 有 AQS 实现;
tryRelease 释放锁, 由 Sync 实现
锁数量 - 1;
当前线程和保存的线程不一致抛出异常;
锁数量为 0 则进行释放锁, 把独占线程设置为 null, 修改状态;
- //===========================ReentrantLock 源码 ===============================
- public class ReentrantLock implements Lock, java.io.Serializable {
- private final Sync sync;// 同步器
- public void lock() {// 获取锁
- sync.lock();// 调用同步器 Sync 的 lock, 由 FairSync 实现
- }
- public void unlock() {// 使用锁
- sync.release(1);// 调用同步器的 release, 由 AQS 实现
- }
- // 内部类, 同步器继承 AQS, 实现 tryRelease 释放锁
- abstract static class Sync extends AbstractQueuedSynchronizer{
- abstract void lock();// 获取锁抽象方法, 由 FairSync 实现
- //=========================== 释放锁 ===============================
- protected final boolean tryRelease(int releases) {
- int c = getState() - releases;// 锁数量 - 1
- // 当前线程和保存的线程不一致
- if (Thread.currentThread() != getExclusiveOwnerThread())
- throw new IllegalMonitorStateException();
- boolean free = false;
- if (c == 0) {// 持有的锁数量为 0
- free = true;// 释放锁
- setExclusiveOwnerThread(null);// 当前独占线程为 null
- }
- setState(c);// 设置状态
- return free;
- }
- }
- // 内部类, 公平锁继承同步器, 实现 lock 方法
- static final class FairSync extends Sync {
- //=========================== 获取锁 ===============================
- final void lock() {
- acquire(1);// 调用 AQS 的 acquire
- }
- protected final boolean tryAcquire(int acquires) {
- final Thread current = Thread.currentThread();// 获得当前线程
- /**getState 是 AQS 的 Node 的 waitStatus, 其值有
- *CANCELLED = 1
- *SIGNAL = -1
- *CONDITION = -2
- *PROPAGATE = -3
- */
- int c = getState();
- //c 初始值为 0,0 表示不是以上的状态; hasQueuedPredecessors 之前是否有节点,
- // 如果是 true 表示这个线程的前面还有节点应该让前面的节点先获取锁, 当前线程获取失败;
- //[非公平锁少了 hasQueuedPredecessors 这个判断]
- //compareAndSetState CAS 比较, 设置当前状态为 1;setExclusiveOwnerThread 当前线程设置为独占线程
- if (c == 0) {
- if (!hasQueuedPredecessors() &&
- compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;// 获取成功
- }
- }
- else if (current == getExclusiveOwnerThread()) {// 如果是当前线程, 表示重入
- int nextc = c + acquires;// 锁数量 + 1
- if (nextc <0)// 小于 0 表示溢出
- throw new Error("Maximum lock count exceeded");
- setState(nextc);// 更新状态
- return true;// 获取成功
- }
- return false;// 获取失败
- }
- }
- }
- //=============AbstractQueuedSynchronizer 源码 ==============
- public abstract class AbstractQueuedSynchronizer
- extends AbstractOwnableSynchronizer
- implements java.io.Serializable{
- //=========================== 获取锁 ===============================
- // 以独占模式获取, 忽略中断. 通过调用至少一次 tryAcquire(int)实现, 成功返回. 否则线程排队,
- // 可能会重复阻塞和解除阻塞, 直到成功才调用 tryAcquire(int).
- public final void acquire(int arg) {//FairSync 的 lock 调用
- //tryAcquire 获取锁; acquireQueued 线程加入到了等待队列中, 进行阻塞等待, 竞争获取锁;
- //addWaiter 其他线程获取锁失败添加到等待队列中; Node.EXCLUSIVE 节点独占, 为 null
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
- protected boolean tryAcquire(int arg) {//acquire 调用, 由 FairSync 实现
- throw new UnsupportedOperationException();
- }
- final boolean acquireQueued(final Node node, int arg) {//acquire 调用
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();// 获取前一个节点
- if (p == head && tryAcquire(arg)) {// 如果获取的节点为头节点并且获取到锁
- setHead(node);// 当前节点设置为头节点
- p.next = null;// 头节点下一节点为空, 即把当前节点从队列中移除出来
- failed = false;
- return interrupted;
- }
- // 当前节点不是头节点, parkAndCheckInterrupt 让当前线程处于阻塞等待状态由其他线程唤醒
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {//acquireQueued 调用
- int ws = pred.waitStatus;// 获取前一节点的等待状态
- if (ws == Node.SIGNAL)// 如果状态为唤醒状态
- return true;
- if (ws> 0) {// 处于 CANCELLED 状态
- do {
- node.prev = pred = pred.prev;//1. 把所有处于 CANCELLED 状态的节点移除
- } while (pred.waitStatus> 0);
- pred.next = node;//2. 把所有处于 CANCELLED 状态的节点移除
- } else {
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);// 设置为 SIGNAL 状态
- }
- return false;
- }
- private Node addWaiter(Node mode) {//acquireQueued 的参数, 在 acquire 中调用
- Node node = new Node(Thread.currentThread(), mode);// 创建 Node, 当前线程指向 node
- Node pred = tail;// 前节点指向尾节点[双向链表]
- if (pred != null) {// 尾节点不为空
- node.prev = pred;// 当前线程节点指向尾节点
- if (compareAndSetTail(pred, node)) {//CAS 比较, 把当前线程节点更新为尾节点
- pred.next = node;// 前尾节点的下一节点指向当前尾节点
- return node;
- }
- }
- enq(node);// 如果尾节点为空, 把当前节点放到一个初始化节点或添加到节点中做为尾节点
- return node;
- }
- private Node enq(final Node node) {//addWaiter 调用
- for (;;) {
- Node t = tail;
- if (t == null) { // 尾节点为空
- if (compareAndSetHead(new Node()))// 创建新节点并维护一个头节点
- tail = head;// 把当前节点设置为头节点
- } else {
- node.prev = t;// 当前节点指向尾节点
- if (compareAndSetTail(t, node)) {// 把当前节点更新为尾节点
- t.next = node;// 前尾节点的下一节点指向当前尾节点
- return t;
- }
- }
- }
- }
- static void selfInterrupt() {//acquire 调用
- Thread.currentThread().interrupt();// 当前线程中断
- }
- //=========================== 释放锁 ===============================
- public final boolean release(int arg) {//ReentrantLock 的 unlock 调用
- if (tryRelease(arg)) {// 当前线程锁释放成功, 唤醒其他线程进行资源的竞争
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
- protected boolean tryRelease(int arg) {//release 调用, 由 Sync 实现
- throw new UnsupportedOperationException();
- }
- }
备注: 公平锁是针对锁的获取而言, 如果一个锁是公平的, 那么锁的获取顺序就应该符合请求的绝对时间顺序; 非公平锁会进行插队获取锁;
二. AQS 重写锁
流程:
实现 Lock, 重写实现方法 lock,lockInterruptibly,tryLock,unlock,newCondition;
内部类继承 AQS, 重写 tryAcquire 和 tryRelease;
- public class MyAQSLock implements Lock{
- private MyAQS myAQS;
- private class MyAQS extends AbstractQueuedSynchronizer{
- @Override
- protected boolean tryAcquire(int arg) {
- int state = getState();// 获取状态
- Thread thread = Thread.currentThread();
- if(state==0){// 线程第一次进来获取, 状态为 0, 表示可以拿到锁
- if(compareAndSetState(0,arg)){// 更新状态
- setExclusiveOwnerThread(Thread.currentThread());// 设置为独占线程, 其他线程进来进入等待
- return true;// 获取成功
- }
- }else if(getExclusiveOwnerThread()== thread){// 重入, 存储线程等于当前线程
- setState(state+1);// 锁数量 + 1
- return true;
- }
- return false;// 获取失败
- }
- @Override
- protected boolean tryRelease(int arg) {
- // 当前线程不是存储线程
- if(Thread.currentThread() != getExclusiveOwnerThread()){
- throw new RuntimeException();
- }
- int state = getState()-arg;// 锁数量 - 1
- boolean flag = false;
- if(state==0){// 锁数量为 0
- setExclusiveOwnerThread(null);// 独占锁为 null, 表示可以让其他线程进来竞争获取资源了
- flag=true;
- }
- setState(state);// 更新状态
- return flag;
- }
- public ConditionObject newConditonObject(){
- return new ConditionObject();
- }
- }
- @Override
- public void lock() {
- myAQS.acquire(1);
- }
- @Override
- public void lockInterruptibly() throws InterruptedException {
- myAQS.acquireInterruptibly(1);
- }
- @Override
- public boolean tryLock() {
- return myAQS.tryAcquire(1);
- }
- @Override
- public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
- return myAQS.tryAcquireNanos(1,unit.toNanos(time));
- }
- @Override
- public void unlock() {
- myAQS.release(1);
- }
- @Override
- public Condition newCondition() {
- return myAQS.newConditonObject();
- }
- }
来源: https://www.cnblogs.com/huangrenhui/p/12712475.html