Tips
书中的源代码地址:
注意, 书中的有些代码里方法是基于 Java 9 API 中的, 所以 JDK 最好下载 JDK 9 以上的版本.
81. 优先使用并发实用程序替代 wait 和 notify
本书的第一版专门用一个条目来介绍正确使用 wait 和 notify 方法[Bloch01,Item 50]. 它的建议仍然有效, 并在本条目末尾进行了总结, 但这个建议远不如以前那么重要了. 这是因为没有太多理由再使用 wait 和 notify 了. 从 Java 5 开始, 该平台提供了更高级别的并发实用程序, 可以执行以前必须在 wait 和 notify 时手动编写代码的各种操作. 鉴于正确使用 wait 和 notify 的困难, 应该使用更高级别的并发实用程序.
java.util.concurrent 包中的高级实用程序分为三类: Executor Framework, 在条目 80 中简要介绍了它; 并发集合(concurrent collections) 和同步器(synchronizers). 本条目简要介绍后两者.
并发集合是标准集合接口 (如 List,Queue 和 Map) 的高性能并发实现. 为了提供高并发性, 这些实现在内部管理自己的同步(条目 79). 因此, 不可能从并发集合中排除并发活动; 锁定它只会使程序变慢.
因为不能排除并发集合上的并发活动, 所以也不能以原子方式组合对它们的方法调用. 因此, 并发集合接口配备了依赖于状态的修改操作, 这些操作将几个基本操作组合成单个原子操作. 事实证明, 这些操作对并发集合非常有用, 它们使用默认方法 (条目 21) 添加到 Java 8 中相应的集合接口中.
例如, Map 的 putIfAbsent(key, value)方法插入键的映射 (如果不存在) 并返回与键关联的之前的值, 如果没有则返回 null.
这样可以轻松实现线程安全的规范化 Map. 此方法模拟 String.intern` 方法的行为:
- // Concurrent canonicalizing map atop ConcurrentMap - not optimal
- private static final ConcurrentMap<String, String> map =
- new ConcurrentHashMap<>();
- public static String intern(String s) {
- String previousValue = map.putIfAbsent(s, s);
- return previousValue == null ? s : previousValue;
- }
事实上, 你可以做得更好. ConcurrentHashMap 针对 get 等检索操作进行了优化. 因此, 只有在 get 表明有必要时, 才首先调用 get 并调用 putIfAbsent 方法:
- // Concurrent canonicalizing map atop ConcurrentMap - faster!
- public static String intern(String s) {
- String result = map.get(s);
- if (result == null) {
- result = map.putIfAbsent(s, s);
- if (result == null)
- result = s;
- }
- return result;
- }
除了提供出色的并发性外, ConcurrentHashMap 非常快. 在我的机器上, 上面的 intern 方法比 String.intern 快 6 倍(但请记住, String.intern 必须采用一些策略来防止在长期运行的应用程序中泄漏内存). 并发集合使基于同步的集合在很大程度上已经过时了. 例如, 使用 ConcurrentHashMap 优先于 Collections.synchronizedMap. 简单地用并发 Map 替换同步 Map 以显着提高并发应用程序的性能.
一些集合接口使用阻塞操作进行扩展, 这些操作等待 (或阻塞) 直到可以成功执行. 例如, BlockingQueue 扩展了 Queue 并添加了几个方法, 包括 take, 它从队列中删除并返回 head 元素, 等待队列为空. 这允许阻塞队列用于工作队列 (也称为生产者 -- 消费者队列), 一个或多个生产者线程将工作项入队, 并且一个或多个消费者线程从哪个队列变为可用时出队并处理项目. 正如所期望的那样, 大多数 ExecutorService 实现(包括 ThreadPoolExecutor) 都使用 BlockingQueue(条目 80).
同步器是使线程能够彼此等待的对象, 允许它们协调各自的活动. 最常用的同步器是 CountDownLatch 和 Semaphore. 不太常用的是 CyclicBarrier 和 Exchanger. 最强大的同步器是 Phaser.
倒计时锁存器 (CountDownLatch) 是一次性使用的屏障, 允许一个或多个线程等待一个或多个其他线程执行某些操作. CountDownLatch 的唯一构造方法接受一个 int 类型的参数, 它是在允许所有等待的线程继续之前, 必须在 latch 上调用 countDown 方法的次数.
在这个简单的原语上构建有用的东西非常容易. 例如, 假设想要构建一个简单的框架来为一个操作的并发执行计时. 这个框架由一个方法组成, 该方法使用一个执行器 executor 来执行操作, 一个表示要并发执行的操作数量并发级别, 以及一个表示该操作的 runnable 组成. 所有工作线程都准备在计时器线程启动时钟之前运行操作. 当最后一个工作线程准备好运行该操作时, 计时器线程 "发号施令(fires the starting gun)", 允许工作线程执行该操作. 一旦最后一个工作线程完成该操作, 计时器线程就停止计时. 在 wait 和 notify 的基础上直接实现这种逻辑至少会有点麻烦, 但是在 CountDownLatch 的基础上实现起来却非常简单:
- // Simple framework for timing concurrent execution
- public static long time(Executor executor, int concurrency,
- Runnable action) throws InterruptedException {
- CountDownLatch ready = new CountDownLatch(concurrency);
- CountDownLatch start = new CountDownLatch(1);
- CountDownLatch done = new CountDownLatch(concurrency);
- for (int i = 0; i <concurrency; i++) {
- executor.execute(() -> {
- ready.countDown(); // Tell timer we're ready
- try {
- start.await(); // Wait till peers are ready
- action.run();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } finally {
- done.countDown(); // Tell timer we're done
- }
- });
- }
- ready.await(); // Wait for all workers to be ready
- long startNanos = System.nanoTime();
- start.countDown(); // And they're off!
- done.await(); // Wait for all workers to finish
- return System.nanoTime() - startNanos;
- }
请注意, 该方法使用三个倒计时锁存器. 第一个 ready, 由工作线程来告诉计时器线程何时准备就绪. 工作线程然后等待第二个锁存器, 即 start. 当最后一个工作线程调用 ready.countDown 时, 计时器线程记录开始时间并调用 start.countDown, 允许所有工作线程继续. 然后, 计时器线程等待第三个锁存器完成, 直到最后一个工作线程完成运行并调用 done.countDown. 一旦发生这种情况, 计时器线程就会唤醒并记录结束时间.
还有一些细节值得注意. 传递给 time 方法的 executor 必须允许创建至少与给定并发级别相同数量的线程, 否则测试将永远不会结束. 这被称为线程饥饿死锁 (thread starvation deadlock)[Goetz06, 8.1.1]. 如果工作线程捕捉到 InterruptedException 异常, 它使用习惯用法 thread.currentthread ().interrupt() 重新断言中断, 并从它的 run 方法返回. 这允许执行程序按照它认为合适的方式处理中断. System.nanoTime 用于计算活动的时间.** 对于间隔计时, 请始终使用 System.nanoTime 而不是 System.currentTimeMillis. System.nanoTime 更准确, 更精确, 不受系统实时时钟调整的影响. 最后, 请注意, 本例中的代码不会产生准确的计时, 除非 action 做了相当多的工作, 比如一秒钟或更长时间. 准确的微基准测试是非常困难的, 最好是借助诸如 jmh [JMH]这样的专业框架来完成.
这个条目只涉及使用并发实用程序做一些皮毛的事情. 例如, 前一个示例中的三个倒计时锁存器可以由单个 CyclicBarrier 或 Phaser 实例替换. 结果代码会更简洁, 但可能更难理解.
虽然应该始终优先使用并发实用程序来等替换 wait 和 notify 方法, 但你可能必须维护使用 wait 和 notify 的旧代码. wait 方法用于使线程等待某些条件. 必须在同步区域内调用它, 该区域锁定调用它的对象. 下面是使用 wait 方法的标准习惯用法:
- // The standard idiom for using the wait method
- synchronized (obj) {
- while (<condition does not hold>)
- obj.wait(); // (Releases lock, and reacquires on wakeup)
- ... // Perform action appropriate to condition
- }
始终要在循环中调用 wait 方法; 永远不要在循环之外调用它. 循环用于测试 wait 前后的条件.
如果条件已经存在, 则在 wait 之前测试条件并跳过等待以确保活性 (liveness). 如果条件已经存在并且在线程等待之前已经调用了 notify(或 notifyAll) 方法, 则无法保证线程将从等待中唤醒.
为了确保安全, 需要在等待之后再测试条件, 如果条件不成立, 则再次等待. 如果线程在条件不成立的情况下继续执行该操作, 它可能会破坏由锁保护的不变式(invariant). 当条件不成立时, 以下几个原因可以把线程唤醒:
另一个线程可以获得锁并在线程调用 notify 和等待线程醒来之间改变了保护状态.
当条件不成立时, 另一个线程可能意外地或恶意地调用 notify 方法. 类通过等待公共可访问的对象来暴露自己. 公共可访问对象的同步方法中的任何 wait 方法都容易受到这个问题的影响.
通知线程在唤醒等待线程时可能过于 "慷慨". 例如, 即使只有一些等待线程的满足条件, 通知线程也可能调用 notifyAll.
在没有通知的情况下, 等待的线程可能 (很少) 被唤醒. 这被称为虚假的唤醒(spurious wakeup)[POSIX, 11.4.3.6.1;Java9-API].
一个相关的问题是, 为了唤醒等待的线程, 是使用 notify 还是 notifyAll.(回想一下 notify 唤醒一个等待线程, 假设存在这样一个线程, notifyAll 唤醒所有等待线程). 有时人们会说, 应该始终使用 notifyAll. 这是合理的, 保守的建议. 它总是会产生正确的结果, 因为它保证唤醒所有需要被唤醒的线程. 可能还会唤醒其他一些线程, 但这不会影响程序的正确性. 这些线程将检查它们正在等待的条件, 如果发现为 false, 将继续等待.
作为一种优化, 如果所有线程都在等待相同的条件, 并且每次只有一个线程可以从条件变为 true 中唤醒, 那么可以选择调用 notify 而不是 notifyAll.
即使满足了这些先决条件, 也可能有理由使用 notifyAll 来代替 notify. 正如将 wait 方法调用放在循环中可以防止公共访问对象上的意外或恶意通知一样, 使用 notifyAll 代替 notify 可以防止不相关线程的意外或恶意等待. 否则, 这样的等待可能会 "吞下" 一个关键通知, 让预期的接收者无限期地等待.
总之, 与 java.util.concurrent 提供的高级语言相比, 直接使用 wait 和 notify 就像在 "并发汇编语言" 中编程一样. 在新代码中基本上不存在使用 wait 和 notify 的理由. 如果正在维护使用 wait 和 notify 的代码, 请确保它始终使用标准惯用法在 while 循环内调用 wait 方法. 通常应优先使用 notifyAll 方法进行通知. 如果使用 notify, 必须非常小心以确保程序的活性.
来源: https://www.cnblogs.com/IcanFixIt/p/10640314.html