生产者消费者模型具体来讲, 就是在一个系统中, 存在生产者和消费者两种角色, 他们通过内存缓冲区进行通信 (解耦), 生产者将消费者需要的资源生产出来放到缓冲区, 消费者把从缓冲区把资源拿走消费.
在这个模型中, 最关键就是内存缓冲区为空的时候消费者必须等待, 而内存缓冲区满的时候, 生产者必须等待. 其他时候就是一边在生产一边在消费. 值得注意的是多线程对内存缓冲区的操作时必须保证线程安全, 所以需要设计锁的策略.
◆
使用 wait 和 notify 实现生产这消费者
◆
我们在 Hello,Thread 一文中提到了 wait 和 notify 来实现等待通知的功能, 本篇文章则继续使用它们实现一个生产者, 消费者模型.
首先我们定义一个资源的类, 资源类中初始时什么都没有, 最多允许存放 10 个资源.
当生产者调用 add 方法时, i+1, 即代表生产出了一件资源. 当生产了一个资源以后就使用 notifyAll 通知所有等待在此资源文件的线程. 如果当资源达到 10 个后则所有的生产者线程进入等待状态, 等待消费者线程唤醒.
当消费者调用 remove 方法时, i-1, 即代表消费了一件资源. 当消费了一个资源以后就使用 notifyAll 通知所有等待在此资源文件的线程. 如果当资源达到 0 个后则所有的消费者线程进入等待状态, 等待生产者线程唤醒.
- public class WaitNotifyResouce {
- private int i=0;
- private int size=10;
- public synchronized void add(){
- if(i0){
- i--;
- System.out.println(Thread.currentThread().getName()+"号线程拿走了一件资源, 当前资源"+i+"个");
- notifyAll();
- }else {
- try {
- wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
接下来我们创建 3 个生产者线程, 2 个消费者线程持续对资源进行生产和消费.
- public class WaitNotifyProducerConsumerDemo {
- static WaitNotifyResouce waitNotifyResouce = new WaitNotifyResouce();
- static class ProducerThreadDemo extends Thread {
- @Override
- public void run() {
- while (true) {
- try {
- sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- waitNotifyResouce.add();
- }
- }
- }
- static class ConsumerThreadDemo extends Thread {
- @Override
- public void run() {
- while (true) {
- try {
- sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- waitNotifyResouce.remove();
- }
- }
- }
- public static void main(String[] args) throws InterruptedException {
- Thread p1 = new Thread(new ProducerThreadDemo(), "生产者 p1");
- Thread p2 = new Thread(new ProducerThreadDemo(), "生产者 p2");
- Thread p3 = new Thread(new ProducerThreadDemo(), "生产者 p3");
- p1.start();
- p2.start();
- p3.start();
- Thread c1 = new Thread(new ConsumerThreadDemo(), "消费者 c1");
- Thread c2 = new Thread(new ConsumerThreadDemo(), "消费者 c2");
- c1.start();
- c2.start();
- }
- }
接下来程序打印的结果就像预想中一样了:
生产者 p1 号线程生产一件资源, 当前资源 1 个
生产者 p2 号线程生产一件资源, 当前资源 2 个
生产者 p3 号线程生产一件资源, 当前资源 3 个
消费者 c1 号线程拿走了一件资源, 当前资源 2 个
消费者 c2 号线程拿走了一件资源, 当前资源 1 个
生产者 p1 号线程生产一件资源, 当前资源 2 个
生产者 p3 号线程生产一件资源, 当前资源 3 个
生产者 p2 号线程生产一件资源, 当前资源 4 个
...
◆
使用 Condition 实现生产者消费者模型
◆
在文章: 浅谈 Java 中的锁: Synchronized, 重入锁, 读写锁 中, 我们了解了 Lock 和 Condition, 现在我们使用它们配合实现一个生产者消费者模型
首先同样创建一个资源文件, 此资源文件所有的操作跟上方的资源文件是一样的, 只不过使用 Lock 和 Condition 的组合代替了 synchronize.
- public class LockConditionResouce {
- private int i = 0;
- private int size = 10;
- private Lock lock = new ReentrantLock();
- Condition condition = lock.newCondition();
- public void add() {
- lock.lock();
- try {
- if (i <size) {
- i++;
- System.out.println(Thread.currentThread().getName() + "号线程生产一件资源, 当前资源" + i + "个");
- condition.signalAll();
- } else {
- try {
- condition.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- } finally {
- lock.unlock();
- }
- }
- public void remove() {
- lock.lock();
- try {
- if (i> 0) {
- i--;
- System.out.println(Thread.currentThread().getName() + "号线程拿走了一件资源, 当前资源" + i + "个");
- condition.signalAll();
- } else {
- try {
- condition.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- } finally {
- lock.unlock();
- }
- }
- }
接下来使用生产者消费者线程操作资源:
- public class LockConditionProducerConsumerDemo {
- static LockConditionResouce lockConditionResouce = new LockConditionResouce();
- static class ProducerThreadDemo extends Thread {
- @Override
- public void run() {
- while (true) {
- try {
- sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- lockConditionResouce.add();
- }
- }
- }
- static class ConsumerThreadDemo extends Thread {
- @Override
- public void run() {
- while (true) {
- try {
- sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- lockConditionResouce.remove();
- }
- }
- }
- public static void main(String[] args) throws InterruptedException {
- Thread p1 = new Thread(new ProducerThreadDemo(), "生产者 p1");
- Thread p2 = new Thread(new ProducerThreadDemo(), "生产者 p2");
- Thread p3 = new Thread(new ProducerThreadDemo(), "生产者 p3");
- p1.start();
- p2.start();
- p3.start();
- Thread c1 = new Thread(new ConsumerThreadDemo(), "消费者 c1");
- Thread c2 = new Thread(new ConsumerThreadDemo(), "消费者 c2");
- c1.start();
- c2.start();
- }
- }
来源: http://www.bubuko.com/infodetail-3006872.html