介绍了多线程之间竞争访问同一个资源的问题及解决方案 synchronized,我们提到,多线程之间除了竞争,还经常需要相互协作,本节就来介绍 Java 中多线程协作的基本机制 wait/notify。
都有哪些场景需要协作?wait/notify 是什么?如何使用?实现原理是什么?协作的核心是什么?如何实现各种典型的协作场景?由于内容较多,我们分为上下两节来介绍。
我们先来看看都有哪些协作的场景。
协作的场景
多线程之间需要协作的场景有很多,比如说:
我们会探讨如何实现这些协作场景,在此之前,我们先来了解协作的基本方法 wait/notify。
wait/notify
我们知道,Java 的根父类是 Object,Java 在 Object 类而非 Thread 类中,定义了一些线程协作的基本方法,使得每个对象都可以调用这些方法,这些方法有两类,一类是 wait,另一类是 notify。
主要有两个 wait 方法:
- public final void wait() throws InterruptedException
- public final native void wait(long timeout) throws InterruptedException;
一个带时间参数,单位是毫秒,表示最多等待这么长时间,参数为 0 表示无限期等待。一个不带时间参数,表示无限期等待,实际就是调用 wait(0)。在等待期间都可以被中断,如果被中断,会抛出 InterruptedException,关于中断及中断处理,我们在下节介绍,本节暂时忽略该异常。
wait 实际上做了什么呢?它在等待什么?上节我们说过,每个对象都有一把锁和等待队列,一个线程在进入 synchronized 代码块时,会尝试获取锁,获取不到的话会把当前线程加入等待队列中,其实,除了用于锁的等待队列,每个对象还有另一个等待队列,表示条件队列,该队列用于线程间的协作。调用 wait 就会把当前线程放到条件队列上并阻塞,表示当前线程执行不下去了,它需要等待一个条件,这个条件它自己改变不了,需要其他线程改变。当其他线程改变了条件后,应该调用 Object 的 notify 方法:
- public final native void notify();
- public final native void notifyAll();
notify 做的事情就是从条件队列中选一个线程,将其从队列中移除并唤醒,notifyAll 和 notify 的区别是,它会移除条件队列中所有的线程并全部唤醒。
我们来看个简单的例子,一个线程启动后,在执行一项操作前,它需要等待主线程给它指令,收到指令后才执行,代码如下:
- public class WaitThread extends Thread {
- private volatile boolean fire = false;
- @Override
- public void run() {
- try {
- synchronized (this) {
- while (!fire) {
- wait();
- }
- }
- System.out.println("fired");
- } catch (InterruptedException e) {
- }
- }
- public synchronized void fire() {
- this.fire = true;
- notify();
- }
- public static void main(String[] args) throws InterruptedException {
- WaitThread waitThread = new WaitThread();
- waitThread.start();
- Thread.sleep(1000);
- System.out.println("fire");
- waitThread.fire();
- }
- }
示例代码中有两个线程,一个是主线程,一个是 WaitThread,协作的条件变量是 fire,WaitThread 等待该变量变为 true,在不为 true 的时候调用 wait,主线程设置该变量并调用 notify。
两个线程都要访问协作的变量 fire,容易出现竞态条件,所以相关代码都需要被 synchronized 保护。实际上,wait/notify 方法只能在 synchronized 代码块内被调用,如果调用 wait/notify 方法时,当前线程没有持有对象锁,会抛出异常 java.lang.IllegalMonitorStateException。
你可能会有疑问,如果 wait 必须被 synchronzied 保护,那一个线程在 wait 时,另一个线程怎么可能调用同样被 synchronzied 保护的 notify 方法呢?它不需要等待锁吗?我们需要进一步理解 wait 的内部过程,虽然是在 synchronzied 方法内,但调用 wait 时,线程会释放对象锁,wait 的具体过程是:
线程从 wait 调用中返回后,不代表其等待的条件就一定成立了,它需要重新检查其等待的条件,一般的调用模式是:
- synchronized (obj) {
- while (条件不成立)
- obj.wait();
- ... // 执行条件满足后的操作
- }
比如,上例中的代码是:
- synchronized (this) {
- while (!fire) {
- wait();
- }
- }
调用 notify 会把在条件队列中等待的线程唤醒并从队列中移除,但它不会释放对象锁,也就是说,只有在包含 notify 的 synchronzied 代码块执行完后,等待的线程才会从 wait 调用中返回。
简单总结一下,wait/notify 方法看上去很简单,但往往难以理解 wait 等的到底是什么,而 notify 通知的又是什么,我们需要知道,它们与一个共享的条件变量有关,这个条件变量是程序自己维护的,当条件不成立时,线程调用 wait 进入条件等待队列,另一个线程修改了条件变量后调用 notify,调用 wait 的线程唤醒后需要重新检查条件变量。从多线程的角度看,它们围绕共享变量进行协作,从调用 wait 的线程角度看,它阻塞等待一个条件的成立。我们在设计多线程协作时,需要想清楚协作的共享变量和条件是什么,这是协作的核心。接下来,我们通过一些场景来进一步理解 wait/notify 的应用,本节只介绍生产者 / 消费者模式,下节介绍更多模式。
生产者 / 消费者模式
在生产者 / 消费者模式中,协作的共享变量是队列,生产者往队列上放数据,如果满了就 wait,而消费者从队列上取数据,如果队列为空也 wait。我们将队列作为单独的类进行设计,代码如下:
- static class MyBlockingQueue {
- private Queue queue = null;
- private int limit;
- public MyBlockingQueue(int limit) {
- this.limit = limit;
- queue = new ArrayDeque<>(limit);
- }
- public synchronized void put(E e) throws InterruptedException {
- while (queue.size() == limit) {
- wait();
- }
- queue.add(e);
- notifyAll();
- }
- public synchronized E take() throws InterruptedException {
- while (queue.isEmpty()) {
- wait();
- }
- E e = queue.poll();
- notifyAll();
- return e;
- }
- }
MyBlockingQueue 是一个长度有限的队列,长度通过构造方法的参数进行传递,有两个方法 put 和 take。put 是给生产者使用的,往队列上放数据,满了就 wait,放完之后调用 notifyAll,通知可能的消费者。take 是给消费者使用的,从队列中取数据,如果为空就 wait,取完之后调用 notifyAll,通知可能的生产者。
我们看到,put 和 take 都调用了 wait,但它们的目的是不同的,或者说,它们等待的条件是不一样的,put 等待的是队列不为满,而 take 等待的是队列不为空,但它们都会加入相同的条件等待队列。由于条件不同但又使用相同的等待队列,所以要调用 notifyAll 而不能调用 notify,因为 notify 只能唤醒一个线程,如果唤醒的是同类线程就起不到协调的作用。
只能有一个条件等待队列,这是 Java wait/notify 机制的局限性,这使得对于等待条件的分析变得复杂,后续章节我们会介绍显式的锁和条件,它可以解决该问题。
一个简单的生产者代码如下所示:
- static class Producer extends Thread {
- MyBlockingQueue queue;
- public Producer(MyBlockingQueue queue) {
- this.queue = queue;
- }
- @Override
- public void run() {
- int num = 0;
- try {
- while (true) {
- String task = String.valueOf(num);
- queue.put(task);
- System.out.println("produce task " + task);
- num++;
- Thread.sleep((int) (Math.random() * 100));
- }
- } catch (InterruptedException e) {
- }
- }
- }
Producer 向共享队列中插入模拟的任务数据。一个简单的示例消费者代码如下所示:
- static class Consumer extends Thread {
- MyBlockingQueue queue;
- public Consumer(MyBlockingQueue queue) {
- this.queue = queue;
- }
- @Override
- public void run() {
- try {
- while (true) {
- String task = queue.take();
- System.out.println("handle task " + task);
- Thread.sleep((int)(Math.random()*100));
- }
- } catch (InterruptedException e) {
- }
- }
- }
主程序的示例代码如下所示:
- public static void main(String[] args) {
- MyBlockingQueue queue = new MyBlockingQueue<>(10);
- new Producer(queue).start();
- new Consumer(queue).start();
- }
运行该程序,会看到生产者和消费者线程的输出交替出现。
我们实现的 MyBlockingQueue 主要用于演示,Java 提供了专门的阻塞队列实现,包括:
我们会在后续章节介绍这些类,在实际系统中,应该考虑使用这些类。
小结
本节介绍了 Java 中线程间协作的基本机制 wait/notify,协作关键要想清楚协作的共享变量和条件是什么,为进一步理解,本节针对生产者 / 消费者模式演示了 wait/notify 的用法。
下一节,我们来继续探讨其他协作模式。
(与其他章节一样,本节所有代码位于 https://github.com/swiftma/program-logic)
----------------
未完待续,查看最新文章,敬请关注微信公众号 "老马说编程"(扫描下方二维码),从入门到高级,深入浅出,老马和你一起探索 Java 编程及计算机技术的本质。用心原创,保留所有版权。
来源: http://www.cnblogs.com/swiftma/p/6421803.html