多线程编程一直是老生常谈的问题, 在 Java 中, 随着 JDK 的逐渐发展, JDK 提供给我们的并发模型也越来越多, 本文摘取三例使用不同原理的模型, 分析其大致原理
COW 之 CopyOnWriteArrayList
cow 是 copy-on-write 的简写, 这种模型来源于 linux 系统 fork 命令, Java 中一种使用 cow 模型来实现的并发类是 CopyOnWriteArrayList 相比于 Vector, 它的读操作是无需加锁的:
- public E get(int index) {
- return (E) elements[index];
- }
之所以有如此神奇功效, 其采取的是空间换取时间的方法, 查看其 add 方法:
- public synchronized boolean add(E e) {
- Object[] newElements = new Object[elements.length + 1];
- System.arraycopy(elements, 0, newElements, 0, elements.length);
- newElements[elements.length] = e;
- elements = newElements;
- return true;
- }
我们注意到, CopyOnWriteArrayList 的 add 方法是需要加锁的, 但其内部并没有直接对 elements 数组做操作, 而是先 copy 一份当前的数据到一个新的数组, 然后对新的数组进行赋值操作这样做就让 get 操作从同步中解脱出来因为更改的数据并没有发生在 get 所需的数组中而是放生在新生成的副本中, 所以不需要同步但应该注意的是, 尽管如此, get 操作还是可能会读取到脏数据的
CopyOnWriteArrayList 的另一特点是允许多线程遍历, 且其它线程更改数据并不会导致遍历线程抛出
ConcurrentModificationException
异常, 来看下 iterator(),
- public Iterator<E> iterator() {
- Object[] snapshot = elements;
- return new CowIterator<E>(snapshot, 0, snapshot.length);
- }
这个 CowIterator 是 ListIterator 的子类, 这个 Iterator 的特点是它并不支持对数据的更改操作:
- public void add(E object) {
- throw new UnsupportedOperationException();
- }
- public void remove() {
- throw new UnsupportedOperationException();
- }
- public void set(E object) {
- throw new UnsupportedOperationException();
- }
这样做的原因也很容易理解, 我们可以简单地的认为 CowIterator 中的 snapshot 是不可变数组, 因为 list 中有数据更新都会生成新数组, 而不会改变 snapshot, 所以此时 Iterator 没办法再将更改的数据写回 list 了同理, list 数据有更新也不会反映在 CowIterator 中 CowIterator 只是保证其迭代过程不会发生异常
CAS 之 ConcurrentHashMap(JDK1.8)
CAS 是 Compare and Swap 的简写, 即比较与替换, CAS 造作将比较和替换封装为一组原子操作, 不会被外部打断这种原子操作的保证往往由处理器层面提供支持
在 Java 中有一个非常神奇的 Unsafe 类来对 CAS 提供语言层面的接口但类如其名, 此等神器如果使用不当, 会造成武功尽失的, 所以 Unsafe 不对外开放, 想使用的话需要通过反射等技巧这里不对其做展开介绍它的原因是因为它是 JDK1.8 中 ConcurrentHashMap 的实现基础
ConcurrentHashMap 与 HashMap 对数据的存储有着相似的地方, 都采用数组 + 链表 + 红黑树的方式基本逻辑是内部使用 Node 来保存 map 中的一项 key, value 结构, 对于 hash 不冲突的 key, 使用数组来保存 Node 数据, 而每一项 Node 都是一个链表, 用来保存 hash 冲突的 Node, 当链表的大小达到一定程度会转为红黑树, 这样会使在冲突数据较多时也会有比较好的查询效率
了解了 ConcurrentHashMap 的存储结构后, 我们来看下在这种结构下, ConcurrentHashMap 是如何实现高效的并发操作, 这得益于 ConcurrentHashMap 中的如下三个函数
- static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
- return (Node<K,V>)U.getObjectVolatile(tab, ((long)i <<ASHIFT) + ABASE);
- }
- static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
- Node<K,V> c, Node<K,V> v) {
- return U.compareAndSwapObject(tab, ((long)i <<ASHIFT) + ABASE, c, v);
- }
- static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
- U.putOrderedObject(tab, ((long)i <<ASHIFT) + ABASE, v);
- }
其中的 U 就是我们前文提到的 Unsafe 的一个实例, 这三个函数都通过 Unsafe 的几个方法保证了是原子性:
tabAt 作用是返回 tab 数组第 i 项 casTabAt 函数是对比 tab 第 i 项是否与 c 相等, 相等的话将其设置为 vsetTabAt 将 tab 的第 i 项设置为 v
有了这三个函数就可以保证 ConcurrentHashMap 的线程安全吗? 并不是的, ConcurrentHashMap 内部也使用比较多的 synchronized, 不过与 HashTable 这种对所有操作都使用 synchronized 不同, ConcurrentHashMap 只在特定的情况下使用 synchronized, 来较少锁的定的区域来看下 putVal 方法 (精简版):
- final V putVal(K key, V value, boolean onlyIfAbsent) {
- if (key == null || value == null) throw new NullPointerException();
- int hash = spread(key.hashCode());
- int binCount = 0;
- for (Node<K,V>[] tab = table;;) {
- Node<K,V> f; int n, i, fh;
- if (tab == null || (n = tab.length) == 0)
- tab = initTable();
- else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
- if (casTabAt(tab, i, null,
- new Node<K,V>(hash, key, value, null)))
- break; // no lock when adding to embin
- }
- else if ((fh = f.hash) == MOVED)
- tab = helpTransfer(tab, f);
- else {
- V oldVal = null;
- synchronized (f) {
- ....
- }
- }
- }
- addCount(1L, binCount);
- return null;
- }
整个 put 流程大致如下:
判断 key 与 value 是否为空, 为空抛异常计算 kek 的 hash 值, 然后进入死循环, 一般来讲, caw 算法与死循环是搭档判断 table 是否初始化, 未初始化进行初始化操作 Node 在 table 中的目标位置是否为空, 为空的话使用 caw 操作进行赋值, 当然, 这种赋值是有可能失败的, 所以前面的死循环发挥了重试的作用如果当前正在扩容, 则尝试协助其扩容, 死循环再次发挥了重试的作用, 有趣的是 ConcurrentHashMap 是可以多线程同时扩容的这里说协助的原因在于, 对于数组扩容, 一般分为两步: 1. 新建一个更大的数组; 2. 将原数组数据 copy 到新数组中对于第一步, ConcurrentHashMap 通过 CAW 来控制一个 int 变量保证新建数组这一步只会执行一次对于第二步, ConcurrentHashMap 采用 CAW + synchronized + 移动后标记 的方式来达到多线程扩容的目的感兴趣可以查看 transfer 函数最后的一个 else 分支, 黑科技的流程已尝试无效, 目标 Node 已经存在值, 只能锁住当前 Node 来进行 put 操作, 当然, 这里省略了很多代码, 包括链表转红黑树的操作等等
相比于 put,get 的代码更好理解一下:
- public V get(Object key) {
- Node <K,
- V> [] tab;
- Node <K,
- V> e,
- p;
- int n,
- eh;
- K ek;
- int h = spread(key.hashCode());
- if ((tab = table) != null && (n = tab.length)> 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
- if ((eh = e.hash) == h) {
- if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val;
- } else if (eh <0) return (p = e.find(h, key)) != null ? p.val: null;
- while ((e = e.next) != null) {
- if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val;
- }
- }
- return null;
- }
检查表是否为空获取 key 的 hash h, 获取 key 在 table 中对应的 Node e 判断 Node e 的第一项是否与预期的 Node 相等, 相等话, 则返回 e.val 如果 e.hash < 0, 说明 e 为红黑树, 调用 e 的 find 接口来进行查找走到这一步, e 为链表无疑, 且第一项不是需要查询的数据, 一直调用 next 来进行查找即可
读写分离之 LinkedBlockingQueue
还有一种实现线程安全的方式是通过将读写进行分离, 这种方式的一种实现是 LinkedBlockingQueueLinkedBlockingQueue 整体设计的也十分精巧, 它的全局变量分为三类:
final 型 Atomic 型普通变量
final 型变量由于声明后就不会被修改, 所以自然线程安全, Atomic 型内部采用了 cas 模型来保证线程安全对于普通型变量, LinkedBlockingQueue 中只包含 head 与 last 两个表示队列的头与尾并且私有, 外部无法更改, 所以, LinkedBlockingQueue 只需要保证 head 与 last 的安全即可保证真个队列的线程安全并且 LinkedBlockingQueue 属于 FIFO 型队列, 一般情况下, 读写会在不同元素上工作, 所以, LinkedBlockingQueue 定义了两个可重入锁, 巧妙的通过对 head 与 last 分别加锁, 实现读写分离, 来实现良好的安全并发特性:
- /** Lock held by take, poll, etc */
- private final ReentrantLock takeLock = new ReentrantLock();
- /** Wait queue for waiting takes */
- private final Condition notEmpty = takeLock.newCondition();
- /** Lock held by put, offer, etc */
- private final ReentrantLock putLock = new ReentrantLock();
- /** Wait queue for waiting puts */
- private final Condition notFull = putLock.newCondition();
首先看下它的 offer 方法:
- public boolean offer(E e) {
- if (e == null) throw new NullPointerException();
- final AtomicInteger count = this.count;
- if (count.get() == capacity)
- return false;
- int c = -1;
- Node<E> node = new Node<E>(e);
- final ReentrantLock putLock = this.putLock;
- putLock.lock();
- try {
- if (count.get() <capacity) {
- enqueue(node);
- c = count.getAndIncrement();
- if (c + 1 < capacity)
- notFull.signal();
- }
- } finally {
- putLock.unlock();
- }
- if (c == 0)
- signalNotEmpty();
- return c>= 0;
- }
可见, 在对队列进行添加元素时, 只需要对 putLock 进行加锁即可, 保证同一时刻只有一个线程可以对 last 进行插入同样的, 在从队列进行提取元素时, 也只需要获取 takeLock 锁来对 head 操作即可:
- public E poll() {
- final AtomicInteger count = this.count;
- if (count.get() == 0)
- return null;
- E x = null;
- int c = -1;
- final ReentrantLock takeLock = this.takeLock;
- takeLock.lock();
- try {
- if (count.get()> 0) {
- x = dequeue();
- c = count.getAndDecrement();
- if (c> 1)
- notEmpty.signal();
- }
- } finally {
- takeLock.unlock();
- }
- if (c == capacity)
- signalNotFull();
- return x;
- }
LinkedBlockingQueue 整体还是比较好理解的, 但有几个点需要特殊注意:
LinkedBlockingQueue 是一个阻塞队列, 当队列无元素为空时, 所有取元素的线程会通过 notEmpty 的 await() 方法进行等待, 直到再次有数据 enqueue 时, notEmpty 发出 signal 信号对于队列达到上限时也是同理对于 remove,contains,toArray, toString, clear 之类方法, 会调用 fullyLock 方法, 来同时获取读写锁但对于 size 方法, 由于队列内部维护了 AtomicInteger 类型的 count 变量, 是不需要加锁进行获取的
https://blog.saymagic.cn/2016/08/30/java-thread-safe-model-analyze.html#post__title
来源: http://www.jqhtml.com/13348.html