先做总结:
1,HashMap HashTable ConcurrentHashMap
HashMap: 线程不安全
HashTable: 线程安全, 每个方法都加了 synchronized 修饰. 类似 Collections.synchronizedMap(hashMap)
对读写加锁, 独占式, 一个线程在读时其他线程必须等待, 吞吐量较低, 性能较为低下.
ConcurrentHashMap: 利用 CAS+Synchronized 来保证并发的安全性. 数据结构同 HashMap.
2,ConcurrentHashMap 如何实现线程安全?
(1)get() 方法使用 tabAt(Node<K, V>[], int) 方法
调用 Unsafe 的 native 方法 getObjectVolatile(Object obj, long offset);
// 获取 obj 对象中 offset 偏移地址对应的 object 型 field 的值, 支持 volatile load 语义, 即: 让缓存中的数据失效, 重新从主内存加载数据
(2)put() 方法
1需要获取数组上的 Node 时同样使用 tabAt() 方法
2设置数组上 Node 是使用 casTabAt() 方法,
casTabAt() 调用 Unsafe 的 native 方法 compareAndSwapObject(),CAS 操作
3哈希冲突之后, 需要操作改 hash 值对应的链表 / 红黑树, 此时 synchronized(该链表第一个 Node)
保证线程安全的基础上, 减小了锁的粒度.
3, 线程安全的容器只能保证自身的数据不被破坏, 但无法保证业务的行为是否正确.
- public static void demo1() {
- final Map<String, Integer> count = new ConcurrentHashMap<>();
- final CountDownLatch endLatch = new CountDownLatch(2);
- Runnable task = new Runnable() {
- @Override
- public void run() {
- for (int i = 0; i <5; i++) {
- Integer value = count.get("a");
- if (null == value) {
- count.put("a", 1);
- } else {
- count.put("a", value + 1);
- }
- }
- endLatch.countDown();
- }
- };
- new Thread(task).start();
- new Thread(task).start();
- try {
- endLatch.await();
- System.out.println(count);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
demo1 是两个线程操作 ConcurrentHashMap, 意图将 value 变为 10. 但是, 因为多个线程用相同的 key 调用时, 很可能会覆盖相互的结果, 造成记录的次数比实际出现的次数少.
当然可以用锁解决这个问题, 但是也可以使用 ConcurrentMap 定义的方法:
V putIfAbsent(K key, V value)
如果 key 对应的 value 不存在, 则 put 进去, 返回 null. 否则不 put, 返回已存在的 value.
boolean remove(Object key, Object value)
如果 key 对应的值是 value, 则移除 K-V, 返回 true. 否则不移除, 返回 false.
boolean replace(K key, V oldValue, V newValue)
如果 key 对应的当前值是 oldValue, 则替换为 newValue, 返回 true. 否则不替换, 返回 false.
修改:
- public static void demo1() {
- final Map<String, Integer> count = new ConcurrentHashMap<>();
- final CountDownLatch endLatch = new CountDownLatch(2);
- Runnable task = new Runnable() {
- @Override
- public void run() {
- Integer oldValue, newValue;
- for (int i = 0; i <5; i++) {
- while (true) {
- oldValue = count.get("a");
- if (null == oldValue) {
- newValue = 1;
- if (count.putIfAbsent("a", newValue) == null) {
- break;
- }
- } else {
- newValue = oldValue + 1;
- if (count.replace("a", oldValue, newValue)) {
- break;
- }
- }
- }
- }
- endLatch.countDown();
- }
- };
- new Thread(task).start();
- new Thread(task).start();
- try {
- endLatch.await();
- System.out.println(count);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
由于 ConcurrentMap 中不能保存 value 为 null 的值, 所以需要处理不存在和已存在两种情况, 不过可以使用 AtomicInteger 来替代.
- public static void demo1() {
- final Map<String, AtomicInteger> count = new ConcurrentHashMap<>();
- final CountDownLatch endLatch = new CountDownLatch(2);
- Runnable task = new Runnable() {
- @Override
- public void run() {
- AtomicInteger oldValue;
- for (int i = 0; i <5; i++) {
- oldValue = count.get("a");
- if (null == oldValue) {
- AtomicInteger zeroValue = new AtomicInteger(0);
- oldValue = count.putIfAbsent("a", zeroValue);
- if (null == oldValue) {
- oldValue = zeroValue;
- }
- }
- oldValue.incrementAndGet();
- }
- endLatch.countDown();
- }
- };
- new Thread(task).start();
- new Thread(task).start();
- try {
- endLatch.await();
- System.out.println(count);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
一, 属性
- // 最大容量: 2^30=1073741824
- private static final int MAXIMUM_CAPACITY = 1 << 30;
- // 默认初始值, 必须是 2 的幕数
- private static final int DEFAULT_CAPACITY = 16;
- //
- static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
- //
- private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
- //
- private static final float LOAD_FACTOR = 0.75f;
- // 链表转红黑树阀值,> 8 链表转换为红黑树
- static final int TREEIFY_THRESHOLD = 8;
- // 树转链表阀值, 小于等于 6(tranfer 时, lc,hc=0 两个计数器分别 ++ 记录原 bin, 新 binTreeNode 数量,<=UNTREEIFY_THRESHOLD 则 untreeify(lo))
- static final int UNTREEIFY_THRESHOLD = 6;
- //
- static final int MIN_TREEIFY_CAPACITY = 64;
- //
- private static final int MIN_TRANSFER_STRIDE = 16;
- //
- private static int RESIZE_STAMP_BITS = 16;
- // 2^15-1,help resize 的最大线程数
- private static final int MAX_RESIZERS = (1 <<(32 - RESIZE_STAMP_BITS)) - 1;
- // 32-16=16,sizeCtl 中记录 size 大小的偏移量
- private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
- // forwarding nodes 的 hash 值
- static final int MOVED = -1;
- // 树根节点的 hash 值
- static final int TREEBIN = -2;
- // ReservationNode 的 hash 值
- static final int RESERVED = -3;
- // 可用处理器数量
- static final int NCPU = Runtime.getRuntime().availableProcessors();
几个很重要的概念:
(1)table: 用来存放 Node 节点数据的, 默认为 null, 默认大小为 16 的数组, 每次扩容时大小总是 2 的幂次方;
(2)nextTable: 扩容时新生成的数据, 数组为 table 的两倍;
(3)Node: 节点, 保存 key-value 的数据结构;
(4)ForwardingNode: 一个特殊的 Node 节点, hash 值为 - 1, 其中存储 nextTable 的引用. 只有 table 发生扩容的时候, ForwardingNode 才会发挥作用, 作为一个占位符放在 table 中表示当前节点为 null 或则已经被移动
(5)sizeCtl: 控制标识符, 用来控制 table 初始化和扩容操作的, 在不同的地方有不同的用途, 其值也不同, 所代表的含义也不同
负数代表正在进行初始化或扩容操作
-1 代表正在初始化
-N 表示有 N-1 个线程正在进行扩容操作
正数或 0 代表 hash 表还没有被初始化, 这个数值表示初始化或下一次进行扩容的大小
二, 构造
- public ConcurrentHashMap() {
- }
- public ConcurrentHashMap(int initialCapacity) {
- if (initialCapacity < 0)
- throw new IllegalArgumentException();
- int cap = ((initialCapacity>= (MAXIMUM_CAPACITY>>> 1)) ?
- MAXIMUM_CAPACITY :
- tableSizeFor(initialCapacity + (initialCapacity>>> 1) + 1));
- this.sizeCtl = cap;
- }
- public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
- this.sizeCtl = DEFAULT_CAPACITY;
- putAll(m);
- }
- public ConcurrentHashMap(int initialCapacity, float loadFactor) {
- this(initialCapacity, loadFactor, 1);
- }
- public ConcurrentHashMap(int initialCapacity,
- float loadFactor, int concurrencyLevel) {
- if (!(loadFactor> 0.0f) || initialCapacity <0 || concurrencyLevel <= 0)
- throw new IllegalArgumentException();
- if (initialCapacity < concurrencyLevel) // Use at least as many bins
- initialCapacity = concurrencyLevel; // as estimated threads
- long size = (long)(1.0 + (long)initialCapacity / loadFactor);
- int cap = (size>= (long)MAXIMUM_CAPACITY) ?
- MAXIMUM_CAPACITY : tableSizeFor((int)size);
- this.sizeCtl = cap;
- }
初始化: initTable()
- private final Node<K,V>[] initTable() {
- Node<K,V>[] tab; int sc;
- while ((tab = table) == null || tab.length == 0) {
- // 初始化的 "功劳" 被其他线程 "抢去" 了
- if ((sc = sizeCtl) <0)
- Thread.yield(); // lost initialization race; just spin
- // CAS 一下, 将 sizeCtl 设置为 -1, 代表抢到了锁
- else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
- try {
- if ((tab = table) == null || tab.length == 0) {
- // DEFAULT_CAPACITY 默认初始容量是 16
- int n = (sc> 0) ? sc : DEFAULT_CAPACITY;
- // 初始化数组, 长度为 16 或初始化时提供的长度
- Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
- // 将这个数组赋值给 table,table 是 volatile 的
- table = tab = nt;
- // 如果 n 为 16 的话, 那么这里 sc = 12
- // 其实就是 0.75 * n
- sc = n - (n>>> 2);
- }
- } finally {
- // 设置 sizeCtl 为 sc, 我们就当是 12 吧
- sizeCtl = sc;
- }
- break;
- }
- }
- return tab;
- }
三, put()
- public V put(K key, V value) {
- return putVal(key, value, false);
- }
- final V putVal(K key, V value, boolean onlyIfAbsent) {
- if (key == null || value == null) throw new NullPointerException();
- // 得到 hash 值
- int hash = spread(key.hashCode());
- // 用于记录相应链表的长度
- int binCount = 0;
- for (Node<K,V>[] tab = table;;) {
- Node<K,V> f; int n, i, fh;
- // 如果数组 "空", 进行数组初始化
- if (tab == null || (n = tab.length) == 0)
- // 初始化数组, 后面会详细介绍
- tab = initTable();
- // 找该 hash 值对应的数组下标, 得到第一个节点 f
- else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
- // 如果数组该位置为空,
- // 用一次 CAS 操作将这个新值放入其中即可, 这个 put 操作差不多就结束了, 可以拉到最后面了
- // 如果 CAS 失败, 那就是有并发操作, 进到下一个循环就好了
- if (casTabAt(tab, i, null,
- new Node<K,V>(hash, key, value, null)))
- break; // no lock when adding to empty bin
- }
- // hash 居然可以等于 MOVED, 这个需要到后面才能看明白, 不过从名字上也能猜到, 肯定是因为在扩容
- else if ((fh = f.hash) == MOVED)
- // 帮助数据迁移, 这个等到看完数据迁移部分的介绍后, 再理解这个就很简单了
- tab = helpTransfer(tab, f);
- else { // 到这里就是说, f 是该位置的头结点, 而且不为空
- V oldVal = null;
- // 获取数组该位置的头结点的监视器锁
- synchronized (f) {
- if (tabAt(tab, i) == f) {
- if (fh>= 0) { // 头结点的 hash 值大于 0, 说明是链表
- // 用于累加, 记录链表的长度
- binCount = 1;
- // 遍历链表
- for (Node<K,V> e = f;; ++binCount) {
- K ek;
- // 如果发现了 "相等" 的 key, 判断是否要进行值覆盖, 然后也就可以 break 了
- if (e.hash == hash &&
- ((ek = e.key) == key ||
- (ek != null && key.equals(ek)))) {
- oldVal = e.val;
- if (!onlyIfAbsent)
- e.val = value;
- break;
- }
- // 到了链表的最末端, 将这个新值放到链表的最后面
- Node<K,V> pred = e;
- if ((e = e.next) == null) {
- pred.next = new Node<K,V>(hash, key,
- value, null);
- break;
- }
- }
- }
- else if (f instanceof TreeBin) { // 红黑树
- Node<K,V> p;
- binCount = 2;
- // 调用红黑树的插值方法插入新节点
- if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
- value)) != null) {
- oldVal = p.val;
- if (!onlyIfAbsent)
- p.val = value;
- }
- }
- }
- }
- if (binCount != 0) {
- // 判断是否要将链表转换为红黑树, 临界值和 HashMap 一样, 也是 8
- if (binCount>= TREEIFY_THRESHOLD)
- // 这个方法和 HashMap 中稍微有一点点不同, 那就是它不是一定会进行红黑树转换,
- // 如果当前数组的长度小于 64, 那么会选择进行数组扩容, 而不是转换为红黑树
- // 具体源码我们就不看了, 扩容部分后面说
- treeifyBin(tab, i);
- if (oldVal != null)
- return oldVal;
- break;
- }
- }
- }
- //
- addCount(1L, binCount);
- return null;
- }
按照上面的源码, 我们可以确定 put 整个流程如下:
判空; ConcurrentHashMap 的 key,value 都不允许为 null
计算 hash. 利用方法计算 hash 值.
遍历 table, 进行节点插入操作, 过程如下:
如果 table 为空, 则表示 ConcurrentHashMap 还没有初始化, 则进行初始化操作: initTable()
根据 hash 值获取节点的位置 i, 若该位置为空, 则直接插入, 这个过程是不需要加锁的. 计算 f 位置: i=(n - 1) & hash
如果检测到 fh = f.hash == -1, 则 f 是 ForwardingNode 节点, 表示有其他线程正在进行扩容操作, 则帮助线程一起进行扩容操作
如果 f.hash>= 0 表示是链表结构, 则遍历链表, 如果存在当前 key 节点则替换 value, 否则插入到链表尾部. 如果 f 是 TreeBin 类型节点, 则按照红黑树的方法更新或者增加节点
若链表长度 > TREEIFY_THRESHOLD(默认是 8), 则将链表转换为红黑树结构
调用 addCount 方法, ConcurrentHashMap 的 size + 1
这里整个 put 操作已经完成.
四, get()
- public V get(Object key) {
- Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
- // 计算 hash
- int h = spread(key.hashCode());
- if ((tab = table) != null && (n = tab.length)> 0 &&
- (e = tabAt(tab, (n - 1) & h)) != null) {
- // 搜索到的节点 key 与传入的 key 相同且不为 null, 直接返回这个节点
- if ((eh = e.hash) == h) {
- if ((ek = e.key) == key || (ek != null && key.equals(ek)))
- return e.val;
- }
- // 树
- else if (eh <0)
- return (p = e.find(h, key)) != null ? p.val : null;
- // 链表, 遍历
- while ((e = e.next) != null) {
- if (e.hash == h &&
- ((ek = e.key) == key || (ek != null && key.equals(ek))))
- return e.val;
- }
- }
- return null;
- }
get 操作:
计算 hash 值
判断 table 是否为空, 如果为空, 直接返回 null
根据 hash 值获取 table 中的 Node 节点 (tabAt(tab, (n - 1) & h)), 然后根据链表或者树形方式找到相对应的节点, 返回其 value 值.
五, 扩容
- // 首先要说明的是, 方法参数 size 传进来的时候就已经翻了倍了
- private final void tryPresize(int size) {
- // c:size 的 1.5 倍, 再加 1, 再往上取最近的 2 的 n 次方.
- int c = (size>= (MAXIMUM_CAPACITY>>> 1)) ? MAXIMUM_CAPACITY :
- tableSizeFor(size + (size>>> 1) + 1);
- int sc;
- while ((sc = sizeCtl)>= 0) {
- Node<K,V>[] tab = table; int n;
- // 这个 if 分支和之前说的初始化数组的代码基本上是一样的, 在这里, 我们可以不用管这块代码
- if (tab == null || (n = tab.length) == 0) {
- n = (sc> c) ? sc : c;
- if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
- try {
- if (table == tab) {
- @SuppressWarnings("unchecked")
- Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
- table = nt;
- sc = n - (n>>> 2); // 0.75 * n
- }
- } finally {
- sizeCtl = sc;
- }
- }
- }
- else if (c <= sc || n>= MAXIMUM_CAPACITY)
- break;
- else if (tab == table) {
- // 我没看懂 rs 的真正含义是什么, 不过也关系不大
- int rs = resizeStamp(n);
- if (sc <0) {
- Node<K,V>[] nt;
- if ((sc>>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
- sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
- transferIndex <= 0)
- break;
- // 2. 用 CAS 将 sizeCtl 加 1, 然后执行 transfer 方法
- // 此时 nextTab 不为 null
- if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
- transfer(tab, nt);
- }
- // 1. 将 sizeCtl 设置为 (rs <<RESIZE_STAMP_SHIFT) + 2)
- // 我是没看懂这个值真正的意义是什么? 不过可以计算出来的是, 结果是一个比较大的负数
- // 调用 transfer 方法, 此时 nextTab 参数为 null
- else if (U.compareAndSwapInt(this, SIZECTL, sc,
- (rs << RESIZE_STAMP_SHIFT) + 2))
- transfer(tab, null);
- }
- }
- }
这个方法的核心在于 sizeCtl 值的操作, 首先将其设置为一个负数, 然后执行 transfer(tab, null), 再下一个循环将 sizeCtl 加 1, 并执行 transfer(tab, nt), 之后可能是继续 sizeCtl 加 1, 并执行 transfer(tab, nt).
所以, 可能的操作就是执行 1 次 transfer(tab, null) + 多次 transfer(tab, nt), 这里怎么结束循环的需要看完 transfer 源码才清楚.
- private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
- int n = tab.length, stride;
- // stride 在单核下直接等于 n, 多核模式下为 (n>>>3)/NCPU, 最小值是 16
- // stride 可以理解为 "步长", 有 n 个位置是需要进行迁移的,
- // 将这 n 个任务分为多个任务包, 每个任务包有 stride 个任务
- if ((stride = (NCPU> 1) ? (n>>> 3) / NCPU : n) <MIN_TRANSFER_STRIDE)
- stride = MIN_TRANSFER_STRIDE; // subdivide range
- // 如果 nextTab 为 null, 先进行一次初始化
- // 前面我们说了, 外围会保证第一个发起迁移的线程调用此方法时, 参数 nextTab 为 null
- // 之后参与迁移的线程调用此方法时, nextTab 不会为 null
- if (nextTab == null) {
- try {
- // 容量翻倍
- Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n <<1];
- nextTab = nt;
- } catch (Throwable ex) { // try to cope with OOME
- sizeCtl = Integer.MAX_VALUE;
- return;
- }
- // nextTable 是 ConcurrentHashMap 中的属性
- nextTable = nextTab;
- // transferIndex 也是 ConcurrentHashMap 的属性, 用于控制迁移的位置
- transferIndex = n;
- }
- int nextn = nextTab.length;
- // ForwardingNode 翻译过来就是正在被迁移的 Node
- // 这个构造方法会生成一个 Node,key,value 和 next 都为 null, 关键是 hash 为 MOVED
- // 后面我们会看到, 原数组中位置 i 处的节点完成迁移工作后,
- // 就会将位置 i 处设置为这个 ForwardingNode, 用来告诉其他线程该位置已经处理过了
- // 所以它其实相当于是一个标志.
- ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
- // advance 指的是做完了一个位置的迁移工作, 可以准备做下一个位置的了
- boolean advance = true;
- boolean finishing = false; // to ensure sweep before committing nextTab
- /*
- * 下面这个 for 循环, 最难理解的在前面, 而要看懂它们, 应该先看懂后面的, 然后再倒回来看
- *
- */
- // i 是位置索引, bound 是边界, 注意是从后往前
- for (int i = 0, bound = 0;;) {
- Node<K,V> f; int fh;
- // 下面这个 while 真的是不好理解
- // advance 为 true 表示可以进行下一个位置的迁移了
- // 简单理解结局: i 指向了 transferIndex,bound 指向了 transferIndex-stride
- while (advance) {
- int nextIndex, nextBound;
- if (--i>= bound || finishing)
- advance = false;
- // 将 transferIndex 值赋给 nextIndex
- // 这里 transferIndex 一旦小于等于 0, 说明原数组的所有位置都有相应的线程去处理了
- else if ((nextIndex = transferIndex) <= 0) {
- i = -1;
- advance = false;
- }
- else if (U.compareAndSwapInt
- (this, TRANSFERINDEX, nextIndex,
- nextBound = (nextIndex> stride ?
- nextIndex - stride : 0))) {
- // 看括号中的代码, nextBound 是这次迁移任务的边界, 注意, 是从后往前
- bound = nextBound;
- i = nextIndex - 1;
- advance = false;
- }
- }
- if (i <0 || i>= n || i + n>= nextn) {
- int sc;
- if (finishing) {
- // 所有的迁移操作已经完成
- nextTable = null;
- // 将新的 nextTab 赋值给 table 属性, 完成迁移
- table = nextTab;
- // 重新计算 sizeCtl:n 是原数组长度, 所以 sizeCtl 得出的值将是新数组长度的 0.75 倍
- sizeCtl = (n <<1) - (n>>> 1);
- return;
- }
- // 之前我们说过, sizeCtl 在迁移前会设置为 (rs <<RESIZE_STAMP_SHIFT) + 2
- // 然后, 每有一个线程参与迁移就会将 sizeCtl 加 1,
- // 这里使用 CAS 操作对 sizeCtl 进行减 1, 代表做完了属于自己的任务
- if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
- // 任务结束, 方法退出
- if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
- return;
- // 到这里, 说明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,
- // 也就是说, 所有的迁移任务都做完了, 也就会进入到上面的 if(finishing){} 分支了
- finishing = advance = true;
- i = n; // recheck before commit
- }
- }
- // 如果位置 i 处是空的, 没有任何节点, 那么放入刚刚初始化的 ForwardingNode "空节点"
- else if ((f = tabAt(tab, i)) == null)
- advance = casTabAt(tab, i, null, fwd);
- // 该位置处是一个 ForwardingNode, 代表该位置已经迁移过了
- else if ((fh = f.hash) == MOVED)
- advance = true; // already processed
- else {
- // 对数组该位置处的结点加锁, 开始处理数组该位置处的迁移工作
- synchronized (f) {
- if (tabAt(tab, i) == f) {
- Node<K,V> ln, hn;
- // 头结点的 hash 大于 0, 说明是链表的 Node 节点
- if (fh>= 0) {
- // 下面这一块和 Java7 中的 ConcurrentHashMap 迁移是差不多的,
- // 需要将链表一分为二,
- // 找到原链表中的 lastRun, 然后 lastRun 及其之后的节点是一起进行迁移的
- // lastRun 之前的节点需要进行克隆, 然后分到两个链表中
- int runBit = fh & n;
- Node<K,V> lastRun = f;
- for (Node<K,V> p = f.next; p != null; p = p.next) {
- int b = p.hash & n;
- if (b != runBit) {
- runBit = b;
- lastRun = p;
- }
- }
- if (runBit == 0) {
- ln = lastRun;
- hn = null;
- }
- else {
- hn = lastRun;
- ln = null;
- }
- for (Node<K,V> p = f; p != lastRun; p = p.next) {
- int ph = p.hash; K pk = p.key; V pv = p.val;
- if ((ph & n) == 0)
- ln = new Node<K,V>(ph, pk, pv, ln);
- else
- hn = new Node<K,V>(ph, pk, pv, hn);
- }
- // 其中的一个链表放在新数组的位置 i
- setTabAt(nextTab, i, ln);
- // 另一个链表放在新数组的位置 i+n
- setTabAt(nextTab, i + n, hn);
- // 将原数组该位置处设置为 fwd, 代表该位置已经处理完毕,
- // 其他线程一旦看到该位置的 hash 值为 MOVED, 就不会进行迁移了
- setTabAt(tab, i, fwd);
- // advance 设置为 true, 代表该位置已经迁移完毕
- advance = true;
- }
- else if (f instanceof TreeBin) {
- // 红黑树的迁移
- TreeBin<K,V> t = (TreeBin<K,V>)f;
- TreeNode<K,V> lo = null, loTail = null;
- TreeNode<K,V> hi = null, hiTail = null;
- int lc = 0, hc = 0;
- for (Node<K,V> e = t.first; e != null; e = e.next) {
- int h = e.hash;
- TreeNode<K,V> p = new TreeNode<K,V>
- (h, e.key, e.val, null, null);
- if ((h & n) == 0) {
- if ((p.prev = loTail) == null)
- lo = p;
- else
- loTail.next = p;
- loTail = p;
- ++lc;
- }
- else {
- if ((p.prev = hiTail) == null)
- hi = p;
- else
- hiTail.next = p;
- hiTail = p;
- ++hc;
- }
- }
- // 如果一分为二后, 节点数少于 8, 那么将红黑树转换回链表
- ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
- (hc != 0) ? new TreeBin<K,V>(lo) : t;
- hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
- (lc != 0) ? new TreeBin<K,V>(hi) : t;
- // 将 ln 放置在新数组的位置 i
- setTabAt(nextTab, i, ln);
- // 将 hn 放置在新数组的位置 i+n
- setTabAt(nextTab, i + n, hn);
- // 将原数组该位置处设置为 fwd, 代表该位置已经处理完毕,
- // 其他线程一旦看到该位置的 hash 值为 MOVED, 就不会进行迁移了
- setTabAt(tab, i, fwd);
- // advance 设置为 true, 代表该位置已经迁移完毕
- advance = true;
- }
- }
- }
- }
- }
- }
来源: https://www.cnblogs.com/hexinwei1/p/10000779.html