目录
简介
读写状态
读锁计数器
共享锁的获取
- tryAcquireShared(int unused)
- doAcquireShared(int arg)
共享锁的释放
- tryReleaseShared(int unused)
- doReleaseShared()
写锁获取
tryAcquire(int acquires)
写锁释放
tryRelease(int releases)
锁降级
总结
简介
在前一篇博客多线程学习笔记三之 ReentrantLock 与 AQS 实现分析分析了基于同步器 AQS 实现的独占锁 ReentrantLock,AQS 同步器作为 JUC 组件实现锁的框架, 基于 AQS 除了可以实现独占锁, 还可以实现共享锁.
ReentrantReadWriteLock 是基于 AQS 实现的读写锁, 内部维护了一个读锁 (共享锁) 和写锁 (独占锁). 如果我们要在程序中提供共享的缓存数据结构, 缓存肯定是读操作(数据查询) 多而写操作 (数据更新) 少, 只要保证写操作对后续的读操作是可见的就行了, 这种情况下使用独占锁就不如读写锁的吞吐量大, 读写锁中的读锁允许多个线程获得读锁对资源进行读操作, 写锁是传统的独占锁, 只允许单个线程获得写锁对资源进行更新. 以下是 JDK 提供基于 ReentrantReadWriteLock 简单实现缓存结构的 Demo:
- class CachedData {
- Object data;
- volatile boolean cacheValid;
- final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
- void processCachedData() {
- rwl.readLock().lock();
- if (!cacheValid) {
- // 必须先释放读锁再获取写锁
- rwl.readLock().unlock();
- rwl.writeLock().lock();
- try {
- // 再次检查 cacheValid 防止其他线程获得写锁改变 cacheValid 值
- if (!cacheValid) {
- data = ...
- cacheValid = true;
- }
- // 写锁降级为读锁
- rwl.readLock().lock();
- } finally {
- // 释放写锁
- rwl.writeLock().unlock();
- }
- }
- try {
- use(data);
- } finally {
- rwl.readLock().unlock();
- }
- }
- }
ReentranReadWriteLock 的关系图:
ReentrantReadWriteLock 没有实现 Lock 接口, 实现了 ReadWriteLock 接口. 内部类 ReadLock 和 WriteLock 实现 Lock 接口, ReadLock 和 WriteLock 包含了继承了 AQS 的 Sync 对象, 从而提供了共享锁和独占锁特性的实现. 读写锁 ReentrantReadWriteLock 具有以下特性:
可重入, 不管是读锁还是写锁, 都是可重入锁
公平锁和非公平锁, 支持以公平方式或非公平方式 (默认方式) 获取读锁和写锁.
支持锁降级, 线程获得写锁之后可以降级为读锁, 具体是先获取写锁, 再获得读锁, 再释放写锁. 但读锁不可升级为写锁.
读写状态
在实现 ReentrantLock 时, 当一个线程去尝试获取锁时, 线程会去检查同步器 AQS 中维护的 int 型变量 state 是否为 0, 同步状态加一表示当前线程成功获取锁. 而读写锁 ReentrantReadWriteLock 维护了读锁和写锁, 那么一个线程获得了锁, 怎么通过 state 表明到底是读锁还是写锁呢? 答案是把 int 型变量切位两部分, 高 16 位表示读状态, 低 16 位表示写状态. ReentrantReadWriteLock 在内部类 Sync 定义了以下常量用以区分读写状态:
- // 偏移量
- static final int SHARED_SHIFT = 16;
- // 线程获得读锁, state 加 SHARED_UNIT,state 高 16 位 SHARED_UNIT 个数代表了有多少个共享锁
- static final int SHARED_UNIT = (1 <<SHARED_SHIFT);
- // 读写锁重入最多不超过 65535
- static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
- static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
- /** Returns the number of shared holds represented in count */
- static int sharedCount(int c) { return c>>> SHARED_SHIFT; }
- /** Returns the number of exclusive holds represented in count */
- static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
通过把 32 位 int 型变量 state 按位切割成两部分维护读写两种状态, 具体划分如图:
从图中可以看到, 当前线程获取了写锁, 重进入了 3 次, 连续获得了两次读锁, 每次获得写锁, 就把 state 加 1, 而低 16 位总共最大是 65535, 就是 MAX_COUNT 的值. 每获得一次读锁, 就把 state 加 SHARED_COUNT. 那么如何获取读写状态呢? 只要通过位运算取出高 16 位或低 16 位就行了, 对于读状态, state>>>SHARED_SHIFT(无符号补 0 右移 16 位)就可以得到加了多少次 SHARED_UNIT 从而获得读状态; 对于写状态, state & EXCLUSIVE_MASK(0X0000FFFF, 高 16 位都变为 0, 低 16 位不变)就可以获得写状态.
读锁计数器
由于 ReentrantReadWriteLock 支持读写锁的重入, 而写锁是独占锁, 只要取出同步状态 state 低 16 位对应的数值就是获得写锁的重入次数; 而读锁是共享锁, 每个线程获得读锁就会把 state 加上 SHARED_UNIT(包括读锁重入), 取出 state 高 16 位的对应的数值表示是所有线程获得读锁的次数, 但是如何获得单个线程获得共享锁的次数呢? 内部类 Sync 为同步器维护了一个读锁计数器, 专门统计每个线程获得读锁的次数. Sync 内部有两个内部类分别为 HoldCounter 和 ThreadLocalHoldCounter:
- abstract static class Sync extends AbstractQueuedSynchronizer {
- static final class HoldCounter {
- // 计数器, 用于统计线程重入读锁次数
- int count = 0;
- // Use id, not reference, to avoid garbage retention
- // 线程 TID, 区分线程, 可以唯一标识一个线程
- final long tid = getThreadId(Thread.currentThread());
- }
- static final class ThreadLocalHoldCounter
- extends ThreadLocal<HoldCounter> {
- // 重写初始化方法, 在没有进行 set 的情况下, 获取的都是该 HoldCounter 值
- public HoldCounter initialValue() {
- return new HoldCounter();
- }
- }
- private transient ThreadLocalHoldCounter readHolds;
- private transient HoldCounter cachedHoldCounter;
- private transient Thread firstReader = null;
- private transient int firstReaderHoldCount;
- Sync() {
- // 本地线程读锁计数器
- readHolds = new ThreadLocalHoldCounter();
- setState(getState()); // ensures visibility of readHolds
- }
- }
firstReader 和 firstReaderHoldCount
如果只有一个线程获取了读锁, 就不需要使用本地线程变量 readHolds, 当前线程就是第一个获得读锁的线程 firstReader, 使用 firstReaderHoldCount 存储线程重入次数.
readHolds
第一个获得读锁的线程使用 firstReaderHoldCount 存储读锁重入次数, 后面的线程就要使用 ThreadLocal 类型变量 readHolds 了, 每个线程拥有自己的副本, 用来保存自己的重入数.
cachedHoldCounter
缓存计数器, 是最后一个获取到读锁的线程计数器, 每当有新的线程获取到读锁, 这个变量都会更新. 如果当前线程不是第一个获得读锁的线程, 先到缓存计数器 cachedHoldCounter 查看缓存计数器是否指向当前线程, 不是再去 readHolds 查找, 通过缓存提高效率.
共享锁的获取
获取读锁, 由内部类 ReadLock 提供 lock 方法, 调用了 Sync 父类 AQS 的方法:
- // 获取读锁
- public void lock() {
- sync.acquireShared(1);
- }
- // 获取共享锁
- public final void acquireShared(int arg) {
- if (tryAcquireShared(arg) <0)
- doAcquireShared(arg);
- }
- tryAcquireShared(int unused)
尝试获取共享锁:
- protected final int tryAcquireShared(int unused) {
- // 当前线程
- Thread current = Thread.currentThread();
- // 同步状态 state
- int c = getState();
- // 检查独占锁是否被占据, 如果被占据, 是否是当前线程获取了独占锁
- // 如果是当前线程获取了写锁, 可以继续获取读锁, 如果都不是返回 - 1 表示获取失败
- if (exclusiveCount(c) != 0 &&
- getExclusiveOwnerThread() != current)
- return -1;
- // 读锁数量
- int r = sharedCount(c);
- //!readerShouldBlock() 根据公平与否策略和队列是否含有等待节点决定当前线程是否继续获取锁
- // 不能大于 65535 且 CAS 修改成功
- if (!readerShouldBlock() &&
- r < MAX_COUNT &&
- compareAndSetState(c, c + SHARED_UNIT)) {
- // 如果没有线程获取过读锁
- if (r == 0) {
- // 将当前线程设置为第一个读锁线程
- firstReader = current;
- // 计数器为一
- firstReaderHoldCount = 1;
- // 读锁重入
- } else if (firstReader == current) {
- // 计数器加一
- firstReaderHoldCount++;
- } else {
- // 如果不是第一个线程, 获取锁成功
- // cachedHoldCounter 代表的是最后一个获取读锁的线程的计数器
- HoldCounter rh = cachedHoldCounter;
- // 如果计数器是 null 或者不指向当前线程, 那么就新建一个 HoldCounter 对象
- if (rh == null || rh.tid != getThreadId(current))
- cachedHoldCounter = rh = readHolds.get();
- // 计数器为 0, 保存到 readHolds 中
- else if (rh.count == 0)
- readHolds.set(rh);
- // 计数器加一
- rh.count++;
- }
- return 1;
- }
- return fullTryAcquireShared(current);
- }
- fullTryAcquireShared(Thread current)
当已有线程占据独占锁, 读锁数量超过 MAX_COUNT, 不满足公平策略或者 CAS 设置 state 失败, 就会调用这个方法. 与 tryAcquireShared 方法逻辑大体相似.
- final int fullTryAcquireShared(Thread current) {
- HoldCounter rh = null;
- // 死循环
- for (;;) {
- // 同步状态
- int c = getState();
- // 检查写锁获取情况
- if (exclusiveCount(c) != 0) {
- if (getExclusiveOwnerThread() != current)
- return -1;
- // 进入到这里, 说明没有其他线程获取写锁
- // 公平锁策略检查
- } else if (readerShouldBlock()) {
- //readerShouldBlock()返回 true, 应该堵塞, 检查是否获取过读锁
- // 第一个获取读锁线程是当前线程, 重入
- if (firstReader == current) {
- } else {
- // 循环中, 若计数器为 null
- if (rh == null) {
- rh = cachedHoldCounter;
- if (rh == null || rh.tid != getThreadId(current)) {
- rh = readHolds.get();
- if (rh.count == 0)
- readHolds.remove();
- }
- }
- // 需要阻塞且是非重入(还未获取读锁的), 获取失败.
- if (rh.count == 0)
- return -1;
- }
- }
- // 检查读锁总数量是否超过最大值
- if (sharedCount(c) == MAX_COUNT)
- throw new Error("Maximum lock count exceeded");
- //CAS 设置同步状态 state
- if (compareAndSetState(c, c + SHARED_UNIT)) {
- // 当前线程获得第一个读锁
- if (sharedCount(c) == 0) {
- firstReader = current;
- firstReaderHoldCount = 1;
- // 读锁重入
- } else if (firstReader == current) {
- firstReaderHoldCount++;
- } else {
- // 从缓存读入计数器, 提高效率
- if (rh == null)
- rh = cachedHoldCounter;
- // 计数器为空或不是指向当前线程
- if (rh == null || rh.tid != getThreadId(current))
- rh = readHolds.get();
- else if (rh.count == 0)
- readHolds.set(rh);
- rh.count++;
- cachedHoldCounter = rh; // cache for release
- }
- return 1;
- }
- }
- }
- doAcquireShared(int arg)
当 tryAcquireShared 尝试获取共享锁失败, 返回 - 1, 进入 AQS 同步队列等待获取共享锁
- private void doAcquireShared(int arg) {
- // 将当前节点以共享型类型加入同步队列
- final Node node = addWaiter(Node.SHARED);
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- // 前驱节点获取到锁, 可能占据锁, 也可能已经释放锁, 调用 tryAcquireShared 尝试获取锁
- if (p == head) {
- int r = tryAcquireShared(arg);
- // 获取成功
- if (r>= 0) {
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- if (interrupted)
- selfInterrupt();
- failed = false;
- return;
- }
- }
- // 与独占锁 ReentrantLock 堵塞逻辑一致
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- // 因中断 / 超时, 取消获取锁
- if (failed)
- cancelAcquire(node);
- }
- }
共享锁的释放
释放读锁, 由内部类 ReadLock 提供 unlock 方法, 调用了 Sync 父类 AQS 的方法:
- public void unlock() {
- sync.releaseShared(1);
- }
- public final boolean releaseShared(int arg) {
- if (tryReleaseShared(arg)) {
- doReleaseShared();
- return true;
- }
- return false;
- }
- tryReleaseShared(int unused)
tryReleaseShared 返回 true, 即同步状态为 0, 不存在线程占据读锁或写锁.
- protected final boolean tryReleaseShared(int unused) {
- Thread current = Thread.currentThread();
- // 当前线程是第一个获得读锁的线程
- if (firstReader == current) {
- // assert firstReaderHoldCount> 0;
- if (firstReaderHoldCount == 1)
- firstReader = null;
- else
- firstReaderHoldCount--;
- // 不是 firstReader, 更新计数器
- } else {
- HoldCounter rh = cachedHoldCounter;
- if (rh == null || rh.tid != getThreadId(current))
- rh = readHolds.get();
- int count = rh.count;
- // 完全释放锁
- if (count <= 1) {
- readHolds.remove();
- if (count <= 0)
- throw unmatchedUnlockException();
- }
- // 重入锁退出
- --rh.count;
- }
- //CAS 更新同步状态,
- for (;;) {
- int c = getState();
- int nextc = c - SHARED_UNIT;
- if (compareAndSetState(c, nextc))
- return nextc == 0;
- }
- }
- doReleaseShared()
tryReleaseShared 方法成功释放锁, 调用 doReleaseShared 唤醒后继节点.
- private void doReleaseShared() {
- for (;;) {
- Node h = head;
- if (h != null && h != tail) {
- int ws = h.waitStatus;
- // 如果节点状态为 Node.SIGNAL, 将状态设置为 0, 设置成功, 唤醒线程.
- if (ws == Node.SIGNAL) {
- if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
- continue; // loop to recheck cases
- unparkSuccessor(h);
- }
- // 如果本身头结点的 waitStatus 是出于重置状态 (waitStatus==0) 的,
- // 将其设置为 "传播" 状态. 意味着需要将状态向后一个节点传播.
- else if (ws == 0 &&
- !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
- continue;
- }
- if (h == head)
- break;
- }
- }
写锁获取
获取写锁, 由内部类 WriteLock 提供 lock 方法, 调用了 Sync 父类 AQS 的方法, 重点解析一下 tryAcquire 实现:
- public void lock() {
- sync.acquire(1);
- }
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
- tryAcquire(int acquires)
内部类 Sync 重写的 tryAcquire 方法:
- protected final boolean tryAcquire(int acquires) {
- Thread current = Thread.currentThread();
- int c = getState();
- int w = exclusiveCount(c);
- // 同步状态不为 0
- if (c != 0) {
- // 其他线程获得写锁, 获取失败; w 为 0 而同步状态不为 0, 没有线程占据写锁, 有线程占据读锁
- // 注意: 不存在读锁与写锁同时被多个线程获取的情况.
- if (w == 0 || current != getExclusiveOwnerThread())
- return false;
- // 当前线程已经获得写锁, 重入次数超过 MAX_COUNT, 失败
- if (w + exclusiveCount(acquires)> MAX_COUNT)
- throw new Error("Maximum lock count exceeded");
- // 锁重入
- setState(c + acquires);
- return true;
- }
- // 公平策略检查
- //CAS 设置同步状态成功则获得写锁
- if (writerShouldBlock() ||
- !compareAndSetState(c, c + acquires))
- return false;
- setExclusiveOwnerThread(current);
- return true;
- }
写锁释放
- public void unlock() {
- sync.release(1);
- }
- public final boolean release(int arg) {
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
- tryRelease(int releases)
当同步状态 state 为 0 时, tryRelease 方法返回 true.
- protected final boolean tryRelease(int releases) {
- if (!isHeldExclusively())
- throw new IllegalMonitorStateException();
- // 独占锁, 只有当前线程释放同步状态, 不需要考虑并发
- int nextc = getState() - releases;
- boolean free = exclusiveCount(nextc) == 0;
- if (free)
- setExclusiveOwnerThread(null);
- setState(nextc);
- return free;
- }
锁降级
读写锁 ReentrantReadWriteLock 支持写锁降级, 从下面可以看到线程获得写锁后, 在没有释放写锁的情况下获得了读锁(锁降级), 然后在手动释放写锁. 这更像是一种特殊的锁重入, 由于获得写锁有继续获得读锁的需要, 相对于释放写锁再获取读锁, 直接去获取读锁没有其他线程竞争, 免去了由于其他线程获得写锁进入等待状态的可能, 效率更高. 注意: 锁降级后需要手动释放写锁, 否则线程会一直持有独占锁
读写锁 ReentrantReadWriteLock 是不支持锁升级的, 如果一个获得了读锁的线程在持有读锁的情况下尝试获取写锁, 是不可能成功获得读锁的, 因为获得写锁会判断当前有没有线程持有读锁, 而尝试锁升级的线程本身读锁没有释放, 所以会进入同步队列等待同步状态为 0 获取写锁, 由于读锁一直不释放会导致其他线程无法获取写锁(获取写锁条件不能有其他线程占据读锁或写锁), 只能获取共享锁读锁. 因此 ReentrantReadWriteLock 是不支持读写锁的.
- final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
- void processCachedData() {
- rwl.readLock().lock();
- if (!cacheValid) {
- // 必须先释放读锁再获取写锁
- rwl.readLock().unlock();
- rwl.writeLock().lock();
- try {
- if (!cacheValid) {
- data = ...
- cacheValid = true;
- }
- // 写锁未释放获得读锁
- rwl.readLock().lock();
- } finally {
- // 释放写锁, 降级为读锁
- rwl.writeLock().unlock();
- }
- }
- try {
- use(data);
- } finally {
- rwl.readLock().unlock();
- }
- }
总结
读写锁内部维护了共享锁读锁和独占锁写锁, 读锁和写锁都支持重进入, 当读锁已经被获取 (state 高 16 位不为 0) 或写锁已被其他线程获取, 获取写锁的线程进入等待状态; 当写锁已经被其他线程获取, 获取读锁的线程进入等待状态. 读写锁支持由独占锁 (写锁) 降级到(读锁), 但不支持读锁升级到写锁, 在使用时要考虑手动释放好读锁与写锁的释放, 否则程序可能会出现意想不到的问题.
来源: https://www.cnblogs.com/rain4j/p/10135283.html