一, 前言
借用 Java 并发编程实践中的话 "编写正确的程序并不容易, 而编写正常的并发程序就更难了", 相比于顺序执行的情况, 多线程的线程安全问题是微妙而且出乎意料的, 因为在没有进行适当同步的情况下多线程中各个操作的顺序是不可预期的, 本文算是对多线程情况下同步策略的一个一个简单介绍.
二, 什么是线程安全问题
线程安全问题是指当多个线程同时读写一个状态变量, 并且没有任何同步措施时候, 导致脏数据或者其他不可预见的结果的问题. Java 中首要的同步策略是使用 Synchronized 关键字, 它提供了可重入的独占锁.
三, 什么是共享变量可见性问题
要谈可见性首先需要介绍下多线程处理共享变量时候的 Java 中内存模型.
Java 内存模型规定了所有的变量都存放在主内存中, 当线程使用变量时候都是把主内存里面的变量拷贝到了自己的工作空间或者叫做工作内存.
如图是双核 CPU 系统架构, 每核有自己的控制器和运算器, 其中控制器包含一组寄存器和操作控制器, 运算器执行算术逻辑运算, 并且有自己的一级缓存, 并且有些架构里面双核还有个共享的二级缓存.`
对应 Java 内存模型里面的工作内存, 在实现上这里是指 L1 或者 L2 缓存或者自己 cpu 的寄存器 `.
当线程操作一个共享变量时候操作流程为:
线程首先从主内存拷贝共享变量到自己的工作内存
然后对工作内存里的变量进行处理
处理完后更新变量值到主内存
那么假如线程 A 和 B 同时去处理一个共享变量, 会出现什么情况那?
首先他们都会去走上面的三个流程, 假如线程 A 拷贝共享变量到了工作内存, 并且已经对数据进行了更新但是还没有更新会主内存, 这时候线程 B 拷贝共享变量到了自己的工内存进行处理, 处理后, 线程 A 才把自己的处理结果更更新到主内存, 可知 线程 B 处理的并不是线程 A 处理后的结果, 也就是说线程 A 处理后的变量值对线程 B 不可见, 这就是共享变量的可见性问题.
构成共享变量内存不可见原因是因为三步流程不是原子性操作, 下面知道使用恰当同步就可以解决这个问题.
我们知道 ArrayList 是线程不安全的, 因为他的读写方法没有同步策略, 会导致脏数据和不可预期的结果, 下面我们就一一讲解如何解决.
这是线程不安全的
- public class ArrayList<E>
- {
- public E get(int index) {
- rangeCheck(index);
- return elementData(index);
- }
- public E set(int index, E element) {
- rangeCheck(index);
- E oldValue = elementData(index);
- elementData[index] = element;
- return oldValue;
- }
- }
四, 原子性
4.1 介绍
假设线程 A 执行操作 Ao 和线程 B 执行操作 Bo , 那么从 A 看, 当 B 线程执行 Bo 操作时候, 那么 Bo 操作全部执行, 要么全部不执行, 我们称 Ao 和 Bo 操作互为原子性操作, 在设计计数器时候一般都是先读取当前值, 然后 + 1, 然后更新会变量, 是读 - 改 - 写的过程, 这个过程必须是原子性的操作.
- public class ThreadNotSafeCount {
- private Long value;
- public Long getCount() {
- return value;
- }
- public void inc() {
- ++value;
- }
- }
如上代码是线程不安全的, 因为不能保证 ++value 是原子性操作. 方法一是使用 Synchronized 进行同步如下:
- public class ThreadSafeCount {
- private Long value;
- public synchronized Long getCount() {
- return value;
- }
- public synchronized void inc() {
- ++value;
- }
- }
注意, 这里不能简单的使用 volatile 修饰 value 进行同步, 因为变量值依赖了当前值
使用 Synchronized 确实可以实现线程安全, 即实现可见性和同步, 但是 Synchronized 是独占锁, 没有获取内部锁的线程会被阻塞掉, 那么有没有刚好的实现那? 答案是肯定的.
4.2 原子变量类
原子变量类比锁更轻巧, 比如 AtomicLong 代表了一个 Long 值, 并提供了 get,set 方法, get,set 方法语音和 volatile 相同, 因为 AtomicLong 内部就是使用了 volatile 修饰的真正的 Long 变量. 另外提供了原子性的自增自减操作, 所以计数器可以改下为:
- public class ThreadSafeCount {
- private AtomicLong value = new AtomicLong(0L);
- public Long getCount() {
- return value.get();
- }
- public void inc() {
- value.incrementAndGet();
- }
- }
那么相比使用 synchronized 的好处在于
原子类操作不会导致线程的挂起和重新调度
, 因为他内部使用的是 cas 的非阻塞算法.
常用的原子类变量为: AtomicLong,AtomicInteger,AtomicBoolean,AtomicReference.
五 CAS 介绍
CAS 即 CompareAndSet, 也就是比较并设置, CAS 有三个操作数分别为: 内存位置, 旧的预期值, 新的值, 操作含义是当内存位置的变量值为旧的预期值时候使用新的值替换旧的值. 通俗的说就是看内存位置的变量值是不是我给的旧的预期值, 如果是则使用我给的新的值替换他, 如果不是返回给我旧值. 这个是处理器提供的一个原子性指令. 上面介绍的 AtomicLong 的自增就是使用这种方式实现:
- public final long incrementAndGet() {
- for (;;) {
- long current = get();(1)
- long next = current + 1;(2)
- if (compareAndSet(current, next))(3)
- return next;
- }
- }
- public final boolean compareAndSet(long expect, long update) {
- return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
- }
假如当前值为 1, 那么线程 A 和检查 B 同时执行到了 (3) 时候各自的 next 都是 2,current=1, 假如线程 A 先执行了 3, 那么这个是原子性操作, 会把档期值更新为 2 并且返回 1,if 判断 true 所以 incrementAndGet 返回 2. 这时候线程 B 执行 3, 因为 current=1 而当前变量实际值为 2, 所以 if 判断为 false, 继续循环, 如果没有其他线程去自增变量的话, 这次线程 B 就会更新变量为 3 然后退出.
这里使用了无限循环使用 CAS 进行轮询检查, 虽然一定程度浪费了 cpu 资源, 但是相比锁来说避免的线程上下文切换和调度.
六, 什么是可重入锁
当一个线程要获取一个被其他线程占用的锁时候, 该线程会被阻塞, 那么当一个线程再次获取它自己已经获取的锁时候是否会被阻塞那? 如果不需要阻塞那么我们说该锁是可重入锁, 也就是锁只要该线程获取了该锁, 那么可以无限制次数进入被该锁锁住的代码.
先看一个例子如果锁不是可重入的, 看看会出现什么问题.
- public class Hello{
- public Synchronized void helloA(){
- System.out.println("hello");
- }
- public Synchronized void helloB(){
- System.out.println("hello B");
- helloA();
- }
- }
如上面代码当调用 helloB 函数前会先获取内置锁, 然后打印输出, 然后调用 helloA 方法, 调用前会先去获取内置锁, 如果内置锁不是可重入的那么该调用就会导致死锁了, 因为线程持有并等待了锁导致 helloB 永远不会获取内置锁.
实际上内部锁是可重入锁, 例如 synchronized 关键字管理的方法, 可重入锁的原理是在所内部维护了一个线程标示标示该锁目前被那个线程占用, 然后关联一个计数器, 一开始计数器值为 0, 说明该锁没有被任何线程占用, 当一个线程获取了该锁, 计数器会变成 1, 其他线程在获取该锁时候发现锁的所有者不是自己所以被阻塞, 但是当获取该锁的线程再次获取锁时候发现锁拥有者是自己会把计数器值 + 1, 当释放锁后计数器会 - 1, 当计数器为 0 时候, 锁里面的线程标示重置为 null, 这时候阻塞的线程会获取被唤醒来获取该锁.
七, Synchronized 关键字
7.1 Synchronized 介绍
synchronized 块是 Java 提供的一种强制性内置锁, 每个 Java 对象都可以隐式的充当一个用于同步的锁的功能, 这些内置的锁被称为内部锁或者叫监视器锁, 执行代码在进入 synchronized 代码块前会自动获取内部锁, 这时候其他线程访问该同步代码块时候会阻塞掉. 拿到内部锁的线程会在正常退出同步代码块或者异常抛出后释放内部锁, 这时候阻塞掉的线程才能获取内部锁进入同步代码块.
7.2 Synchronized 同步实例
内部锁是一种互斥锁, 具体说是同时已有一个线程可以拿到该锁, 当一个线程拿到该锁并且没有释放的情况下, 其他线程只能等待.
对于上面说的 ArrayList 可以使用 synchronized 进行同步来处理可见性问题.
使用 synchronized 对方法进行同步
- public class ArrayList<E>
- {
- public synchronized E get(int index) {
- rangeCheck(index);
- return elementData(index);
- }
- public synchronized E set(int index, E element) {
- rangeCheck(index);
- E oldValue = elementData(index);
- elementData[index] = element;
- return oldValue;
- }
- }
如图当线程 A 获取内部锁进入同步代码块后, 线程 B 也准备要进入同步块, 但是由于 A 还没释放锁, 所以 B 现在进入等待, 使用同步可以保证线程 A 获取锁到释放锁期间的变量值对 B 获取锁后都可见. 也就是说当 B 开始执行 A 执行的代码同步块时候可以看到 A 操作的所有变量值, 这里具体说是当线程 B 获取 b 的值时候能够保证获取的值是 2. 这时因为线程 A 进入同步块修改变量值后, 会在退出同步块前把值刷新到主内存, 而线程 B 在进入同步块前会首先清空本地内存内容, 从主内存重新获取变量值, 所以实现了可见性. 但是要注意一点所有线程使用的是同一个锁.
注意
Synchronized 关键字会引起现场上下文切换和线程调度
.
八, ReentrantReadWriteLock 介绍
使用 synchronized 可以实现同步, 但是缺点是同时只有一个线程可以访问共享变量, 但是正常情况下, 对于多个读操作操作共享变量时候是不需要同步的, synchronized 时候无法实现多个读线程同时执行, 而大部分情况下读操作次数多于写操作, 所以这大大降低了并发性, 所以出现了 ReentrantReadWriteLock, 它可以实现读写分离, 运行多个线程同时进行读取, 但是最多运行一个写现线程存在.
对于上面的方法现在可以修改为:
- public class ArrayList<E>
- {
- private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- public E get(int index) {
- Lock readLock = readWriteLock.readLock();
- readLock.lock();
- try {
- return list.get(index);
- } finally {
- readLock.unlock();
- }
- }
- public E set(int index, E element) {
- Lock wirteLock = readWriteLock.writeLock();
- wirteLock.lock();
- try {
- return list.set(index, element);
- } finally {
- wirteLock.unlock();
- }
- }
- }
如代码在 get 方法时候通过 readWriteLock.readLock()获取了读锁, 多个线程可以同时获取这读锁, set 方法通过 readWriteLock.writeLock()获取了写锁, 同时只有一个线程可以获取写锁, 其他线程在获取写锁时候会阻塞直到写锁被释放. 假如一个线程已经获取了读锁, 这时候如果一个线程要获取写锁时候要等待直到释放了读锁, 如果一个线程获取了写锁, 那么所有获取读锁的线程需要等待直到写锁被释放. 所以相比 synchronized 来说运行多个读者同时存在, 所以提高了并发量.
注意
需要使用者显示调用 Lock 与 unlock 操作
九, Volatile 变量
对于避免不可见性问题, Java 还提供了一种弱形式的同步, 即使用了 volatile 关键字. 该关键字确保了对一个变量的更新对其他线程可见. 当一个变量被声明为 volatile 时候, 线程写入时候不会把值缓存在寄存器或者或者在其他地方, 当线程读取的时候会从主内存重新获取最新值, 而不是使用当前线程的拷贝内存变量值.
volatile 虽然提供了可见性保证, 但是不能使用他来构建复合的原子性操作, 也就是说当一个变量依赖其他变量或者更新变量值时候新值依赖当前老值时候不在适用. 与 synchronized 相似之处在于如图
如图线程 A 修改了 volatile 变量 b 的值, 然后线程 B 读取了改变量值, 那么所有 A 线程在写入变量 b 值前可见的变量值, 在 B 读取 volatile 变量 b 后对线程 B 都是可见的, 途中线程 B 对 A 操作的变量 a,b 的值都可见的. volatile 的内存语义和 synchronized 有类似之处, 具体说是说当线程写入了 volatile 变量值就等价于线程退出 synchronized 同步块(会把写入到本地内存的变量值同步到主内存), 读取 volatile 变量值就相当于进入同步块(会先清空本地内存变量值, 从主内存获取最新值).
下面的 Integer 也是线程不安全的, 因为没有进行同步措施
- public class ThreadNotSafeInteger {
- private int value;
- public int get() {
- return value;
- }
- public void set(int value) {
- this.value = value;
- }
- }
使用 synchronized 关键字进行同步如下:
- public class ThreadSafeInteger {
- private int value;
- public synchronized int get() {
- return value;
- }
- public synchronized void set(int value) {
- this.value = value;
- }
- }
等价于使用 volatile 进行同步如下:
- public class ThreadSafeInteger {
- private volatile int value;
- public int get() {
- return value;
- }
- public void set(int value) {
- this.value = value;
- }
- }
这里使用 synchronized 和使用 volatile 是等价的, 但是并不是所有情况下都是等价, 一般只有满足下面所有条件才能使用 volatile
写入变量值时候不依赖变量的当前值, 或者能够保证只有一个线程修改变量值.
写入的变量值不依赖其他变量的参与.
读取变量值时候不能因为其他原因进行枷锁.
另外 加锁可以同时保证可见性和原子性, 而 volatile 只保证变量值的可见性.
注意
volatile 关键字不会引起线程上下文切换和线程调度
.
十, 乐观锁与悲观锁
10.1 悲观锁
悲观锁, 指数据被外界修改持保守态度(悲观), 在整个数据处理过程中, 将数据处于锁定状态. 悲观锁的实现, 往往依靠数据库提供的锁机制 . 数据库中实现是对数据记录进行操作前, 先给记录加排它锁, 如果获取锁失败, 则说明数据正在被其他线程修改, 则等待或者抛出异常. 如果加锁成功, 则获取记录, 对其修改, 然后事务提交后释放排它锁.
一个例子:
select * from 表 where .. for update;
悲观锁是先加锁再访问策略, 处理加锁会让数据库产生额外的开销, 还有增加产生死锁的机会, 另外在多个线程只读情况下不会产生数据不一致行问题, 没必要使用锁, 只会增加系统负载, 降低并发性, 因为当一个事务锁定了该条记录, 其他读该记录的事务只能等待.
10.2 乐观锁
乐观锁是相对悲观锁来说的, 它认为数据一般情况下不会造成冲突, 所以在访问记录前不会加排他锁, 而是在数据进行提交更新的时候, 才会正式对数据的冲突与否进行检测, 具体说根据 update 返回的行数让用户决定如何去做. 乐观锁并不会使用数据库提供的锁机制, 一般在表添加 version 字段或者使用业务状态来做.
具体可以参考: https://www.atatech.org/articles/79240
乐观锁直到提交的时候才去锁定, 所以不会产生任何锁和死锁.
十一, 独占锁与共享锁
根据锁能够被单个线程还是多个线程共同持有, 锁又分为独占锁和共享锁. 独占锁保证任何时候都只有一个线程能读写权限, ReentrantLock 就是以独占方式实现的互斥锁. 共享锁则可以同时有多个读线程, 但最多只能有一个写线程, 读和写是互斥的, 例如 ReadWriteLock 读写锁, 它允许一个资源可以被多线程同时进行读操作, 或者被一个线程 写操作, 但两者不能同时进行.
独占锁是一种悲观锁, 每次访问资源都先加上互斥锁, 这限制了并发性, 因为读操作并不会影响数据一致性, 而独占锁只允许同时一个线程读取数据, 其他线程必须等待当前线程释放锁才能进行读取.
共享锁则是一种乐观锁, 它放宽了加锁的条件, 允许多个线程同时进行读操作.
十二, 公平锁与非公平锁
根据线程获取锁的抢占机制锁可以分为公平锁和非公平锁, 公平锁表示线程获取锁的顺序是按照线程加锁的时间多少来分决定的的, 也就是最早枷锁的线程将最早获取锁, 也就是先来先得的 FIFO 顺序. 而非公平锁则运行闯入, 也就是先来不一定先得.
ReentrantLock 提供了公平和非公平锁的实现:
公平锁 ReentrantLock pairLock = new ReentrantLock(true);
非公平锁 ReentrantLock pairLock = new ReentrantLock(false);
如果构造函数不传递参数, 则默认是非公平锁.
在没有公平性需求的前提下尽量使用非公平锁, 因为公平锁会带来性能开销.
假设线程 A 已经持有了锁, 这时候线程 B 请求该锁将会被挂起, 当线程 A 释放锁后, 假如当前有线程 C 也需要获取该锁, 如果采用非公平锁方式, 则根据线程调度策略线程 B 和 C 两者之一可能获取锁, 这时候不需要任何其他干涉, 如果使用公平锁则需要把 C 挂起, 让 B 获取当前锁.
十三, AbstractQueuedSynchronizer 介绍
AbstractQueuedSynchronizer 提供了一个队列, 大多数开发者可能从来不会直接用到 AQS,AQS 中刮泥这个一个单一的状态信息 state, 可以通过 protected 的 getState,setState,compareAndSetState 函数进行调用. 对于 ReentrantLock 来说, state 可以用来表示该线程获可重入锁的次数, semaphore 来说 state 用来表示当前可用信号的个数, FutuerTask 用来表示任务状态(例如还没开始, 运行, 完成, 取消).
十四, CountDownLatch 原理
14.1 一个例子
- public class Test {
- private static final int ThreadNum = 10;
- public static void main(String[] args) {
- // 创建一个 CountDownLatch 实例, 管理计数为 ThreadNum
- CountDownLatch countDownLatch = new CountDownLatch(ThreadNum);
- // 创建一个固定大小的线程池
- ExecutorService executor = Executors.newFixedThreadPool(ThreadNum);
- // 添加线程到线程池
- for(int i =0;i<ThreadNum;++i){
- executor.execute(new Person(countDownLatch, i+1));
- }
- System.out.println("开始等待全员签到...");
- try {
- // 等待所有线程执行完毕
- countDownLatch.await();
- System.out.println("签到完毕, 开始吃饭");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }finally {
- executor.shutdown();
- }
- }
- static class Person implements Runnable{
- private CountDownLatch countDownLatch;
- private int index;
- public Person(CountDownLatch cdl,int index){
- this.countDownLatch = cdl;
- this.index = index;
- }
- @Override
- public void run() {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- System.out.println("person" + index +"签到");
- // 线程执行完毕, 计数器减一
- countDownLatch.countDown();
- }
- }
- }
如上代码, 创建一个线程池和 CountDownLatch 实例, 每个线程通过构造函数传入 CountDownLatch 的实例, 主线程通过 await 等待线程池里面线程任务全部执行完毕, 子线程则执行完毕后调用 countDown 计数器减一, 等所有子线程执行完毕后, 主线程的 await 才会返回.
14.2 原理
先看下类图:
可知 CountDownLatch 内部还是使用 AQS 实现的.
首先通过构造函数初始化 AQS 的状态值
- public CountDownLatch(int count) {
- if (count <0) throw new IllegalArgumentException("count < 0");
- this.sync = new Sync(count);
- }
- Sync(int count) {
- setState(count);
- }
然后看下 await 方法:
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- // 如果线程被中断则抛异常
- if (Thread.interrupted())
- throw new InterruptedException();
- // 尝试看当前是否计数值为 0, 为 0 则直接返回, 否者进入队列等待
- if (tryAcquireShared(arg) < 0)
- doAcquireSharedInterruptibly(arg);
- }
- protected int tryAcquireShared(int acquires) {
- return (getState() == 0) ? 1 : -1;
- }
如果 tryAcquireShared 返回 - 1 则 进入 doAcquireSharedInterruptibly
- private void doAcquireSharedInterruptibly(int arg)
- throws InterruptedException {
- // 加入队列状态为共享节点
- final Node node = addWaiter(Node.SHARED);
- boolean failed = true;
- try {
- for (;;) {
- final Node p = node.predecessor();
- if (p == head) {
- int r = tryAcquireShared(arg);
- if (r>= 0) {
- // 如果多个线程调用了 await 被放入队列则一个个返回.
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- failed = false;
- return;
- }
- }
- //shouldParkAfterFailedAcquire 会把当前节点状态变为 SIGNAL 类型, 然后调用 park 方法把当先线程挂起,
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- throw new InterruptedException();
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
调用 await 后, 当前线程会被阻塞主, 知道所有子线程调用了 countdown 方法, 并在在计数为 0 时候调用该线程 unpark 方法激活线程, 然后该线程重新 tryAcquireShared 会返回 1.
然后看下 countDown 方法:
委托给 sync
- public void countDown() {
- sync.releaseShared(1);
- }
- public final boolean releaseShared(int arg) {
- if (tryReleaseShared(arg)) {
- doReleaseShared();
- return true;
- }
- return false;
- }
首先看下 tryReleaseShared
- protected boolean tryReleaseShared(int releases) {
- // 循环进行 cas, 直到当前线程成功完成 cas 使计数值 (状态值 state) 减一更新到 state
- for (;;) {
- int c = getState();
- if (c == 0)
- return false;
- int nextc = c-1;
- if (compareAndSetState(c, nextc))
- return nextc == 0;
- }
- }
该函数一直返回 false 直到当前计数器为 0 时候才返回 true.
返回 true 后会调用 doReleaseShared, 该函数主要作用是调用 uppark 方法激活调用 await 的线程, 代码如下:
- private void doReleaseShared() {
- for (;;) {
- Node h = head;
- if (h != null && h != tail) {
- int ws = h.waitStatus;
- // 节点类型为 SIGNAL, 把类型在通过 cas 设置回去, 然后调用 unpark 激活调用 await 的线程
- if (ws == Node.SIGNAL) {
- if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
- continue; // loop to recheck cases
- unparkSuccessor(h);
- }
- else if (ws == 0 &&
- !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
- continue; // loop on failed CAS
- }
- if (h == head) // loop if head changed
- break;
- }
- }
激活主线程后, 主线程会在调用 tryAcquireShared 获取锁.
十五, ReentrantLock 独占锁原理
15.1 ReentrantLock 结构
先上类图:
可知 ReentrantLock 最终还是使用 AQS 来实现, 并且根据参数决定内部是公平还是非公平锁, 默认是非公平锁
- public ReentrantLock() {
- sync = new NonfairSync();
- }
- public ReentrantLock(boolean fair) {
- sync = fair ? new FairSync() : new NonfairSync();
- }
加锁代码:
- public void lock() {
- sync.lock();
- }
15.2 公平锁原理
先看 Lock 方法:
lock 方法最终调用 FairSync 重写的 tryAcquire 方法
- protected final boolean tryAcquire(int acquires) {
- // 获取当前线程和状态值
- final Thread current = Thread.currentThread();
- int c = getState();
- // 状态为 0 说明该锁未被任何线程持有
- if (c == 0) {
- // 为了实现公平, 首先看队列里面是否有节点, 有的话再看节点所属线程是不是当前线程, 是的话 hasQueuedPredecessors 返回 false, 然后使用原子操作 compareAndSetState 保证一个线程更新状态为 1, 设置排他锁归属与当前线程. 其他线程通过 cass 则返回 false.
- if (!hasQueuedPredecessors() &&
- compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- // 状态不为 0 说明该锁已经被线程持有, 则看是否是当前线程持有, 是则重入锁次数 + 1.
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc <0)
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
- }
公平性保证代码:
- public final boolean hasQueuedPredecessors() {
- Node t = tail; // Read fields in reverse initialization order
- Node h = head;
- Node s;
- return h != t &&
- ((s = h.next) == null || s.thread != Thread.currentThread());
- }
再看看 unLock 方法, 最终调用了 Sync 的 tryRelease 方法:
- protected final boolean tryRelease(int releases) {
- // 如果不是锁持有者调用 UNlock 则抛出异常.
- int c = getState() - releases;
- if (Thread.currentThread() != getExclusiveOwnerThread())
- throw new IllegalMonitorStateException();
- boolean free = false;
- // 如果当前可重入次数为 0, 则清空锁持有线程
- if (c == 0) {
- free = true;
- setExclusiveOwnerThread(null);
- }
- // 设置可重入次数为原始值 - 1
- setState(c);
- return free;
- }
15.3 非公平锁原理
- final void lock() {
- // 如果当前锁空闲 0, 则设置状态为 1, 并且设置当前线程为锁持有者
- if (compareAndSetState(0, 1))
- setExclusiveOwnerThread(Thread.currentThread());
- else
- acquire(1);// 调用重写的 tryAcquire 方法 -nonfairTryAcquire 方法
- }
- final boolean nonfairTryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) {// 状态为 0 说明没有线程持有该锁
- if (compareAndSetState(0, acquires)) {//cass 原子性操作, 保证只有一个线程可以设置状态
- setExclusiveOwnerThread(current);// 设置锁所有者
- return true;
- }
- }// 如果当前线程是锁持有者则可重入锁计数 + 1
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc < 0) // overflow
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
15.3 总结
可知公平与非公平都是先执行 tryAcquire 尝试获取锁, 如果成功则直接获取锁, 如果不成功则把当前线程放入队列. 对于放入队列里面的第一个线程 A 在 unpark 后会进行自旋调用 tryAcquire 尝试获取锁, 假如这时候有一个线程 B 执行了 lock 操作, 那么也会调用 tryAcquire 方法尝试获取锁, 但是线程 B 并不在队列里面, 但是线程 B 有可能比线程 A 优先获取到锁, 也就是说虽然线程 A 先请求的锁, 但是却有可能没有 B 先获取锁, 这是非公平锁实现. 而公平锁要保证线程 A 要比线程 B 先获取锁. 所以公平锁相比非公平锁在 tryAcquire 里面添加了 hasQueuedPredecessors 方法用来保证公平性.
十六, ReentrantReadWriteLock 原理
如图读写锁内部维护了一个 ReadLock 和 WriteLock, 并且也提供了公平和非公平的实现, 下面只介绍下非公平的读写锁实现. 我们知道 AQS 里面只维护了一个 state 状态, 而 ReentrantReadWriteLock 则需要维护读状态和写状态, 一个 state 是无法表示写和读状态的. 所以 ReentrantReadWriteLock 使用 state 的高 16 位表示读状态也就是读线程的个数, 低 16 位表示写锁可重入量.
static final int SHARED_SHIFT = 16;
共享锁 (读锁) 状态单位值 65536
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
共享锁线程最大个数 65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
排它锁 (写锁) 掩码 二进制 15 个 1
- static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
- /** 返回读锁线程数 */
- static int sharedCount(int c) { return c>>> SHARED_SHIFT; }
- /** 返回写锁可重入个数 */
- static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
- 16.1 WriteLock
lock 获取锁
对应写锁只需要分析下 Sync 的 tryAcquire 和 tryRelease
- protected final boolean tryAcquire(int acquires) {
- Thread current = Thread.currentThread();
- int c = getState();
- int w = exclusiveCount(c);
- //c!=0 说明读锁或者写锁已经被某线程获取
- if (c != 0) {
- //w=0 说明已经有线程获取了读锁或者 w!=0 并且当前线程不是写锁拥有者, 则返回 false
- if (w == 0 || current != getExclusiveOwnerThread())
- return false;
- // 说明某线程获取了写锁, 判断可重入个数
- if (w + exclusiveCount(acquires)> MAX_COUNT)
- throw new Error("Maximum lock count exceeded");
- // 设置可重入数量(1)
- setState(c + acquires);
- return true;
- }
- // 第一个写线程获取写锁
- if (writerShouldBlock() ||
- !compareAndSetState(c, c + acquires))
- return false;
- setExclusiveOwnerThread(current);
- return true;
- }
unlock 释放锁
- protected final boolean tryRelease(int releases) {
- // 看是否是写锁拥有者调用的 unlock
- if (!isHeldExclusively())
- throw new IllegalMonitorStateException();
- // 获取可重入值, 这里没有考虑高 16 位, 因为写锁时候读锁状态值肯定为 0
- int nextc = getState() - releases;
- boolean free = exclusiveCount(nextc) == 0;
- // 如果写锁可重入值为 0 则释放锁, 否者只是简单更新状态值.
- if (free)
- setExclusiveOwnerThread(null);
- setState(nextc);
- return free;
- }
- 16.2 ReadLock
对应读锁只需要分析下 Sync 的 tryAcquireShared 和 tryReleaseShared
lock 获取锁
- protected final int tryAcquireShared(int unused) {
- // 获取当前状态值
- Thread current = Thread.currentThread();
- int c = getState();
- // 如果写锁计数不为 0 说明已经有线程获取了写锁, 然后看是不是当前线程获取的写锁.
- if (exclusiveCount(c) != 0 &&
- getExclusiveOwnerThread() != current)
- return -1;
- // 获取读锁计数
- int r = sharedCount(c);
- // 尝试获取锁, 多个读线程只有一个会成功, 不成功的进入下面 fullTryAcquireShared 进行重试
- if (!readerShouldBlock() &&
- r <MAX_COUNT &&
- compareAndSetState(c, c + SHARED_UNIT)) {
- if (r == 0) {
- firstReader = current;
- firstReaderHoldCount = 1;
- } else if (firstReader == current) {
- firstReaderHoldCount++;
- } else {
- HoldCounter rh = cachedHoldCounter;
- if (rh == null || rh.tid != current.getId())
- cachedHoldCounter = rh = readHolds.get();
- else if (rh.count == 0)
- readHolds.set(rh);
- rh.count++;
- }
- return 1;
- }
- return fullTryAcquireShared(current);
- }
unlock 释放锁
- protected final boolean tryReleaseShared(int unused) {
- Thread current = Thread.currentThread();
- if (firstReader == current) {
- // assert firstReaderHoldCount> 0;
- if (firstReaderHoldCount == 1)
- firstReader = null;
- else
- firstReaderHoldCount--;
- } else {
- HoldCounter rh = cachedHoldCounter;
- if (rh == null || rh.tid != current.getId())
- rh = readHolds.get();
- int count = rh.count;
- if (count <= 1) {
- readHolds.remove();
- if (count <= 0)
- throw unmatchedUnlockException();
- }
- --rh.count;
- }
- // 循环直到自己的读计数 - 1 cas 更新成功
- for (;;) {
- int c = getState();
- int nextc = c - SHARED_UNIT;
- if (compareAndSetState(c, nextc))
- return nextc == 0;
- }
- }
十七, 参考
Java 并发编程实践
- http://www.hollischuang.com/archives/934
- http://ifeve.com/juc-reentrantreadwritelock/
来源: https://yq.aliyun.com/articles/599188