一, 什么是生产者消费者模式?
生产者 / 消费者模式是为了解耦消费者和生产者而产生的, 其原理非常地简单. 总的来说就是生产者和消费者之间不直接通信, 而是借助一个第三方
(通常是阻塞队列)
, 第三方也成为临界资源, 同一时间只允许一条线程对其进行操作.
1, 当临界资源满了, 生产者必须阻塞等待;
2, 当临界资源为空, 消费者必须阻塞等待, 通知生产者生产;
二, 使用简单的 notify/wait 机制实现
所有的注释都写在代码中, 在这里我们模仿在水桶中存水和取水的过程:
- Main.java
- package com.wokao66.consumerProvider;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- /**
- * 测试
- * @author: huangjiawei
- * @since: 2018 年 4 月 3 日
- * @version: $Revision$ $Date$ $LastChangedBy$
- */
- public class Main {
- public static void main(String[] args) {
- /**
- * 新建一个水桶, 存放所有的水, 刚开始水桶是空的, 容量为 5L
- */
- List<Water> waterList = new ArrayList<>(5);
- ExecutorService executors = Executors.newFixedThreadPool(10);
- WaterProvider provider = new WaterProvider(waterList);
- WaterConsumer consumer = new WaterConsumer(waterList);
- executors.execute(provider);
- executors.execute(consumer);
- }
- }
- WaterProvider.java
- package com.wokao66.consumerProvider;
- import java.util.List;
- /**
- * 往桶里加水的生产者
- * @author: huangjiawei
- * @since: 2018 年 4 月 3 日
- * @version: $Revision$ $Date$ $LastChangedBy$
- */
- public class WaterProvider implements Runnable {
- /**
- * 这是我们的水桶 (10L)
- */
- private List<Water> waterList = null;
- /**
- * 初始化水桶, 也就是缓冲区
- */
- public WaterProvider(List<Water> waterList) {
- this.waterList = waterList;
- }
- @Override
- public void run() {
- /**
- * 循环任务, 也就是这个任务会执行多次, 没有明确的 break 语句或者异常, 该任务不会终止
- */
- while (true) {
- /**
- * 这里获得 waterList 的锁, 之前说过 notify,wait 的使用必须先获得锁
- */
- synchronized (waterList) {
- /**
- * 判断是不是满了, 满了就不生产了
- */
- while (waterList.size() == 5) {
- try {
- /**
- * 这里将所释放掉 waterList 的锁
- */
- waterList.wait();
- } catch (InterruptedException e) {}
- }
- /**
- * 如果还没有满, 那么就加 1L 水进去, 加进去之前
- */
- waterList.add(new Water());
- System.err.println("生产了 1L 水, 现在水桶有:" + waterList.size() + "L 水");
- try {
- /**
- * sleep 方法是不会释放锁的
- */
- Thread.sleep(1000);
- } catch (InterruptedException e) {}
- /**
- * 我通知所有的消费者来消费
- */
- waterList.notifyAll();
- }
- }
- }
- }
- WaterConsumer.java
- package com.wokao66.consumerProvider;
- import java.util.List;
- /**
- * 往桶里取水的消费者
- * @author: huangjiawei
- * @since: 2018 年 4 月 3 日
- * @version: $Revision$ $Date$ $LastChangedBy$
- */
- public class WaterConsumer implements Runnable {
- private List<Water> waterList;
- public WaterConsumer(List<Water> waterList) {
- this.waterList = waterList;
- }
- @Override
- public void run() {
- /**
- * 循环任务, 也就是这个任务会执行多次, 没有明确的 break 语句或者异常, 该任务不会终止
- */
- while (true) {
- /**
- * 获得锁
- */
- synchronized (waterList) {
- /**
- * 证明没有水可以消费
- */
- while (waterList.isEmpty()) {
- try {
- /**
- * 释放锁
- */
- waterList.wait();
- } catch (InterruptedException e) {}
- }
- /**
- * 每次我都移动第一个元素
- */
- waterList.remove(0);
- System.err.println("消费了 1L 水, 现在水桶有:" + waterList.size() + "L 水");
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {}
- /**
- * 通知生产者生产
- */
- waterList.notifyAll();
- }
- }
- }
- }
- Water.java
- package com.wokao66.consumerProvider;
- /**
- * 水这种类型
- * @author: huangjiawei
- * @since: 2018 年 4 月 3 日
- * @version: $Revision$ $Date$ $LastChangedBy$
- */
- public class Water {
- /**
- * 单位 L
- */
- private String unit;
- public String getUnit() {
- return unit;
- }
- public void setUnit(String unit) {
- this.unit = unit;
- }
- }
执行结果:
生产了 1L 水, 现在水桶有: 1L 水
生产了 1L 水, 现在水桶有: 2L 水
消费了 1L 水, 现在水桶有: 1L 水
消费了 1L 水, 现在水桶有: 0L 水
生产了 1L 水, 现在水桶有: 1L 水
生产了 1L 水, 现在水桶有: 2L 水
消费了 1L 水, 现在水桶有: 1L 水
消费了 1L 水, 现在水桶有: 0L 水
生产了 1L 水, 现在水桶有: 1L 水
消费了 1L 水, 现在水桶有: 0L 水
生产了 1L 水, 现在水桶有: 1L 水
生产了 1L 水, 现在水桶有: 2L 水
生产了 1L 水, 现在水桶有: 3L 水
生产了 1L 水, 现在水桶有: 4L 水
生产了 1L 水, 现在水桶有: 5L 水
消费了 1L 水, 现在水桶有: 4L 水
消费了 1L 水, 现在水桶有: 3L 水
消费了 1L 水, 现在水桶有: 2L 水
消费了 1L 水, 现在水桶有: 1L 水
执行结果可能会不一致, 但数据正确即可!
来源: https://juejin.im/post/5ac33ecc51882555635e836f