一, 前言
这种模式在生活是最常见的, 那么它的场景是什么样的呢? 下面是我假象的, 假设有一个仓库, 仓库有一个生产者和一个消费者, 消费者过来消费的时候会检测仓库中是否有库存, 如果没有了则等待生产, 如果有就先消费直至消费完成; 而生产者每天的工作就是先检测仓库是否有库存, 如果没有就开始生产, 满仓了就停止生产等待消费, 直至工作结束. 下图是根据假象画的流程图:
那么在程序中怎么才能达到这样的效果呢? 下面介绍三种方式实现.
二, 使用 notify() 和 wait()实现
相信大家这两个方法都不陌生, 它是 Object 类中的两个方法, 具体请看源码中的解释. 提醒一点就是使用 notify()和 wait()方法时必须拥有对象锁.
根据上面假象我这定义一下明确场景: 仓库库存有个最大值, 如果仓库库存已经达到最大值那么就停止生产, 小于就需要生产; 如果库存等于 0 则需要等待生产停止消费. 另外生产者有个生产目标, 当它生产了目标数后就结束生产; 消费者也是, 当消费一定的数据后就结束消费, 否则等待消费.
见下面代码:
- package com.yuanfy.jmm.threads;
- import com.yuanfy.util.SleepUtils;
- import java.util.concurrent.TimeUnit;
- public class Factory {
- // 当前库存大小
- private int size;
- // 库存容量(最大库存值)
- private int capacity;
- public Factory(int capacity) {
- this.capacity = capacity;
- }
- public synchronized void produce(int num) {
- try {
- System.out.println("+++++ 生产者[" + Thread.currentThread().getName()
- + "] , 他的任务是生产" + num + "件产品.");
- // 当生产完成就停止
- while (num> 0) {
- // 如果当前库存大小大于或等于库存容量值了, 则停止生产等待消费.
- if (size>= capacity) {
- System.out.println("+++++" + Thread.currentThread().getName() +
- "检测库存已满, 停止生产等待消费...");
- // 等待消费
- wait();
- System.out.println("+++++" + Thread.currentThread().getName() + "开始生产...");
- }
- // 否则继续生产
- int inc = (num + size)> capacity ? (capacity - size) : num;
- num -= inc;
- size += inc;
- SleepUtils.second(1);
- System.out.println("+++++" + Thread.currentThread().getName() + "生产了" + inc + "件, 当前库存有" + size + "件.");
- // 生产后唤醒消费者
- notify();
- }
- System.out.println("+++++ 生产者[" + Thread.currentThread().getName()
- + "] 生产结束.");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- public synchronized void consume(int num) {
- try {
- System.out.println("----- 消费者[" + Thread.currentThread().getName()
- + "] , 他需要消费" + num + "件产品.");
- // 当消费完成则停止
- while (num> 0) {
- // 如果当前库存大小小于等于 0, 则停止消费等待生产.
- if (size <= 0) {
- System.out.println("-----" + Thread.currentThread().getName() + "检测库存已空, 停止消费等待生产...");
- // 等待生产
- wait();
- System.out.println("-----" + Thread.currentThread().getName() + "开始消费...");
- }
- // 否则继续消费
- int dec = (size - num)> 0 ? num : size;
- num -= dec;
- size -= dec;
- SleepUtils.second(1);
- System.out.println("-----" + Thread.currentThread().getName() + "消费了" + dec + "件, 当前有" + size + "件.");
- // 消费后唤醒生产者继续生产
- notify();
- }
- System.out.println("----- 消费者[" + Thread.currentThread().getName()
- + "] 消费结束.");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
上面是工厂 (仓库) 类, 主要包含两个任务一个是生产一个是消费, 接下来创建两个线程去调用它, 如下:
- package com.yuanfy.jmm.threads;
- /**
- * 生产线程
- */
- class Produce {
- private Factory factory;
- public Produce(Factory factory) {
- this.factory = factory;
- }
- public void produce(String name, final int num) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- factory.produce(num);
- }
- }, name).start();
- }
- }
- /**
- * 消费线程
- */
- class Consume {
- private Factory factory;
- public Consume(Factory factory) {
- this.factory = factory;
- }
- public void consume(String name, final int num) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- factory.consume(num);
- }
- }, name).start();
- }
- }
- public class ProduceConsumeDemo {
- public static void main(String[] args) {
- Factory f = new Factory(500);
- Consume consume = new Consume(f);
- consume.consume("消费线程",600);
- Produce produce = new Produce(f);
- produce.produce("生产线程",800);
- }
- }
注意上方, 消费线程和生产线程都是拥有同一个工厂对象, 然后进行生产和消费模式. 那么我们直接运行, 结果如下:
三, 使用锁中的 Condition 对象进行控制
这种方式估计用的比较少, 因为使用 Condition 必须先使用锁 Lock. 这里我只介绍怎么用 Condition 对象进行控制实现生产者与消费者模式的实现.
其实它跟上面那种方法有点类似, Condition 对象中 await()方法表示等待, signal()方法表示唤醒(看了 AQS 源码的应该都知道有这个对象且了解过这两个方法). 下面看下具体怎么实现:
- public class Factory {
- // 当前大小
- private int size;
- // 总容量
- private int capacity;
- private Lock lock;
- // 已满的条件
- private Condition fullCondition;
- // 已空的条件
- private Condition emptyCondition;
- public Factory(int capacity) {
- this.capacity = capacity;
- lock = new ReentrantLock();
- fullCondition = lock.newCondition();
- emptyCondition = lock.newCondition();
- }
- public void produce(int no) {
- lock.lock();
- try {
- while (no> 0) {
- while (size>= capacity) {
- System.out.println(Thread.currentThread().getName() + "报告仓库已满, 等待快递员取件...");
- fullCondition.await();
- System.out.println(Thread.currentThread().getName() + "报告开始进货...");
- }
- int inc = (no + size)> capacity ? (capacity - size) : no;
- no -= inc;
- size += inc;
- System.out.println(Thread.currentThread().getName() +
- "报告进货了:" + inc + "件, 当前库存数:" + size);
- emptyCondition.signal();
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
- public void consume(int no) {
- lock.lock();
- try {
- while (no> 0) {
- while (size <= 0) {
- System.out.println(Thread.currentThread().getName() + "报告仓库已空, 等待仓库管理员进货");
- emptyCondition.await();
- System.out.println(Thread.currentThread().getName() + "报告开始取件...");
- }
- int dec = (size - no)> 0 ? no : size;
- no -= dec;
- size -= dec;
- System.out.println(Thread.currentThread().getName() +
- "报告取件:" + dec + ", 当前库存数:" + size);
- fullCondition.signal();
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- lock.unlock();
- }
- }
- }
看了上面工厂类的代码后是不是跟使用 Object 中 wait()和 notify()方法类似呢. 主要区别就是拥有对象的方式不一样, 这里使用的 lock 进行且需要手动释放, 而第一种是需要 Synchronized 进行控制.
四, 使用阻塞队列进行实现
这个就很简单了, 它已经封装好等待和唤醒的操作, 所以不进行案例分享了. 其中涉及到两个重要方法 put() 和 take
来源: https://www.cnblogs.com/yuanfy008/p/9574509.html