基于 AQS 的前世今生, 来学习并发工具类 ReentrantReadWriteLock. 本文将从 ReentrantReadWriteLock 的产生背景, 源码原理解析和应用来学习这个并发工具类.
1, 产生背景
前面我们学习的重入锁 ReentrantLock 本质上还是互斥锁, 每次最多只能有一个线程持有 ReentrantLock. 对于维护数据完整性来说, 互斥通常是一种过于强硬的规则, 因此也就不必要的限制了并发性. 互斥是一种保守的加锁策略, 虽然可以避免 "写 / 写" 冲突和 "写 / 读" 冲突, 但也同样避免了 "读 / 读" 冲突. 和互联网的 "二八法则" 一样, 大部分数据都是读数据, 可以存放在缓存中, 数据结构的操作其实很多也是读操作, 可以考虑适当的放宽加锁需求, 允许多个读操作线程同时访问数据结构以提升程序的性能. 在这样的需求背景下, 就产生了读写锁 ReadWriteLock, 一个资源可以同时被多个读操作访问, 或者被一个写操作访问, 但是不能读写操作同时访问. ReadWriteLock 定义了接口规范, 实际实现读写锁控制的类是 ReentrantReadWriteLock, 该类为读写锁提供了可重入的加锁语义.
2, 源码原理解析
2.1 读写锁原理
既然是读写锁, 那就是有两把锁, 可以用 AQS 的同步状态表示其中的一把锁, 再引入一个新的属性表示另外一把锁, 但是这么做就变成了二元并发安全问题, 使问题变得更加复杂. ReentrantReadWriteLock 选择了用一个属性, 即 AQS 的同步状态来表示读写锁, 怎样用一个属性来表示读写锁呢? 那就是位运算, 对位运算不熟悉的可以先看下此文.
ReentantReadWriteLock 采用 "按位切割" 的方式, 就是将这个 32 位的 int 型 state 变量分为高 16 位和低 16 位来使用, 高 16 位代表读状态, 低 16 位代表写状态. 读锁是可以共享的, 而写锁是互斥的, 对于写锁而言, 用低 16 位表示线程的重入次数, 但是读锁因为可以同时有多个线程, 所以重入次数需要通过其他的方式来记录, 那就是 ThreadLocal 变量. 从这也可以总结出来和 ReentrantLock 相比, 写锁的重入次数会减少, 最多不能超过 65535 次. 读锁的线程数也有限制, 最对不能超过 65535 个.
假设状态变量是 c, 则读状态就是 c>>>16(无符号右移 16 位), 其实就是通过无符号右移运算抹掉低的 16 位, 剩下的就是 c 的高 16 位. 写状态是 c&((1 <<16) - 1), 其实就是 c&00000000000000001111111111111111, 与运算之后, 高的 16 位被抹掉, 剩下的就是 c 的低 16 位. 如果读线程申请读锁, 当前写锁重入次数不为 0 时, 则等待, 否则可以马上分配; 如果是写线程申请写锁, 当前状态为 0 则可以马上分配, 否则等待.
2.2 读锁的获取和释放
读锁的获取方法如下:
- protected final int tryAcquireShared(int unused) {
- Thread current = Thread.currentThread();// 当前线程
- int c = getState();
- // 持有写锁的线程可以获取读锁
- if (exclusiveCount(c) != 0 && // 已经分配了写锁
- getExclusiveOwnerThread() != current) // 当前线程不是持有写锁的线程
- return -1;
- int r = sharedCount(c); // 读锁获取次数
- if (!readerShouldBlock() && // 由子类根据公平策略实现决定是否可获取读锁
- r < MAX_COUNT && // 读锁获取次数小于最大值
- compareAndSetState(c, c + SHARED_UNIT)) {// 更新读锁状态
- if (r == 0) {// 读锁的第一个线程 此时可以不用记录到 ThreadLocal
- firstReader = current;
- firstReaderHoldCount = 1; // 避免查找 ThreadLocal 提升效率
- } else if (firstReader == current) {// 读锁的第一个线程重入
- firstReaderHoldCount++;
- } else {// 非读锁的第一个线程
- HoldCounter rh = cachedHoldCounter; // 下面为重入次数更新
- if (rh == null || rh.tid != getThreadId(current))
- cachedHoldCounter = rh = readHolds.get();
- else if (rh.count == 0)
- readHolds.set(rh);
- rh.count++;
- }
- return 1;
- }
- return fullTryAcquireShared(current); // 获取读锁失败 循环重试
- }
- final int fullTryAcquireShared(Thread current) {
- HoldCounter rh = null;
- for (;;) {
- int c = getState();
- if (exclusiveCount(c) != 0) {// 获取到写锁
- if (getExclusiveOwnerThread() != current)
- return -1; // 非写锁线程获取失败
- // else we hold the exclusive lock; blocking here
- // would cause deadlock.
- } else if (readerShouldBlock()) {
- // Make sure we're not acquiring read lock reentrantly
- if (firstReader == current) {
- // assert firstReaderHoldCount> 0;
- } else {
- if (rh == null) {
- rh = cachedHoldCounter;
- if (rh == null || rh.tid != getThreadId(current)) {
- rh = readHolds.get();
- if (rh.count == 0)
- readHolds.remove();
- }
- }
- if (rh.count == 0)
- return -1;
- }
- }
- if (sharedCount(c) == MAX_COUNT) // 读锁数量达到最大
- throw new Error("Maximum lock count exceeded");
- if (compareAndSetState(c, c + SHARED_UNIT)) {// 读锁获取成功 处理方式和之前类似
- if (sharedCount(c) == 0) {
- firstReader = current;
- firstReaderHoldCount = 1;
- } else if (firstReader == current) {
- firstReaderHoldCount++;
- } else {
- if (rh == null)
- rh = cachedHoldCounter;
- if (rh == null || rh.tid != getThreadId(current))
- rh = readHolds.get();
- else if (rh.count == 0)
- readHolds.set(rh);
- rh.count++;
- cachedHoldCounter = rh; // cache for release
- }
- return 1;
- }
- }
- }
读锁的释放方法如下:
- protected final boolean tryReleaseShared(int unused) {
- Thread current = Thread.currentThread();
- if (firstReader == current) {// 当前线程是读锁的第一个线程
- // assert firstReaderHoldCount> 0;
- if (firstReaderHoldCount == 1) // 第一次占有读锁 直接清除该线程
- firstReader = null;
- else
- firstReaderHoldCount--;// 读锁的第一个线程重入次数减少
- } else {
- HoldCounter rh = cachedHoldCounter;
- if (rh == null || rh.tid != getThreadId(current))
- rh = readHolds.get();
- int count = rh.count;
- if (count <= 1) {
- readHolds.remove();// 读锁释放
- if (count <= 0)
- throw unmatchedUnlockException();
- }
- --rh.count; // 重入次数减少
- }
- for (;;) {
- int c = getState();
- int nextc = c - SHARED_UNIT;
- // 减少读锁的线程数量
- if (compareAndSetState(c, nextc))
- // Releasing the read lock has no effect on readers,
- // but it may allow waiting writers to proceed if
- // both read and write locks are now free.
- return nextc == 0;
- }
- }
2.3 写锁的获取和释放
写锁的获取方法如下:
- protected final boolean tryAcquire(int acquires) {
- Thread current = Thread.currentThread();
- int c = getState();
- int w = exclusiveCount(c);// 写锁状态
- if (c != 0) {// 表示锁已经被分配出去了 if c != 0 and w == 0 表示获取读锁
- // (Note: if c != 0 and w == 0 then shared count != 0)
- // 其他线程获取到了写锁
- if (w == 0 || current != getExclusiveOwnerThread())
- return false;
- // 写锁重入次数超过最大值
- if (w + exclusiveCount(acquires)> MAX_COUNT)
- throw new Error("Maximum lock count exceeded");
- // Reentrant acquire 更新写锁重入次数
- setState(c + acquires);
- return true;
- }
- if (writerShouldBlock() ||// 子类实现写锁是否公平获取
- !compareAndSetState(c, c + acquires))
- return false;//cas 获取写锁失败
- setExclusiveOwnerThread(current);// 获取写锁成功 独占
- return true;
- }
写锁的释放方法如下:
- protected final boolean tryRelease(int releases) {
- if (!isHeldExclusively())// 当前线程不持有写锁
- throw new IllegalMonitorStateException();
- int nextc = getState() - releases; // 重入次数减少
- boolean free = exclusiveCount(nextc) == 0; // 减少到 0 写锁释放
- if (free)
- setExclusiveOwnerThread(null); // 写锁释放
- setState(nextc);
- return free;
- }
2.4 锁降级
锁降级指的是写锁降级为读锁, 首先持有当前写锁, 然后获取到读锁, 在 tryAcquireShared 方法中已经体现了该过程, 随后再释放该写锁的过程. 锁降级主要是为了保持数据的可见性, 如果当前线程不获取读锁而是直接释放写锁, 假设此时有另外的线程获取到了写锁并修改了数据, 那么当前线程是无法知晓数据已经更新了. 如果当前线程遵循锁降级的过程, 则其他线程会被阻塞, 直到当前线程操作完成其他线程才可以获取写锁进行数据更新. RentrantReadWriteLock 不支持锁升级 (把持读锁, 获取写锁, 最后释放读锁的过程). 目的也是保证数据可见性, 如果读锁已被多个线程获取, 其中任意线程成功获取了写锁并更新了数据, 则其更新对其他获取到读锁的线程是不可见的.
3, 应用
概况性的总结 RentrantReadWriteLock 的应用, 就是 ReentrantLock 能使用的地方, RentrantReadWriteLock 都能使用, 而且能提供更好的吞吐率.
参考资料:
https://github.com/lingjiango/ConcurrentProgramPractice
来源: https://www.cnblogs.com/iou123lg/p/9637637.html