在上文已经说明, 委托是构造线程安全类的一个最有效策略, 也就是让现有的线程安全类管理所有的状态即可. 以下将介绍这些基础构建模块.
同步容器类
同步容器类包括 Vector 和 Hashtable 以及由 Collections.synchronizedXxx 等工厂方法创建的同步封装器类. 这些类实现线程安全的方式是: 将它们的状态封装起来, 并对每个公有方法都进行同步, 使得每次只有一个线程能访问容器的状态. 同步容器对所有容器状态的访问都串行化, 严重降低了并发性; 当多个线程竞争锁时, 吞吐量严重下降.
同步容器类存在的问题
同步容器类都是线程安全的, 但是在某些情况下可能需要额外的客户端加锁来保护复合操作.
比如, 在 Vecotr 中, getLast()和 deleteLast()操作, 如果是在多线程的环境下运行, 如果不加锁, 会产生异常情况. 一个线程在 getLast()后, 另一个线程 deleteLast(), 然后该线程继续执行, 进行 deleteLast()操作, 此时会抛出下标越界的异常.
又比如, 在迭代的过程中, 使用 get(index)的操作, 如果有多个线程运行, 可能会删除其中元素, 同样会造成异常.
对于如上的情况, 我们需要通过客户端加锁来解决线程安全的问题. 如在迭代时加锁:
- synchronized(vector){
- for(int i=0;i<vector.size();i++){
- vector.get(i);
- }
- }
迭代器
在迭代或者 for-each 循环语法时, 对容器类进行迭代的标准方式都是使用 Iterator. 然而, 在设计同步容器类的迭代器时并没有考虑到并发修改的问题, 并且它们表现出的行为时 "及时失败" 的, 也就是当它们发现容器在迭代过程中被修改时, 就会抛出 ConcurrentModificationException.
如果在迭代期间, 对容器加锁, 首先会降低效率, 提高线程的等待时间; 然后还可能会产生死锁; 降低了吞吐量和 CPU 的利用率.
如果不希望在迭代期间加锁, 可以使用克隆容器的方法, 并在克隆副本上进行迭代.
加锁可以防止迭代器抛出 ConcurrentModificationException, 但是要在所有对容器进行迭代的地方都要加锁. 如 hashCode,equals,containsAll,removeAll,retainAll 等方法, 在以容器为参数时, 都会对容器进行迭代. 这些间接的迭代操作可能抛出 ConcurrentModificationException.
并发容器
Java 5.0 提供了多种并发容器类来改进同步容器的性能. 同步容器对所有容器状态的访问都串行化, 严重降低了并发性; 当多个线程竞争锁时, 吞吐量严重下降.
并发容器是针对多个线程并发访问设计的. 通过并发容器来替代同步容器, 可以极大地提高伸缩性并降低风险. 并发容器包括 ConcurrentHashMap(替代 Map),CopyOnWriteArrayList(替代 List),ConcurrentLinkedQueue,BlockingQueue 等等.
ConcurrentHashMap
同步容器类在执行每个操作期间都持有一个锁. ConcurrentHashMap 采用了不同的加锁策略来提供更高的并发性和伸缩性. 它并不是将每个方法都在同一个锁上同步, 而是使用一种粒度更细的加锁机制来实现更大程度的共享, 这种机制称为分段锁.
分段锁机制使得任意数量的读取线程可以并发访问 Map, 执行读取操作的线程和执行写入操作的线程可以并发访问 Map, 并且一定数量的写入线程可以并发地修改 Map, 因此提高了并发访问的吞吐量.
并发容器增强了同步容器类, 它们提供的迭代器不会抛出 ConcurrentModificationException, 因此不需要在迭代过程中对容器加锁. 其迭代器具有弱一致性, 可以容忍并发的修改, 在创建迭代器时会遍历已有元素, 并可以 (但是不保证) 在迭代器被构造后将修改操作反映给容器. size(),isEmpty()等方法返回的是一个近似值.
由于 ConcurrentHashMap 与 Hashtable 和 synchronizedMap 有更多的优势, 因此大多数情况应该使用并发容器类, 至于当需要对整个容器加锁进行独占访问时, 才应该放弃使用并发容器.
注意, 此时不能再通过客户端加锁新建新的原子操作了, 客户端只能对并发容器自身加锁, 但并发容器内部使用的并不是自身锁.
CopyOnWriteArrayList
写入时复制容器, 在每次修改时都会加锁并创建和重新发布一个新的容器副本, 直接修改容器引用, 从而实现可见性. 写操作在一个复制的数组上进行, 读操作还是在原始数组中进行, 读写分离, 互不影响. 写操作需要加锁, 防止并发写入时导致写入数据丢失. 写操作结束之后需要把原始数组指向新的复制数组.
CopyOnWriteArrayList 在写操作的同时允许读操作, 大大提高了读操作的性能, 因此很适合读多写少的应用场景. 但是 CopyOnWriteArrayList 有其缺陷:
内存占用: 在写操作时需要复制一个新的数组, 使得内存占用为原来的两倍左右;
数据不一致: 读操作不能读取实时性的数据, 因为部分写操作的数据还未同步到读数组中.
阻塞队列
阻塞队列支持生产者 - 消费者模式. 简化了开发过程, 消除了生产者和消费者之间的代码依赖性. 阻塞队列简化了生产者 - 消费者设计的实现过程. 一种常见的生产者 - 消费者设计模式就是线程池与工作队列的组合.
阻塞队列提供了四种处理方法:
抛出异常, 使用 add(e)插入, remove()删除, element()查询. 当阻塞队列满时, 插入元素; 当队列空, 删除元素都会抛出异常.
返回特殊值, 使用 offer(e)插入, poll()删除, peek()查询. 插入时, 如果成功返回 true, 移除时, 如果没有对应的元素返回 null.
阻塞, 使用 put(e)插入, take()删除. 队列满, 插入元素时会阻塞; 队列空, 取元素会阻塞.
超时退出: 使用 offer(e,time,unit)插入, poll(time,unit)删除. 当队列满时, 会阻塞, 超过一定的时间, 线程会退出.
阻塞队列有多种实现.
ArrayBlokcingQueue 和 LinkedBlockingQueue 分别是数组和链表结构组成的有界的 FIFO 阻塞队列.
PriorityBlockingQueue 是一个支持优先级排序的无界阻塞队列.
SynchronousQueue 是一个不存储元素的阻塞队列, 它不会为队列中元素维护存储空间.
LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列.
LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列.
双端队列与工作密取
Java 6 提供了 Dqueue 和 BlockingDeque, 是双端队列, 实现了在队列头和队列尾的高效插入和移除. 双端队列适用于工作密取模式. 在工作密取中, 每个消费者都有各自的双端队列. 如果一个消费者完成了自己的双端队列的全部工作, 可以从其他消费者双端队列末尾秘密的获取工作. 因为工作者线程不会再单个共享的任务队列上发生竞争. 适用于既是生产者又是消费者问题.
阻塞方法与中断方法
线程会阻塞或暂停执行. 被阻塞的线程必须等待某个不受它控制的事件发生后才能继续执行. 当在代码中调用一个可以抛出 InterruptedException 的方法时, 自己的方法就编程了阻塞方法, 必须处理中断的响应. 如果这个方法被中断, 那么它将努力提前结束状态.
处理中断的响应有两种基本选择:
传递 InterruptedException, 把该异常抛出给方法的调用者.
恢复中断, 捕获异常, 并调用当前线程的 interrupt 方法恢复中断, 引发更高层的代码中断.
- public void run(){
- try{
- something();
- }catch(InterruptedException e){
- Thread.currentThread().interrupt();
- }
- }
同步工具类
同步工具类可以是任何一个对象, 只要它根据其自身的状态来协调线程的控制流. 包括阻塞队列, 信号量, 栅栏以及闭锁.
闭锁
闭锁用来确保某些活动直到其他活动都完成了才继续执行. 如果有多个线程, 其中一个线程需要等到其他所有线程活动结束后才继续执行, 使用闭锁.
CountDownLatch 是一种闭锁的实现, 可以使得一个或者多个线程等待一组事情发生. 包括一个计数器, 表示需要等待的事件数量; countDown 方法用来递减计数器, 表示有一个事件已经发生了; await 方法等待计数器为 0, 表示所有需要等待的事情已经发生.
- // 初始化闭锁, 并设置资源个数
- CountDownLatch latch = new CountDownLatch(2);
- Thread t1 = new Thread( new Runnable(){
- public void run(){
- // 加载资源 1
加载资源的代码......
- // 本资源加载完后, 闭锁 - 1
- latch.countDown();
- }
- } ).start();
- Thread t2 = new Thread( new Runnable(){
- public void run(){
- // 加载资源 2
资源加载代码......
- // 本资源加载完后, 闭锁 - 1
- latch.countDown();
- }
- } ).start();
- Thread t3 = new Thread( new Runnable(){
- public void run(){
- // 本线程必须等待所有资源加载完后才能执行
- latch.await();
- // 当闭锁数量为 0 时, await 返回, 执行接下来的任务
任务代码......
- }
- } ).start();
栅栏(同步屏障)
闭锁是一次性对象, 一旦进入终止状态, 就不能被重置. 栅栏类似于闭锁, 能阻塞一组进程直到某个时间发生. 栅栏与闭锁的区别在于, 所有线程必须同时到达栅栏位置, 才能继续执行.
若有多条线程, 他们到达屏障时将会被阻塞, 只有当所有线程都到达屏障时才能打开屏障, 所有线程同时执行, 若有这样的需求可以使用同步屏障. 此外, 当屏障打开的同时还能指定执行的任务.
闭锁只会阻塞一条线程, 目的是为了让该条任务线程满足条件后执行; 而同步屏障会阻塞所有线程, 目的是为了让所有线程同时执行(实际上并不会同时执行, 而是尽量把线程启动的时间间隔降为最少).
- // 创建同步屏障对象, 并制定需要等待的线程个数 和 打开屏障时需要执行的任务
- CyclicBarrier barrier = new CyclicBarrier(3,new Runnable(){
- public void run(){
- // 当所有线程准备完毕后触发此任务
- }
- });
- // 启动三条线程
- for( int i=0; i<3; i++ ){
- new Thread( new Runnable(){
- public void run(){
- // 等待,(每执行一次 barrier.await, 同步屏障数量 - 1, 直到为 0 时, 打开屏障)
- barrier.await();
- // 任务
任务代码......
- }
- } ).start();
- }
信号量
信号量用于控制同时访问某个特定资源的操作数量, 或者执行某个指定操作的数量. 计数信号量还可以用来实现某种资源池, 或者对容器施加边界.
信号量可以用于实现资源池, 也可以用于将容器变为有界阻塞容器. 信号量管理着一组虚拟的许可, 在执行操作时首先获取许可, 并在使用以后释放许可. 如果没有许可, 将阻塞直到有许可或被中断, 超时.
信号量的使用场景是, 有 m 个资源, n 个线程, 且 n>m, 同一时刻只能允许 m 条线程访问资源.
- // 创建信号量对象, 并给予 3 个资源
- Semaphore semaphore = new Semaphore(3);
- // 开启 10 条线程
- for ( int i=0; i<10; i++ ) {
- new Thread( new Runnbale(){
- public void run(){
- // 获取资源, 若此时资源被用光, 则阻塞, 直到有线程归还资源
- semaphore.acquire();
- // 任务代码
- ......
- // 释放资源
- semaphore.release();
- }
- } ).start();
- }
FutureTask
可以用作闭锁, 是一种可以生成结果的 Runnable, 可以处于以下三种状态: 等待运行, 正在运行和运行完成. 当 FutureTask 进入完成状态后, 它会停止在这个状态上.
FutureTask 在 Executor 框架中表示异步任务, 此外还可以用来表示一些时间较长的运算, 这些计算可以在使用计算结构之前启动.
实战: 构建缓存
首先, 使用 HashMap 和同步机制来初始化缓存.
- public interface Computable<A,V> {
- V compute(A arg) throws InterruptedException;
- }
- public class ExpensiveFunc implements Computable<String,BigInteger> {
- @Override
- public BigInteger compute(String arg) throws InterruptedException {
- return new BigInteger(arg);
- }
- }
- public class Memoizer1<A,V> implements Computable<A,V> {
- private final Map<A,V> cache=new HashMap<>();
- private final Computable<A,V> c;
- public Memoizer1(Computable<A,V> c){
- this.c=c;
- }
- @Override
- public synchronized V compute(A arg) throws InterruptedException {
- V result=cache.get(arg);
- if(result==null){
- result=c.compute(arg);
- cache.put(arg,result);
- }
- return result;
- }
- }
在这种实现方法中, 使用 HashMap 保存之前计算的结果. 首先检查需要的结果是否已经在缓存中, 如果存在则返回之前计算, 否则将计算结果缓存到 HashMap 再返回.
为了确保线程安全, 将整个 compute 方法进行同步. 但是这样伸缩性差, 缓存的性能并没有得到提升.
下面使用 ConcurrentHashMap 替换 HashMap. 但是, 这种方法存在一些不足, 当两个线程同时调用 compute 时, 可能会导致计算得到相同的值. 这样是低效的, 因为缓存的作用就是避免相同的数据被计算多次. 其问题在于, 如果某个线程启动了一个计算, 而其他线程并不知道这个计算正在进行, 很可能会重复这个计算.
针对如上问题, 我们考虑可以使用 FutureTask 来解决. 使用该类来表示计算的过程, 如果有结果可用, 则返回结果, 否则一直阻塞.
- public class Memo2 <A,V> implements Computable<A,V>{
- private final Map<A,Future<V>> cache=new ConcurrentHashMap<>();
- private final Computable<A,V>c;
- public Memo2(Computable<A,V>c){
- this.c=c;
- }
- @Override
- public V compute(A arg) throws InterruptedException {
- Future<V> future=cache.get(arg);
- if(future==null){
- Callable<V> eval=new Callable<V>() {
- @Override
- public V call() throws Exception {
- return c.compute(arg);
- }
- };
- FutureTask<V> ft=new FutureTask<>(eval);
- future=cache.putIfAbsent(arg,ft);
- if(future==null){
- future=ft;
- ft.run();
- }
- }
- try{
- return future.get();
- }catch (ExecutionException e){
- e.printStackTrace();
- }
- return null;
- }
- }
参考资料
Java 并发编程实战
https://www.infoq.cn/article/java-blocking-queue
来源: https://juejin.im/post/5c19aa876fb9a049fe3516f0