更多技术分享可关注我
前言
如果仔细阅读过 Netty 的线程调度模型的源码, 或者 NIO 线程对象及其线程池的创建源码, 那么肯定会遇到类似 "AtomicIntegerFieldUpdater" 的身影, 不禁想知道 --Netty 为何不直接使用原子类包装普通的比如计数的变量?
JDK 的 Atomic 原子操作类实现机制
在 JDK 里, Atomic 开头的原子操作类有很多, 涉及到 Java 常用的数字类型的, 基本都有相应的 Atomic 原子操作类, 如下图所示:
原子操作类都是线程安全的, 编码时可以放心大胆的使用. 下面以其中常用的 AtomicInteger 原子类为例子, 分析这些原子类的底层实现机制, 辅助理解 Netty 为何没有直接使用原子类. 具体使用的 demo 就不写了, 想必 Javaer 都多少用过或者见过, 直接看 AtomicInteger 类核心源码:
- private volatile int value; // 简化了部分非核心源码
- // 初始化, 简化了部分非核心源码
- public AtomicInteger(int initialValue) {
- value = initialValue;
- }
- public final int get() {
- return value;
- }
- // 自增 1, 并返回自增之前的值
- public final int getAndIncrement() {
- return unsafe.getAndAddInt(this, valueOffset, 1);
- }
- // 自减 1, 并返回自增之前的值
- public final int getAndDecrement() {
- return unsafe.getAndAddInt(this, valueOffset, -1);
- }
以上, AtomicInteger 可以对 int 类型的值进行线程安全的自增或者自减等操作. 从源码中可以看到, 线程安全的操作方法底层都是使用 unsafe 方法实现, 这是一个 JDK 的魔法类, 能实现很多贴近底层的功能, 所以并不是 Java 的实现的, 但是能保证底层的这些 getAndXXX 操作都是线程安全的, 关于 unsafe 具体的用法和细节, 可以参考这篇文章 Java 魔法类: Unsafe 应用解析(https://tech.meituan.com/2019/02/14/talk-about-java-magic-class-unsafe.html, 可能无法直接打开, 复制黏贴到浏览器即可)
题外话: 如果 AtomicXXX 的对象是自定义类型呢? 不要慌, Java 也提供了自定义类型的原子操作类 --AtomicReference, 它操作的对象是个泛型对象, 故能支持自定义的类型, 其底层是没有自增方法的, 操作的方法可以作为函数入参传递, 源码如下:
- // 对 x 执行 accumulatorFunction 操作
- // accumulatorFunction 是个函数, 可以自定义想做的事情
- // 返回老值
- public final V getAndAccumulate(V x,
- BinaryOperator<V> accumulatorFunction) {
- // prev 是老值, next 是新值
- V prev, next;
- // 自旋 + CAS 保证一定可以替换老值
- do {
- prev = get();
- // 执行自定义操作
- next = accumulatorFunction.apply(prev, x);
- } while (!compareAndSet(prev, next));
- return prev;
- }
JDK 的 AtomicXXXFieldUpdater 原子更新器及其优势
在 Java5 中, JDK 就开始提供原子类了, 当然也包括原子的更新器 -- 即后缀为 FieldUpdater 的类, 如下 Integer,Long, 还有一个自定义类型的原子更新器, 共三类:
这些原子更新器常见于各种优秀的开源框架里, 而很少被普通的业务程序员直接使用, 其实这些原子更新器也可以被用来包装共享变量(必须是 volatile 修饰的对象属性), 来为这些共享变量实现原子更新的功能. 这些被包装的共享变量可以是原生类型, 也可以是引用类型, 那么不禁要问: 已经有了原子类, 为啥还额外提供一套原子更新器呢?
简单的说有两个原因, 以 int 变量为例, 基于 AtomicIntegerFieldUpdater 实现的原子计数器, 比单纯的直接用 AtomicInteger 包装 int 变量的花销要小, 因为前者只需要一个全局的静态变量 AtomicIntegerFieldUpdater 即可包装 volatile 修饰的非静态共享变量, 然后配合 CAS 就能实现原子更新, 而这样做, 使得后续同一个类的每个对象中只需要共享这个静态的原子更新器即可为对象计数器实现原子更新, 而原子类是为同一个类的每个对象中都创建了一个计数器 + AtomicInteger 对象, 这种开销显然就比较大了.
下面看一个 JDK 使用原子更新器的例子, 即 JDK 的 BufferedInputStream, 如下是源码的片段节选:
- public class BufferedInputStream extends FilterInputStream {
- private static int DEFAULT_BUFFER_SIZE = 8192;
- private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
- protected volatile byte buf[];
- /**
- * Atomic updater to provide compareAndSet for buf. This is
- * necessary because closes can be asynchronous. We use nullness
- * of buf[] as primary indicator that this stream is closed. (The
- * "in" field is also nulled out on close.)
- */
- private static final
- AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
- AtomicReferenceFieldUpdater.newUpdater
- (BufferedInputStream.class, byte[].class, "buf");
可以看出, 每个 BufferedInputStream 对象都包含了一个 buf 属性, 该属性是对象属性, 且被 volition 修饰, 并被原子更新器 AtomicReferenceFieldUpdater 包装, 注意这个引用类型的原子更新器是静态类型的, 这意味着不论用户创建了多少个 BufferedInputStream 对象, 在全局都只有这一个原子更新器被创建, 这里之所以不用原子类 AtomicReference 直接包装 buf 属性, 是因为 buf 是一个 byte 数组, 通常会是一个比较大的对象, 如果用原子类直接包装, 那么后续每个 BufferedInputStream 对象都会额外创建一个原子类的对象, 会消耗更多的内存, 负担较重, 因此 JDK 直接使用了原子更新器代替了原子类, Netty 源码中的类似使用也是如出一辙.
另外一个重要原因是使用原子更新器, 不会破坏共享变量原来的结构, 回到上述 JDK 的例子, buf 对外仍然可以保留 buf 对象的原生数组属性, 只不过多了一个 volatile 修饰, 外界可以直接获取到这个 byte 数组实现一些业务逻辑, 而且在必要的时候也能使用原子更新器实现原子更新, 可谓两头不耽误, 灵活性较强!
还有一个可能的疑问点需要理解, 即原子更新器虽然是静态的, 但是其修饰的共享变量确仍然是类的对象属性, 即每个类的对象仍然是只包含自己那独一份的共享变量, 不会因为原子更新器是静态的, 而受到任何影响.
结论: 实现原子更新最佳的方式是直接使用原子更新器实现. 一方面是更节省内存, 另一方面是不破坏原始的共享变量, 使用起来更灵活. 当然如果是时延要求没有那么高的场景, 那么就不需要这么严苛, 直接使用原子类就 OK, 毕竟原子类的编码简单, 开发效率高, 不易出错.
品 Netty 源码, 学习原子更新的最佳实现方式
前面说了很多理论, 下面看一段 Netty 源码, 看 Netty 是如何优雅的使用原子更新器的. 下面是 Netty 的 NIO 线程实现类 --SingleThreadEventExecutor 的部分源码, 省略了很多和本次分析无关的代码:
- /**
- * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.
- */
- public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
- private static final int ST_NOT_STARTED = 1;
- private static final int ST_STARTED = 2;
- private static final int ST_SHUTTING_DOWN = 3;
- private static final int ST_SHUTDOWN = 4;
- private static final int ST_TERMINATED = 5;
- private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER;
- private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER;
- private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
- static {
- AtomicIntegerFieldUpdater<SingleThreadEventExecutor> updater =
- PlatformDependent.newAtomicIntegerFieldUpdater(SingleThreadEventExecutor.class, "state");
- if (updater == null) {
- updater = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
- }
- STATE_UPDATER = updater;
- }
- private final Queue<Runnable> taskQueue;
- private final Executor executor;
- private volatile Thread thread;
- private volatile int state = ST_NOT_STARTED;
以上截取了一小片段, 并删除了注释, 可以清晰的看到 Netty 封装了 JDK 的 Thread 对象, 一些标识线程状态的静态常量, 线程执行器, 异步任务队列, 以及标识线程状态的属性 state 等, 其中重点关注 state, 这个属性是普通的共享变量, 由 volatile 修饰, 并且被静态的原子更新器 STATE_UPDATER 包装.
下面看 NIO 线程的启动源码:
- /**
- * NioEventLoop 线程启动方法, 这里会判断本 NIO 线程是否已经启动
- */
- private void startThread() {
- if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
- if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
- doStartThread();
- }
- }
- }
注释写到了, 启动 NIO 线程之前会做一次是否已经启动的判断, 避免重复启动, 这个判断逻辑就是前面提到的原子更新器实现的, 当本 NIO 线程实例没有启动时, 会做一次 CAS 计算, 注意 CAS 对应操作系统的一个指令, 是原子操作, 如果是多个外部线程在启动 NIO 线程, 那么同时只有一个外部线程能启动成功一次, 后续的线程不会重复启动这个 NIO 线程. 保证在 NIO 线程的一次生命周期内, 外部线程只能调用一次 doStartThread()方法, 这样可以实现无锁更新, 且没有自旋, 性能较好, 这里之所以不需要自旋, 是因为启动线程就应该是一锤子买卖, 启动不成功, 就说明是已经启动了, 直接跳过, 无需重试.
在看一个自旋的用法:
在 NIO 线程被优雅 (也可能异常) 关闭时, 会在死循环里, 结合 CAS 算法, 原子更新当前 NIO 线程的状态为关闭中... 这里有两个注意事项:
1, 和线程安全的启动 NIO 线程的逻辑不一样, 更新线程状态必须成功, 不是一锤子买卖, 所以需要自旋重试, 直到 CAS 操作成功
2, 需要使用局部变量缓存外部的共享变量的旧值, 保证 CAS 操作执行期间该共享变量的旧值不被外部线程修改
3, 同样的, 每次执行 CAS 操作之前, 必须判断一次旧值, 只有符合更新条件, 才真的执行 CAS 操作, 否则说明已经被外界线程更新成功, 无需重复操作, 以提升性能.
Netty 这样做也侧面反映 Nerty 的源码确实很优秀, 平时的业务开发, 如果有类似场景, 那么可以参考学习这两类用法.
总结使用原子更新器的注意事项:
1, 包装的必须是被 volatile 修饰的共享变量
2, 包装的必须是非静态的共享变量
3, 必须搭配 CAS 的套路自行实现比较并交换的逻辑
4, 自行实现比较并交换的逻辑时需要注意: 如果是非一锤子买卖的原子更新操作, 那么必须用局部变量缓存外部的共享变量的旧值, 具体原因可以参考: Netty 的线程调度模型分析(10)《多线程环境下, 实例变量转为局部变量的程序设计技巧》, 且放在一个循环里操作, 以保证最终一致性.
后记
dashuai 的博客是终身学习践行者, 大厂程序员, 且专注于工作经验, 学习笔记的分享和日常吐槽, 包括但不限于互联网行业, 附带分享一些 PDF 电子书, 资料, 帮忙内推, 欢迎拍砖!
来源: https://www.cnblogs.com/kubixuesheng/p/12650686.html