RocketMQ 简介
RocketMQ 是一款开源的分布式消息系统, 基于高可用分布式集群技术, 提供低延时的, 高可靠, 万亿级容量, 灵活可伸缩的消息发布与订阅服务.
它前身是 MetaQ, 是阿里基于 Kafka 的设计使用 Java 进行自主研发的. 在 2012 年, 阿里将其开源, 在 2016 年, 阿里将其捐献给 Apache 软件基金会 (Apache Software Foundation, 简称为 ASF), 正式成为孵化项目. 2017 年, Apache 软件基金会宣布 RocketMQ 已孵化成为 Apache 顶级项目 (Top Level Project, 简称为 TLP ), 是国内首个互联网中间件在 Apache 上的顶级项目.
延迟消息
生产者把消息发送到消息队列中以后, 并不期望被立即消费, 而是等待指定时间后才可以被消费者消费, 这类消息通常被称为延迟消息.
在 RocketMQ 中, 支持延迟消息, 但是不支持任意时间精度的延迟消息, 只支持特定级别的延迟消息. 如果要支持任意时间精度, 不能避免在 Broker 层面做消息排序, 再涉及到持久化的考量, 那么消息排序就不可避免产生巨大的性能开销.
消息延迟级别分别为 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h, 共 18 个级别. 在发送消息时, 设置消息延迟级别即可, 设置消息延迟级别时有以下 3 种情况:
设置消息延迟级别等于 0 时, 则该消息为非延迟消息.
设置消息延迟级别大于等于 1 并且小于等于 18 时, 消息延迟特定时间, 如: 设置消息延迟级别等于 1, 则延迟 1s; 设置消息延迟级别等于 2, 则延迟 5s, 以此类推.
设置消息延迟级别大于 18 时, 则该消息延迟级别为 18, 如: 设置消息延迟级别等于 20, 则延迟 2h.
文章持续更新, 微信搜索「万猫学社第一时间阅读, 关注后回复「电子书」, 免费获取 12 本 Java 必读技术书籍.
延迟消息示例
首先, 写一个消费者, 用于消费延迟消息:
- public class Consumer {
- public static void main(String[] args) throws MQClientException {
- SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
- // 实例化消费者
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OneMoreGroup");
- // 设置 NameServer 的地址
- consumer.setNamesrvAddr("localhost:9876");
- // 订阅一个或者多个 Topic, 以及 Tag 来过滤需要消费的消息
- consumer.subscribe("OneMoreTopic", "*");
- // 注册回调实现类来处理从 broker 拉取回来的消息
- consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
- System.out.printf("%s %s Receive New Messages:%n"
- , sdf.format(new Date())
- , Thread.currentThread().getName());
- for (MessageExt msg : msgs) {
- System.out.printf("\tMsg Id: %s%n", msg.getMsgId());
- System.out.printf("\tBody: %s%n", new String(msg.getBody()));
- }
- // 标记该消息已经被成功消费
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- });
- // 启动消费者实例
- consumer.start();
- System.out.println("Consumer Started.");
- }
- }
再写一个延迟消息的生产者, 用于发送延迟消息:
- public class DelayProducer {
- public static void main(String[] args) throws Exception {
- SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
- // 实例化消息生产者 Producer
- DefaultMQProducer producer = new DefaultMQProducer("OneMoreGroup");
- // 设置 NameServer 的地址
- producer.setNamesrvAddr("localhost:9876");
- // 启动 Producer 实例
- producer.start();
- Message msg = new Message("OneMoreTopic"
- , "DelayMessage", "This is a delay message.".getBytes());
- //"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
- // 设置消息延迟级别为 3, 也就是延迟 10s.
- msg.setDelayTimeLevel(3);
- // 发送消息到一个 Broker
- SendResult sendResult = producer.send(msg);
- // 通过 sendResult 返回消息是否成功送达
- System.out.printf("%s Send Status: %s, Msg Id: %s %n"
- , sdf.format(new Date())
- , sendResult.getSendStatus()
- , sendResult.getMsgId());
- // 如果不再发送消息, 关闭 Producer 实例.
- producer.shutdown();
- }
- }
运行生产者以后, 就会发送一条延迟消息:
10:37:14.992 Send Status: SEND_OK, Msg Id: C0A8006D5AB018B4AAC216E0DB690000
10 秒钟后, 消费者收到的这条延迟消息:
- 10:37:25.026 ConsumeMessageThread_1 Receive New Messages:
- Msg Id: C0A8006D5AB018B4AAC216E0DB690000
- Body: This is a delay message.
文章持续更新, 微信搜索「万猫学社第一时间阅读, 关注后回复「电子书」, 免费获取 12 本 Java 必读技术书籍.
延迟消息的原理分析
以下分析的 RocketMQ 源码的版本号是 4.7.1, 版本不同源码略有差别.
CommitLog
在 org.apache.rocketmq.store.CommitLog 中, 针对延迟消息做了一些处理:
- // 延迟级别大于 0, 就是延时消息
- if (msg.getDelayTimeLevel()> 0) {
- // 判断当前延迟级别, 如果大于最大延迟级别,
- // 就设置当前延迟级别为最大延迟级别.
- if (msg.getDelayTimeLevel()> this.defaultMessageStore
- .getScheduleMessageService().getMaxDelayLevel()) {
- msg.setDelayTimeLevel(this.defaultMessageStore
- .getScheduleMessageService().getMaxDelayLevel());
- }
- // 获取延迟消息的主题,
- // 其中 RMQ_SYS_SCHEDULE_TOPIC 的值为 SCHEDULE_TOPIC_XXXX
- topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
- // 根据延迟级别获取延迟消息的队列 Id,
- // 队列 Id 其实就是延迟级别减 1
- queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
- // 备份真正的主题和队列 Id
- MessageAccessor.putProperty(msg
- , MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
- MessageAccessor.putProperty(msg
- , MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
- msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
- // 设置延时消息的主题和队列 Id
- msg.setTopic(topic);
- msg.setQueueId(queueId);
- }
可以看到, 每一个延迟消息的主题都被暂时更改为 SCHEDULE_TOPIC_XXXX, 并且根据延迟级别延迟消息变更了新的队列 Id. 接下来, 处理延迟消息的就是 org.apache.rocketmq.store.schedule.ScheduleMessageService.
ScheduleMessageService
ScheduleMessageService 是由 org.apache.rocketmq.store.DefaultMessageStore 进行初始化的, 初始化包括构造对象和调用 load 方法. 最后, 再执行 ScheduleMessageService 的 start 方法:
- public void start() {
- // 使用 AtomicBoolean 确保 start 方法仅有效执行一次
- if (started.compareAndSet(false, true)) {
- this.timer = new Timer("ScheduleMessageTimerThread", true);
- // 遍历所有延迟级别
- for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
- // key 为延迟级别
- Integer level = entry.getKey();
- // value 为延迟级别对应的毫秒数
- Long timeDelay = entry.getValue();
- // 根据延迟级别获得对应队列的偏移量
- Long offset = this.offsetTable.get(level);
- // 如果偏移量为 null, 则设置为 0
- if (null == offset) {
- offset = 0L;
- }
- if (timeDelay != null) {
- // 为每个延迟级别创建定时任务,
- // 第一次启动任务延迟为 FIRST_DELAY_TIME, 也就是 1 秒
- this.timer.schedule(
- new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
- }
- }
- // 延迟 10 秒后每隔 flushDelayOffsetInterval 执行一次任务,
- // 其中, flushDelayOffsetInterval 默认配置也为 10 秒
- this.timer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- try {
- // 持久化每个队列消费的偏移量
- if (started.get()) ScheduleMessageService.this.persist();
- } catch (Throwable e) {
- log.error("scheduleAtFixedRate flush exception", e);
- }
- }
- }, 10000, this.defaultMessageStore
- .getMessageStoreConfig().getFlushDelayOffsetInterval());
- }
- }
遍历所有延迟级别, 根据延迟级别获得对应队列的偏移量, 如果偏移量不存在, 则设置为 0. 然后为每个延迟级别创建定时任务, 第一次启动任务延迟为 1 秒, 第二次及以后的启动任务延迟才是延迟级别相应的延迟时间.
然后, 又创建了一个定时任务, 用于持久化每个队列消费的偏移量. 持久化的频率由 flushDelayOffsetInterval 属性进行配置, 默认为 10 秒.
文章持续更新, 微信搜索「万猫学社第一时间阅读, 关注后回复「电子书」, 免费获取 12 本 Java 必读技术书籍.
定时任务
ScheduleMessageService 的 start 方法执行之后, 每个延迟级别都创建自己的定时任务, 这里的定时任务的具体实现就在 DeliverDelayedMessageTimerTask 类之中, 它核心代码是 executeOnTimeup 方法之中, 我们来看一下主要部分:
- // 根据主题和队列 Id 获取消息队列
- ConsumeQueue cq =
- ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(
- TopicValidator.RMQ_SYS_SCHEDULE_TOPIC
- , delayLevel2QueueId(delayLevel));
如果没有获取到对应的消息队列, 则在 DELAY_FOR_A_WHILE(默认为 100) 毫秒后再执行任务. 如果获取到了, 就继续执行下面操作:
- // 根据消费偏移量从消息队列中获取所有有效消息
- SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
如果没有获取到有效消息, 则在 DELAY_FOR_A_WHILE(默认为 100) 毫秒后再执行任务. 如果获取到了, 就继续执行下面操作:
- // 遍历所有消息
- for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
- // 获取消息的物理偏移量
- long offsetPy = bufferCQ.getByteBuffer().getLong();
- // 获取消息的物理长度
- int sizePy = bufferCQ.getByteBuffer().getInt();
- long tagsCode = bufferCQ.getByteBuffer().getLong();
- // 省略部分代码...
- long now = System.currentTimeMillis();
- // 计算消息应该被消费的时间
- long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
- // 计算下一条消息的偏移量
- nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE)
- long countdown = deliverTimestamp - now;
- // 省略部分代码...
- }
如果当前消息不到消费的时间, 则在 countdown 毫秒后再执行任务. 如果到消费的时间, 就继续执行下面操作:
- // 根据消息的物理偏移量和大小获取消息
- MessageExt msgExt =
- ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
- offsetPy, sizePy);
如果获取到消息, 则继续执行下面操作:
- // 重新构建新的消息, 包括:
- // 1. 清除消息的延迟级别
- // 2. 恢复真正的消息主题和队列 Id
- MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
- if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
- log.error("[BUG] the real topic of schedule msg is {},"
- + "discard the msg. msg={}",
- msgInner.getTopic(), msgInner);
- continue;
- }
- // 重新把消息发送到真正的消息队列上
- PutMessageResult putMessageResult =
- ScheduleMessageService.this.writeMessageStore
- .putMessage(msgInner);
清除了消息的延迟级别, 并且恢复了真正的消息主题和队列 Id, 重新把消息发送到真正的消息队列上以后, 消费者就可以立即消费了.
文章持续更新, 微信搜索「万猫学社第一时间阅读, 关注后回复「电子书」, 免费获取 12 本 Java 必读技术书籍.
总结
经过以上对源码的分析, 可以总结出延迟消息的实现步骤:
如果消息的延迟级别大于 0, 则表示该消息为延迟消息, 修改该消息的主题为 SCHEDULE_TOPIC_XXXX, 队列 Id 为延迟级别减 1.
消息进入 SCHEDULE_TOPIC_XXXX 的队列中.
定时任务根据上次拉取的偏移量不断从队列中取出所有消息.
根据消息的物理偏移量和大小再次获取消息.
根据消息属性重新创建消息, 清除延迟级别, 恢复原主题和队列 Id.
重新发送消息到原主题的队列中, 供消费者进行消费.
来源: https://www.cnblogs.com/heihaozi/p/13259125.html