本篇博客我们主要讲述 J.U.C 包下的 atomic 包, 在上篇博客 "并发模拟" 的最后, 我们模拟高并发的情形时出现了线程安全问题, 怎么解决呢? 其实解决的办法有很多中, 如直接在 add() 方法上加 synchronized 关键字, 还有一种就是用 atomic 包下的类来解决这个问题, 这也是现在比较推荐的一种写法, 下面我们给出完整代码:
- @Slf4j
- public class CountExample2 {
- // 请求总数
- public static int clientTotal = 5000;
- // 同时并发执行的线程数
- public static int threadTotal = 200;
- public static AtomicInteger count = new AtomicInteger(0);
- private static void add() {
- count.incrementAndGet();
- // count.getAndIncrement();
- }
- public static void main(String[] args)throws Exception {
- // 定义线程池
- ExecutorService executorService = Executors.newCachedThreadPool();
- // 定义信号量
- final Semaphore semaphore = new Semaphore(threadTotal);
- // 定义计数器闭锁
- final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
- for (int i = 0; i <clientTotal; i++) {
- executorService.execute(()->{
- try {
- semaphore.acquire();
- add();
- semaphore.release();
- } catch (Exception e) {
- log.error("exception",e);
- }
- countDownLatch.countDown();
- });
- }
- countDownLatch.await();
- executorService.shutdown();
- log.info("count:{}",count.get());
- }
- }
输出结果如下:
由输出结果可知, 我们已经保证了线程安全 (如果对此 demo 有不解的地方可参考 "并发编程 (三)"), 在这个 demo 中就是 AtomicInteger 发挥了作用, 下面我们来系统的了解一下 atomic 包下的类.
atomic 包
java8 中, 在 atomic 包中一共有 17 个类, 其中有 12 个类是 jdk1.5 提供 atomic 时就有的, 5 个类是 jdk1.8 新加的. 在原有的 12 个类中包含四种原子更新方式, 分别是原子更新基本类型, 原子更新数组, 原子更新引用, 原子更新字段 (下面我会在每种类型中, 选择某个类为代表以代码进行演示)
java8 新增的 5 个类分别是 Striped64,LongAdder,LongAccumulator,DoubleAdder,DoubleAccumulator,Sriped64 作为父类, 其余四个类继承此类, 分别是 long 和 double 的具体实现
原子更新基本类型
通过原子的方式更新基本类型. 以 AtomicInteger 为代表进行演示
AtomicInteger 的常用方法如下:
int addAndGet(int delta): 以原子的方式将输入的数值与实例中的值相加, 并返回结果
boolean compareAndSet(int expect,int update): 如果输入的数值等于预期值, 则以原子方式将该值设置为输入的值
int getAndIncrement(): 以原子方式将当前值加 1, 并返回自增前的值
void lazySet(int newValue): 最终设置为 newValue, 使用 lazySet 设置后, 可能导致其他线程在一小段时间内还是可以读到旧的值
int getAndSet(int newValue): 以原子的方式设置为 newValue 的值, 并返回旧值
本篇博客开始的 demo 就是运用 AtomicInteger 的例子
- import java.util.concurrent.atomic.AtomicInteger;
- public class AtomicIntegerTest {
- static AtomicInteger ai = new AtomicInteger(1);
- public static void main(String[] args) {
- System.out.println(ai.getAndIncrement());
- System.out.println(ai.get());
- }
- }
输出分别为 1,2
- AtomicLong-demo
- @Slf4j
- public class AtomicExample2 {
- // 请求总数
- public static int clientTotal = 5000;
- // 同时并发执行的线程数
- public static int threadTotal = 200;
- public static AtomicLong count = new AtomicLong(0);
- private static void add() {
- // 主要为此方法 (看源码)
- count.incrementAndGet();
- // count.getAndIncrement();
- }
- public static void main(String[] args)throws Exception {
- // 定义线程池
- ExecutorService executorService = Executors.newCachedThreadPool();
- // 定义信号量
- final Semaphore semaphore = new Semaphore(threadTotal);
- // 定义计数器闭锁
- final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
- for (int i = 0; i <clientTotal; i++) {
- executorService.execute(()->{
- try {
- semaphore.acquire();
- add();
- semaphore.release();
- } catch (Exception e) {
- log.error("exception",e);
- }
- countDownLatch.countDown();
- });
- }
- countDownLatch.await();
- executorService.shutdown();
- log.info("count:{}",count.get());
- }
- }
输出为 5000, 用法与 AtomicInteger 类似
- AtomicBoolean-demo
- @Slf4j
- public class AtomicExample6 {
- private static AtomicBoolean isHappened = new AtomicBoolean(false);
- // 请求总数
- public static int clientTotal = 5000;
- // 同时并发执行的线程数
- public static int threadTotal = 200;
- public static void main(String[] args) throws Exception{
- // 定义线程池
- ExecutorService executorService = Executors.newCachedThreadPool();
- // 定义信号量
- final Semaphore semaphore = new Semaphore(threadTotal);
- // 定义计数器闭锁
- final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
- for (int i = 0; i <clientTotal; i++) {
- executorService.execute(()->{
- try {
- semaphore.acquire();
- test();
- semaphore.release();
- } catch (Exception e) {
- log.error("exception",e);
- }
- countDownLatch.countDown();
- });
- }
- countDownLatch.await();
- executorService.shutdown();
- log.info("isHappened:{}",isHappened.get());
- }
- private static void test() {
- // 原子性操作, 从 false 变为 true 只会执行一次
- if (isHappened.compareAndSet(false, true)) {
- log.info("execute");
- }
- }
- }
test() 方法中将 isHappened 从 false 变为 true 只会执行一次.
原子更新数组类
通过原子的方式更新数组里的某个元素. 我们以 AtomicIntegerArray(提供原子的方式更新数组里的整型) 为例进行演示, 其常用方法如下:
int addAndGet(int i,int delta): 以原子的方式将输入值与数组中索引 i 的元素相加
boolean compareAndSet(int i,int expect,int update): 如果当前值等于预期值, 则以原子方式将数组位置 i 的元素设置成 update 值
- AtomicIntegerArray-demo
- public class AtomicIntegerArrayTest {
- static int[] value = new int[] { 1, 2 };
- static AtomicIntegerArray ai = new AtomicIntegerArray(value);
- public static void main(String[] args) {
- ai.getAndSet(0, 3);
- System.out.println(ai.get(0));
- System.out.println(value[0]);
- }
- }
输出结果分别为 3,1 , 为什么 value[0] 的值为 1 呢? 因为数组 value 通过构造方法传递进去之后, AtomicIntegerArray 会将此数组复制一份, 所以当 AtomicIntegerArray 对内部数组元素进行修改时不会影响传入的数组.
原子更新引用类型
AtomicReference: 原子更新引用类型
AtomicReferenceFieldUpdater: 原子更新引用类型的字段
AtomicMarkableReference: 原子更新带有标记位的引用类型, 可以原子的更新一个布尔类型的标记位和引用类型.
- AtomicReference-demo1
- class AtomicReferenceTest {
- public static AtomicReference<User> atomicUserRef = new AtomicReference<User>();
- public static void main(String[] args) {
- User user = new User("conan", 15);
- atomicUserRef.set(user);
- User updateUser = new User("xiaoming", 18);
- atomicUserRef.compareAndSet(user, updateUser);
- System.out.println(atomicUserRef.get().getName());
- System.out.println(atomicUserRef.get().getOld());
- }
- static class User {
- private String name;
- private int old;
- public User(String name, int old) {
- this.name = name;
- this.old = old;
- }
- public String getName() {
- return name;
- }
- public int getOld() {
- return old;
- }
- }
- }
输出为:"xiaoming",18 我们可以结合上篇博客 "并发模拟" 中, 模拟高并发环境, 观察原子更新引用类型与非原子更新的区别
- AtomicReference-demo2
- @Slf4j
- public class AtomicExample4 {
- private static AtomicReference<Integer> count = new AtomicReference<>(0);
- public static void main(String[] args) {
- count.compareAndSet(0, 2); //2
- count.compareAndSet(0, 1); //no
- count.compareAndSet(1, 3); //no
- count.compareAndSet(2, 4); //4
- count.compareAndSet(3, 5); //no
- log.info("count:{}", count.get());
- }
- }
上述代码输出结果为 4, 因为只有第一句和第二句代码得到执行, 具体原因可参考下篇博客 cas 相关的内容
原子更新字段类
AtomicIntegerFiledUpdater: 原子更新整型的字段的更新器
AtomicLongFiledUpdater: 原子更新长整型字段的更新器
AtomicStampedReference: 原子更新带有版本号的引用类型, 该类将整数值与引用关联起来, 可原子的更数据和数据的版本号, 可以解决使用 cas 进行原子更新时, 可能出现的 aba 问题, 原子更新字段类都是抽象类, 每次使用都必须使用静态方法 newUpdater 创建一个更新器, 原子更新类的字段必须使用 public volatile 修饰, 我们以 AtomicIntergerFieldUpdater 为例进行演示
- AtomicIntergerFieldUpdater-demo1
- public class AtomicIntegerFieldUpdaterTest {
- private static AtomicIntegerFieldUpdater<User> a = AtomicIntegerFieldUpdater
- .newUpdater(User.class, "old");
- public static void main(String[] args) {
- User conan = new User("conan", 10);
- System.out.println(a.getAndIncrement(conan));
- System.out.println(a.get(conan));
- }
- public static class User {
- private String name;
- public volatile int old;
- public User(String name, int old) {
- this.name = name;
- this.old = old;
- }
- public String getName() {
- return name;
- }
- public int getOld() {
- return old;
- }
- }
- }
输出为 10,11
- AtomicIntergerFieldUpdater-demo2
- @Slf4j
- public class AtomicExample5 {
- // 更新某个类的某一字段的值
- private static AtomicIntegerFieldUpdater<AtomicExample5> updater = AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class, "count");
- // 这个字段必须要用 volatile 修饰, 并且是非 static 字段才可以
- @Getter
- public volatile int count = 100;
- private static AtomicExample5 example5 = new AtomicExample5();
- public static void main(String[] args) {
- if (updater.compareAndSet(example5,100,120)) {
- log.info("update success 1,{}", example5.getCount());
- }
- if (updater.compareAndSet(example5, 100, 120)) {
- log.info("update success 2,{}", example5.getCount());
- } else {
- log.info("update failed,{}", example5.getCount());
- }
- }
- }
输出如下:
我们同样可以发现,"update success 2" 没有执行, 这也涉及到了 cas 的基本原理, 我会在下篇博客具体介绍
下面我们看一下在 atomic 包中新增的 5 个类:
Striped64
Striped64 里面维持一个 volatile 的 base, 还有一个 cell 数据, cell 数组主要存储线程需要增加或减少的值, 它能够将竞争的线程分散到自己内部的私有 cell 数组里面, 所以当并发量很大的时候, 线程会被部分分发去访问内部的 cell 数组. Striped64 里面有两个主要的方法 longAccumulate 和 doubleAccumulate, 两个方法非常相似. 实现的主要思路是如果能通过 cas 修改 base 成功, 那么直接退出 (并发量不大的时候), 否则去 cells 里面占一个非空的空 (并发量大的时候), 并把要操作的值赋值保存在一个 cell 里面, 这样在并发特别高的时候可能将热点分离
LongAdder
当并发量不高时, LongAdder 和 AtomicLong 性能差不多, 但是当并发超过一定限度, cas 会频繁失败, 对于 AtomicLong 没有其他解决办法, 对于 LongAdder 则可以通过 cells 数组来进行部分分流操作. LongAdder 使用的思想是热点分离, 就是将 value 值分离成一个数组, 当多线程访问时通过 hash 算法映射到其中一个数字进行计数, 最终的结果就是这些数组的求和累加, 这样一来减小了锁的粒度. LongAdder 一开始不会直接使用 cell [] 存储, 而是先使用 long 类型的 base 存储, 当 casBase() 出现失败时, 则会创建 cell[], 如果在单个 cell 上面出现了 cell 更新冲突, 则会尝试创建新的 cell 或将 cell[] 扩容为 2 倍. LongAdder 中的方法如下:
void add(long x): 增加 x
void increment(): 自增 1
void decrement(): 自减 1
long sum(): 求和
void reset(): 重置 cell 数组
long sumThenReset(): 求和并重置
- LongAdder-demo1
- @Slf4j
- @ThreadSafe
- public class AtomicExample3 {
- // 请求总数
- public static int clientTotal = 5000;
- // 同时并发执行的线程数
- public static int threadTotal = 200;
- // 与 AtomicLong 的区别是, 可以将热点分离, 在并发特别高的时候可能提高性能, 在并发不是特别高的时候可以用 atomiclong(序列化生成, 全局唯一也可以用这个)(再去查)
- public static LongAdder count = new LongAdder();
- private static void add() {
- count.increment();
- // count.getAndIncrement();
- }
- public static void main(String[] args)throws Exception {
- // 定义线程池
- ExecutorService executorService = Executors.newCachedThreadPool();
- // 定义信号量
- final Semaphore semaphore = new Semaphore(threadTotal);
- // 定义计数器闭锁
- final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
- for (int i = 0; i <clientTotal; i++) {
- executorService.execute(()->{
- try {
- semaphore.acquire();
- add();
- semaphore.release();
- } catch (Exception e) {
- log.error("exception",e);
- }
- countDownLatch.countDown();
- });
- }
- countDownLatch.await();
- executorService.shutdown();
- log.info("count:{}",count);
- }
- }
输出为 5000, 没有出现线程安全问题
LongAccumulator
LongAccumulator 是 LongAdder 的功能增强版, LongAdd 的 api 只是对数值的加减, 而 LongAccumulator 提供了自定义的函数操作
- LongAccumulator-demo
- public class LongAccumulatorDemo {
- public static void main(String[] args)throws InterruptedException {
- LongAccumulator accumulator = new LongAccumulator(Long::max, Long.MIN_VALUE);
- Thread[] ts = new Thread[1000];
- for (int i = 0; i <1000; i++) {
- ts[i] = new Thread(() ->{
- Random random = new Random();
- long value = random.nextLong();
- // 比较 value 和上一次的比较值, 存储较大值
- accumulator.accumulate(value);
- });
- ts[i].start();
- }
- // 使用 join 方法, 等到所有的线程结束后再执行下面的代码
- for (int i = 0; i < 1000; i++) {
- ts[i].join();
- }
- System.out.println(accumulator.longValue());
- }
- }
上述代码作用为 accumulate(value) 传入的值会与上一次比较值对比, 保留较大者
DoubleAdder 与 DoubleAccumulator
这两个类的实现思路与 long 类型的实现一致, 只是将 double 转为 long 类型后运算的.
来源: https://www.cnblogs.com/sbrn/p/8995500.html