此篇博客所有源码均来自 JDK 1.8
说明
队列是比较常见的数据结构, 我们也经常使用到, BlockingQueue 常用于生产者消费者场景, 在 Java 的并发包中已经提供了 BlockingQueue 的实现 , 后面几篇会介绍几种主要的阻塞队列.
J.U.C 之 AQS 传送门:[死磕 Java 并发] --J.U.C 之 AQS(一篇就够了) https://mp.weixin.qq.com/s?__biz=MzU2NjIzNDk5NQ==&mid=2247484289&idx=1&sn=56ad0dd81fb2b4e4f86c5aaaaf3164a6&scene=21#wechat_redirect
DelayQueue 介绍
DelayQueue 是一个支持延时获取元素的无界阻塞队列. 里面的元素全部都是 "可延期" 的元素, 列头的元素是最先 "到期" 的元素, 如果队列里面没有元素到期, 是不能从列头获取元素的, 哪怕有元素也不行. 也就是说只有在延迟期到时才能够从队列中取元素.
DelayQueue 主要用于两个方面:- 缓存: 清掉缓存中超时的缓存数据 - 任务超时处理
DelayQueue
DelayQueue 实现的关键主要有如下几个:
可重入锁 ReentrantLock
用于阻塞和通知的 Condition 对象
根据 Delay 时间排序的优先级队列: PriorityQueue
用于优化阻塞通知的线程元素 leader
ReentrantLock,Condition 这两个对象就不需要阐述了, 他是实现整个 BlockingQueue 的核心. PriorityQueue 是一个支持优先级线程排序的队列 (参考 [死磕 Java 并发] -----J.U.C 之阻塞队列: PriorityBlockingQueue),leader 后面阐述. 这里我们先来了解 Delay, 他是实现延时操作的关键.
Delayed
Delayed 接口是用来标记那些应该在给定延迟时间之后执行的对象, 它定义了一个 long getDelay(TimeUnit unit) 方法, 该方法返回与此对象相关的的剩余时间. 同时实现该接口的对象必须定义一个 compareTo 方法, 该方法提供与此接口的 getDelay 方法一致的排序.
public interface Delayed extends Comparable<Delayed> { long getDelay(TimeUnit unit);}
如何使用该接口呢? 上面说的非常清楚了, 实现该接口的 getDelay() 方法, 同时定义 compareTo() 方法即可.
内部结构
先看 DelayQueue 的定义:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { /** 可重入锁 */ private final transient ReentrantLock lock = new ReentrantLock(); /** 支持优先级的 BlockingQueue */ private final PriorityQueue<E> q = new PriorityQueue<E>(); /** 用于优化阻塞 */ private Thread leader = null; /** Condition */ private final Condition available = lock.newCondition(); /** * 省略很多代码 */ }
看了 DelayQueue 的内部结构就对上面几个关键点一目了然了, 但是这里有一点需要注意, DelayQueue 的元素都必须继承 Delayed 接口. 同时也可以从这里初步理清楚 DelayQueue 内部实现的机制了: 以支持优先级无界队列的 PriorityQueue 作为一个容器, 容器里面的元素都应该实现 Delayed 接口, 在每次往优先级队列中添加元素时以元素的过期时间作为排序条件, 最先过期的元素放在优先级最高.
- offer()
- public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { // 向 PriorityQueue 中插入元素 q.offer(e); // 如果当前元素的对首元素 (优先级最高),leader 设置为空, 唤醒所有等待线程 if (q.peek() == e) { leader = null; available.signal(); } // 无界队列, 永远返回 true return true; } finally { lock.unlock(); } }
offer(E e) 就是往 PriorityQueue 中添加元素, 具体可以参考 ([死磕 Java 并发] -----J.U.C 之阻塞队列: PriorityBlockingQueue). 整个过程还是比较简单, 但是在判断当前元素是否为对首元素, 如果是的话则设置 leader=null, 这是非常关键的一个步骤, 后面阐述.
- take()
- public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 对首元素 E first = q.peek(); // 对首为空, 阻塞, 等待 off() 操作唤醒 if (first == null) available.await(); else { // 获取对首元素的超时时间 long delay = first.getDelay(NANOSECONDS); // <=0 表示已过期, 出对, return if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting // leader != null 证明有其他线程在操作, 阻塞 if (leader != null) available.await(); else { // 否则将 leader 设置为当前线程, 独占 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 超时阻塞 available.awaitNanos(delay); } finally { // 释放 leader if (leader == thisThread) leader = null; } } } } } finally { // 唤醒阻塞线程 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
首先是获取对首元素, 如果对首元素的延时时间 delay <= 0 , 则可以出对了, 直接 return 即可. 否则设置 first = null, 这里设置为 null 的主要目的是为了避免内存泄漏. 如果 leader != null 则表示当前有线程占用, 则阻塞, 否则设置 leader 为当前线程, 然后调用 awaitNanos() 方法超时等待.
first = null
这里为什么如果不设置 first = null, 则会引起内存泄漏呢? 线程 A 到达, 列首元素没有到期, 设置 leader = 线程 A, 这是线程 B 来了因为 leader != null, 则会阻塞, 线程 C 一样. 假如线程阻塞完毕了, 获取列首元素成功, 出列. 这个时候列首元素应该会被回收掉, 但是问题是它还被线程 B, 线程 C 持有着, 所以不会回收, 这里只有两个线程, 如果有线程 D, 线程 E... 呢? 这样会无限期的不能回收, 就会造成内存泄漏.
这个入队, 出对过程和其他的阻塞队列没有很大区别, 无非是在出对的时候增加了一个到期时间的判断. 同时通过 leader 来减少不必要阻塞.
来源: https://juejin.im/entry/5b154162e51d4506a020803b