条件队列是什么? 可能很多人和我一样答不出来, 不过今天终于搞清楚了!
什么是条件队列
条件队列: 当某个线程调用了 wait 方法, 或者通过 Condition 对象调用了 await 相关方法, 线程就会进入阻塞状态, 并加入到对应条件队列中.
- synchronized(对象){ // 获取锁失败, 线程会加入到同步队列中
- while(条件不满足){
对象. wait();// 调用 wait 方法当前线程加入到条件队列中
}
}
基于 synchcronized 的内置条件队列存在一些缺陷. 每个内置锁都只能有一个相关联的条件队列, 因而存在多个线程可能在同一个条件队列上等待不同的条件谓词, 并且在最常见的加锁模式下公开条件队列对象.
Java 中的锁的实现可以分为两种, 一种是基于 synchronized 的隐式锁, 它是基于 JVM 层面实现的; 而另一种则是基于 AQS 框架在代码层面实现的锁, 如 ReentrantLock 等, 在进行并发控制过程中, 很多情况下他们都可以相互替代.
其中同步队列和条件队列是 AQS 中两个比较核心的概念, 它们是代码层面实现锁的关键. 关于同步队列的内容, 我们已经在图解 AQS 的设计与实现, 手摸手带你实现一把互斥锁! 中进行了详细的介绍.
与 Object 配合 synchronized 相比, 基于 AQS 的 Lock&Condition 实现的等待唤醒模式更加灵活, 支持多个条件队列, 支持等待状态中不响应中断以及超时等待功能; 其次就是基于 AQS 实现的条件队列是 "肉眼可见" 的, 我们可以通过源代码进行 debug, 而 synchronized 则是完全隐式的.
同步队列和条件队列
与条件队列密不可分的类则是 ConditionObject, 是 AQS 中实现了 Condition 接口的内部类, 通常配合基于 AQS 实现的锁一同使用. 当线程获取到锁之后, 可以调用 await 方法进入条件队列并释放锁, 或者调用 singinal 方法唤醒对应条件队列中等待时间最久的线程并加入到等待队列中.
在 AQS 中, 线程会被封装成 Node 对象加入队列中, 而条件队列中则复用了同步队列中的 Node 对象.
Condition 相关方法和描述
Condition 接口一共定义了以下几个方法:
await(): 当前线程进入等待状态, 直到被通知 (siginal) 或中断[和 wait 方法语义相同] .
awaitUninterruptibly(): 当前线程进入等待状态, 直到被通知, 对中断不敏感.
awaitNanos(long timeout): 当前线程进入等待状态直到被通知(siginal), 中断或超时.
awaitUnitil(Date deadTime): 当前线程进入等待状态直到被通知(siginal), 中断或到达某个时间.
signal(): 唤醒一个等待在 Condition 上的线程, 该线程从等待方法返回前必须获得与 Condition 关联的锁[和 notify 方法语义相同]
signalAll(): 唤醒所有等待在 Condition 上的线程, 能够从等待方法返回的线程必须获得与 Condition 关联的锁[和 notifyAll 方法语义相同] .
条件队列入队操作
当线程获取到锁之后, Condition 对象调用 await 相关的方法, 线程会进入到对应的条件队列中.
- /**
- * 如果当前线程被终端, 抛出 InterruptedException 异常
- */
- public final void await() throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- // 添加当前线程到[条件队列]
- Node node = addConditionWaiter();
- // 释放已经获取的锁资源, 并返回释放前的同步状态
- int savedState = fullyRelease(node);
- int interruptMode = 0;
- // 如果当前节点不在[同步队列] 中, 线程进入阻塞状态, 等待被唤醒
- while (!isOnSyncQueue(node)) {
- LockSupport.park(this);
- if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
- break;
- }
- if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
- interruptMode = REINTERRUPT;
- if (node.nextWaiter != null) // clean up if cancelled
- unlinkCancelledWaiters();
- if (interruptMode != 0)
- reportInterruptAfterWait(interruptMode);
- }
条件队出队操作
Condition 对象调用 signal 或者 signalAll 方法时,
- /**
- * 将[条件队列] 中第一个有效的元素移除并且添加到[同步队列] 中
- * 所谓有效指的是非 null, 并且状态吗
- * @param first 条件队列中第一个非空的元素
- */
- private void doSignal(Node first) {
- do {
- if ( (firstWaiter = first.nextWaiter) == null)
- lastWaiter = null;
- first.nextWaiter = null;
- // 将条件队列中等待最久的那个有效元素添加到同步队列中
- } while (!transferForSignal(first) &&
- (first = firstWaiter) != null);
- }
- /**
- * 将条件队列中的节点转换到同步队列中
- */
- final boolean transferForSignal(Node node) {
- /*
- * If cannot change waitStatus, the node has been cancelled.
- * 如果节点的等待状态不能被修改, 说明当前线程已经被取消等待[多个线程执行 siginal 时会出现的情况]
- */
- if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
- return false;
- /*
- * 加入到[同步队列] 中, 并且尝试将前驱节点设置为可唤醒状态
- */
- Node p = enq(node); // 将 node 添加到同步队列中, 并返回它的前驱节点
- int ws = p.waitStatus;
- // 如果前驱节点不需要唤醒, 或者设置状态为'唤醒'失败, 则唤醒线程时期重新争夺同步状态
- if (ws> 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
- LockSupport.unpark(node.thread);
- return true;
- }
实现阻塞队列
自定义互斥锁升级, 增加获取 Condition 对象接口.
- /**
- * 自定义互斥锁
- *
- * @author cruder
- * @time 2019/11/29 9:43
- */
- public class MutexLock {
- private static final Sync STATE_HOLDER = new Sync();
- /**
- * 通过 Sync 内部类来持有同步状态, 当状态为 1 表示锁被持有, 0 表示锁处于空闲状态
- */
- private static class Sync extends AbstractQueuedSynchronizer {
- /**
- * 是否被独占, 有两种表示方式
- * 1. 可以根据状态, state=1 表示锁被占用, 0 表示空闲
- * 2. 可以根据当前独占锁的线程来判断, 即 getExclusiveOwnerThread()!=null 表示被独占
- */
- @Override
- protected boolean isHeldExclusively() {
- return getExclusiveOwnerThread() != null;
- }
- /**
- * 尝试获取锁, 将状态从 0 修改为 1, 操作成功则将当前线程设置为当前独占锁的线程
- */
- @Override
- protected boolean tryAcquire(int arg) {
- if (compareAndSetState(0, 1)) {
- setExclusiveOwnerThread(Thread.currentThread());
- return true;
- }
- return false;
- }
- /**
- * 释放锁, 将状态修改为 0
- */
- @Override
- protected boolean tryRelease(int arg) {
- if (getState() == 0) {
- throw new UnsupportedOperationException();
- }
- setExclusiveOwnerThread(null);
- setState(0);
- return true;
- }
- //[新增代码]
- final ConditionObject newCondition() {
- return new ConditionObject();
- }
- }
- /**
- * 下面的实现 Lock 接口需要重写的方法, 基本是就是调用内部内 Sync 的方法
- */
- public void lock() {
- STATE_HOLDER.acquire(1);
- }
- public void unlock() {
- STATE_HOLDER.release(1);
- }
- // [新增代码] 获取条件队列
- public Condition newCondition(){
- return STATE_HOLDER.newCondition();
- }
- }
基于自定义互斥锁, 实现阻塞队列. 阻塞队列具有两个特点:
添加元素到队列中, 如果队列已满会使得当前线程阻塞[加入到条件队列 - 队列不满] , 直到队列不满为止
移除队列中的元素, 当队列为空时会使当前线程阻塞[加入到条件队列 - 队列不空] , 直到队列不为空为止
- /**
- * 有界阻塞阻塞队列
- *
- * @author Jann Lee
- * @date 2019-12-11 22:20
- **/
- public class BoundedBlockingQueue<T> {
- /**
- * list 作为底层存储结构
- */
- private List<T> dataList;
- /**
- * 队列的大小
- */
- private int size;
- /**
- * 锁, 和条件变量
- */
- private MutexLock lock;
- /**
- * 队列非空 条件变量
- */
- private Condition notEmpty;
- /**
- * 队列未满 条件变量
- */
- private Condition notFull;
- public BoundedBlockingQueue(int size) {
- dataList = new ArrayList<>();
- lock = new MutexLock();
- notEmpty = lock.newCondition();
- notFull = lock.newCondition();
- this.size = size;
- }
- /**
- * 队列中添加元素 [只有队列未满时才可以添加, 否则需要等待队列变成未满状态]
- */
- public void add(T data) throws InterruptedException {
- lock.lock();
- try {
- // 如果队列已经满了, 需要在等待 "队列未满" 条件满足
- while (dataList.size() == size) {
- notFull.await();
- }
- dataList.add(data);
- Thread.sleep(2000);
- notEmpty.signal();
- } finally {
- lock.unlock();
- }
- }
- /**
- * 移除队列的第一个元素[只有队列非空才可以移除, 否则需要等待变成队列非空状态]
- */
- public T remove() throws InterruptedException {
- lock.lock();
- try {
- // 如果为空, 需要在等待 "队列非空" 条件满足
- while (dataList.isEmpty()) {
- notEmpty.await();
- }
- T result = dataList.remove(0);
- notFull.signal();
- return result;
- } finally {
- lock.unlock();
- }
- }
- }
总结
条件队列和同步队列在 Java 中有两种实现, synchronized 关键字以及基于 AQS
每个 (基于 synchronized 的) 内置锁都只能有一个相关联的条件队列, 会存在多个线程可能在同一个条件队列上等待不同的条件谓词; 而 (基于 AQS 实现的) 显式锁支持多个条件队列
与 wait,notify,notifyAll 对应的方法时 Conditoin 接口中的 await,signal,signalAll, 他们具有相同的语义
最后敬上一个关注了也没有错的公众号~
来源: https://www.cnblogs.com/liqiangchn/p/12081838.html