定时消息与延迟消息在代码配置上存在一些差异,但是最终达到的效果相同:消息在发送到 MQ 服务端后并不会立马投递,而是根据消息中的属性延迟固定时间后才投递给消费者。
上图是阿里云上对业界 MQ 功能的对比,其中开源产品中只有阿里的 RocketMQ 支持延迟消息,且是固定的 18 个 Level。
固定 Level 的含义是延迟是特定级别的,比如支持 3 秒、5 秒的 Level,那么用户只能发送 3 秒延迟或者 5 秒延迟,不能发送 8 秒延迟的消息。
消息队列 RocketMQ 的阿里云版本(收费版本)才支持到精确到秒级别的延迟消息(没有特定 Level 的限制)。
上图是 CMQ 中对 MQ 功能的对比,其中标明腾讯的 CMQ 支持延迟消息,但是没有具体写明支持到什么精度,支持任意时间还是特定的 Level。
通过腾讯云上 CMQ 的 API 文档可以看到有一个秒级别的 delaySeconds,应该是支持任意级别的延迟,即和收费版本的 RocketMQ 一致。
总结
(真是有钱能使鬼推磨啊,有钱就能发任意延迟的消息了,没钱最多只能发特定 Level 了)
开源版本没有支持任意延迟的消息,我想可能有以下几个原因:
需求不强
对支持任意延迟的需求确实不强,因为:
在我司,MQ 上线一年多后才有业务方希望我能支持延迟消息,且不要求任意延迟,只要求和 RocketMQ 开源版本一致,支持一些业务上的级别即可。
不愿意开源
为了差异化(好在云上卖钱),只能降开源版本的功能进行阉割,所以开源版本的 RocketMQ 变成了只支持特定 Level 的延迟。
难点在哪里?
既然业务有需求,我们肯定也要去支持。
首先,我们先划清楚定义和边界:
在我们的系统范围内,支持任意延迟的消息指的是:
- 精度支持到秒级别
- 最大支持 30 天的延迟
本着对自己的高要求,我们并不满足于开源 RocketMQ 的 18 个 Level 的方案。那么,如果我们自己要去实现一个支持任意延迟的消息队列,难点在哪里呢?
首先,支持任意延迟意味着消息是需要在服务端进行排序的。
比如用户先发了一条延迟 1 分钟的消息,一秒后发了一条延迟 3 秒的消息,显然延迟 3 秒的消息需要先被投递出去。那么服务端在收到消息后需要对消息进行排序后再投递出去。
在 MQ 中,为了保证可靠性,消息是需要落盘的,且对性能和延迟的要求,决定了在服务端对消息进行排序是完全不可接受的。
其次,目前 MQ 的方案中都是基于 WAL 的方式实现的(RocketMQ、Kafka),日志文件会被过期删除,一般会保留最近一段时间的数据。支持任意级别的延迟,那么需要保存最近 30 天的消息。
阿里内部 1000+ 核心应用使用,每天流转几千亿条消息,经过双 11 交易、商品等核心链路真实场景的验证,稳定可靠。
考虑一下一天几千亿的消息,保存 30 天的话需要堆多少服务器,显然是无法做到的。
虽然决定自己做,但是依旧需要先了解开源的实现,那么就只能看看 RocketMQ 开源版本中,支持 18 个 Level 是怎么实现的,希望能从中得到一些灵感。
上图是通过 RocketMQ 源码分析后简化一个实现原理方案示意图。
分为两个部分:
消息写入中:
Schedule 过程中:
回顾一下这个方案,最大的优点就是没有了排序:
但是这个方案也有一些问题:
总结 RocketMQ 的方案,通过划分 Level 的方式,将排序操作转换为了 O(1) 的 ConsumeQueue 的 append 操作。
我们去支持任意延迟的消息,必然也需要通过类似的方式避免掉排序。
此时我们想到了 TimeWheel:《Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facility 》
Netty 中也是用 TimeWheel 来优化 I/O 超时的操作。
TimeWheel 的大致原理如下:
如每次 tick 为 1 秒,ticksPerWheel 为 60,那么这就和现实中的秒针走动完全一致。
无论定时消息还是延迟消息,最终都是投递后延迟一段时间对用户可见。
假设这个延迟时间为 X 秒,那么 X%(ticksPerWheel * tick) 可以计算出 X 所属的 TimeWheel 中位置。
这里存在一个问题,以上图为例,TimeWheel 的 size 为 8,那么延迟 1 秒和 9 秒的消息都处在一个链表中。如果用户先发了延迟 9 秒的消息再发了延迟 1 秒的消息,他们在一个链表中所以延迟 1 秒的消息会需要等待延迟 9 秒的消息先投递。显然这是不能接受的,那么如何解决这个问题?
排序
显然,如果对 TimeWheel 一个 tick 中的元素进行排序显然就解决了上面的问题。但是显而易见的是排序是不可能的。
扩大时间轮
最直观的方式,我们能不能通过扩大时间轮的方式避免延迟 9 和延迟 1 落到一个 tick 位置上?
假设支持 30 天,精度为 1 秒,那么 ticksPerWheel=30 * 24 * 60 * 60,这样每一个 tick 上的延迟都是一致的,不存在上述的问题(类似于将 RocketMQ 的 Level 提升到了 30 * 24 * 60 * 60 个)。但是 TimeWheel 需要被加载到内存操作,这显然是无法接受的。
多级时间轮
单个 TimeWheel 无法支持,那么能否显示中的时针、分针的形式,构建多级时间轮来解决呢?
多级时间轮解决了上述的问题,但是又引入了新的问题:
延迟加载
多级定时轮的问题在于需要加载大量数据到内存,那么能否优化一下将这里的数据延迟加载到内存来解决内存开销的问题呢?在多级定时轮的方案中,显然对于未来一小时或者未来一天的数据可以不加载到内存,而可以只加载延迟时间临近的消息。
进一步优化,可以将数据按照固定延迟间隔划分,那么每次加载的数据量是大致相同的,不会出 tick 约大的定时轮需要加载越多的数据,那么方案如下:
基于上述的方案,那么 TimeWheel 中存储未来 30 分钟需要投递的消息的索引,索引为一个 long 型,那么数据量为:30 * 60 * 8 * TPS,相对来说内存开销是可以接受的,比如 TPS 为 1w 那么大概开销为 200M+。
之后的数据按照每 30 分钟一个块的形式写入文件,那么每个整点时的操作就是计算一下将 30 分钟的消息 Hash 到对应的 TimeWheel 上,那么排序问题就解决了。
到此为止就只剩下一个问题,如何保存 30 天的数据?
CommitLog 是有时效性的,比如在我们只保存最近 7 天的消息,过期数据将被删除。对于延迟消息,可能需要 30 天之后投递,显然是不能被删除的。
那么我们怎么保存延迟消息呢?
直观的方法就是将延迟消息从 CommitLog 中剥离出来,独立存储以保存更长的时间。
通过 DispatchService 将 WAL 中的延迟消息写入到独立的文件中。这些文件按照延迟时间组成一个链表。
链表长度为最大延迟时间 / 每个文件保存的时间长度。
那么 WAL 可以按照正常的策略进行过期删除,Delay Msg File 则在一个文件投递完之后进行删除。
唯一的问题是这里会有 Delay Msg File 带来的随机写问题,但是这个对系统整体性能不会有很大影响,在可接受范围内。
结合 TimeWheel 和 CommitLog 保存超长延迟数据的方案,加上一些优化手段,基本就完成了支持任意延迟时间的方案:
通过这个方案解决了最初提出来的任意延迟消息的两个难点:
本文从延迟消息的概念出发,了解业界的支持情况,确定延迟消息的难点和支持边界,最后通过一步步推导完成了一个相对来说从内存开销和性能上都可以满足期望的方案。
对本文有任何问题欢迎通过公公众号留言或添加我的微信交流。
来源: https://www.cnblogs.com/hzmark/p/mq-delay-msg.html