本篇文章主要介绍了 Java 线程通信问题, 线程通信用来保证线程协调运行, 有需要的朋友可以了解一下
线程通信用来保证线程协调运行, 一般在做线程同步的时候才需要考虑线程通信的问题
1 传统的线程通信
通常利用 Objeclt 类提供的三个方法:
wait() 导致当前线程等待, 并释放该同步监视器的锁定, 直到其它线程调用该同步监视器的 notify()或者 notifyAll()方法唤醒线程
notify(), 唤醒在此同步监视器上等待的线程, 如果有多个会任意选择一个唤醒
notifyAll() 唤醒在此同步监视器上等待的所有线程, 这些线程通过调度竞争资源后, 某个线程获取此同步监视器的锁, 然后得以运行
这三个方法必须由同步监视器对象调用, 分为两张情况:
同步方法时, 由于同步监视器为 this 对象, 所以可以直接调用这三个方法
示例如下:
- public class SyncMethodThreadCommunication {
- static class DataWrap{
- int data = 0;
- boolean flag = false;
- public synchronized void addThreadA(){
- if (flag) {
- try {
- wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- data++;
- System.out.println(Thread.currentThread().getName() + " " + data);
- flag = true;
- notify();
- }
- public synchronized void addThreadB() {
- if (!flag) {
- try {
- wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- data++;
- System.out.println(Thread.currentThread().getName() + " " + data);
- flag = false;
- notify();
- }
- }
- static class ThreadA extends Thread {
- private DataWrap data;
- public ThreadA(DataWrap dataWrap) {
- this.data = dataWrap;
- }
- @Override
- public void run() {
- for (int i = 0; i < 10; i++) {
- data.addThreadA();
- }
- }
- }
- static class ThreadB extends Thread {
- private DataWrap data;
- public ThreadB(DataWrap dataWrap) {
- this.data = dataWrap;
- }
- @Override
- public void run() {
- for (int i = 0; i < 10; i++) {
- data.addThreadB();
- }
- }
- }
- public static void main(String[] args) {
- // 实现两个线程轮流对数据进行加一操作
- DataWrap dataWrap = new DataWrap();
- new ThreadA(dataWrap).start();
- new ThreadB(dataWrap).start();
- }
- }
同步代码块时, 需要使用监视器对象调用这三个方法
示例如下:
- public class SyncBlockThreadComminication {
- static class DataWrap{
- boolean flag;
- int data;
- }
- static class ThreadA extends Thread{
- DataWrap dataWrap;
- public ThreadA(DataWrap dataWrap){
- this.dataWrap = dataWrap;
- }
- @Override
- public void run() {
- for(int i = 0 ; i < 10; i++) {
- synchronized (dataWrap) {
- if (dataWrap.flag) {
- try {
- dataWrap.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- dataWrap.data++;
- System.out.println(getName() + " " + dataWrap.data);
- dataWrap.flag = true;
- dataWrap.notify();
- }
- }
- }
- }
- static class ThreadB extends Thread{
- DataWrap dataWrap;
- public ThreadB(DataWrap dataWrap){
- this.dataWrap = dataWrap;
- }
- @Override
- public void run() {
- for (int i = 0; i < 10; i++) {
- synchronized (dataWrap) {
- if (!dataWrap.flag) {
- try {
- dataWrap.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- dataWrap.data++;
- System.out.println(getName() + " " + dataWrap.data);
- dataWrap.flag = false;
- dataWrap.notify();
- }
- }
- }
- }
- public static void main(String[] args) {
- // 实现两个线程轮流对数据进行加一操作
- DataWrap dataWrap = new DataWrap();
- new ThreadA(dataWrap).start();
- new ThreadB(dataWrap).start();
- }
- }
2 使用 Condition 控制线程通信
当使用 Lock 对象保证同步时, 则使用 Condition 对象来保证协调
示例如下:
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.Lock;
- import java.util.concurrent.locks.ReentrantLock;
- import com.sun.media.sound.RIFFInvalidDataException;
- import javafx.scene.chart.PieChart.Data;
- public class SyncLockThreadCommunication {
- static class DataWrap {
- int data;
- boolean flag;
- private final Lock lock = new ReentrantLock();
- private final Condition condition = lock.newCondition();
- public void addThreadA() {
- lock.lock();
- try {
- if (flag) {
- try {
- condition.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- data++;
- System.out.println(Thread.currentThread().getName() + " " + data);
- flag = true;
- condition.signal();
- } finally {
- lock.unlock();
- }
- }
- public void addThreadB() {
- lock.lock();
- try {
- if (!flag) {
- try {
- condition.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- data++;
- System.out.println(Thread.currentThread().getName() + " " + data);
- flag = false;
- condition.signal();
- } finally {
- lock.unlock();
- }
- }
- }
- static class ThreadA extends Thread{
- DataWrap dataWrap;
- public ThreadA(DataWrap dataWrap) {
- this.dataWrap = dataWrap;
- }
- @Override
- public void run() {
- for (int i = 0; i < 10; i++) {
- dataWrap.addThreadA();
- }
- }
- }
- static class ThreadB extends Thread{
- DataWrap dataWrap;
- public ThreadB(DataWrap dataWrap) {
- this.dataWrap = dataWrap;
- }
- @Override
- public void run() {
- for (int i = 0; i < 10; i++) {
- dataWrap.addThreadB();
- }
- }
- }
- public static void main(String[] args) {
- // 实现两个线程轮流对数据进行加一操作
- DataWrap dataWrap = new DataWrap();
- new ThreadA(dataWrap).start();
- new ThreadB(dataWrap).start();
- }
- }
其中 Condition 对象的 await(), singal(),singalAll()分别对应 wait(),notify()和 notifyAll()方法
3 使用阻塞队列 BlockingQueue 控制线程通信
BlockingQueue 是 Queue 接口的子接口, 主要用来做线程通信使用, 它具有一个特征: 当生产者线程试图向 BlockingQueue 中放入元素时, 如果队列已满, 则该线程被阻塞; 当消费者线程试图从 BlockingQueue 中取出元素时, 如果队列已空, 则该线程被阻塞这两个特征分别对应两个支持阻塞的方法, put(E e)和 take()
示例如下:
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.BlockingQueue;
- public class BlockingQueueThreadComminication {
- static class DataWrap{
- int data;
- }
- static class ThreadA extends Thread{
- private BlockingQueue<DataWrap> blockingQueue;
- public ThreadA(BlockingQueue<DataWrap> blockingQueue, String name) {
- super(name);
- this.blockingQueue = blockingQueue;
- }
- @Override
- public void run() {
- for (int i = 0; i < 100; i++) {
- try {
- DataWrap dataWrap = blockingQueue.take();
- dataWrap.data++;
- System.out.println(getName() + " " + dataWrap.data);
- sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- static class ThreadB extends Thread{
- private BlockingQueue<DataWrap> blockingQueue;
- private DataWrap dataWrap;
- public ThreadB(BlockingQueue<DataWrap> blockingQueue, DataWrap dataWrap, String name) {
- super(name);
- this.blockingQueue = blockingQueue;
- this.dataWrap = dataWrap;
- }
- @Override
- public void run() {
- for (int i = 0; i < 100; i++) {
- try {
- dataWrap.data++;
- System.out.println(getName() + " " + dataWrap.data);
- blockingQueue.put(dataWrap);
- sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- public static void main(String[] args) {
- /// 实现两个线程轮流对数据进行加一操作
- DataWrap dataWrap = new DataWrap();
- BlockingQueue<DataWrap> blockingQueue = new ArrayBlockingQueue<>(1);
- new ThreadA(blockingQueue, "Consumer").start();
- new ThreadB(blockingQueue, dataWrap, "Producer").start();
- }
- }
BlockingQueue 共有五个实现类:
ArrayBlockingQueue 基于数组实现的 BlockingQueue 队列
LinkedBlockingQueue 基于链表实现的 BlockingQueue 队列
PriorityBlockingQueue 中元素需实现 Comparable 接口, 其中元素的排序是按照 Comparator 进行的定制排序
SynchronousQueue 同步队列, 要求对该队列的存取操作必须是交替进行
DelayQueue 集合元素必须实现 Delay 接口, 队列中元素排序按照 Delay 接口方法 getDelay()的返回值进行排序
来源: http://www.phperz.com/article/18/0224/359616.html