问题描述
在 IT 技术面试过程中, 我们经常会遇到生产者消费者问题 (Producer-consumer problem), 这是多线程并发协作问题的经典案例. 场景中包含三个对象, 生产者(Producer), 消费者(Consumer) 以及一个固定大小的缓冲区(Buffer). 生产者的主要作用是不断生成数据放到缓冲区, 消费者则从缓冲区不断消耗数据. 该问题的关键是如何线程安全的操作共享数据块, 保证生产者线程和消费者线程可以正确的更新数据块, 主要考虑 1. 生产者不会在缓冲区满时加入数据. 2. 消费者应当停止在缓冲区时消耗数据. 3. 在同一时间应当只允许一个生产者或者消费者访问共享缓冲区(这一点是对于互斥操作访问共享区块的要求).
解决方案
解决问题以上问题通常有信号量, wait & notify, 管道或者阻塞队列等几种思路. 本文以 Java 语言为例一一进行举例讲解.
信号量
信号量 (Semaphore) 也称信号灯, 是用来控制资源被同时访问的个数, 比如控制访问数据库最大连接数的数量, 线程通过 acquire()获得连接许可, 完成数据操作后, 通过 release()释放许可. 对于生产者消费者问题来说, 为了满足线程安全操作的要求, 同一时间我们只允许一个线程访问共享数据区, 因此需要一个大小为 1 的信号量 mutex 来控制互斥操作. 注意到我们还定义了 notFull 和 notEmpty 信号量, notFull 用于标识当前可用区块的空间大小, 当 notFull size 大于 0 时表明 "not full", producer 可以继续生产, 等于 0 时表示空间已满, 无法继续生产; 同样, 对于 notEmpty 信号量来说, 大于 0 时表明 "not empty", consumer 可以继续消耗, 等于 0 时表明没有产品, 无法继续消耗. notFull 初始 size 为 5 (5 个 available 空间可供生产),notEmpty 初始为 0(没有产品可供消耗).
- /***
- 数据仓储 class, 所有的 producer 和 consumer 共享这个 class 对象
- **/
- static class DataWareHouse {
- // 共享数据区
- private final Queue<String> data = new LinkedList();
- // 非满锁
- private final Semaphore notFull;
- // 非空锁
- private final Semaphore notEmpty;
- // 互斥锁
- private final Semaphore mutex;
- public DataWareHouse(int capacity) {
- this.notFull = new Semaphore(capacity);
- this.notEmpty = new Semaphore(0);
- mutex = new Semaphore(1);
- }
- public void offer(String x) throws InterruptedException {
- notFull.acquire(); //producer 获取信号, notFull 信号量减一
- mutex.acquire(); // 当前进程获得信号, mutex 信号量减 1, 其他线程被阻塞操作共享区块 data
- data.add(x);
- mutex.release(); //mutex 信号量 + 1, 其他线程可以继续信号操作共享区块 data
- notEmpty.release(); // 成功生产数据, notEmpty 信号量加 1
- }
- public String poll() throws InterruptedException {
- notEmpty.acquire(); //notEmpty 信号减一
- mutex.acquire();
- String result = data.poll();
- mutex.release();
- notFull.release(); // 成功消耗数据, notFull 信号量加 1
- return result;
- }
- }
- /**Producer 线程 **/
- static class Producer implements Runnable {
- private final DataWareHouse dataWareHouse;
- public Producer(final DataWareHouse dataWareHouse) {
- this.dataWareHouse = dataWareHouse;
- }
- @Override
- public void run() {
- while (true) {
- try {
- Thread.sleep(100); // 生产的速度慢于消耗的速率
- String s = UUID.randomUUID().toString();
- System.out.println("put data" + s);
- dataWareHouse.offer(s);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- /**Consumer 线程 **/
- static class Consumer implements Runnable {
- private final DataWareHouse dataWareHouse;
- public Consumer(final DataWareHouse dataWareHouse) {
- this.dataWareHouse = dataWareHouse;
- }
- @Override
- public void run() {
- while (true) {
- while (true) {
- try {
- System.out.println("get data" + dataWareHouse.poll());
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
- // 测试代码
- public static void main(String[] args) {
- final DataWareHouse dataWareHouse = new DataWareHouse(5);
- // 三个 producer 持续生产
- for (int i = 0; i <3; i++) {
- Thread t = new Thread(new Producer(dataWareHouse));
- t.start();
- }
- // 三个 consumer 持续消耗
- for (int i = 0; i < 3; i++) {
- Thread t = new Thread(new Consumer(dataWareHouse));
- t.start();
- }
- }
Wait 和 Notify 机制
Java Object 对象类中包含三个 final methods 来允许线程之间进行通信, 告知资源的状态. 它们分别是 wait(), notify(), 和 notifyAll().
wait(): 顾名思义告诉当前线程释放锁, 陷入休眠状态(waiting 状态), 等待资源. wait 方法本身是一个 native method, 它在 Java 中的使用语法如下所示:
- synchronized(lockObject )
- {
- while( ! condition )
- {
- lockObject.wait();
- }
- //take the action here;
- }
notify(): 用于唤醒 waiting 状态的线程, 同时释放锁, 被唤醒的线程可以重新获得锁访问资源. 它的基本语法 如下
- synchronized(lockObject)
- {
- //establish_the_condition;
- lockObject.notify();
- //any additional code if needed
- }
notifyAll(): 不同于 notify(), 它用于唤醒所有处于 waiting 状态的线程. 语法如下:
- synchronized(lockObject)
- {
- establish_the_condition;
- lockObject.notifyAll();
- }
说完了这三个方法, 来看下如何使用 wait & notify(All) 来解决我们的问题. 新的 DataWareHouse 类如下所示:
- //producer 类和 consumer 共享对象
- static class DataWareHouse {
- // 共享数据区
- private final Queue<String> data = new LinkedList();
- private int capacity;
- private int size = 0;
- public DataWareHouse(int capacity) {
- this.capacity = capacity;
- }
- public synchronized void offer(String x) throws InterruptedException {
- while (size == capacity) { // 当 buffer 满时, producer 进入 waiting 状态
- this.wait(); // 使用 this 对象来加锁
- }
- data.add(x);
- size++;
- notifyAll(); // 当 buffer 有数据时, 唤醒所有等待的 consumer 线程
- }
- public synchronized String poll() throws InterruptedException {
- while (size == 0) {// 当 buffer 为空时, consumer 进入等待状态
- this.wait();
- }
- String result = data.poll();
- size--;
- notifyAll(); // 当数据被消耗, 空间被释放, 通知所有等待的 producer.
- return result;
- }
- }
Note: 在方法上使用 synchronized 等价于在方法体内使用 synchronized(this), 两者都是使用 this 对象作为锁.
生产者和消费者类, 以及测试代码和 信号量 section 相同, 不做重复列举了.
管道
管道 Pipe 是实现进程或者线程 (线程之间通常通过共享内存实现通讯, 而进程则通过 scoket, 管道, 消息队列等技术) 之间通信常用方式, 它连接输入流和输出流, 基于生产者 - 消费者模式构建的一种技术. 具体实现可以通过创建一个管道输入流对象和管道输出流对象, 然后将输入流和输出流就行链接, 生产者通过往管道中写入数据, 而消费者在管道数据流中读取数据, 通过这种方式就实现了线程之间的互相通讯.
具体实现代码如下所示
- public class PipeSolution {
- static class DataWareHouse implements Closeable {
- private final PipedInputStream pis;
- private final PipedOutputStream pos;
- public DataWareHouse() throws IOException {
- pis = new PipedInputStream();
- pos = new PipedOutputStream();
- pis.connect(pos); // 连接管道
- }
- // 向管道中写入数据
- public void offer(int val) throws IOException {
- pos.write(val);
- pos.flush();
- }
- // 从管道中取数据.
- public int poll() throws IOException {
- // 当管道中没有数据, 方法阻塞
- return pis.read();
- }
- // 关闭管道
- @Override
- public void close() throws IOException {
- if (pis != null) {
- pis.close();
- }
- if (pos != null) {
- pos.close();
- }
- }
- }
- //consumer 类
- static class Consumer implements Runnable {
- private final DataWareHouse dataWareHouse;
- Consumer(DataWareHouse dataWareHouse) {
- this.dataWareHouse = dataWareHouse;
- }
- @Override
- public void run() {
- try {
- // 消费者不断从管道中读取数据
- while (true) {
- int num = dataWareHouse.poll();
- System.out.println("get data +" + num);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- static class Producer implements Runnable {
- private final DataWareHouse dataWareHouse;
- private final Random random = new Random();
- Producer(DataWareHouse dataWareHouse) {
- this.dataWareHouse = dataWareHouse;
- }
- @Override
- public void run() {
- try {
- // 生产者不断向管道中写入数据
- while (true) {
- int num = random.nextInt(256);
- dataWareHouse.offer(num);
- System.out.println("put data +" + num);
- Thread.sleep(1000);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- public static void main(String[] args) throws IOException {
- DataWareHouse dataWareHouse = new DataWareHouse();
- new Thread(new Producer(dataWareHouse)).start();
- new Thread(new Consumer(dataWareHouse)).start();
- }
- }
阻塞队列
阻塞队列(BlockingQueue), 具有 1. 当队列满了的时候阻塞入队列操作 2. 当队列空了的时候阻塞出队列操作 3. 线程安全 的特性, 因而阻塞队列通常被视为实现生产消费者模式最便捷的工具, 其中 DataWareHouse 类实现代码如下:
- static class DataWareHouse {
- // 共享数据区
- private final BlockingQueue<String> blockingQueue;
- public DataWareHouse(int capacity) {
- this.blockingQueue = new ArrayBlockingQueue<>(capacity);
- }
- public void offer(String x) {
- blockingQueue.offer(x);
- }
- public String poll() {
- return blockingQueue.poll();
- }
- }
生产者和消费者类, 以及测试代码和 信号量 section 相同, 在此不做重复列举了.
总结
生产者消费者问题是面试中经常会遇到的题目, 本文总结了几种常见的实现方式, 面试过程中通常不必要向面试官描述过多实现细节, 说出每种实现方式的特点即可. 希望能给大家带来帮助.
Reference
来源: https://www.cnblogs.com/jun-ma/p/11843394.html