Java 线程系列文章只是自己知识的总结梳理,都是最基础的玩意,已经掌握熟练的可以绕过.
一,从一个小 Demo 说起
上篇我们聊到了 Java 多线程的同步机制: Java 多线程同步问题: 一个小 Demo 完全搞懂. 这篇我们聊一下 java 多线程之间的通信机制.
上一篇探讨 java 同步机制的时候我们举得例子输出 log 现象是: 一段时间总是 A 线程输出而另一段时间总是 B 线程输出,有没有一种方式可以控制 A,B 线程交错输出呢?答案是当然可以了,这时候我们就要用到多线程的 wait/notify 机制了.
wait/notify 机制就是当线程 A 执行到某一对象的 wait() 方法时,就会进入等待状态,此时线程 A 放弃持有的锁,其余线程可以竞争锁的持有权.当有其余线程调用 notify() 或者 notifyAll() 方法的时候就可能(当有多个线程的时候 notify() 方法只会唤醒处于等待状态线程中的一个)唤醒线程 A,使其从 wait 状态醒来,继续向下执行业务逻辑.
接下来,我们通过一个小 demo 加以理解.
二,单生产者消费者模式 demo 很简单,就是开启两个线程,一个生产面包,另一个负责消费面包,并且生产一个就要消费一个,交替执行.首先看下 BreadFactory 类:
public class BreadFactory {
//生产面包个数计数器
private int count = 0;
//线程的锁
private Object o = new Object();
private boolean flag = false;
public void product() {
synchronized (o) {
if (flag) {
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"生产了第" + (++count) + "个面包");
flag = true;
o.notify();
}
}
public void consume() {
synchronized (o) {
if (!flag) {
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"消费第" + count + "个面包");
flag = false;
o.notify();
}
}
}
此类就是负责生产,消费面包,flag 主要用于控制线程之间的切换.
接下来我们看下 Producter,Consumer 类:
public class Producter extends Thread {
private BreadFactory mBreadFactory;
public Producter(BreadFactory mBreadFactory) {
super();
this.mBreadFactory = mBreadFactory;
}
@Override
public void run() {
//
while (true) {
mBreadFactory.product();
}
}
}
很简单,初始化的时候需要传递进来一个 BreadFactory 实例对象,线程启动的时候调用 BreadFactory 类中 product() 方法不停生产面包.
Consumer 类同理:
public class Consumer extends Thread {
private BreadFactory mBreadFactory;
public Consumer(BreadFactory mBreadFactory) {
super();
this.mBreadFactory = mBreadFactory;
}
@Override
public void run() {
//
while (true) {
mBreadFactory.consume();
}
}
}
最后看下 main 方法:
public static void main(String[] args) {
//
BreadFactory factory = new BreadFactory();
Producter p1 = new Producter(factory);
p1.start();
Consumer c1 = new Consumer(factory);
c1.start();
}
没什么要多说的,就是初始化并启动线程,运行程序,输出如下:
Thread-0生产了第1个面包
Thread-1消费第1个面包
Thread-0生产了第2个面包
Thread-1消费第2个面包
Thread-0生产了第3个面包
Thread-1消费第3个面包
Thread-0生产了第4个面包
Thread-1消费第4个面包
.....
三,多生产者消费者模式似乎很顺利的就实现了啊,但是实际需求中怎么可能只有一个生产者,一个消费者,生产者,消费者是有多个的,我们试下多个生产者,消费者是什么现象,修改 main 中逻辑:
public static void main(String[] args) {
//
BreadFactory factory = new BreadFactory();
Producter p1 = new Producter(factory);
p1.start();
Consumer c1 = new Consumer(factory);
c1.start();
Producter p2 = new Producter(factory);
p2.start();
Consumer c2 = new Consumer(factory);
c2.start();
}
我们就是只多添加了一个生产者和一个消费者,其余没任何变化.
运行程序,输出信息如下:
...
Thread-2生产了第4个面包
Thread-1消费第4个面包
Thread-2生产了第5个面包
Thread-1消费第5个面包
Thread-2生产了第6个面包
Thread-1消费第6个面包
Thread-3消费第6个面包
Thread-0生产了第7个面包
Thread-3消费第7个面包
...
咦?生产到第 6 个面包,竟然被消费了两次,这显然是不正常的,那是哪里出问题了呢?
四,多生产者消费者模式问题产生原因分析接下来,我们直接分析问题产生的原因,我们分析下 BreadFactory 中 product() 与 consume() 方法:
public void product() {
synchronized (o) {
if (flag) {
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"生产了第" + (++count) + "个面包");
flag = true;
o.notify();
}
}
public void consume() {
synchronized (o) {
if (!flag) {
try {
o.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"消费第" + count + "个面包");
flag = false;
o.notify();
}
}
从线程启动顺序以及打印信息可以看出线程 0, 线程 2 负责生产面包,线程 1,线程 3 负责消费面包.线程执行过程中,线程 1 消费掉第 5 个面包,此时 flag 置为 false,执行 notify() 方法唤醒其余线程争取锁获取执行权.此时线程 3 获取线程执行权,执行 consume()业务逻辑 flag 此时为 false,进入 if(!flag) 逻辑,执行 wait() 方法,此时线程 3 进入 wait 状态,停留在 25 行代码处.释放锁资源,其余线程可以争取执行权.此时线程 1 获取执行权,和线程 3 一样,最终停留在 25 行代码处.释放锁资源,其余线程可以争取执行权.注意:此时线程 1,线程 3 都停留在 25 行代码处,处于 wait 状态.接下来线程 2 获取执行权,执行生产业务,生产了第 6 个面包,然后释放锁资源,其余线程可以争取执行权.然后线程 1 又获取执行权,上面说了线程 1 停留在 25 行代码处,现在获取执行权从 25 行代码处开始执行,消费掉第 6 个面包没问题,flag 置为 false.然后释放锁资源,其余线程可以争取执行权.此时线程 3 又获取执行权,上面分析时说了线程 3 处于 25 行代码处 wait 状态,现在获取执行权从 25 行代码处开始执行,又消费了第 6 个面包,到这里面包 6 被消耗了两次.经过上面分析已经知道产生问题的原因了,线程获取执行权后直接从 wait 处开始继续执行,不在检查 if 条件是否成立,这里就是问题产生的原因了.那怎么修改的呢?很简单了,将 if 判断改为 while 条件判断就可以了,这样线程获取执行权后还会再次检查 while 条件判断是否成立.运行程序打印 Log 如下:
...
Thread-1消费第19个面包
Thread-0生产了第20个面包
Thread-1消费第20个面包
Thread-2生产了第21个面包
看输出 Log 上面问题是解决了,生产一个面包只会消费一次,但是发现程序运行自己终止了,上面生产到第 21 个面包程序似乎不运行了没 Log 输出了,这是什么原因呢? 五,notify() 通知丢失问题以及 notify() 与 notifyAll() 的区别
要想明白上述问题产生的原因我们就必须搞懂 notify() 与 notifyAll() 的区别.简单说就是 notify() 只会唤醒同一监视器处于 wait 状态的一个线程 (随机唤醒),而 notifyAll() 会唤醒同一监视器处于 wait 状态的所有线程.我们分析上面问题产生的原因:线程 0,线程 2 负责生产面包,线程 1,线程 3 负责消费面包,在程序运行过程存在如下情况: 线程 1,3 处于 consume() 中的 wait() 处,线程 0 处于 product() 中 wait()
处,此时线程 2 生产完第 21 个面包执行 notify() 方法,通知处于同一监视器下处于 wait 状态线程,此时处于 wait 状态线程为线程 1,线程 3 与线程 0,按理说我们是想唤醒一个线程 1,3 中一个线程来消费刚刚生产的面包,但是程序可不知道啊,调用 notify 方法随机唤醒一个线程,碰巧此时唤醒的还是生产线程 0,这就是 notify 通知丢失问题,线程 0 执 while 判断又处于 wait 状态了,到这里就出现了控制台没有 Log 输出现象了,经过上面分析我们该明白问题出现的原因就是 notify 通知丢失问题,通知了一个我们不想通知的线程,那怎么解决呢?很简单了,程序中 notify() 方法改为 notifyAll() 就可以了,改为 notifyAll() 方法上述线程 2 通知的时候会一起唤醒线程 0,1,3,也就是唤醒同一监视器处于 wait 状态的所有线程,到这里运行程序就没有什么问题了.
六,notify() 与 notifyAll() 性能问题也许有些同学有疑问了,既然 notify() 方法会产生问题,那我就用 notifyAll() 不就完了,直接屏蔽掉 notify() 方法.这样做当然是很 Low 的做法.
假设有 N 个线程在 wait 状态下,调用 notifyall 会唤醒所有线程,然后这 N 个线程竞争同一个锁,最后只有一个线程能够得到锁,其它线程又回到 wait 状态.这意味每一次唤醒操作可能带来大量的竞争锁的请求.这对于频繁的唤醒操作而言性能上可能是一种灾难.如果说总是只有一个线程被唤醒后能够拿到锁,这种情况下使用 notify 的性能是要高于 notifyall 的.
七,JDK1.5 中 Condition 通知机制 JDK1.5 中 Condition 通知机制这里就不详细讲解了,Condition 中 await(),signal(),signalAll() 相当于传统线程通信机制中 wait(),notify(),notifyAll() 方法.我们修改 BreadFactory 类如下,其余类均不变:
public class BreadFactory {
// 生产面包个数计数器
private int count = 0;
// 线程的锁
private Lock lock = new ReentrantLock();
private Condition consumeCon = lock.newCondition();
private Condition productCon = lock.newCondition();
private boolean flag = false;
public void product() {
lock.lock();
try {
while (flag) {
try {
productCon.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "生产了第"
+ (++count) + "个面包");
flag = true;
consumeCon.signal();
} finally {
//
lock.unlock();
}
}
public void consume() {
lock.lock();
try {
while (!flag) {
try {
consumeCon.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "消费第" + count
+ "个面包");
flag = false;
productCon.signal();
} finally {
//
lock.unlock();
}
}
}
其强大之处就在于代码中 6,7,15,28,40,53 行代码处,我们并没有调用 signalAll() 方法,而是调用的 signal() 方法.这样我们就可以控制在生产完一个面包去唤醒消费的线程来消费面包,而不用连同生产线程一起唤醒,这就是其强大之处,这里就不详细分析了,不太熟悉的同学可自行搜索其余博客学习一下,比较简单,但是很基础很重要的.关于线程间通信问题本篇到此就结束了,再说一次,多线程相关博客没什么新玩意,只是自己工作以来一次总结,虽然基础,枯燥,但是比较重要,希望本篇博客对您有用.
来源: https://www.cnblogs.com/leipDao/p/8310974.html