本文源码: GitHub. 点这里 https://github.com/cicadasmile/java-base-parent || GitEE. 点这里 https://gitee.com/cicadasmile/java-base-parent
一, 概念简介
1, 线程通信
在操作系统中, 线程是个独立的个体, 但是在线程执行过程中, 如果处理同一个业务逻辑, 可能会产生资源争抢, 导致并发问题, 通常使用互斥锁来控制该逻辑. 但是在还有这样一类场景, 任务执行是有顺序控制的, 例如常见的报表数据生成:
启动数据分析任务, 生成报表数据;
报表数据存入指定位置数据容器;
通知数据搬运任务, 把数据写入报表库;
该场景在相对复杂的系统中非常常见, 如果基于多线程来描述该过程, 则需要线程之间通信协作, 才能有条不紊的处理该场景业务.
2, 等待通知机制
如上的业务场景, 如果线程 A 生成数据过程中, 线程 B 一直在访问数据容器, 判断该过程的数据是否已经生成, 则会造成资源浪费. 正常的流程应该如图, 线程 A 和线程 B 同时启动, 线程 A 开始处理数据生成任务, 线程 B 尝试获取容器数据, 数据还没过来, 线程 B 则进入等待状态, 当线程 A 的任务处理完成, 则通知线程 B 去容器中获取数据, 这样基于线程等待和通知的机制来协作完成任务.
3, 基础方法
等待 / 通知机制的相关方法是 Java 中 Object 层级的基础方法, 任何对象都有该方法:
notify: 随机通知一个在该对象上等待的线程, 使其结束 wait 状态返回;
notifyAll: 唤醒在该对象上所有等待的线程, 进入对象锁争抢队列中;
wait: 线程进入 waiting 等待状态, 不会争抢锁对象, 也可以设置等待时间;
线程的等待通知机制, 就是基于这几个基础方法.
二, 等待通知原理
1, 基本原理
等待 / 通知机制, 该模式下指线程 A 在不满足任务执行的情况下调用对象 wait() 方法进入等待状态, 线程 B 修改了线程 A 的执行条件, 并调用对象 notify() 或者 notifyAll() 方法, 线程 A 收到通知后从 wait 状态返回, 进而执行后续操作. 两个线程通过基于对象提供的 wait()/notify()/notifyAll() 等方法完成等待和通知间交互, 提高程序的可伸缩性.
2, 实现案例
通过线程通信解决上述数据生成和存储任务的解耦流程.
- public class NotifyThread01 {
- static Object lock = new Object() ;
- static volatile List<String> dataList = new ArrayList<>();
- public static void main(String[] args) throws Exception {
- Thread saveThread = new Thread(new SaveData(),"SaveData");
- saveThread.start();
- TimeUnit.SECONDS.sleep(3);
- Thread dataThread = new Thread(new AnalyData(),"AnalyData");
- dataThread.start();
- }
- // 等待数据生成, 保存
- static class SaveData implements Runnable {
- @Override
- public void run() {
- synchronized (lock){
- while (dataList.size()==0){
- try {
- System.out.println(Thread.currentThread().getName()+"等待...");
- lock.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- System.out.println("SaveData .."+ dataList.get(0)+dataList.get(1));
- }
- }
- }
- // 生成数据, 通知保存
- static class AnalyData implements Runnable {
- @Override
- public void run() {
- synchronized (lock){
- dataList.add("hello,");
- dataList.add("java");
- lock.notify();
- System.out.println("AnalyData End...");
- }
- }
- }
- }
注意: 除了 dataList 满足写条件, 还要在 AnalyData 线程执行通知操作.
三, 管道流通信
1, 管道流简介
基本概念
管道流主要用于在不同线程间直接传送数据, 一个线程发送数据到输出管道, 另一个线程从输入管道中读取数据, 进而实现不同线程间的通信.
实现分类
管道字节流: PipedInputStream 和 PipedOutputStream;
管道字符流: PipedWriter 和 PipedReader;
新 IO 管道流: Pipe.SinkChannel 和 Pipe.SourceChannel;
2, 使用案例
- public class NotifyThread02 {
- public static void main(String[] args) throws Exception {
- PipedInputStream pis = new PipedInputStream();
- PipedOutputStream pos = new PipedOutputStream();
- // 链接输入流和输出流
- pos.connect(pis);
- // 写数据线程
- new Thread(new Runnable() {
- public void run() {
- BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
- // 将从键盘读取的数据写入管道流
- PrintStream ps = new PrintStream(pos);
- while (true) {
- try {
- System.out.print(Thread.currentThread().getName());
- ps.println(br.readLine());
- Thread.sleep(1000);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }, "输入数据线程:").start();
- // 读数据线程
- new Thread(new Runnable() {
- public void run() {
- BufferedReader br = new BufferedReader(new InputStreamReader(pis));
- while (true) {
- try {
- System.out.println(Thread.currentThread().getName() + br.readLine());
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }, "输出数据线程:").start();
- }
- }
写线程向管道流写入数据, 读线程读取数据, 完成基本通信流程.
四, 生产消费模式
1, 业务场景
基于线程等待通知机制: 实现工厂生产一件商品, 通知商店卖出一件商品的业务流程.
2, 代码实现
- public class NotifyThread03 {
- public static void main(String[] args) {
- Product product = new Product();
- ProductFactory productFactory = new ProductFactory(product);
- ProductShop productShop = new ProductShop(product);
- productFactory.start();
- productShop.start();
- }
- }
- // 产品
- class Product {
- public String name ;
- public double price ;
- // 产品是否生产完毕, 默认没有
- boolean flag ;
- }
- // 产品工厂: 生产
- class ProductFactory extends Thread {
- Product product ;
- public ProductFactory (Product product){
- this.product = product;
- }
- @Override
- public void run() {
- int i = 0 ;
- while (i < 20) {
- synchronized (product) {
- if (!product.flag){
- if (i%2 == 0){
- product.name = "鼠标";
- product.price = 79.99;
- } else {
- product.name = "键盘";
- product.price = 89.99;
- }
- System.out.println("产品:"+product.name+"[价格:"+product.price+"] 出厂...");
- product.flag = true ;
- i++;
- // 通知消费者
- product.notifyAll();
- } else {
- try {
- // 进入等待状态
- product.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
- }
- // 产品商店: 销售
- class ProductShop extends Thread {
- Product product ;
- public ProductShop (Product product){
- this.product = product ;
- }
- @Override
- public void run() {
- while (true) {
- synchronized (product) {
- if (product.flag == true ){
- System.out.println("产品:"+product.name+"[价格"+(product.price*2)+"] 卖出...");
- product.flag = false ;
- product.notifyAll(); // 唤醒生产者
- } else {
- try {
- product.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
- }
流程描述: ProductFactory 生成一件商品, 通知商店售卖, 通过 flag 标识判断控制是否进入等待状态, 商店卖出商品后, 再次通知工厂生产商品.
五, 源代码地址
GitHub. 地址
https://github.com/cicadasmile/java-base-parent
GitEE. 地址
https://gitee.com/cicadasmile/java-base-parent
来源: https://www.cnblogs.com/cicada-smile/p/12899417.html