零, 时间轮定义
简单说说时间轮吧, 它是一个高效的延时队列, 或者说定时器. 实际上现在网上对于时间轮算法的解释很多, 定义也很全, 这里引用一下朱小厮博客里出现的定义:
参考下图, Kafka 中的时间轮 (TimingWheel) 是一个存储定时任务的环形队列, 底层采用数组实现, 数组中的每个元素可以存放一个定时任务列表(TimerTaskList).TimerTaskList 是一个环形的双向链表, 链表中的每一项表示的都是定时任务项(TimerTaskEntry), 其中封装了真正的定时任务 TimerTask.
如果你理解了上面的定义, 那么就不必往下看了. 但如果你第一次看到和我一样懵比, 并且有不少疑问, 那么这篇博文将带你进一步了解时间轮, 甚至理解时间轮算法.
如果有兴趣, 可以去看看其他的定时器 你真的了解延时队列吗. 博主认为, 时间轮定时器最大的优点:
是任务的添加与移除, 都是 O(1)级的复杂度;
不会占用大量的资源;
只需要有一个线程去推进时间轮就可以工作了.
我们将对时间轮做层层推进的解析:
一, 为什么使用环形队列
假设我们现在有一个很大的数组, 专门用于存放延时任务. 它的精度达到了毫秒级! 那么我们的延迟任务实际上需要将定时的那个时间简单转换为毫秒即可, 然后将定时任务存入其中:
比如说当前的时间是 2018/10/24 19:43:45, 那么就将任务存入 Task[1540381425000],value 则是定时任务的内容.
private Task[很长] tasks;
- public List<Task> getTaskList(long timestamp) {
- return task.get(timestamp)
- }
- // 假装这里真的能一毫秒一个循环
- public void run(){
- while (true){
getTaskList(System.currentTimeMillis()). 后台执行()
- Thread.sleep(1);
- }
- }
假如这个数组长度达到了亿亿级, 我们确实可以这么干. 那如果将精度缩减到秒级呢? 我们也需要一个百亿级长度的数组.
先不说内存够不够, 显然你的定时器要这么大的内存显然很浪费.
当然如果我们自己写一个 map, 并保证它不存在 hash 冲突问题, 那也是完全可行的.(我不确定我的想法是否正确, 如果错误, 请指出)
- /* 一个精度为秒级的延时任务管理类 */
- private Map<Long, Task> taskMap;
- public List<Task> getTaskList(long timestamp) {
- return taskMap.get(timestamp - timestamp % 1000)
- }
- // 新增一个任务
- public void addTask(long timestamp, Task task) {
- List<Task> taskList = getTaskList(timestamp - timestamp % 1000);
- if (taskList == null){
- taskList = new ArrayList();
- }
- taskList.add(task);
- }
- // 假装这里真的能一秒一个循环
- public void run(){
- while (true){
getTaskList(System.currentTimeMillis()). 后台执行()
- Thread.sleep(1000);
- }
- }
其实时间轮就是一个不存在 hash 冲突的数据结构
抛开其他疑问, 我们看看手腕上的手表(如果没有去找个钟表, 或者想象一个), 是不是无论当前是什么时间, 总能用我们的表盘去表示它(忽略精度)
就拿秒表来说, 它总是落在 0 - 59 秒, 每走一圈, 又会重新开始.
用伪代码模拟一下我们这个秒表:
- private Bucket[60] buckets;// 表示 60 秒
- public void addTask(long timestamp, Task task) {
- Bucket bucket = buckets[timestamp / 1000 % 60];
- bucket.add(task);
- }
- public Bucket getBucket(long timestamp) {
- return buckets[timestamp / 1000 % 60];
- }
- // 假装这里真的能一秒一个循环
- public void run(){
- while (true){
getBucket(System.currentTimeMillis()). 后台执行()
- Thread.sleep(1000);
- }
- }
这样, 我们的时间总能落在 0 - 59 任意一个 bucket 上, 就如同我们的秒钟总是落在 0 - 59 刻度上一样, 这便是时间轮的环形队列.
二, 表示的时间有限
但是细心的小伙伴也会发现这么一个问题: 如果只能表示 60 秒内的定时任务应该怎么存储与取出, 那是不是太有局限性了? 如果想要加入一小时后的延迟任务, 该怎么办?
其实还是可以看一看钟表, 对于只有三个指针的表 (一般的表) 来说, 最大能表示 12 个小时, 超过了 12 小时这个范围, 时间就会产生歧义. 如果我们加多几个指针呢? 比如说我们有秒针, 分针, 时针, 上下午针, 天针, 月针, 年针...... 那不就能表示很长很长的一段时间了? 而且, 它并不需要占用很大的内存.
比如说秒针我们可以用一个长度为 60 的数组来表示, 分针也同样可以用一个长度为 60 的数组来表示, 时针可以用一个长度为 24 的数组来表示. 那么表示一天内的所有时间, 只需要三个数组即可.
动手来做吧, 我们将这个数据结构称作时间轮, tickMs 表示一个刻度, 比如说上面说的一秒. wheelSize 表示一圈有多少个刻度, 即上面说的 60.interval 表示一圈能表示多少时间, 即 tickMs * wheelSize = 60 秒.
overflowWheel 表示上一层的时间轮, 比如说, 对于秒钟来说, overflowWheel 就表示分钟, 以此类推.
- public class TimeWheel {
- /** 一个时间槽的时间 */
- private long tickMs;
- /** 时间轮大小 */
- private int wheelSize;
- /** 时间跨度 */
- private long interval;
- /** 槽 */
- private Bucket[] buckets;
- /** 时间轮指针 */
- private long currentTimestamp;
- /** 上层时间轮 */
- private volatile TimeWheel overflowWheel;
- public TimeWheel(long tickMs, int wheelSize, long currentTimestamp) {
- this.currentTimestamp = currentTimestamp;
- this.tickMs = tickMs;
- this.wheelSize = wheelSize;
- this.interval = tickMs * wheelSize;
- this.buckets = new Bucket[wheelSize];
- this.currentTimestamp = currentTimestamp - (currentTimestamp % tickMs);
- for (int i = 0; i <wheelSize; i++) {
- buckets[i] = new Bucket();
- }
- }
- }
将任务添加到时间轮中十分简单, 对于每个时间轮来说, 比如说秒级时间轮, 和分级时间轮, 都有它自己的过期槽. 也就是 delayMs < tickMs 的时候.
添加延时任务的时候一共就这几种情况:
#### 一, 时间到期
1)比如说有一个任务要在 16:29:07 执行, 从秒级时间轮中来看, 当我们的当前时间走到 16:29:06 的时候, 则表示这个任务已经过期了. 因为它的 delayMs = 1000ms, 小于了我们的秒级时间轮的 tickMs(1000ms).
比如说有一个任务要在 16:41:25 执行, 从分级时间轮中来看, 当我们的当前时间走到 16:41 的时候(分级时间轮没有秒针! 它的最小精度是分钟(一定要理解这一点)), 则表示这个任务已经到期, 因为它的 delayMs = 25000ms, 小于了我们的分级时间轮的 tickMs(60000ms).
二, 时间未到期, 且 delayMs 小于 interval.
对于秒级时间轮来说, 就是延迟时间小于 60s, 那么肯定能找到一个秒钟槽扔进去.
三, 时间未到期, 且 delayMs 大于 interval.
对于妙级时间轮来说, 就是延迟时间大于等于 60s, 这时候就需要借助上层时间轮的力量了, 很简单的代码实现, 就是拿到上层时间轮, 然后类似递归一样, 把它扔进去.
比如说一个有一个延时为一年后的定时任务, 就会在这个递归中不断创建更上层的时间轮, 直到找到满足 delayMs 小于 interval 的那个时间轮.
这里为了不把代码写的那么复杂, 我们每一层时间轮的刻度都一样, 也就是秒级时间轮表示 60 秒, 上面则表示 60 分钟, 再上面则表示 60 小时, 再上层则表示 60 个 60 小时, 再上层则表示 60 个 60 个 60 小时 = 216000 小时.
也就是如果将最底层时间轮的 tickMs(精度)设置为 1000ms.wheelSize 设置为 60. 那么只需要 5 层时间轮, 可表示的时间跨度已经长达 24 年(216000 小时).
- /**
- * 添加任务到某个时间轮
- */
- public boolean addTask(TimedTask timedTask) {
- long expireTimestamp = timedTask.getExpireTimestamp();
- long delayMs = expireTimestamp - currentTimestamp;
- if (delayMs < tickMs) {// 到期了
- return false;
- } else {
- // 扔进当前时间轮的某个槽中, 只有时间[大于某个槽] , 才会放进去
- if (delayMs < interval) {
- int bucketIndex = (int) (((delayMs + currentTimestamp) / tickMs) % wheelSize);
- Bucket bucket = buckets[bucketIndex];
- bucket.addTask(timedTask);
- } else {
- // 当 maybeInThisBucket 大于等于 wheelSize 时, 需要将它扔到上一层的时间轮
- TimeWheel timeWheel = getOverflowWheel();
- timeWheel.addTask(timedTask);
- }
- }
- return true;
- }
- /**
- * 获取或创建一个上层时间轮
- */
- private TimeWheel getOverflowWheel() {
- if (overflowWheel == null) {
- synchronized (this) {
- if (overflowWheel == null) {
- overflowWheel = new TimeWheel(interval, wheelSize, currentTimestamp, delayQueue);
- }
- }
- }
- return overflowWheel;
- }
当然我们的时间轮还需要一个指针的推进机制, 总不能让时间永远停留在当前吧? 推进的时候, 同时类似递归, 去推进一下上一层的时间轮.
注意: 要强调一点的是, 我们这个时间轮更像是电子表, 它不存在时间的中间状态, 也就是精度这个概念一定要理解好. 比如说, 对于秒级时间轮来说, 它的精度只能保证到 1 秒, 小于 1 秒的, 都会当成是已到期
对于分级时间轮来说, 它的精度只能保证到 1 分, 小于 1 分的, 都会当成是已到期
- /**
- * 尝试推进一下指针
- */
- public void advanceClock(long timestamp) {
- if (timestamp>= currentTimestamp + tickMs) {
- currentTimestamp = timestamp - (timestamp % tickMs);
- if (overflowWheel != null) {
- this.getOverflowWheel()
- .advanceClock(timestamp);
- }
- }
- }
三, 对于高层时间轮来说, 精度越来越不准, 会不会有影响?
上面说到, 分级时间轮, 精度只有分钟级, 总不能延迟 1 秒的定时任务和延迟 59 秒的定时任务同时执行吧?
有这个疑问的同学很好! 实际上很好解决, 只需再入时间轮即可. 比如说, 对于分钟级时间轮来说, delayMs 为 1 秒和 delayMs 为 59 秒的都已经过期, 我们将其取出, 再扔进底层的时间轮不就可以了?
1 秒的会被扔到秒级时间轮的下一个执行槽中, 而 59 秒的会被扔到秒级时间轮的后 59 个时间槽中.
细心的同学会发现, 我们的添加任务方法, 返回的是一个 bool
public boolean addTask(TimedTask timedTask)
再倒回去好好看看, 添加到最底层时间轮失败的(我们只能直接操作最底层的时间轮, 不能直接操作上层的时间轮), 是不是会直接返回 flase? 对于再入失败的任务, 我们直接执行即可.
- /**
- * 将任务添加到时间轮
- */
- public void addOrSubmitTask(TimedTask timedTask) {
- if (!timeWheel.addTask(timedTask)) {
- taskExecutor.submit(timedTask.getTask());
- }
- }
四, 如何知道一个任务已经过期?
记得我们将任务存储在槽中嘛? 比如说秒级时间轮中, 有 60 个槽, 那么一共有 60 个槽. 如果时间轮共有两层, 也仅仅只有 120 个槽. 我们只需将槽扔进一个 delayedQueue 之中即可.
我们轮询地从 delayedQueue 取出已经过期的槽即可.(前面的所有代码, 为了简单说明, 并没有引入这个 DelayQueue 的概念, 所以不用去上面翻了, 并没有. 博主觉得... 已经看到这里了, 应该很明白这个 DelayQueue 的意义了.)
其实简单来说, 实际上定时任务单单使用 DelayQueue 来实现, 也是可以的, 但是一旦任务的数量多了起来, 达到了百万级, 千万级, 针对这个 delayQueue 的增删, 将非常的慢.
** 一, 面向槽的 delayQueue**
而对于时间轮来说, 它只需要往 delayQueue 里面扔各种槽即可, 比如我们的定时任务长短不一, 最长的跨度到了 24 年, 这个 delayQueue 也仅仅只有 300 个元素.
** 二, 处理过期的槽 **
而这个槽到期后, 也就是被我们从 delayQueue 中 poll 出来后, 我们只需要将槽中的所有任务循环一次, 重新加到新的槽中 (添加失败则直接执行) 即可.
- /**
- * 推进一下时间轮的指针, 并且将 delayQueue 中的任务取出来再重新扔进去
- */
- public void advanceClock(long timeout) {
- try {
- Bucket bucket = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
- if (bucket != null) {
- timeWheel.advanceClock(bucket.getExpire());
- bucket.flush(this::addTask);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
完整的时间轮 GitHub, 其实就是半抄半自己撸的 Kafka 时间轮简化版 https://github.com/anurnomeru/Solution Timer#main 中模拟了六百万个简单的延时任务, 执行的效率很高 ~
来源: https://juejin.im/entry/5bd6c3a1e51d4564211b7be5