本文转载自微信公众号「潜行前行」, 作者 cscw . 转载本文请联系潜行前行公众号.
前言
在使用多线程并发编程的时, 经常会遇到对共享变量修改操作. 此时我们可以选择 ConcurrentHashMap,ConcurrentLinkedQueue 来进行安全地存储数据. 但如果单单是涉及状态的修改, 线程执行顺序问题, 使用 Atomic 开头的原子组件或者 ReentrantLock,CyclicBarrier 之类的同步组件, 会是更好的选择, 下面将一一介绍它们的原理和用法
原子组件的实现原理 CAS
AtomicBoolean,AtomicIntegerArray 等原子组件的用法,
同步组件的实现原理
ReentrantLock,CyclicBarrier 等同步组件的用法
原子组件的实现原理 CAS
cas 的底层实现可以看下之前写的一篇文章: 详解锁原理, synchronized,volatile+cas 底层实现 [1]
应用场景
可用来实现变量, 状态在多线程下的原子性操作
可用于实现同步锁 (ReentrantLock)
原子组件
原子组件的原子性操作是靠使用 cas 来自旋操作 volatile 变量实现的
volatile 的类型变量保证变量被修改时, 其他线程都能看到最新的值
cas 则保证 value 的修改操作是原子性的, 不会被中断
基本类型原子类
- AtomicBoolean // 布尔类型
- AtomicInteger // 正整型数类型
- AtomicLong // 长整型类型
使用示例
- public static void main(String[] args) throws Exception {
- AtomicBoolean atomicBoolean = new AtomicBoolean(false);
- // 异步线程修改 atomicBoolean
- CompletableFuture future = CompletableFuture.runAsync(() ->{
- try {
- Thread.sleep(1000); // 保证异步线程是在主线程之后修改 atomicBoolean 为 false
- atomicBoolean.set(false);
- }catch (Exception e){
- throw new RuntimeException(e);
- }
- });
- atomicBoolean.set(true);
- future.join();
- System.out.println("boolean value is:"+atomicBoolean.get());
- }
--------------- 输出结果 ------------------
boolean value is:false
引用类原子类
- AtomicReference
- // 加时间戳版本的引用类原子类
- AtomicStampedReference
- // 相当于 AtomicStampedReference,AtomicMarkableReference 关心的是
- // 变量是否还是原来变量, 中间被修改过也无所谓
- AtomicMarkableReference
AtomicReference 的源码如下, 它内部定义了一个 volatile V value, 并借助 VarHandle(具体子类是 FieldInstanceReadWrite) 实现原子操作, MethodHandles 会帮忙计算 value 在类的偏移位置, 最后在 VarHandle 调用 Unsafe.public final native boolean compareAndSetReference(Object o, long offset, Object expected, Object x) 方法原子修改对象的属性
- public class AtomicReference implements java.io.Serializable {
- private static final long serialVersionUID = -1848883965231344442L;
- private static final VarHandle VALUE;
- static {
- try {
- MethodHandles.Lookup l = MethodHandles.lookup();
- VALUE = l.findVarHandle(AtomicReference.class, "value", Object.class);
- } catch (ReflectiveOperationException e) {
- throw new ExceptionInInitializerError(e);
- }
- }
- private volatile V value;
- ....
ABA 问题
线程 X 准备将变量的值从 A 改为 B, 然而这期间线程 Y 将变量的值从 A 改为 C, 然后再改为 A; 最后线程 X 检测变量值是 A, 并置换为 B. 但实际上, A 已经不再是原来的 A 了
解决方法, 是把变量定为唯一类型. 值可以加上版本号, 或者时间戳. 如加上版本号, 线程 Y 的修改变为 A1->B2->A3, 此时线程 X 再更新则可以判断出 A1 不等于 A3
AtomicStampedReference 的实现和 AtomicReference 差不多, 不过它原子修改的变量是 volatile Pair pair;,Pair 是其内部类. AtomicStampedReference 可以用来解决 ABA 问题
- public class AtomicStampedReference {
- private static class Pair {
- final T reference;
- final int stamp;
- private Pair(T reference, int stamp) {
- this.reference = reference;
- this.stamp = stamp;
- }
- static Pair of(T reference, int stamp) {
- return new Pair(reference, stamp);
- }
- }
- private volatile Pair pair;
如果我们不关心变量在中间过程是否被修改过, 而只是关心当前变量是否还是原先的变量, 则可以使用 AtomicMarkableReference
AtomicStampedReference 的使用示例
- public class Main {
- public static void main(String[] args) throws Exception {
- Test old = new Test("hello"), newTest = new Test("world");
- AtomicStampedReference reference = new AtomicStampedReference<>(old, 1);
- reference.compareAndSet(old, newTest,1,2);
- System.out.println("对象:"+reference.getReference().name+"; 版本号:"+reference.getStamp());
- }
- }
- class Test{
- Test(String name){ this.name = name; }
- public String name;
- }
--------------- 输出结果 ------------------
对象: world; 版本号: 2
数组原子类
- AtomicIntegerArray // 整型数组
- AtomicLongArray // 长整型数组
- AtomicReferenceArray // 引用类型数组
数组原子类内部会初始一个 final 的数组, 它把整个数组当做一个对象, 然后根据下标 index 计算法元素偏移量, 再调用 UNSAFE.compareAndSetReference 进行原子操作. 数组并没被 volatile 修饰, 为了保证元素类型在不同线程的可见, 获取元素使用到了 UNSAFEpublic native Object getReferenceVolatile(Object o, long offset) 方法来获取实时的元素值
使用示例
- // 元素默认初始化为 0
- AtomicIntegerArray array = new AtomicIntegerArray(2);
- // 下标为0的元素, 期待值是 0, 更新值是1
- array.compareAndSet(0,0,1);
- System.out.println(array.get(0));
--------------- 输出结果 ------------------
1
属性原子类
- AtomicIntegerFieldUpdater
- AtomicLongFieldUpdater
- AtomicReferenceFieldUpdater
如果操作对象是某一类型的属性, 可以使用 AtomicIntegerFieldUpdater 原子更新, 不过类的属性需要定义成 volatile 修饰的变量, 保证该属性在各个线程的可见性, 否则会报错
使用示例
- public class Main {
- public static void main(String[] args) {
- AtomicReferenceFieldUpdater fieldUpdater = AtomicReferenceFieldUpdater.newUpdater(Test.class,String.class,"name");
- Test test = new Test("hello world");
- fieldUpdater.compareAndSet(test,"hello world","siting");
- System.out.println(fieldUpdater.get(test));
- System.out.println(test.name);
- }
- }
- class Test{
- Test(String name){ this.name = name; }
- public volatile String name;
- }
--------------- 输出结果 ------------------
siting
siting
累加器
- Striped64
- LongAccumulator
- LongAdder
- //accumulatorFunction: 运算规则, identity: 初始值
- public LongAccumulator(LongBinaryOperator accumulatorFunction,long identity)
LongAccumulator 和 LongAdder 都继承于 Striped64,Striped64 的主要思想是和 ConcurrentHashMap 有点类似, 分段计算, 单个变量计算并发性能慢时, 我们可以把数学运算分散在多个变量, 而需要计算总值时, 再一一累加起来
LongAdder 相当于 LongAccumulator 一个特例实现
LongAccumulator 的示例
- public static void main(String[] args) throws Exception {
- LongAccumulator accumulator = new LongAccumulator(Long::sum, 0);
- for(int i=0;i<100000;i++){
- CompletableFuture.runAsync(() -> accumulator.accumulate(1));
- }
- Thread.sleep(1000); // 等待全部 CompletableFuture 线程执行完成, 再获取
- System.out.println(accumulator.get());
- }
--------------- 输出结果 ------------------
100000
同步组件的实现原理
java 的多数同步组件会在内部维护一个状态值, 和原子组件一样, 修改状态值时一般也是通过 cas 来实现. 而状态修改的维护工作被 Doug Lea 抽象出 AbstractQueuedSynchronizer(AQS) 来实现
AQS 的原理可以看下之前写的一篇文章: 详解锁原理, synchronized,volatile+cas 底层实现 [2]
同步组件
ReentrantLock,ReentrantReadWriteLock
ReentrantLock,ReentrantReadWriteLock 都是基于 AQS(AbstractQueuedSynchronizer) 实现的. 因为它们有公平锁和非公平锁的区分, 因此没直接继承 AQS, 而是使用内部类去继承, 公平锁和非公平锁各自实现 AQS,ReentrantLock,ReentrantReadWriteLock 再借助内部类来实现同步
ReentrantLock 的使用示例
- ReentrantLock lock = new ReentrantLock();
- if(lock.tryLock()){
- // 业务逻辑
- lock.unlock();
- }
ReentrantReadWriteLock 的使用示例
- public static void main(String[] args) throws Exception {
- ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- if(lock.readLock().tryLock()){ // 读锁
- // 业务逻辑
- lock.readLock().unlock();
- }
- if(lock.writeLock().tryLock()){ // 写锁
- // 业务逻辑
- lock.writeLock().unlock();
- }
- }
Semaphore 实现原理和使用场景
Semaphore 和 ReentrantLock 一样, 也有公平和非公平竞争锁的策略, 一样也是通过内部类继承 AQS 来实现同步
通俗解释: 假设有一口井, 最多有三个人的位置打水. 每有一个人打水, 则需要占用一个位置. 当三个位置全部占满时, 第四个人需要打水, 则要等待前三个人中一个离开打水位, 才能继续获取打水的位置
使用示例
- public static void main(String[] args) throws Exception {
- Semaphore semaphore = new Semaphore(2);
- for (int i = 0; i < 3; i++)
- CompletableFuture.runAsync(() -> {
- try {
- System.out.println(Thread.currentThread().toString() + "start");
- if(semaphore.tryAcquire(1)){
- Thread.sleep(1000);
- semaphore.release(1);
- System.out.println(Thread.currentThread().toString() + "无阻塞结束");
- }else {
- System.out.println(Thread.currentThread().toString() + "被阻塞结束");
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- // 保证 CompletableFuture 线程被执行, 主线程再结束
- Thread.sleep(2000);
- }
--------------- 输出结果 ------------------
- Thread[ForkJoinPool.commonPool-worker-19,5,main] start
- Thread[ForkJoinPool.commonPool-worker-5,5,main] start
- Thread[ForkJoinPool.commonPool-worker-23,5,main] start
Thread[ForkJoinPool.commonPool-worker-23,5,main] 被阻塞结束
Thread[ForkJoinPool.commonPool-worker-5,5,main] 无阻塞结束
Thread[ForkJoinPool.commonPool-worker-19,5,main] 无阻塞结束
可以看出三个线程, 因为信号量设定为 2, 第三个线程是无法获取信息成功的, 会打印阻塞结束
CountDownLatch 实现原理和使用场景
CountDownLatch 也是靠 AQS 实现的同步操作
通俗解释: 玩游戏时, 假如主线任务需要靠完成五个小任务, 主线任务才能继续进行时. 此时可以用 CountDownLatch, 主线任务阻塞等待, 每完成一小任务, 就 done 一次计数, 直到五个小任务全部被执行才能触发主线
使用示例
- public static void main(String[] args) throws Exception {
- CountDownLatch count = new CountDownLatch(2);
- for (int i = 0; i < 2; i++)
- CompletableFuture.runAsync(() -> {
- try {
- Thread.sleep(1000);
- System.out.println("CompletableFuture over");
- count.countDown();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- // 等待 CompletableFuture 线程的完成
- count.await();
- System.out.println("main over");
- }
--------------- 输出结果 ------------------
- CompletableFuture over
- CompletableFuture over
- main over
CyclicBarrier 实现原理和使用场景
CyclicBarrier 则是靠 ReentrantLock lock 和 Condition trip 属性来实现同步
通俗解释: CyclicBarrier 需要阻塞全部线程到 await 状态, 然后全部线程再全部被唤醒执行. 想象有一个栏杆拦住五只羊, 需要当五只羊一起站在栏杆时, 栏杆才会被拉起, 此时所有的羊都可以飞跑出羊圈
使用示例
- public static void main(String[] args) throws Exception {
- CyclicBarrier barrier = new CyclicBarrier(2);
- CompletableFuture.runAsync(()->{
- try {
- System.out.println("CompletableFuture run start-"+ Clock.systemUTC().millis());
- barrier.await(); // 需要等待 main 线程也执行到 await 状态才能继续执行
- System.out.println("CompletableFuture run over-"+ Clock.systemUTC().millis());
- }catch (Exception e){
- throw new RuntimeException(e);
- }
- });
- Thread.sleep(1000);
- // 和 CompletableFuture 线程相互等待
- barrier.await();
- System.out.println("main run over!");
- }
--------------- 输出结果 ------------------
- CompletableFuture run start-1609822588881
- main run over!
- CompletableFuture run over-1609822589880
- StampedLock
StampedLock 不是借助 AQS, 而是自己内部维护多个状态值, 并配合 cas 实现的
StampedLock 具有三种模式: 写模式, 读模式, 乐观读模式
StampedLock 的读写锁可以相互转换
- // 获取读锁, 自旋获取, 返回一个戳值
- public long readLock()
- // 尝试加读锁, 不成功返回 0
- public long tryReadLock()
- // 解锁
- public void unlockRead(long stamp)
- // 获取写锁, 自旋获取, 返回一个戳值
- public long writeLock()
- // 尝试加写锁, 不成功返回 0
- public long tryWriteLock()
- // 解锁
- public void unlockWrite(long stamp)
- // 尝试乐观读读取一个时间戳, 并配合 validate 方法校验时间戳的有效性
- public long tryOptimisticRead()
- // 验证 stamp 是否有效
- public boolean validate(long stamp)
使用示例
- public static void main(String[] args) throws Exception {
- StampedLock stampedLock = new StampedLock();
- long stamp = stampedLock.tryOptimisticRead();
- // 判断版本号是否生效
- if (!stampedLock.validate(stamp)) {
- // 获取读锁, 会空转
- stamp = stampedLock.readLock();
- long writeStamp = stampedLock.tryConvertToWriteLock(stamp);
- if (writeStamp != 0) { // 成功转为写锁
- //fixme 业务操作
- stampedLock.unlockWrite(writeStamp);
- } else {
- stampedLock.unlockRead(stamp);
- // 尝试获取写读
- stamp = stampedLock.tryWriteLock();
- if (stamp != 0) {
- //fixme 业务操作
- stampedLock.unlockWrite(writeStamp);
- }
- }
- }
- }
参考文章
并发之 Striped64(l 累加器)[3]
参考资料
[1] 详解锁原理, synchronized,volatile+cas 底层实现: https://juejin.cn/post/6854573210768900110
[2] 详解锁原理, synchronized,volatile+cas 底层实现: https://juejin.cn/post/6854573210768900110
[3] 并发之 Striped64(l 累加器): https://www.cnblogs.com/gosaint/p/9129867.html
来源: http://developer.51cto.com/art/202101/640782.htm