上节讲过了 ThreadLocal 的源码, 这一节我们来看下 FastThreadLocal. 这个我觉得要比 ThreadLocal 要简单, 因为缺少了对于 Entry 的清理和整理工作, 所以 ThreadLocal 的效率更高.
跟 ThreadLocal 一样, 我们也先给一个结构图:
大家看这个图跟 ThreadLocal 有哪些区别, 首先是用一个 Object 数组来替代了 Entry 数组, 不再是 key 键值对的形式. 另外 Object[0]存储一个 Set<FastThreadLocal<?>>集合.
OK, 看完这个, 源码就很好理解了, 我们还是先看下 InternalThreadLocalMap 构造函数
- private InternalThreadLocalMap() {
- super(newIndexedVariableTable());
- }
- private static Object[] newIndexedVariableTable() {
- Object[] array = new Object[32]; // 初始化 32 长度的 Object 数组
- Arrays.fill(array, UNSET); // 将每个元素初始化成 UNSET 这里的 UNSET 可以理解为占位符, 因为 null 会被认为成有效值
- return array;
- }
- UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
- this.indexedVariables = indexedVariables; // 将创建的 array 赋给 Object[] indexedVariables;
- }
看完这个, 我们来看下数组的索引值是什么设置的, 打开 FastThreadLocal, 看下全局变量.
1 private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
上面这个了解类的加载机制的应该很清楚 variablesToRemoveIndex 初始化的时机. 想要了解类的加载过程的 请移步 类的加载机制(一)
- static final AtomicInteger nextIndex = new AtomicInteger(); // 原子类
- public static int nextVariableIndex() {
- int index = nextIndex.getAndIncrement(); // 先获取值, 在增加, 所以 index = 0
- if (index <0) {
- nextIndex.decrementAndGet();
- throw new IllegalStateException("too many thread-local indexed variables");
- }
- return index;
- }
由此得出, variablesToRemoveIndex = 0 固定值. 再看下 FastThreadLocal 的构造方法
- public FastThreadLocal() {
- index = InternalThreadLocalMap.nextVariableIndex();
- }
大家看到这个, 是不是就想到了, 也就是说每个 FastThreadLocal 都有一个唯一的 index 值 , 那么跟 ThreadLocal 相比的话, ThreadLocal 要 先获取一个 hash 值, 然后再根据 Entry 数组的长度进行运算得到一个索引值, 所以说这样也是 Netty 的这个 FastThreadLocal 效率更高的原因之一.
为了更好地讲下面的内容, 我们再看一个 FastThreadLocalThread
继承了 Thread, 增加了成员变量 InternalThreadLocalMap threadLocalMap. 不再是放在 Thread 中的成员变量了, 看到这个想到了什么? 那么也就是说获取某个线程存储 Object 数组结构的 Map, 是从 FastThreadLocalThread 中获取.
好了介绍完上面, 看 set 方法
- public final void set(V value) {
- if (value != InternalThreadLocalMap.UNSET) { // 如果不是 UNSET
- InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); // 获取 map
- if (setKnownNotUnset(threadLocalMap, value)) { // 新增或者更新值
- registerCleaner(threadLocalMap); // 如果返回 true, 代表新增, 设置清理位
- }
- } else {
- remove(); // 如果入参是一个 UNSET, 那么执行删除逻辑
- }
- }
看下 InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
- public static InternalThreadLocalMap get() {
- Thread thread = Thread.currentThread();
- if (thread instanceof FastThreadLocalThread) { // 如果当前线程是一个 FastThreadLocalThread
- return fastGet((FastThreadLocalThread) thread); // 执行 fastGet 这个名字是很有意思....
- } else {
- return slowGet(); // 要不然就 slowGet() Netty 叫 ThreadLocal Slow...
- }
- }
- private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) { // 如果当前的 map 没有设置, 则新创建一个
- InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); // 这里就是获取了 FastThreadLocalThread 中的成员变量 threadLocalMap
- if (threadLocalMap == null) {
- thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
- }
- return threadLocalMap;
- }
看下新增或者更新的逻辑
- private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
- if (threadLocalMap.setIndexedVariable(index, value)) { // index 是当前 FastThreadLocal 的索引 , 如果是 新增, 那么这个方法返回 true , 如果是修改则返回 false
- addToVariablesToRemove(threadLocalMap, this); // 新增的话, 需要将当前的这个 FastThreadLocal 添加到 Set<FashThreadLocal > 集合中
- return true; // 新增返回 trye
- }
- return false; // 更新返回 false
- }
- public boolean setIndexedVariable(int index, Object value) {
- Object[] lookup = indexedVariables; // 当前的 Object[]对象
- if (index <lookup.length) { // 检查当前的索引是否超过了 Object 数组的长度
- Object oldValue = lookup[index]; // 获取当前的值
- lookup[index] = value; // 将新值设置进去
- return oldValue == UNSET; // 判断之前的值是不是占位符, 如果是则代表新增, 不是代表更新
- } else {
- expandIndexedVariableTableAndSet(index, value); // 如果超过了, 则需要进行扩容
- return true;
- }
- }
- 1 private void expandIndexedVariableTableAndSet(int index, Object value) { // 扩容逻辑
- 2 Object[] oldArray = indexedVariables; // 当前的 Object 数组
- 3 final int oldCapacity = oldArray.length; // 当前的长度
- 4 int newCapacity = index; // index 是这个 FastThreadLocal 对应的索引值
- 5 newCapacity |= newCapacity>>> 1;
- 6 newCapacity |= newCapacity>>> 2;
- 7 newCapacity |= newCapacity>>> 4;
- 8 newCapacity |= newCapacity>>> 8;
- 9 newCapacity |= newCapacity>>> 16;
- 10 newCapacity ++; // 这段其实也很有意思, 其实是为了计算新的数组的容量, 会变成下一个档位的 2 的次方大小, 比如 1->2 2->4 3->4 4->8 5->8 6->8 7->8 8->16 18->32, 如果不理解,
- // 可以自己写一个 main 方法试一下, 后面我们在讲 Netty 内存模型的时候大家也会看到这么一段.
- 11
- 12 Object[] newArray = Arrays.copyOf(oldArray, newCapacity); // 将老的数据往新的数组进行拷贝
- 13 Arrays.fill(newArray, oldCapacity, newArray.length, UNSET); // 将新的多出来的部分, 设置占位符
- 14 newArray[index] = value; // 将 index 对应的值设置完
- 15 indexedVariables = newArray; // 将新的数组提升成 Map 中的 indexedVariables
- 16 }
接下来看下 set 中的这句 addToVariablesToRemove(threadLocalMap, this); 根据上面的结构, 新增了 value, 那么就需要修改 set 集合
- private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
- Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); // 根据上面的分析 variablesToRemoveIndex = 0 获取第 0 个位置的 Object 然后看是不是 null 或者占位符
- Set<FastThreadLocal<?>> variablesToRemove;
- if (v == InternalThreadLocalMap.UNSET || v == null) { // 如果是占位符
- variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
- threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); // 创建一个 set 集合放到第 0 的位置上
- } else {
- variablesToRemove = (Set<FastThreadLocal<?>>) v; // 已经有了, 则直接取过来
- }
- variablesToRemove.add(variable); // 新增新的 FastThreadLocal 到 set 集合
- }
接下来看 get 方法
- public final V get() {
- InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); // 获取 map
- Object v = threadLocalMap.indexedVariable(index); // 根据当前的 Thread 的索引, 获取 Object
- if (v != InternalThreadLocalMap.UNSET) { // 不是占位符直接返回
- return (V) v;
- }
- V value = initialize(threadLocalMap); // 如果是, 则调用初始化方法
- registerCleaner(threadLocalMap);
- return value;
- }
- private V initialize(InternalThreadLocalMap threadLocalMap) {
- V v = null;
- try {
- v = initialValue(); // 空方法, 供子类调用
- } catch (Exception e) {
- PlatformDependent.throwException(e);
- }
- threadLocalMap.setIndexedVariable(index, v); // 设置初始化的值
- addToVariablesToRemove(threadLocalMap, this); // 添加 set 集合
- return v;
- }
接下来看 remove 方法
- public final void remove(InternalThreadLocalMap threadLocalMap) {
- if (threadLocalMap == null) {
- return;
- }
- Object v = threadLocalMap.removeIndexedVariable(index); // 根据当前的 Thread 的索引, 获取 Object
- removeFromVariablesToRemove(threadLocalMap, this); // 移除 set 集合中的元素, 这里就不展开说了, 上面懂了就很简单了
- if (v != InternalThreadLocalMap.UNSET) {
- try {
- onRemoval((V) v); // 空方法, 供子类调用
- } catch (Exception e) {
- PlatformDependent.throwException(e);
- }
- }
- }
好了, 这就算是讲完了, 当然 FastThreadLocal 不会整理数据和清除过期数据, 是怎么防止内存泄露的呢?
看下 FastThreadLocalRunnable
- public class FastThreadLocalRunnable implements Runnable {
- private Runnable runnable;
- public FastThreadLocalRunnable(Runnable runnable) {
- this.runnable = ObjectUtil.checkNotNull(runnable, "runnable");
- }
- public static Runnable wrap(Runnable runnable) {
- return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
- }
- @Override
- public void run() {
- try {
- // 运行任务
- this.runnable.run();
- } finally {
- /**
- * 线程池中的线程由于会被复用, 所以线程池中的每一条线程在执行 task 结束后, 要清理掉其 InternalThreadLocalMap 和其内的 FastThreadLocal 信息,
- * 否则, 当这条线程在下一次被复用的时候, 其 ThreadLocalMap 信息还存储着上一次被使用时的信息;
- * 另外, 假设这条线程不再被使用, 但是这个线程有可能不会被销毁(与线程池的类型和配置相关), 那么其上的 ThreadLocal 将发生资源泄露.
- */
- FastThreadLocal.removeAll();
- }
- }
- }
使用该类将一个普通的 Runnable 对象进行 wrap 装饰, 之后在调用 FastThreadLocalRunnable.run()的时候, 实际上会调用真实对象 (即普通的 Runnable 对象) 的 run(), 执行完成之后, 会进行对当前线程的全量回收操作(删除当前线程上的 InternalThreadLocalMap 中的每一个 value 以及 threadLocalMap 本身), 这样就可以有效的在线程池中复用当前线程而不必关心 ftl 的错乱和泄漏问题.
来源: https://www.cnblogs.com/huxipeng/p/11299244.html