HashMap 的 put,get,size 等方法都不是线程安全的, 而 HashTable 虽然保证了线程安全, 但却是用了效率极低的方法, 在 put,get,size 等方法上加上了 synchronized, 这就导致所有的并发进程都要竞争同一把锁, 一个线程在进行同步操作时, 其他线程都需要等待.
为了保证集合的线程安全性, Jdk 提供了同步包装器, 比如说 Collections.synchronizedMap, 不过它们都是利用粗粒度的同步方式, 高并发情况下性能较差.
可以看看 Collections.synchronizedMap 的实现.
与并发安全有关的代码都使用了 synchronized 关键字进行了修饰, 使用的锁 mutex 可以在构造方法中看到就是 this.
虽然说不在是用 synchronized 来修饰方法, 但是还是使用了 this 做 mutex 锁, 治标不治本.
更加普遍的选择是利用并发包提供的线程安全容器类, 它提供了各种并发容器, 比如 ConcurrentHashMap,CopyOnWriteArrayList.
各种线程安全队列, 如 ArrayBlockingQueue,SynchronousQueue.
ConcurrentHashMap 分析
ConcurrentHashMap 和 HashMap 一样, 在 Java8 时结构上发生了很大变化.
之前的 ConcurrentHashMap 的实现是基于分离锁, 也就是内部进行分段, 每一个段中都拥有 HashEntry 数组, hashcode 相同的 key 也是按照链表的方式存储的.
HashEntry 内部使用 volatile 修饰的 value 字段来保证可见性, 也利用了不可变对象的机制以改进利用 Unsafe 提供的底层能力, 比如 volatile access, 去直接完成部分操作, 以最优化性能, 毕竟 Unsafe 中的很多操作都是 JVM intrinsic 优化过的.
Segment 的初始大小由 DEFAULT_CONCURRENCY_LEVEL 来确定, 默认是 16, 在构造方法中可以自定义大小, 不过必须是 2 的幂, 如果输入 7, 会自动创建 8.
对于 get 方法来说, 只需要保证可见性, 无需其他同步操作.
关键是 put 方法, 首先是通过二次哈希避免哈希冲突, 然后以 Unsafe 调用方式, 直接获取相应的 Segment, 然后进行线程安全的 put 操作.
- public V put(K key, V value) {
- Segment<K,V> s;
- if (value == null)
- throw new NullPointerException();
- // 二次哈希, 以保证数据的分散性, 避免哈希冲突
- int hash = hash(key.hashCode());
- int j = (hash>>> segmentShift) & segmentMask;
- if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
- (segments, (j <<SSHIFT) + SBASE)) == null) // in ensureSegment
- s = ensureSegment(j);
- return s.put(key, hash, value, false);
- }
put 的核心实现如下.
- fnal V put(K key, int hash, V value, boolean onlyIfAbsent) {
- // scanAndLockForPut 会去查找是否有 key 相同 Node
- // 无论如何, 确保获取锁
- HashEntry<K,V> node = tryLock() ? null :
- scanAndLockForPut(key, hash, value);
- V oldValue;
- try {
- HashEntry<K,V>[] tab = table;
- int index = (tab.length - 1) & hash;
- HashEntry<K,V> frs = entryAt(tab, index);
- for (HashEntry<K,V> e = frs;;) {
- if (e != null) {
- K k;
- // 更新已有 value...
- } else {
- // 放置 HashEntry 到特定位置, 如果超过阈值, 进行 rehash
- // ...
- }
- }
- } fnally {
- unlock();
- }
- return oldValue;
- }
首先也是通过 Key 的 Hash 定位到具体的 Segment, 在 put 之前会进行一次扩容校验. 这里比 HashMap 要好的一点是: HashMap 是插入元素之后再看是否需要扩容, 有可能扩容之后后续就没有插入就浪费了本次扩容(扩容非常消耗性能). 而 ConcurrentHashMap 不一样, 它是在将数据插入之前检查是否需要扩容, 之后再做插入操作.
ConcurrentHashMap 会获取再入锁, 以保证数据一致性, Segment 本身就是基于 ReentrantLock 的扩展实现, 所以, 在并发修改期间, 相应 Segment 是被锁定的.
在最初阶段, 进行重复性的扫描, 以确定相应 key 值是否已经在数组里面, 进而决定是更新还是放置操作, 你可以在代码里看到相应的注释. 重复扫描, 检测冲突
是 ConcurrentHashMap 的常见技巧.
另外一个 Map 的 size 方法同样需要关注, 它的实现涉及分离锁的一个副作用.
试想, 如果不进行同步, 简单的计算所有 Segment 的总值, 可能会因为并发 put, 导致结果不准确, 但是直接锁定所有 Segment 进行计算, 就会变得非常昂贵.
所以, ConcurrentHashMap 的实现是通过重试机制(RETRIES_BEFORE_LOCK, 指定重试次数 2), 来试图获得可靠值. 如果没有监控到发生变化(通过对
比 Segment.modCount), 就直接返回, 否则获取锁进行操作.
每个 Segment 都有一个 volatile 修饰的全局变量 count, 求整个 ConcurrentHashMap 的 size 时很明显就是将所有的 count 累加即可. 但是 volatile 修饰的变量却不能保证多线程的原子性, 所有直接累加很容易出现并发问题.
通过尝试两次将 count 累加, 如果容器的 count 发生了变化再加锁来统计 size, 可以有效的避免加锁的问题.
以上是 JDK1.7 及之前的 ConcurrentHashMap 的结构, 在 1.8 之后, ConcurrentHashMap 结构发生了很大变化, 由之前的分段锁变为了 CAS+synchronized.
我在网上找到了一幅结构图.
总体结构上, 它的内部存储和 HashMap 结构非常相似, 同样是大的桶 (bucket) 数组, 然后内部也是一个个所谓的链表结构(bin), 同步的粒度要更细致一些.
其内部仍然有 Segment 定义, 但仅仅是为了保证序列化时的兼容性而已, 不再有任何结构上的用处.
因为不再使用 Segment, 初始化操作大大简化, 修改为 lazy-load 形式, 这样可以有效避免初始开销, 解决了老版本很多人抱怨的这一点.
将 1.7 中存放数据的 HashEntry 改为 Node, 但作用都是相同的. 其中的 val next 都用了 volatile 修饰, 保证了可见性.
- satic class Node<K,V> implements Map.Entry<K,V> {
- fnal int hash;
- fnal K key;
- volatile V val;
- volatile Node<K,V> next;
- // ...
- }
在 Node 中, key 被定义为了 final, 因为照常理来说, key 是不会改变了, 而是 value 会发生改变, 因此在 val 中添加了 volatile 来修饰.
来看看它的 put 操作.
- fnal V putVal(K key, V value, boolean onlyIfAbsent) {
- if (key == null || value == null) throw new NullPointerException();
- int hash = spread(key.hashCode());
- int binCount = 0;
- for (Node<K,V>[] tab = table;;) {
- Node<K,V> f; int n, i, fh; K fk; V fv;
- if (tab == null || (n = tab.length) == 0)
- tab = initTable();
- else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
- // 利用 CAS 去进行无锁线程安全操作, 如果 bin 是空的
- if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
- break;
- }
- else if ((fh = f.hash) == MOVED)
- tab = helpTransfer(tab, f);
- else if (onlyIfAbsent // 不加锁, 进行检查
- && fh == hash
- && ((fk = f.key) == key || (fk != null && key.equals(fk)))
- && (fv = f.val) != null)
- return fv;
- else {
- V oldVal = null;
- synchronized (f) {
- // 细粒度的同步修改操作...
- }
- }
- // Bin 超过阈值, 进行树化
- if (binCount != 0) {
- if (binCount>= TREEIFY_THRESHOLD)
- treeifyBin(tab, i);
- if (oldVal != null)
- return oldVal;
- break;
- }
- }
- }
- addCount(1L, binCount);
- return null;
- }
具体流程如下:
根据 key 计算出 hashcode .
判断是否需要进行初始化 initTable.
f 即为当前 key 定位出的 Node, 如果为空表示当前位置可以写入数据, 利用 CAS 尝试写入, 失败则自旋保证成功.
如果当前位置的 hashcode == MOVED == -1, 则需要进行扩容.
如果都不满足, 则利用 synchronized 锁写入数据.
如果数量大于 TREEIFY_THRESHOLD 则要转换为红黑树.
可以发现 1.8 以后的锁的颗粒度, 是加在链表头上的, 这是一个很重要的突破.
1.8 中不依赖与 segment 加锁, segment 数量与桶数量一致.
首先判断容器是否为空, 为空则进行初始化利用 volatile 的 sizeCtl 作为互斥手段, 如果发现竞争性的初始化, 就暂停在那里, 等待条件恢复, 否则利用 CAS 设置排他标志(U.compareAndSwapInt(this, SIZECTL, sc, -1)), 否则重试
对 key hash 计算得到该 key 存放的桶位置, 判断该桶是否为空, 为空则利用 CAS 设置新节点, 否则使用 synchronize 加锁, 遍历桶中数据, 替换或新增加点到桶中. 最后判断是否需要转为红黑树, 转换之前判断是否需要扩容.
最后来看看 1.8 ConcurrentHashMap 的 size 方法实现.
在 sumCount 方法中使用到了 CounterCell, 对于 CounterCell 的操作, 是基于 java.util.concurrent.atomic.LongAdder 进行的, 是一种 JVM 利用空间换取更高效率的方法, 利用了 Striped64 内部的复杂逻辑.
- // 一种用于分配计数的填充单元. 改编自 LongAdder 和 Striped64. 请查看他们的内部文档进行解释.
- @sun.misc.Contended
- static final class CounterCell {
- volatile long value;
- CounterCell(long x) { value = x; }
- }
来源: https://www.cnblogs.com/LexMoon/p/concurrenthashmap.html