ConcurrentHashMap, 通过这个名字,可以知道 Concurrent 是并发的,HashMap 是我们常用的一种用来存放键值对的数据结构,所以 ConcurrentHashMap 就是一种用来解决高并发的 HashMap。这个是 JDK 在 1.5 之后提供的一种数据结构。我们知道 Java 中其实已经有了 HashTable 这个线程安全的 Map,但是他是通过对整个 table 使用 synchronized 来保证线程安全的,当有多个线程对 HashTable 进行读写的时候,每次只能有一个线程会获取到 HashTable 的对象锁,别的只能处于等待状态,所以在高并发的情况下,它的效率是比较低的。 当然 JDK 还提供了另外一种方式,那就是 Collections.synchronizedMap(hashMap),这个底层也是通过 synchronized 对要高并发的数据结构来实现线程安全的。 通过上面的分析可以知道,ConcurrentHashMap 为了解决 synchronized 在高并发时的低效问题,ConcurrentHashMap 之所以比 HashTable 高效,是因为他并不是对整个 HashMap 上锁,而是采用了分段锁,那么现在就从源码的角度来看看,分段锁是如何实现的。
通过继承关系,可以发现,ConcurrentHashMap 并没有继承 HashMap, 而是单独实现了 ConcurrentMap 这个接口,其余的跟 HashMap 是一样的
- //table的最大容量
- private static final int MAXIMUM_CAPACITY = 1 << 30;
- //table的默认容量16
- private static final int DEFAULT_CAPACITY = 16;
- //默认的并发度为16
- private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
- //负载因子
- private static final float LOAD_FACTOR = 0.75f;
- //链表转化为红黑树的阈值
- static final int TREEIFY_THRESHOLD = 8;
- //红黑树转链表的阈值
- static final int UNTREEIFY_THRESHOLD = 6;
- //红黑树的最小容量
- static final int MIN_TREEIFY_CAPACITY = 64;
- //存放Node的数组
- transient volatile Node < K,
- V > [] table;
- private transient volatile Node < K,
- V > [] nextTable;
- private transient volatile long baseCount;
- //sizeCtl作为table是否在初始化或者在resize的标志,如果为负数说明正在初始化或者扩容,需要table进行CRUD操作的线程只能等待,稍后进行重试,否则就可以直接进行CRUD操作
- //正数则表明table没有初始化
- private transient volatile int sizeCtl;
- private transient volatile int transferIndex;
- private transient volatile int cellsBusy;
- private transient volatile CounterCell[] counterCells;
- //该节点是否已经移动到新数组中去
- static final int MOVED = -1;
- //是否是TREEBIN节点
- static final int TREEBIN = -2;
除此之外,还有几个类需要解释一下
- static final class TreeNode<K,V> extends Node<K,V> {
- TreeNode<K,V> parent; //根节点
- TreeNode<K,V> left;//左节点
- TreeNode<K,V> right;//右节点
- TreeNode<K,V> prev; //上个节点的指针
- boolean red;//默认节点为黑
- TreeNode(int hash, K key, V val, Node<K,V> next,
- TreeNode<K,V> parent) {
- super(hash, key, val, next);
- this.parent = parent;
- }
- Node<K,V> find(int h, Object k) {
- return findTreeNode(h, k, null);
- }
- //查找相应的树节点
- final TreeNode<K,V> findTreeNode(int h, Object k, Class kc) {
- //此处省略一万行代码......
- }
跟一般的树节点都差不多,在前面分析二叉树的时候也差不多是这种数据结构,所以还是比较好理解的,注释说 Nodes for use in TreeBins,也就是说这个 TreeNode 是用来服务 TreeBin 的,那么继续看 Treebins
这个通过注释可以知道,是用来包装 TreeNode 的,然后作为红黑树的树节点
- static final class TreeBin < K,
- V > extends Node < K,
- V > {
- TreeNode < K,
- V > root;
- volatile TreeNode < K,
- V > first; //TreeNode的根节点
- volatile Thread waiter; //waiter线程
- volatile int lockState; //当前节点的锁状态
- // values for lockState
- static final int WRITER = 1; // 获得写数据的锁状态
- static final int WAITER = 2; // 等待写数据的锁状态
- static final int READER = 4; // 添加数据时的锁状态
- TreeBin(TreeNode < K, V > b) {
- super(TREEBIN, null, null, null);
- this.first = b; //头结点赋值
- TreeNode < K,
- V > r = null;
- //递归遍历传过来的Node,从根节点开始
- for (TreeNode < K, V > x = b, next; x != null; x = next) {
- next = (TreeNode < K, V > ) x.next; //拿到下一个节点
- x.left = x.right = null; //将左右节点置空,因为需要重新赋值
- //判断r节点是否为空
- if (r == null) {
- x.parent = null; //根节点置空
- x.red = false; //黑子树
- r = x; //x赋值为r节点
- } else {
- K k = x.key; //拿到key
- int h = x.hash; //计算hash值
- Class < ?>kc = null; //K对应的泛型实体类
- //从根节点r开始遍历
- for (TreeNode < K, V > p = r;;) {
- int dir,
- ph;
- K pk = p.key;
- if ((ph = p.hash) > h) dir = -1;
- else if (ph < h) dir = 1;
- else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) dir = tieBreakOrder(k, pk);
- TreeNode < K,
- V > xp = p;
- if ((p = (dir <= 0) ? p.left: p.right) == null) {
- x.parent = xp;
- if (dir <= 0) xp.left = x;
- else xp.right = x;
- r = balanceInsertion(r, x);
- break;
- }
- }
- }
- }
- this.root = r; //将r赋值给根节点
- assert checkInvariants(root);
- }
- }
- static final class ForwardingNode < K,
- V > extends Node < K,
- V > {
- final Node < K,
- V > [] nextTable;
- ForwardingNode(Node < K, V > [] tab) {
- super(MOVED, null, null, null);
- this.nextTable = tab;
- }
- Node < K,
- V > find(int h, Object k) {
- // loop to avoid arbitrarily deep recursion on forwarding nodes
- outer: for (Node < K, V > [] tab = nextTable;;) {
- Node < K,
- V > e;
- int n;
- if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null;
- for (;;) {
- int eh;
- K ek;
- if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e;
- if (eh < 0) {
- if (e instanceof ForwardingNode) {
- tab = ((ForwardingNode < K, V > ) e).nextTable;
- continue outer;
- } else return e.find(h, k);
- }
- if ((e = e.next) == null) return null;
- }
- }
- }
- }
一个用于连接两个 table 的节点类。它包含一个 nextTable 指针,用于指向下一张表。而且这个节点的 key value next 指针全部为 null,它的 hash 值为 - 1. 这里面定义的 find 的方法是从 nextTable 里进行查询节点,而不是以自身为头节点进行查找。
- //获得在i位置上的Node节点
- static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
- return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
- }
- static final < K,
- V > boolean casTabAt(Node < K, V > [] tab, int i, Node < K, V > c, Node < K, V > v) {
- return U.compareAndSwapObject(tab, ((long) i << ASHIFT) + ABASE, c, v);
- }
- static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
- U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
- }
利用 volatile 方法设置节点位置的值
之前分析过 HashMap 的源码,所以看这些还是比较轻松的,ConcurrentHashMap 采用的是懒加载,也就是存入第一对 KeyValue 的时候才会去初始化 table,所以前面四个构造方法只是做了一些参数配置,只有最后一个构造方法,才会去真正的初始化,下面重点看一下最后一个方法
- public ConcurrentHashMap(Map extends K, ? extends V> m) {
- this.sizeCtl = DEFAULT_CAPACITY;
- putAll(m);
- }
- public
- void
- putAll
- (Map extends K, ? extends V> m)
- {
- tryPresize(m.size());
- for (Map.Entry extends K, ? extends V> e : m.entrySet())
- putVal(e.getKey(), e.getValue(), false);
- }
最后调用了 putAll 方法,这个会在后面的 put 方法里面再重点讲解。
- private final void transfer(Node < K, V > [] tab, Node < K, V > [] nextTab) {
- int n = tab.length,
- stride;
- if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU: n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range
- if (nextTab == null) { // initiating
- try {@SuppressWarnings("unchecked")
- //创建一个容量是原来2倍的table数组
- Node < K,
- V > [] nt = (Node < K, V > []) new Node < ?,
- ?>[n << 1];
- nextTab = nt;
- } catch(Throwable ex) { // try to cope with OOME
- sizeCtl = Integer.MAX_VALUE;
- return;
- }
- nextTable = nextTab;
- transferIndex = n;
- }
- int nextn = nextTab.length;
- //创建一个ForwardingNode,指向新table,并且用于填充已经移动的Node位置
- ForwardingNode < K,
- V > fwd = new ForwardingNode < K,
- V > (nextTab);
- boolean advance = true;
- boolean finishing = false; // to ensure sweep before committing nextTab
- for (int i = 0, bound = 0;;) {
- Node < K,
- V > f;
- int fh;
- while (advance) {
- int nextIndex,
- nextBound;
- if (--i >= bound || finishing) advance = false;
- else if ((nextIndex = transferIndex) <= 0) {
- i = -1;
- advance = false;
- } else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride: 0))) {
- bound = nextBound;
- i = nextIndex - 1;
- advance = false;
- }
- }
- if (i < 0 || i >= n || i + n >= nextn) {
- int sc;
- if (finishing) {
- //如果所有的节点都已经完成复制工作
- //就把nextTable赋值给table 清空临时对象nextTable
- nextTable = null;
- table = nextTab;
- // 扩容至现在容量的0.75倍
- sizeCtl = (n << 1) - (n >>> 1);
- return;
- }
- if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
- if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return;
- finishing = advance = true;
- i = n; // recheck before commit
- }
- } else if ((f = tabAt(tab, i)) == null)
- //用ForwardingNode来填充Null节点
- advance = casTabAt(tab, i, null, fwd);
- else if ((fh = f.hash) == MOVED)
- //如果当前节点移动过,将次标志设置为true
- //这样后续线程访问便可以直接跳过此节点
- advance = true; // already processed
- else {
- synchronized(f) {
- if (tabAt(tab, i) == f) {
- Node < K,
- V > ln,
- hn;
- if (fh >= 0) {
- //普通Node节点
- int runBit = fh & n;
- Node < K,
- V > lastRun = f;
- for (Node < K, V > p = f.next; p != null; p = p.next) {
- int b = p.hash & n;
- if (b != runBit) {
- runBit = b;
- lastRun = p;
- }
- }
- if (runBit == 0) {
- ln = lastRun;
- hn = null;
- } else {
- hn = lastRun;
- ln = null;
- }
- for (Node < K, V > p = f; p != lastRun; p = p.next) {
- int ph = p.hash;
- K pk = p.key;
- V pv = p.val;
- if ((ph & n) == 0) ln = new Node < K,
- V > (ph, pk, pv, ln);
- else hn = new Node < K,
- V > (ph, pk, pv, hn);
- }
- setTabAt(nextTab, i, ln);
- setTabAt(nextTab, i + n, hn);
- setTabAt(tab, i, fwd);
- advance = true;
- } else if (f instanceof TreeBin) {
- TreeBin < K,
- V > t = (TreeBin < K, V > ) f;
- TreeNode < K,
- V > lo = null,
- loTail = null;
- TreeNode < K,
- V > hi = null,
- hiTail = null;
- int lc = 0,
- hc = 0;
- for (Node < K, V > e = t.first; e != null; e = e.next) {
- int h = e.hash;
- TreeNode < K,
- V > p = new TreeNode < K,
- V > (h, e.key, e.val, null, null);
- if ((h & n) == 0) {
- if ((p.prev = loTail) == null) lo = p;
- else loTail.next = p;
- loTail = p; ++lc;
- } else {
- if ((p.prev = hiTail) == null) hi = p;
- else hiTail.next = p;
- hiTail = p; ++hc;
- }
- }
- //如果扩容后已经不再需要tree的结构 反向转换为链表结构
- ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin < K,
- V > (lo) : t;
- hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin < K,
- V > (hi) : t;
- //在nextTable的i位置上插入一个链表
- setTabAt(nextTab, i, ln);
- //在nextTable的i+n的位置上插入另一个链表
- setTabAt(nextTab, i + n, hn);
- //在table的i位置上插入forwardNode节点 表示已经处理过该节点
- setTabAt(tab, i, fwd);
- //设置advance为true 返回到上面的while循环中 就可以执行i--操作
- advance = true;
- }
- }
- }
- }
- }
- }
- final Node < K,
- V > [] helpTransfer(Node < K, V > [] tab, Node < K, V > f) {
- Node < K,
- V > [] nextTab;
- int sc;
- if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode < K, V > ) f).nextTable) != null) {
- int rs = resizeStamp(tab.length);
- while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
- if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break;
- if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
- transfer(tab, nextTab);
- break;
- }
- }
- return nextTab;
- }
- return table;
- }
这是一个协助扩容的方法。这个方法被调用的时候,当前 ConcurrentHashMap 一定已经有了 nextTable 对象,首先拿到这个 nextTable 对象,调用上面讲到的 transfer 方法来进行扩容
首先调用了 put 方法
- public V put(K key, V value) {
- return putVal(key, value, false);
- }
紧接着调用了 putVal 方法,下面来分析下一年 putVal 方法
- final V putVal(K key, V value, boolean onlyIfAbsent) {
- //key和value都不能为空
- if (key == null || value == null) throw new NullPointerException();
- int hash = spread(key.hashCode()); //计算hash值
- int binCount = 0; //链表长度
- //开启死循环
- for (Node < K, V > [] tab = table;;) {
- Node < K,
- V > f;
- int n,
- i,
- fh;
- if (tab == null || (n = tab.length) == 0)
- //table为null或者table的长度为0,初始化table
- tab = initTable();
- //根据hash值计算table的索引
- else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
- //如果索引处为null,则直接在此处进行插入
- if (casTabAt(tab, i, null, new Node < K, V > (hash, key, value, null)))
- //通过CAS进行插入,插入成功之后跳出循环
- break;
- } else if ((fh = f.hash) == MOVED)
- //当遇到表ForwardNode时,需要对表进行整合
- tab = helpTransfer(tab, f);
- else {
- V oldVal = null;
- //采用同步代码块进行上锁
- synchronized(f) {
- if (tabAt(tab, i) == f) {
- //普通节点
- if (fh >= 0) {
- binCount = 1;
- //遍历所有节点
- for (Node < K, V > e = f;; ++binCount) {
- K ek;
- //如果hash值跟key值都相同,则进行值替换
- if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
- oldVal = e.val;
- if (!onlyIfAbsent) e.val = value;
- break;
- }
- Node < K,
- V > pred = e;
- if ((e = e.next) == null) {
- pred.next = new Node < K,
- V > (hash, key, value, null);
- break;
- }
- }
- }
- //树节点
- else if (f instanceof TreeBin) {
- Node < K,
- V > p;
- binCount = 2;
- if ((p = ((TreeBin < K, V > ) f).putTreeVal(hash, key, value)) != null) {
- oldVal = p.val;
- if (!onlyIfAbsent) p.val = value;
- }
- }
- }
- }
- if (binCount != 0) {
- //判断链表长度,如果链表长度大于阈值,进行树节点转换
- if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i);
- if (oldVal != null) return oldVal;
- break;
- }
- }
- }
- addCount(1L, binCount); //改变table中的元素的个数
- return null;
- }
首先,计算记录的 key 的 hashCode,然后计算 table 的 index 位置,然后获取该 index 的值,如果该位置还为 null,说明该位置上还没有记录,则通过调用 casTabAt 方法来讲该新的记录插入到 table 的 index 位置上去,否则,通过 synchronized 关键字对 table 的 index 位置加锁,需要注意的是,当前线程只会锁住 table 的 index 位置,其他位置上没有锁住,所以此时其他线程可以安全的获得其他的 table 位置来进行操作。这也就提高了 ConcurrentHashMap 的并发度。然后判断 table 的 index 位置上的第一个节点的 hashCode 值,这个节点要么是链表的头节点,要么是红黑树的根节点,如果 hashCode 值小于 0,那么就是一颗红黑树,至于为什么是这样,上文中已经提到,如果不小于 0,那么就还是一条链表,如果是一条链表,那么就寻找是否已经有记录的 key 和当前想要插入的记录是一致的,如果一致,那么这次 put 的效果就是 replace,否则,将该记录添加到链表中去。如果是一颗红黑树,那么就通过调用 putTreeVal 方法来进行插入操作。在插入操作完成之后,需要判断本次操作是否是更新操作,如果是更新操作,则不会造成 size 的变化,否则,如果本次 put 操作时一次添加操作,那么就需要进行更新 size 的操作,而 size 的更新涉及到并发环境,所以较为复杂,并且 table 的扩容操作也会在更新 size 的时候发生,如果在更新 size 之后发现 table 中的记录数量达到了阈值,就需要进行扩容操作,这也是较为复杂的一步。
- public V get(Object key) {
- Node < K,
- V > [] tab;
- Node < K,
- V > e,
- p;
- int n,
- eh;
- K ek;
- int h = spread(key.hashCode()); //计算hash值
- //根据hash值定位元素
- if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
- if ((eh = e.hash) == h) {
- //如果搜索到的节点key与传入的key相同且不为null,直接返回这个节点
- if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val;
- } else if (eh < 0)
- //树节点
- return (p = e.find(h, key)) != null ? p.val: null;
- //通过寻址法进行寻找
- while ((e = e.next) != null) {
- if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val;
- }
- }
- return null;
- }
首先,计算出记录的 key 的 hashCode,然后通过使用 (hashCode & (length - 1)) 的计算方法来获得该记录在 table 中的 index,然后判断该位置上是否为 null,如果为 null,则返回 null,否则,如果该位置上的第一个元素(链表头节点或者红黑树的根节点)与我们先要查找的记录匹配,则直接返回这个节点的值,否则,如果该节点的 hashCode 小于 0,则说明该位置上是一颗红黑树,至于为什么 hashCode 值小于 0 就代表是一颗红黑树而不是链表了,这就要看下面的代码了:
- static final int TREEBIN = -2; // hash for roots of trees
- TreeBin(TreeNode<K,V> b) {
- super(TREEBIN, null, null, null);
- ......
- }
而 TREEBIN 的值为 - 2,也就是小于 0 成立,根据他的说明,TREEBIN 想要代表的是一颗红黑树的根节点,所以在判断到 table 的某个位置上的第一个节点的 hashCode 值小于 0 的时候,就可以判断为该位置上是一棵红黑树,继续回到 get 方法,如果是红黑树,则通过调用 Node 的 find 方法来查找到节点,而这个 Node 的 find 方法在子类中被重写,所以会直接调用子类的 find 方法来进行查找。还有一种情况是 table 的 index 位置上为一条链表,那么就通过链表的查找方法来进行记录查找。最后需要注意的是,ConcurrentHashMap 是一种线程安全的 HashMap,但是我们没有发现在 get 方法的过程中使用任何与锁等效的组件来做线程同步,为什么呢?对于读来说,允许多个线程一起读是很正常的,而且在 Node 的实现上,ConcurrentHashMap 做了一些手脚:
- static class Node<K,V> implements Map.Entry<K,V> {
- final int hash;
- final K key;
- volatile V val;
- volatile Node<K,V> next;
- ....
- }
- /**
- * The array of bins. Lazily initialized upon first insertion.
- * Size is always a power of two. Accessed directly by iterators.
- */
- transient volatile Node<K,V>[] table;
我们发现 table 数组是被 volatile 关键字修饰的,这就代表我们不需要担心 table 数组的线程可见性问题,也就没有必要再加锁来实现并发了。
删除操作属于写类型的操作,所以在进行删除的时候需要对 table 中的 index 位置加锁,ConcurrentHashMap 使用 synchronized 关键字将 table 中的 index 位置锁住,然后进行删除,remove 方法调用了 replaceNode 方法来进行实际的操作,而删除操作的步骤首先依然是计算记录的 hashCode,然后根据 hashCode 来计算 table 中的 index 值,然后根据 table 中的 index 位置上是一条链表还是一棵红黑树来使用不同的方法来删除这个记录,删除记录的操作需要进行记录数量的更新(调用 addCount 方法进行)。
www.importnew.com/22007.html www.jianshu.com/p/f6730d578… blog.csdn.net/mine_song/a…
来源: https://juejin.im/post/5a314d986fb9a0450167f631