在 Android 开发的漫漫长途上的一点感想和记录, 如果能给各位看官带来一丝启发或者帮助, 那真是极好的.
前言
上一篇博文中, 主要说了些线程以及锁的东西, 我们大多数的并发开发需求, 基本上可以用 synchronized 或者 volatile 解决, 虽然 synchronized 已经被 JDK 优化了, 但有的时候我们还是觉得 synchronized 太重了,
比如说一个电影院卖票, 这个票数是一定的而且共享的, 我想尽快的卖票并且知道还有多少余票. 在程序员看来这就是个票数自减以及获取最新票数的操作.
- private static Long sCount = 10000L;
- final Object obj = new Object();
- // 这里开了 1000 个线程对 sCount 并发操作
- for (int i = 0; i <1000; i++) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- synchronized (obj) {
- // 这里加锁保证同步, 使用 synchronized 总觉得没有
- // 必要, 毕竟就是自减操作, 如果不使用 synchronized 又有什么办法呢?
- sCount--;
- }
- }
- }).start();
- }
- Thread.sleep(5000);
- System.out.println(sCount);
再有, 我们平常使用的容器类 List 以及 Map, 如 ArrayList,HashMap 这些容器是非线程安全的, 那我们如果需要支持并发的容器, 我们该怎么办呢?? 读者莫急, 这正是本篇分享的内容.
原子类
我们先来解决第一个问题, JDK1.5 之后为我们提供了一系列的原子操作类, 位于 java.util.concurrent.atomic 包下.
原子操作基本类型类
AtomicBoolean: 原子更新布尔类型.
AtomicInteger: 原子更新整型.
AtomicLong: 原子更新长整型.
以上 3 个类提供的方法几乎一模一样, 所以本篇仅以 AtomicInteger 为例进行讲解,
AtomicInteger 的常用方法如下.
int addAndGet(int delta): 以原子方式将输入的数值与实例中的值(AtomicInteger 里的
value)相加, 并返回结果.
boolean compareAndSet(int expect,int update): 如果当前值 (调用该函数的值) 等于预期值 (expect), 则以原子方式将当前值(调用该函数的值) 设置为更新的值(update).
int getAndIncrement(): 以原子方式将当前值加 1, 返回旧值.
int incrementAndGet()以原子方式将当前值加 1, 返回新值.
int getAndSet(int newValue): 以原子方式设置为 newValue 的值, 并返回旧值.
那按照上面的知识重新对上面的卖票问题编程如下
- private static AtomicLong sAtomicLong = new AtomicLong(10000L);
- // 这里开了 1000 个线程对 sCount 并发操作
- for (int i = 0; i < 1000; i++) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- sAtomicLong.decrementAndGet();
- }
- }).start();
- }
- Thread.sleep(5000);
- System.out.println(sAtomicLong);
上面的是原子更新基本类型, 那对于对象呢, JDK 也提供了原子更新对象引用的原子类
原子更新引用类型
AtomicReference: 原子更新引用类型.
AtomicReferenceFieldUpdater: 原子更新引用类型里的字段.
AtomicMarkableReference: 原子更新带有标记位的引用类型. 可以原子更新一个布尔类
型的标记位和引用类型. 构造方法是 AtomicMarkableReference(V initialRef,boolean
initialMark).
以上几个类提供的方法几乎一样, 所以本节仅以 AtomicReference 为例进行讲解
boolean compareAndSet(V expect, V update): 如果当前对象 (调用该函数的对象) 等于预期对象 (expect), 则以原子方式将当前对象(调用该函数的对象) 设置为更新的对象(update).
V get(): 获取找对象
void set(V newValue): 设置对象
V getAndSet(V newValue): 以原子方式将当前对象 (调用该函数的对象) 设置为指定的对象(newValue), 并返回原来的对象(设置之前)
那这个东西用在哪里呢, 我在著名的 Rxjava 源码中看到了原子更新对象的用法.
CachedThreadScheduler.java
- // 原子引用 AtomicReference
- AtomicReference<CachedWorkerPool> pool;
- static final CachedWorkerPool NONE;
- static {
- NONE = new CachedWorkerPool(0, null);
- NONE.shutdown();
- }
- public CachedThreadScheduler() {
- this.pool = new AtomicReference<CachedWorkerPool>(NONE);
- start();
- }
- @Override
- public void start() {
- CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT);
- // 调用 AtomicReference 的 compareAndSet 方法
- if (!pool.compareAndSet(NONE, update)) {
- update.shutdown();
- }
- }
在创建线程调度器的时候把初始的工作线程池更新为新的工作线程池
AtomicReferenceFieldUpdater 以原子方式更新一个对象的属性值
AtomicMarkableReference 是带有标记的原子更新引用的类, 可以有效解决 ABA 问题, 什么是 ABA 问题,
我们就以上面的代码为例
假设 pool.compareAndSet 调用之前, pool 内的对象 NONE 被更新成了 update, 然后又更新成了 NONE, 那么在调用 pool.compareAndSet 的时候还是会把 pool 内的对象更新为 update, 也就是说 AtomicReference 不关心对象的中间历程, 这对于一些以当前对象是否被更改过为判断条件的特殊情境, AtomicReference 就不适用了.
所以 JDK 提供了 AtomicMarkableReference
那除了上面的原子更新引用类型之外, JDK 还为我们提供了原子更新数组
原子更新数组
通过原子的方式更新数组里的某个元素, Atomic 包提供了以下 4 个类.
AtomicIntegerArray: 原子更新整型数组里的元素.
AtomicLongArray: 原子更新长整型数组里的元素.
AtomicReferenceArray: 原子更新引用类型数组里的元素.
AtomicIntegerArray 类主要是提供原子的方式更新数组里的整型, 其常用方法如下.
int addAndGet(int i,int delta): 以原子方式将输入值与数组中索引 i 的元素相加.
boolean compareAndSet(int i,int expect,int update): 如果当前值等于预期值, 则以原子
方式将数组位置 i 的元素设置成 update 值.
以上几个类提供的方法几乎一样, 所以本节仅以 AtomicIntegerArray 为例进行讲解
- public class AtomicIntegerArrayTest {
- static int[] value = new int[]{2, 3};
- static AtomicIntegerArray ai = new AtomicIntegerArray(value);
- public static void main(String[] args) {
- ai.getAndSet(0, 4);
- System.out.println(ai.get(0));
- System.out.println(value[0]);
- }
- }
以下是输出的结果.
4
2
更快的原子操作基本类 LongAdder DouleAdder
JDK1.8 为我们提供了更快的原子操作基本类 LongAdder DouleAdder,
LongAdder 的 doc 部分说明如下
- This class is usually preferable to {
- @link AtomicLong
- } when
- multiple threads update a common sum that is used for purposes such
- as collecting statistics, not for fine-grained synchronization
- control. Under low update contention, the two classes have similar
- characteristics. But under high contention, expected throughput of
- this class is significantly higher, at the expense of higher space
- consumption
上面那段话翻译过来就是
当我们的场景是为了统计计数, 而不是为了更细粒度的同步控制时, 并且是在多线程更新的场景时, LongAdder 类比 AtomicLong 更好用. 在小并发的环境下, 论更新的效率, 两者都差不多. 但是高并发的场景下, LongAdder 有着明显更高的吞吐量, 但是有着更高的空间复杂度.
从 LongAdder 的 doc 文档上我们就可以知道 LongAdder 更适用于统计求和场景, 而不是细粒度的同步控制.
并发容器
我们在开发中遇到比较简单的并发操作像自增自减, 求和之类的问题, 上一节原子类已经能比较好的解决了, 但对于本篇文章来说只是开胃小菜, 下面正菜来喽
ConcurrentLinkedQueue(并发的队列)
ConcurrentLinkedQueue 是一个基于链表的无界线程安全队列, 它采用先进先出的规则对节点进行排序, 我们添加一个元素的时候, 它会添加到队列的尾部; 当我们获取一个元素时, 它会返回队列头部的元素.
我们先来看一下 ConcurrentLinkedQueue 的类图
ConcurrentLinkedQueue 由 head 节点和 tail 节点组成, 每个节点 (Node) 由节点元素 (item) 和指向下一个节点 (next) 的引用组成, 节点与节点之间就是通过这个 next 关联起来, 从而组成一张链表结构的队列. 默认情况下 head 节点存储的元素为空, tail 节点等于 head 节点
以下源码来自 JDK1.8
- public ConcurrentLinkedQueue() {
- // 默认情况下 head 节点存储的元素为空, tail 节点等于 head 节点, 哨兵节点
- head = tail = new Node<E>(null);
- }
- private static class Node<E> {
- volatile E item;
- volatile Node<E> next;
- Node(E item) {
- // 设置 item 值
- // 这种的设置方式类似于 C++ 的指针, 直接操作内存地址,
- // 例如此行代码, 就是以 CAS 的方式把值 (item) 赋值给当前对象即 Node 地址偏移 itemOffset 后的地址
- // 下面出现的 casItem 以及 casNext 也是同理
- UNSAFE.putObject(this, itemOffset, item);
- }
- boolean casItem(E cmp, E val) {
- return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
- }
- void lazySetNext(Node<E> val) {
- UNSAFE.putOrderedObject(this, nextOffset, val);
- }
- boolean casNext(Node<E> cmp, Node<E> val) {
- return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
- }
- private static final sun.misc.Unsafe UNSAFE;
- private static final long itemOffset;
- private static final long nextOffset;
- static {
- try {
- UNSAFE = sun.misc.Unsafe.getUnsafe();
- Class<?> k = Node.class;
- itemOffset = UNSAFE.objectFieldOffset
- (k.getDeclaredField("item"));
- nextOffset = UNSAFE.objectFieldOffset
- (k.getDeclaredField("next"));
- } catch (Exception e) {
- throw new Error(e);
- }
- }
- }
看完了初始化, 我们来看一下这个线程安全队列的进队和出队方法
offer(E e) 以及 poll() 方法
- offer(E e)
- public boolean offer(E e) {
- checkNotNull(e);// 检查, 为空直接异常
- // 创建新节点, 并将 e 作为节点的 item
- final Node<E> newNode = new Node<E>(e);
- // 这里操作比较多, 将尾节点 tail 赋给变量 t,p
- for (Node<E> t = tail, p = t;;) {
- // 并获取 q 也就是 tail 的下一个节点
- Node<E> q = p.next;
- // 如果下一个节点是 null, 说明 tail 是处于尾节点上
- if (q == null) {
- // 然后用 cas 将下一个节点设置成为新节点
- // 这里用 cas 操作, 如果多线程的情况, 总会有一个先执行成功, 失败的线程继续执行循环.
- // <1>
- if (p.casNext(null, newNode)) {
- // 如果 p.casNext 有个线程成功了, p=newNode
- // 比较 t (tail) 是不是 最后一个节点
- if (p != t)
- // 如果不等, 就利用 cas 将, 尾节点移到最后
- // 如果失败了, 那么说明有其他线程已经把 tail 移动过, 也是 OK 的
- casTail(t, newNode);
- return true;
- }
- // 如果 < 1 > 失败了, 说明肯定有个线程成功了,
- // 这时候失败的线程, 又会执行 for 循环, 再次设值, 直到成功.
- }
- else if (p == q)
- // 有可能刚好插入一个, 然后 P 就被删除了, 那么 p==q
- // 这时候在头结点需要从新定位.
- p = (t != (t = tail)) ? t : head;
- else
- // 这里是为了当 P 不是尾节点的时候, 将 P 移到尾节点, 方便下一次插入
- // 也就是一直保持向前推进
- p = (p != t && t != (t = tail)) ? t : q;
- }
- }
- private boolean casTail(Node<E> cmp, Node<E> val) {
- return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
- }
从上述代码可知入队列过程可归纳为 3 步
定位出尾节点;
使用 CAS 算法将入队节点设置成尾节点的 next 节点, 如不成功则重试
更新尾节点
1. 定位尾节点
tail 节点并不总是尾节点, 所以每次入队都必须先通过 tail 节点来找到尾节点.
尾节点可能是 tail 节点, 也可能是 tail 节点的 next 节点.
2. 设置入队节点为尾节点
p.casNext(null, newNode)方法用于将入队节点设置为当前队列尾节点的 next 节点, 如果 p 是 null,
表示 p 是当前队列的尾节点, 如果不为 null, 表示有其他线程更新了尾节点, 则需要重新获取当前队列的尾节点
3. 更新尾节点
casTail(t, newNode);
将尾节点移到最后(即把 tail 指向新节点)
如果失败了, 那么说明有其他线程已经把 tail 移动过, 此时新节点 newNode 为尾节点, tail 为其前驱结点
- poll()
- public E poll() {
- // 设置起始点
- restartFromHead:
- for (;;) {
- for (Node<E> h = head, p = h, q;;) {
- E item = p.item;
- // 利用 cas 将第一个节点设置为 null
- if (item != null && p.casItem(item, null)) {
- // 和上面类似, p 的 next 被删了,
- // 然后然后判断一下, 目的为了保证 head 的 next 不为空
- if (p != h) // hop two nodes at a time
- updateHead(h, ((q = p.next) != null) ? q : p);
- return item;
- }
- else if ((q = p.next) == null) {
- // 有可能已经被另外线程先删除了下一个节点
- // 那么需要先设定 head 的位置, 并返回 null
- updateHead(h, p);
- return null;
- }
- else if (p == q)
- continue restartFromHead;
- else
- // 和 offer 类似, 保证下一个节点有值, 才能删除
- p = q;
- }
- }
- }
- ConcurrentHashMap(并发的 HashMap)
JDK1.7 与 JDK1.8 ConcurrentHashMap 的实现还是有不小的区别的
JDK1.7
在 JDK1.7 版本中, ConcurrentHashMap 的数据结构是由一个 Segment 数组和多个 HashEntry 组成.
Segment 数组的意义就是将一个大的 table 分割成多个小的 table 来进行加锁, 也就是上面的提到的锁分离技术, 而每一个 Segment 元素存储的是 HashEntry 数组 + 链表, 这个和 HashMap 的数据存储结构一样.
JDK1.8
1.8 中放弃了 Segment 臃肿的设计, 取而代之的是采用 Node + CAS + Synchronized 来保证并发安全进行实现, 结构如下:
put 实现
- public V put(K key, V value) {
- return putVal(key, value, false);
- }
- /** Implementation for put and putIfAbsent */
- final V putVal(K key, V value, boolean onlyIfAbsent) {
- if (key == null || value == null) throw new NullPointerException();
- int hash = spread(key.hashCode());
- int binCount = 0;
- for (Node<K,V>[] tab = table;;) {
- Node<K,V> f; int n, i, fh;
- if (tab == null || (n = tab.length) == 0)
- //1 只有在执行第一次 put 方法时才会调用 initTable()初始化 Node 数组
- tab = initTable();
- //2 如果相应位置的 Node 还未初始化, 则通过 CAS 插入相应的数据;
- else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
- if (casTabAt(tab, i, null,
- new Node<K,V>(hash, key, value, null)))
- break; // no lock when adding to empty bin
- }
- //3 如果相应位置的 Node 不为空, 且当前该节点处于移动状态 帮助转移数据
- else if ((fh = f.hash) == MOVED)
- tab = helpTransfer(tab, f);
- //4 如果相应位置的 Node 不为空, 且当前该节点不处于移动状态, 则对该节点加 synchronized 锁,
- else {
- V oldVal = null;
- synchronized (f) {
- if (tabAt(tab, i) == f) {
- //5 如果该节点的 hash 不小于 0, 则遍历链表更新节点或插入新节点;
- if (fh>= 0) {
- binCount = 1;
- for (Node<K,V> e = f;; ++binCount) {
- K ek;
- if (e.hash == hash &&
- ((ek = e.key) == key ||
- (ek != null && key.equals(ek)))) {
- oldVal = e.val;
- if (!onlyIfAbsent)
- e.val = value;
- break;
- }
- Node<K,V> pred = e;
- if ((e = e.next) == null) {
- pred.next = new Node<K,V>(hash, key,
- value, null);
- break;
- }
- }
- }
- //6 如果该节点是 TreeBin 类型的节点, 说明是红黑树结构, 则通过 putTreeVal 方法往红黑树中插入节点;
- else if (f instanceof TreeBin) {
- Node<K,V> p;
- binCount = 2;
- if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
- value)) != null) {
- oldVal = p.val;
- if (!onlyIfAbsent)
- p.val = value;
- }
- }
- }
- }
- /**
- * 如果 binCount 不为 0, 说明 put 操作对数据产生了影响, 如果当前链表的个数达到 8 个,
- * 通过 treeifyBin 方法转化为红黑树,
- * 如果 oldVal 不为空, 说明是一次更新操作, 没有对元素个数产生影响, 则直接返回旧值;
- */
- if (binCount != 0) {
- if (binCount>= TREEIFY_THRESHOLD)
- treeifyBin(tab, i);
- if (oldVal != null)
- return oldVal;
- break;
- }
- }
- }
- addCount(1L, binCount);
- return null;
- }
- CopyOnWriteArrayList(线程安全的 ArrayList)
JDK1.8 中关于 CopyOnWriteArrayList 的官方介绍如下
- A thread-safe variant of {
- @link java.util.ArrayList
- } in which all mutative
- operations ({
- @code add
- }, {
- @code set
- }, and so on)
are implemented bymaking a fresh copy of the underlying array.
中文翻译大致是
CopyOnWriteArrayList 是一个线程安全的 java.util.ArrayList 的变体,
add,set 等改变 CopyOnWriteArrayList 的操作是通过制作当前数据的副本实现的
其实意思很简单, 假设有一个数组如下所示
并发读取
多个线程并发读取是没有任何问题的
更新数组
我们来看 add 源码
- public boolean add(E e) {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- Object[] elements = getArray();
- int len = elements.length;
- Object[] newElements = Arrays.copyOf(elements, len + 1);
- newElements[len] = e;
- setArray(newElements);
- return true;
- } finally {
- lock.unlock();
- }
- }
有了前面的积淀, 这段代码可以说没有任何难度
获取重入锁(线程互斥锁)
创一个新的数组 (在原有数据长度的基础上加 1) 并把原数组的数据拷贝到新数组
把新数组的引用设置为老数组
注 写入过程中, 若有其他线程读取数据, 那么读取的依然是老数组的数据
使用场景
由上面的结构以及源码分析就知道 CopyOnWriteArrayList 用在读多写少的多线程环境中.
本篇总结
本篇分享了一些原子操作类以及并发容器, 这些在多线程开发中都很有作用. 希望帮到你.
下篇预告
Android 并发工具类与线程池
参考博文
此致, 敬礼
来源: https://segmentfault.com/a/1190000017563053