问题
(1)什么是 ABA?
(2)ABA 的危害?
(3)ABA 的解决方法?
(4)AtomicStampedReference 是什么?
(5)AtomicStampedReference 是怎么解决 ABA 的?
简介
AtomicStampedReference 是 java 并发包下提供的一个原子类, 它能解决其它原子类无法解决的 ABA 问题.
ABA
ABA 问题发生在多线程环境中, 当某线程连续读取同一块内存地址两次, 两次得到的值一样, 它简单地认为 "此内存地址的值并没有被修改过", 然而, 同时可能存在另一个线程在这两次读取之间把这个内存地址的值从 A 修改成了 B 又修改回了 A, 这时还简单地认为 "没有修改过" 显然是错误的.
比如, 两个线程按下面的顺序执行:
(1)线程 1 读取内存位置 X 的值为 A;
(2)线程 1 阻塞了;
(3)线程 2 读取内存位置 X 的值为 A;
(4)线程 2 修改内存位置 X 的值为 B;
(5)线程 2 修改又内存位置 X 的值为 A;
(6)线程 1 恢复, 继续执行, 比较发现还是 A 把内存位置 X 的值设置为 C;
可以看到, 针对线程 1 来说, 第一次的 A 和第二次的 A 实际上并不是同一个 A.
ABA 问题通常发生在无锁结构中, 用代码来表示上面的过程大概就是这样:
- public class ABATest {
- public static void main(String[] args) {
- AtomicInteger atomicInteger = new AtomicInteger(1);
- new Thread(()->{
- int value = atomicInteger.get();
- System.out.println("thread 1 read value:" + value);
- // 阻塞 1s
- LockSupport.parkNanos(1000000000L);
- if (atomicInteger.compareAndSet(value, 3)) {
- System.out.println("thread 1 update from" + value + "to 3");
- } else {
- System.out.println("thread 1 update fail!");
- }
- }).start();
- new Thread(()->{
- int value = atomicInteger.get();
- System.out.println("thread 2 read value:" + value);
- if (atomicInteger.compareAndSet(value, 2)) {
- System.out.println("thread 2 update from" + value + "to 2");
- // do sth
- value = atomicInteger.get();
- System.out.println("thread 2 read value:" + value);
- if (atomicInteger.compareAndSet(value, 1)) {
- System.out.println("thread 2 update from" + value + "to 1");
- }
- }
- }).start();
- }
- }
打印结果为:
- thread 1 read value: 1
- thread 2 read value: 1
- thread 2 update from 1 to 2
- thread 2 read value: 2
- thread 2 update from 2 to 1
- thread 1 update from 1 to 3
ABA 的危害
为了更好地理解 ABA 的危害, 我们还是来看一个现实点的例子.
假设我们有一个无锁的栈结构, 如下:
- public class ABATest {
- static class Stack {
- // 将 top 放在原子类中
- private AtomicReference<Node> top = new AtomicReference<>();
- // 栈中节点信息
- static class Node {
- int value;
- Node next;
- public Node(int value) {
- this.value = value;
- }
- }
- // 出栈操作
- public Node pop() {
- for (;;) {
- // 获取栈顶节点
- Node t = top.get();
- if (t == null) {
- return null;
- }
- // 栈顶下一个节点
- Node next = t.next;
- // CAS 更新 top 指向其 next 节点
- if (top.compareAndSet(t, next)) {
- // 把栈顶元素弹出, 应该把 next 清空防止外面直接操作栈
- t.next = null;
- return t;
- }
- }
- }
- // 入栈操作
- public void push(Node node) {
- for (;;) {
- // 获取栈顶节点
- Node next = top.get();
- // 设置栈顶节点为新节点的 next 节点
- node.next = next;
- // CAS 更新 top 指向新节点
- if (top.compareAndSet(next, node)) {
- return;
- }
- }
- }
- }
- }
咋一看, 这段程序似乎没有什么问题, 然而试想以下情形.
假如, 我们初始化栈结构为 top->1->2->3, 然后有两个线程分别做如下操作:
(1)线程 1 执行 pop()出栈操作, 但是执行到 if (top.compareAndSet(t, next)) {这行之前暂停了, 所以此时节点 1 并未出栈;
(2)线程 2 执行 pop()出栈操作弹出节点 1, 此时栈变为 top->2->3;
(3)线程 2 执行 pop()出栈操作弹出节点 2, 此时栈变为 top->3;
(4)线程 2 执行 push()入栈操作添加节点 1, 此时栈变为 top->1->3;
(5)线程 1 恢复执行, 比较节点 1 的引用并没有改变, 执行 CAS 成功, 此时栈变为 top->2;
What? 点解变成 top->2 了? 不是应该变成 top->3 吗?
那是因为线程 1 在第一步保存的 next 是节点 2, 所以它执行 CAS 成功后 top 节点就指向了节点 2 了.
测试代码如下:
- private static void testStack() {
- // 初始化栈为 top->1->2->3
- Stack stack = new Stack();
- stack.push(new Stack.Node(3));
- stack.push(new Stack.Node(2));
- stack.push(new Stack.Node(1));
- new Thread(()->{
- // 线程 1 出栈一个元素
- stack.pop();
- }).start();
- new Thread(()->{
- // 线程 2 出栈两个元素
- Stack.Node A = stack.pop();
- Stack.Node B = stack.pop();
- // 线程 2 又把 A 入栈了
- stack.push(A);
- }).start();
- }
- public static void main(String[] args) {
- testStack();
- }
在 Stack 的 pop()方法的 if (top.compareAndSet(t, next)) {处打个断点, 线程 1 运行到这里时阻塞它的执行, 让线程 2 执行完, 再执行线程 1 这句, 这句执行完可以看到栈的 top 对象中只有 2 这个节点了.
记得打断点的时候一定要打 Thread 断点, 在 IDEA 中是右击选择 Suspend 为 Thread.
通过这个例子, 笔者认为你肯定很清楚 ABA 的危害了.
ABA 的解决方法
ABA 的危害我们清楚了, 那么怎么解决 ABA 呢?
笔者总结了一下, 大概有以下几种方式:
(1)版本号
比如, 上面的栈结构增加一个版本号用于控制, 每次 CAS 的同时检查版本号有没有变过.
还有一些数据结构喜欢使用高位存储一个邮戳来保证 CAS 的安全.
(2)不重复使用节点的引用
比如, 上面的栈结构在线程 2 执行 push()入栈操作的时候新建一个节点传入, 而不是复用节点 1 的引用;
(3)直接操作元素而不是节点
比如, 上面的栈结构 push()方法不应该传入一个节点(Node), 而是传入元素值(int 的 value).
好了, 扯了这么多, 让我们来看看 java 中的 AtomicStampedReference 是怎么解决 ABA 的吧 ^^
源码分析
内部类
- private static class Pair<T> {
- final T reference;
- final int stamp;
- private Pair(T reference, int stamp) {
- this.reference = reference;
- this.stamp = stamp;
- }
- static <T> Pair<T> of(T reference, int stamp) {
- return new Pair<T>(reference, stamp);
- }
- }
将元素值和版本号绑定在一起, 存储在 Pair 的 reference 和 stamp(邮票, 戳的意思)中.
属性
- private volatile Pair<V> pair;
- private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe();
- private static final long pairOffset =
- objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class);
声明一个 Pair 类型的变量并使用 Unsfae 获取其偏移量, 存储到 pairOffset 中.
构造方法
- public AtomicStampedReference(V initialRef, int initialStamp) {
- pair = Pair.of(initialRef, initialStamp);
- }
构造方法需要传入初始值及初始版本号.
compareAndSet()方法
- public boolean compareAndSet(V expectedReference,
- V newReference,
- int expectedStamp,
- int newStamp) {
- // 获取当前的 (元素值, 版本号) 对
- Pair<V> current = pair;
- return
- // 引用没变
- expectedReference == current.reference &&
- // 版本号没变
- expectedStamp == current.stamp &&
- // 新引用等于旧引用
- ((newReference == current.reference &&
- // 新版本号等于旧版本号
- newStamp == current.stamp) ||
- // 构造新的 Pair 对象并 CAS 更新
- casPair(current, Pair.of(newReference, newStamp)));
- }
- private boolean casPair(Pair<V> cmp, Pair<V> val) {
- // 调用 Unsafe 的 compareAndSwapObject()方法 CAS 更新 pair 的引用为新引用
- return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
- }
(1)如果元素值和版本号都没有变化, 并且和新的也相同, 返回 true;
(2)如果元素值和版本号都没有变化, 并且和新的不完全相同, 就构造一个新的 Pair 对象并执行 CAS 更新 pair.
可以看到, java 中的实现跟我们上面讲的 ABA 的解决方法是一致的.
首先, 使用版本号控制;
其次, 不重复使用节点 (Pair) 的引用, 每次都新建一个新的 Pair 来作为 CAS 比较的对象, 而不是复用旧的;
最后, 外部传入元素值及版本号, 而不是节点 (Pair) 的引用.
案例
让我们来使用 AtomicStampedReference 解决开篇那个 AtomicInteger 带来的 ABA 问题.
- public class ABATest {
- public static void main(String[] args) {
- testStamp();
- }
- private static void testStamp() {
- AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1, 1);
- new Thread(()->{
- int[] stampHolder = new int[1];
- int value = atomicStampedReference.get(stampHolder);
- int stamp = stampHolder[0];
- System.out.println("thread 1 read value:" + value + ", stamp:" + stamp);
- // 阻塞 1s
- LockSupport.parkNanos(1000000000L);
- if (atomicStampedReference.compareAndSet(value, 3, stamp, stamp + 1)) {
- System.out.println("thread 1 update from" + value + "to 3");
- } else {
- System.out.println("thread 1 update fail!");
- }
- }).start();
- new Thread(()->{
- int[] stampHolder = new int[1];
- int value = atomicStampedReference.get(stampHolder);
- int stamp = stampHolder[0];
- System.out.println("thread 2 read value:" + value + ", stamp:" + stamp);
- if (atomicStampedReference.compareAndSet(value, 2, stamp, stamp + 1)) {
- System.out.println("thread 2 update from" + value + "to 2");
- // do sth
- value = atomicStampedReference.get(stampHolder);
- stamp = stampHolder[0];
- System.out.println("thread 2 read value:" + value + ", stamp:" + stamp);
- if (atomicStampedReference.compareAndSet(value, 1, stamp, stamp + 1)) {
- System.out.println("thread 2 update from" + value + "to 1");
- }
- }
- }).start();
- }
- }
运行结果为:
- thread 1 read value: 1, stamp: 1
- thread 2 read value: 1, stamp: 1
- thread 2 update from 1 to 2
- thread 2 read value: 2, stamp: 2
- thread 2 update from 2 to 1
- thread 1 update fail!
可以看到线程 1 最后更新 1 到 3 时失败了, 因为这时版本号也变了, 成功解决了 ABA 的问题.
总结
(1)在多线程环境下使用无锁结构要注意 ABA 问题;
(2)ABA 的解决一般使用版本号来控制, 并保证数据结构使用元素值来传递, 且每次添加元素都新建节点承载元素值;
(3)AtomicStampedReference 内部使用 Pair 来存储元素值及其版本号;
彩蛋
(1)java 中还有哪些类可以解决 ABA 的问题?
AtomicMarkableReference, 它不是维护一个版本号, 而是维护一个 boolean 类型的标记, 标记值有修改, 了解一下.
(2)实际工作中遇到过 ABA 问题吗?
笔者还真遇到过, 以前做棋牌游戏的时候, ABCD 四个玩家, A 玩家出了一张牌, 然后他这个请求迟迟没到服务器, 也就是超时了, 服务器就帮他自动出了一张牌.
然后, 转了一圈, 又轮到 A 玩家出牌了, 说巧不巧, 正好这时之前那个请求到了服务器, 服务器检测到现在正好是 A 出牌, 而且请求的也是出牌, 就把这张牌打出去了.
然后呢, A 玩家的牌就不对了.
最后, 我们是通过给每个请求增加一个序列号来处理的, 检测到过期的序列号请求直接抛弃掉.
你有没有遇到过 ABA 问题呢?
来源: https://www.cnblogs.com/tong-yuan/p/AtomicStampedReference.html