上篇文章我们介绍了synchronized关键字,使用它可以有效的解决我们多线程所带来的一些常见问题。例如:竞态条件,内存可见性等。并且,我们也说明了该关键字主要是一个加锁和释放锁的集成,所有为能获得锁的线程都将被阻塞在某个对象的阻塞队列上。而我们本篇将要介绍的线程间的协作则主要是对对象的另一个队列的使用(条件队列),所有因条件不满足而无法继续运行的线程都将在条件队列上进行等待。主要涉及内容如下:
- 理解wait/notify这两个方法
- 典型的生产者消费者问题
- 理解join方法的实现原理
一、理解wait/notify这两个方法
这两个方法是我们本篇文章的主角,它们被定义在根类Object中。
- public final void wait()
- public final native void wait(long timeout)
- public final native void notify();
- public final native void notifyAll();
两个wait方法,无参的wait相当于wait(0)表示无限期等待,有参数的wait方法则指定该线程等待多长时间。notify方法用于释放一个在条件队列上等待的线程,而notifyall方法则是用于释放所有在条件队列上进行等待的线程。那么究竟什么时候调用wait方法让线程到条件队列上去等待,什么时候调用notify释放条件队列上的线程呢?
我们说过一个对象有一把锁和两个队列,对于所有无法获取到锁的线程都将被阻塞在阻塞队列上,而对于获取到锁以后,于运行过程中由于缺少某些条件而不得不终止程序的线程将被阻塞在条件队列上并让出CPU。而且需要注意一点的是,线程被阻塞在阻塞队列上和条件队列上,所表现出的状态是不一样的。例如:
- /*定义一个线程类*/
- public class MyThread extends Thread {
- @Override public void run() {
- try {
- System.out.println(Thread.currentThread().getState());
- synchronized(this) {
- wait();
- }
- } catch(InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- /*启动线程*/
- public static void main(String[] args) throws InterruptedException {
- Thread thread = new MyThread();
- thread.start();
- Thread.sleep(1000);
- System.out.println(thread.getState());
- System.out.println("main is out");
- }
输出结果显示:
主函数中启动一个线程,该线程内部运行的时候先输出当前线程状态,然后调用wait方法将自己挂在当前线程对象的条件队列上并让出CPU,而我们在主函数中对该线程的状态进行再一次的输出, 从结果截图来看,程序并没有结束----说明子线程并没有正常结束,阻塞在条件队列上的线程的状态是waiting,这和阻塞在阻塞队列上的线程状态blocked是完全两种不同的状态。但是,当我们调用notify或者notifyall方法将某个线程从条件队列中释放的时候,该线程要和外面的其他线程一样去竞争对象的锁,如果不能获取到对象的锁,依然会被阻塞在该对象的阻塞队列上。
二、使用wait/notify解决生产者消费者问题
生产者消费者问题是我们操作系统中的一个经典的问题。生产者向仓库中源源不断的放入产品,消费者从仓库中源源不断的拿出产品,当仓库满的时候,生产者就不能继续往里面放入产品,当仓库空的时候,消费者就不能从仓库里取出产品。如何协调好生产者线程和消费者线程对仓库的操作就是这个问题的核心。
- public class Repository {
- private ArrayDeque < String > list = null;
- private int limit; //仓库容量
- public Repository(int limit) {
- this.limit = limit;
- list = new ArrayDeque < String > (this.limit);
- }
- //仓库提供给生产者存入操作
- public synchronized void addGoods(String data) throws InterruptedException {
- while (list.size() == limit) {
- //说明仓库已经满了
- wait();
- }
- list.add(data);
- System.out.println("i produce a product:" + data);
- notifyAll();
- }
- //仓库提供给消费者取出操作
- public synchronized String getGoods() throws InterruptedException {
- while (list.isEmpty()) {
- //说明仓库已经空了
- wait();
- }
- String result = list.poll();
- System.out.println("i consume a product:" + result);
- notifyAll();
- return result;
- }
- }
我们定义一个仓库类,该仓库提供给生产者投放的方法,提供给消费者取出的方法。我们使用双端队列实现对仓库的模拟,limit参数限定仓库容量。
生产者的投放方法,当生产者想要向仓库投放产品时,如果仓库已经满了,则将将当前线程阻塞在条件队列上,等待仓库有空余位置为止。而如果仓库没满,则向其中投入一个产品并唤醒被阻塞在条件队列上的所有线程(在本例中实际上就是消费者线程)。一旦消费者线程从条件队列上被释放,他将重新和生产者线程竞争对象锁,在获取到对象锁之后将回到上次因条件不足而被阻塞的程序位置。消费者的取出方法和生产者的投放方法类似,此处不再赘述。
- public class Producer extends Thread {
- //生产者线程不停的生产产品直到仓库满
- private Repository repository = null;
- public Producer(Repository r){
- this.repository = r;
- }
- int count = 0;
- @Override
- public void run(){
- while(true){
- try {
- repository.addGoods(String.valueOf(count));
- count++;
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
定义一个生产者类,生产者始终不停的生产产品,我们用count来模拟产品代号。
- public class Consumer extends Thread {
- //消费者线程不停的从仓库中取出产品直到仓库空
- private Repository repository = null;
- public Consumer(Repository r){
- this.repository = r;
- }
- @Override
- public void run(){
- while(true){
- try {
- String result = repository.getGoods();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
定义一个消费者类,消费者不停的从仓库中取出产品。
- public static void main(String[] args) throws InterruptedException {
- Repository repository = new Repository(20);
- Thread producer = new Producer(repository);
- Thread consumer = new Consumer(repository);
- producer.start();
- consumer.start();
- System.out.println("main thread is out");
- }
最后我们定义一个仓库并通过构造方法的传入使得生产者和消费者共同使用相同的仓库对象。分别启动两个线程,程序将死循环的输出生产者和消费者的生产和消费操作,以下是程序运行的部分结果:
我们可以看到生产者和消费者这两个线程交替的输出,偶尔会出现消费者滞后生产者的情况,但是消费者绝对不会超前生产者,因为只有生产者生产出产品之后,消费者才能取出。以上便是经典的生产者消费者问题,通过对该问题的实现,我们能够对wait/notify这两个操作有了一个更加深刻的认识。
三、join方法的实现原理
join方法的内部其实使用的还是我们上述介绍的wait/notify机制。
- public final void join() throws InterruptedException {
- join(0);
- }
- public final synchronized void join(long millis) throws InterruptedException {
- long base = System.currentTimeMillis();
- long now = 0;
- if (millis < 0) {
- throw new IllegalArgumentException("timeout value is negative");
- }
- if (millis == 0) {
- while (isAlive()) {
- wait(0);
- }
- } else {
- while (isAlive()) {
- long delay = millis - now;
- if (delay <= 0) {
- break;
- }
- wait(delay);
- now = System.currentTimeMillis() - base;
- }
- }
- }
两个方法,核心的还是这个带有参数的join方法。该方法大体上分为三种情况,如果millis小于0,抛出异常。如果millis等于0,就无限期等待,这一段代码不知道大家理解的如何:
- if (millis == 0) {
- while (isAlive()) {
- wait(0);
- }
- }
- Thread thread = new MyThread();
- thread.start();
- thread.join();
两小段代码,第一段代码是jdk中关于millis 等于0的一个实现,第二段代码则是我们调用join方法的一个基本格式。我们可以看到,由于join这个方法被synchronized关键字修饰,那么我们主线程在调用thread对象的该方法时就需要首先获得thread对象的锁。
进入到join方法的内部,当millis 等于0的时候,判断只要线程对象活着,也就是thread对象活着,就调用wait(0)方法将当前线程(main)线程挂起到thread对象的条件队列上。一旦thread线程对象执行结束,Java系统将调用notifyall来释放所有挂在该对象的条件队列上的线程,此时main线程将会被唤醒,从而实现了main线程等待thread线程执行结束的一个过程。至于millis 大于0的情况,只不过内部调用了wait(long timeout)方法,其他的实现原理基本类似,此处不再赘述。
本篇文章,我们主要介绍线程间的一种协作机制,使用wait/notify两个方法来协作不同的线程。通过实现经典的生产者消费者模型增加了对wait/notify这两个方法的理解,最后从源代码的角度对Thread下的join方法进行了学习,该方法的核心就是利用wait/notify协作主线程和分支线程来实现等待的一个操作。总结不到之处,望指出。