众所周知,HashMap 是非线程安全的。而 HashMap 的线程不安全主要体现在 resize 时的死循环及使用迭代器时的 fast-fail 上。
注:本章的代码均基于 JDK 1.7.0_67
常用的底层数据结构主要有数组和链表。数组存储区间连续,占用内存较多,寻址容易,插入和删除困难。链表存储区间离散,占用内存较少,寻址困难,插入和删除容易。
HashMap 要实现的是哈希表的效果,尽量实现 O(1) 级别的增删改查。它的具体实现则是同时使用了数组和链表,可以认为最外层是一个数组,数组的每个元素是一个链表的表头。
对于新插入的数据或者待读取的数据,HashMap 将 Key 的哈希值对数组长度取模,结果作为该 Entry 在数组中的 index。在计算机中,取模的代价远高于位操作的代价,因此 HashMap 要求数组的长度必须为 2 的 N 次方。此时将 Key 的哈希值对 2^N-1 进行与运算,其效果即与取模等效。HashMap 并不要求用户在指定 HashMap 容量时必须传入一个 2 的 N 次方的整数,而是会通过 Integer.highestOneBit 算出比指定整数小的最大的 2^N 值,其实现方法如下。
- public static int highestOneBit(int i) {
- i |= (i >> 1);
- i |= (i >> 2);
- i |= (i >> 4);
- i |= (i >> 8);
- i |= (i >> 16);
- return i - (i >>> 1);
- }
由于 Key 的哈希值的分布直接决定了所有数据在哈希表上的分布或者说决定了哈希冲突的可能性,因此为防止糟糕的 Key 的 hashCode 实现(例如低位都相同,只有高位不相同,与 2^N-1 取与后的结果都相同),JDK 1.7 的 HashMap 通过如下方法使得最终的哈希值的二进制形式中的 1 尽量均匀分布从而尽可能减少哈希冲突。
- int h = hashSeed;
- h ^= k.hashCode();
- h ^= (h >>> 20) ^ (h >>> 12);
- return h ^ (h >>> 7) ^ (h >>> 4);
当 HashMap 的 size 超过 Capacity*loadFactor 时,需要对 HashMap 进行扩容。具体方法是,创建一个新的,长度为原来 Capacity 两倍的数组,保证新的 Capacity 仍为 2 的 N 次方,从而保证上述寻址方式仍适用。同时需要通过如下 transfer 方法将原来的所有数据全部重新插入(rehash)到新的数组中。
- void transfer(Entry[] newTable, boolean rehash) {
- int newCapacity = newTable.length;
- for (Entry < K, V > e: table) {
- while (null != e) {
- Entry < K,
- V > next = e.next;
- if (rehash) {
- e.hash = null == e.key ? 0 : hash(e.key);
- }
- int i = indexFor(e.hash, newCapacity);
- e.next = newTable[i];
- newTable[i] = e;
- e = next;
- }
- }
- }
该方法并不保证线程安全,而且在多线程并发调用时,可能出现死循环。其执行过程如下。从步骤 2 可见,转移时链表顺序反转。
单线程情况下,rehash 无问题。下图演示了单线程条件下的 rehash 过程
这里假设有两个线程同时执行了 put 操作并引发了 rehash,执行了 transfer 方法,并假设线程一进入 transfer 方法并执行完 next = e.next 后,因为线程调度所分配时间片用完而 "暂停",此时线程二完成了 transfer 方法的执行。此时状态如下。
接着线程 1 被唤醒,继续执行第一轮循环的剩余部分
- e.next = newTable[1] = null
- newTable[1] = e = key(5)
- e = next = key(9)
结果如下图所示
接着执行下一轮循环,结果状态图如下所示
继续下一轮循环,结果状态图如下所示
此时循环链表形成,并且 key(11) 无法加入到线程 1 的新数组。在下一次访问该链表时会出现死循环。
在使用迭代器的过程中如果 HashMap 被修改,那么
将被抛出,也即 Fast-fail 策略。
- ConcurrentModificationException
当 HashMap 的 iterator() 方法被调用时,会构造并返回一个新的 EntryIterator 对象,并将 EntryIterator 的 expectedModCount 设置为 HashMap 的 modCount(该变量记录了 HashMap 被修改的次数)。
- HashIterator() {
- expectedModCount = modCount;
- if (size > 0) { // advance to first entry
- Entry[] t = table;
- while (index < t.length && (next = t[index++]) == null)
- ;
- }
- }
在通过该 Iterator 的 next 方法访问下一个 Entry 时,它会先检查自己的 expectedModCount 与 HashMap 的 modCount 是否相等,如果不相等,说明 HashMap 被修改,直接抛出
。该 Iterator 的 remove 方法也会做类似的检查。该异常的抛出意在提醒用户及早意识到线程安全问题。
- ConcurrentModificationException
单线程条件下,为避免出现
,需要保证只通过 HashMap 本身或者只通过 Iterator 去修改数据,不能在 Iterator 使用结束之前使用 HashMap 本身的方法修改数据。因为通过 Iterator 删除数据时,HashMap 的 modCount 和 Iterator 的 expectedModCount 都会自增,不影响二者的相等性。如果是增加数据,只能通过 HashMap 本身的方法完成,此时如果要继续遍历数据,需要重新调用 iterator() 方法从而重新构造出一个新的 Iterator,使得新 Iterator 的 expectedModCount 与更新后的 HashMap 的 modCount 相等。
- ConcurrentModificationException
多线程条件下,可使用
方法构造出一个同步 Map,或者直接使用线程安全的 ConcurrentHashMap。
- Collections.synchronizedMap
注:本章的代码均基于 JDK 1.7.0_67
Java 7 中的 ConcurrentHashMap 的底层数据结构仍然是数组和链表。与 HashMap 不同的是,ConcurrentHashMap 最外层不是一个大的数组,而是一个 Segment 的数组。每个 Segment 包含一个与 HashMap 数据结构差不多的链表数组。整体数据结构如下图所示。
在读写某个 Key 时,先取该 Key 的哈希值。并将哈希值的高 N 位对 Segment 个数取模从而得到该 Key 应该属于哪个 Segment,接着如同操作 HashMap 一样操作这个 Segment。为了保证不同的值均匀分布到不同的 Segment,需要通过如下方法计算哈希值。
- private int hash(Object k) {
- int h = hashSeed;
- if ((0 != h) && (k instanceof String)) {
- return sun.misc.Hashing.stringHash32((String) k);
- }
- h ^= k.hashCode();
- h += (h << 15) ^ 0xffffcd7d;
- h ^= (h >>> 10);
- h += (h << 3);
- h ^= (h >>> 6);
- h += (h << 2) + (h << 14);
- return h ^ (h >>> 16);
- }
同样为了提高取模运算效率,通过如下计算,ssize 即为大于 concurrencyLevel 的最小的 2 的 N 次方,同时 segmentMask 为 2^N-1。这一点跟上文中计算数组长度的方法一致。对于某一个 Key 的哈希值,只需要向右移 segmentShift 位以取高 sshift 位,再与 segmentMask 取与操作即可得到它在 Segment 数组上的索引。
- int sshift = 0;
- int ssize = 1;
- while (ssize < concurrencyLevel) {
- ++sshift;
- ssize <<= 1;
- }
- this.segmentShift = 32 - sshift;
- this.segmentMask = ssize - 1;
- Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
Segment 继承自 ReentrantLock,所以我们可以很方便的对每一个 Segment 上锁。
对于读操作,获取 Key 所在的 Segment 时,需要保证可见性 (请参考如何保证多线程条件下的可见性)。具体实现上可以使用 volatile 关键字,也可使用锁。但使用锁开销太大,而使用 volatile 时每次写操作都会让所有 CPU 内缓存无效,也有一定开销。ConcurrentHashMap 使用如下方法保证可见性,取得最新的 Segment。
- Segment < K,
- V > s = (Segment < K, V > ) UNSAFE.getObjectVolatile(segments, u)
获取 Segment 中的 HashEntry 时也使用了类似方法
- HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
- (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE)
对于写操作,并不要求同时获取所有 Segment 的锁,因为那样相当于锁住了整个 Map。它会先获取该 Key-Value 对所在的 Segment 的锁,获取成功后就可以像操作一个普通的 HashMap 一样操作该 Segment,并保证该 Segment 的安全性。
同时由于其它 Segment 的锁并未被获取,因此理论上可支持 concurrencyLevel(等于 Segment 的个数)个线程安全的并发读写。
获取锁时,并不直接使用 lock 来获取,因为该方法获取锁失败时会挂起(参考可重入锁)。事实上,它使用了自旋锁,如果 tryLock 获取锁失败,说明锁被其它线程占用,此时通过循环再次以 tryLock 的方式申请锁。如果在循环过程中该 Key 所对应的链表头被修改,则重置 retry 次数。如果 retry 次数超过一定值,则使用 lock 方法申请锁。
这里使用自旋锁是因为自旋锁的效率比较高,但是它消耗 CPU 资源比较多,因此在自旋次数超过阈值时切换为互斥锁。
put、remove 和 get 操作只需要关心一个 Segment,而 size 操作需要遍历所有的 Segment 才能算出整个 Map 的大小。一个简单的方案是,先锁住所有 Sgment,计算完后再解锁。但这样做,在做 size 操作时,不仅无法对 Map 进行写操作,同时也无法进行读操作,不利于对 Map 的并行操作。
为更好支持并发操作,ConcurrentHashMap 会在不上锁的前提逐个 Segment 计算 3 次 size,如果某相邻两次计算获取的所有 Segment 的更新次数(每个 Segment 都与 HashMap 一样通过 modCount 跟踪自己的修改次数,Segment 每修改一次其 modCount 加一)相等,说明这两次计算过程中无更新操作,则这两次计算出的总 size 相等,可直接作为最终结果返回。如果这三次计算过程中 Map 有更新,则对所有 Segment 加锁重新计算 Size。该计算方法代码如下
- public int size() {
- final Segment<K,V>[] segments = this.segments;
- int size;
- boolean overflow; // true if size overflows 32 bits
- long sum; // sum of modCounts
- long last = 0L; // previous sum
- int retries = -1; // first iteration isn't retry
- try {
- for (;;) {
- if (retries++ == RETRIES_BEFORE_LOCK) {
- for (int j = 0; j < segments.length; ++j)
- ensureSegment(j).lock(); // force creation
- }
- sum = 0L;
- size = 0;
- overflow = false;
- for (int j = 0; j < segments.length; ++j) {
- Segment<K,V> seg = segmentAt(segments, j);
- if (seg != null) {
- sum += seg.modCount;
- int c = seg.count;
- if (c < 0 || (size += c) < 0)
- overflow = true;
- }
- }
- if (sum == last)
- break;
- last = sum;
- }
- } finally {
- if (retries > RETRIES_BEFORE_LOCK) {
- for (int j = 0; j < segments.length; ++j)
- segmentAt(segments, j).unlock();
- }
- }
- return overflow ? Integer.MAX_VALUE : size;
- }
ConcurrentHashMap 与 HashMap 相比,有以下不同点
注:本章的代码均基于 JDK 1.8.0_111
Java 7 为实现并行访问,引入了 Segment 这一结构,实现了分段锁,理论上最大并发度与 Segment 个数相等。Java 8 为进一步提高并发性,摒弃了分段锁的方案,而是直接使用一个大的数组。同时为了提高哈希碰撞下的寻址性能,Java 8 在链表长度超过一定阈值(8)时将链表(寻址时间复杂度为 O(N))转换为红黑树(寻址时间复杂度为 O(long(N)))。其数据结构如下图所示
Java 8 的 ConcurrentHashMap 同样是通过 Key 的哈希值与数组长度取模确定该 Key 在数组中的索引。同样为了避免不太好的 Key 的 hashCode 设计,它通过如下方法计算得到 Key 的最终哈希值。不同的是,Java 8 的 ConcurrentHashMap 作者认为引入红黑树后,即使哈希冲突比较严重,寻址效率也足够高,所以作者并未在哈希值的计算上做过多设计,只是将 Key 的 hashCode 值与其高 16 位作异或并保证最高位为 0(从而保证最终结果为正整数)。
- static final int spread(int h) {
- return (h ^ (h >>> 16)) & HASH_BITS;
- }
对于 put 操作,如果 Key 对应的数组元素为 null,则通过 CAS 操作将其设置为当前值。如果 Key 对应的数组元素(也即链表表头或者树的根元素)不为 null,则对该元素使用 synchronized 关键字申请锁,然后进行操作。如果该 put 操作使得当前链表长度超过一定阈值,则将该链表转换为树,从而提高寻址效率。
对于读操作,由于数组被 volatile 关键字修饰,因此不用担心数组的可见性问题。同时每个元素是一个 Node 实例(Java 7 中每个元素是一个 HashEntry),它的 Key 值和 hash 值都由 final 修饰,不可变更,无须关心它们被修改后的可见性问题。而其 Value 及对下一个元素的引用由 volatile 修饰,可见性也有保障。
- static class Node < K,
- V > implements Map.Entry < K,
- V > {
- final int hash;
- final K key;
- volatile V val;
- volatile Node < K,
- V > next;
- }
对于 Key 对应的数组元素的可见性,由 Unsafe 的 getObjectVolatile 方法保证。
- static final < K,
- V > Node < K,
- V > tabAt(Node < K, V > [] tab, int i) {
- return (Node < K, V > ) U.getObjectVolatile(tab, ((long) i << ASHIFT) + ABASE);
- }
put 方法和 remove 方法都会通过 addCount 方法维护 Map 的 size。size 方法通过 sumCount 获取由 addCount 方法维护的 Map 的 size。
来源: http://www.cnblogs.com/jasongj/p/7109401.html