上一节消息重试里面提到了重试的消息可以被延时消费, 其实除此之外, 用户发送的消息也可以指定延时时间(更准确的说是延时等级), 然后在指定延时时间之后投递消息, 然后被 consumer 消费. 阿里云的 ons 还支持定时消息, 而且延时消息是直接指定延时时间, 其实阿里云的延时消息也是定时消息的另一种表述方式, 都是通过设置消息被投递的时间来实现的, 但是 Apache RocketMQ 在版本 4.2.0 中尚不支持指定时间的延时, 只能通过配置延时等级和延时等级对应的时间来实现延时.
一个延时消息被发出到消费成功经历以下几个过程:
设置消息的延时级别 delayLevel
producer 发送消息
broker 收到消息在准备将消息写入存储的时候, 判断是延时消息则更改 Message 的 topic 为延时消息队列的 topic, 也就是将消息投递到延时消息队列
有定时任务从延时队列中读取消息, 拿到消息后判断是否达到延时时间, 如果到了则修改 topic 为原始 topic. 并将消息投递到原始 topic 的队列
consumer 像消费其他消息一样从 broker 拉取消息进行消费
注意: 批量消息是不支持延时消息的
tips: 下文中说到的延时队列可以理解为一个 ConsumeQueue
producer 发送延时消息
在 producer 中发送消息的时候, 设置 Message 的 delayLevel
- // org.apache.rocketmq.common.message.Message
- public void setDelayTimeLevel(int level) {
- this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
- }
调用上面的方法设置延时等级的时候, 会向 message 添加 "DELAY" 属性, 后面 broker 处理延时消息就是依赖该属性进行特别的处理.
接下来发送消息的流程和正常发送消息的流程基本一致, 只是会将该消息标记为延时消息类型
- // org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
- if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
- context.setMsgType(MessageType.Delay_Msg);
- }
broker 处理延时消息
broker 收到延时消息和正常消息在前置的处理流程是一致的, 对于延时消息的特殊处理体现在将消息写入存储 (内存或文件) 的时候
- // org.apache.rocketmq.store.CommitLog#putMessage
- public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
- // 省略中间代码...
- StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
- // 拿到原始 topic 和对应的 queueId
- String topic = msg.getTopic();
- int queueId = msg.getQueueId();
- final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
- // 非事务消息和事务的 commit 消息才会进一步判断 delayLevel
- if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
- || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
- // Delay Delivery
- if (msg.getDelayTimeLevel()> 0) {
- // 纠正设置过大的 level, 就是 delayLevel 设置都大于延时时间等级的最大级
- if (msg.getDelayTimeLevel()> this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
- msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
- }
- // 设置为延时队列的 topic
- topic = ScheduleMessageService.SCHEDULE_TOPIC;
- // 每一个延时等级一个 queue,queueId = delayLevel - 1
- queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
- // Backup real topic, queueId
- // 备份原始的 topic 和 queueId
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
- // 更新 properties
- msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
- msg.setTopic(topic);
- msg.setQueueId(queueId);
- }
- }
- // 省略中间代码...
- }
上面的 SCHEDULE_TOPIC 是:
public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
这个 topic 是一个特殊的 topic, 和正常的 topic 不同的地方是:
不会创建 TopicConfig, 因为也不需要 consumer 直接消费这个 topic 下的消息
不会将 topic 注册到 namesrv
这个 topic 的队列个数和延时等级的个数是相同的
后面消息写入的过程和普通的又是一致的.
上面将消息写入延时队列中了, 接下来就是处理延时队列中的消息, 然后重新发送回原始 topic 的队列中.
在此之前先说明下至今还有疑问的一个个概念 --delayLevel. 这个概念和我们接下要需要用到的的类 org.apache.rocketmq.store.schedule.ScheduleMessageService 有关, 这个类的字段 delayLevelTable 里面保存了具体的延时等级
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);
看下这个字段的初始化过程
- // org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel
- public boolean parseDelayLevel() {
- HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
- // 每个延时等级延时时间的单位对应的 ms 数
- timeUnitTable.put("s", 1000L);
- timeUnitTable.put("m", 1000L * 60);
- timeUnitTable.put("h", 1000L * 60 * 60);
- timeUnitTable.put("d", 1000L * 60 * 60 * 24);
- // 延时等级在 MessageStoreConfig 中配置
- // private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
- String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
- try {
- // 根据空格将配置分隔出每个等级
- String[] levelArray = levelString.split(" ");
- for (int i = 0; i <levelArray.length; i++) {
- String value = levelArray[i];
- String ch = value.substring(value.length() - 1);
- // 时间单位对应的 ms 数
- Long tu = timeUnitTable.get(ch);
- // 延时等级从 1 开始
- int level = i + 1;
- if (level> this.maxDelayLevel) {
- // 找出最大的延时等级
- this.maxDelayLevel = level;
- }
- long num = Long.parseLong(value.substring(0, value.length() - 1));
- long delayTimeMillis = tu * num;
- this.delayLevelTable.put(level, delayTimeMillis);
- // 省略部分代码...
- }
上面这个 load 方法在 broker 启动的时候 DefaultMessageStore 会调用来初始化延时等级.
接下来就应该解决怎么处理延时消息队列中的消息的问题了. 处理延时消息的服务是: ScheduleMessageService.
还是 broker 启动的时候 DefaultMessageStore 会调用 org.apache.rocketmq.store.schedule.ScheduleMessageService#start 来启动处理延时消息队列的服务:
- public void start() {
- for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
- Integer level = entry.getKey();
- Long timeDelay = entry.getValue();
- // 记录队列的处理进度
- Long offset = this.offsetTable.get(level);
- if (null == offset) {
- offset = 0L;
- }
- if (timeDelay != null) {
- // 每个延时队列启动一个定时任务来处理该队列的延时消息
- this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
- }
- }
- this.timer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- try {
- // 持久化 offsetTable(保存了每个延时队列对应的处理进度 offset)
- ScheduleMessageService.this.persist();
- } catch (Throwable e) {
- log.error("scheduleAtFixedRate flush exception", e);
- }
- }
- }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
- }
DeliverDelayedMessageTimerTask 是一个 TimerTask, 启动以后不断处理延时队列中的消息, 直到出现异常则终止该线程重新启动一个新的 TimerTask
- public void executeOnTimeup() {
- // 找到该延时等级对应的 ConsumeQueue
- ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
- delayLevel2QueueId(delayLevel));
- // 记录异常情况下一次启动 TimerTask 开始处理的 offset
- long failScheduleOffset = offset;
- if (cq != null) {
- // 找到 offset 所处的 MappedFile 中 offset 后面的 buffer
- SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
- if (bufferCQ != null) {
- try {
- long nextOffset = offset;
- int i = 0;
- ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
- for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
- // 下面三个字段信息是 ConsumeQueue 物理存储的信息
- long offsetPy = bufferCQ.getByteBuffer().getLong();
- int sizePy = bufferCQ.getByteBuffer().getInt();
- // 注意这个 tagCode, 不再是普通的 tag 的 hashCode, 而是该延时消息到期的时间
- long tagsCode = bufferCQ.getByteBuffer().getLong();
- // 省略中间代码....
- long now = System.currentTimeMillis();
- // 计算应该投递该消息的时间, 如果已经超时则立即投递
- long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
- // 计算下一个消息的开始位置, 用来寻找下一个消息位置(如果有的话)
- nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
- // 判断延时消息是否到期
- long countdown = deliverTimestamp - now;
- if (countdown <= 0) {
- MessageExt msgExt =
- ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
- offsetPy, sizePy);
- if (msgExt != null) {
- try {
- // 将消息恢复到原始消息的格式, 恢复 topic,queueId,tagCode 等, 清除属性 "DELAY"
- MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
- PutMessageResult putMessageResult =
- ScheduleMessageService.this.defaultMessageStore
- .putMessage(msgInner);
- if (putMessageResult != null
- && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
- // 投递成功, 处理下一个
- continue;
- } else {
- // XXX: warn and notify me
- log.error(
- "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
- msgExt.getTopic(), msgExt.getMsgId());
- // 投递失败, 结束当前 task, 重新启动 TimerTask, 从下一个消息开始处理, 也就是说当前消息丢弃
- // 更新 offsetTable 中当前队列的 offset 为下一个消息的 offset
- ScheduleMessageService.this.timer.schedule(
- new DeliverDelayedMessageTimerTask(this.delayLevel,
- nextOffset), DELAY_FOR_A_PERIOD);
- ScheduleMessageService.this.updateOffset(this.delayLevel,
- nextOffset);
- return;
- }
- } catch (Exception e) {
- // 重新投递期间出现任何异常, 结束当前 task, 重新启动 TimerTask, 从当前消息开始重试
- /*
- * XXX: warn and notify me
- */
- log.error(
- "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
- + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
- + offsetPy + ",sizePy=" + sizePy, e);
- }
- }
- } else {
- ScheduleMessageService.this.timer.schedule(
- new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
- countdown);
- ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
- return;
- }
- } // end of for
- // 处理完当前 MappedFile 中的消息后, 重新启动 TimerTask, 从下一个消息开始处理
- // 更新 offsetTable 中当前队列的 offset 为下一个消息的 offset
- nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
- ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
- this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
- ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
- return;
- } finally {
- bufferCQ.release();
- }
- } // end of if (bufferCQ != null)
- else {
- // 如果根据 offsetTable 中的 offset 没有找到对应的消息(可能被删除了), 则按照当前 ConsumeQueue 的最小 offset 开始处理
- long cqMinOffset = cq.getMinOffsetInQueue();
- if (offset < cqMinOffset) {
- failScheduleOffset = cqMinOffset;
- log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
- + cqMinOffset + ", queueId=" + cq.getQueueId());
- }
- }
- } // end of if (cq != null)
- ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
- failScheduleOffset), DELAY_FOR_A_WHILE);
- }
对于上面的 tagCode 做一下特别说明, 延时消息的 tagCode 和普通消息不一样:
延时消息的 tagCode: 存储的是消息到期的时间
非延时消息的 tagCode:tags 字符串的 hashCode
对延时消息的 tagCode 的特别处理是在下面这个方法中完成的, 也就是在 build ConsumeQueue 信息的时候
org.apache.rocketmq.store.CommitLog#checkMessageAndReturnSize(java.nio.ByteBuffer, boolean, boolean)
总结
以上就是 RocketMQ 延时消息的实现方式, 上面没有详说的是重试消息的延时是怎么实现的, 其实就是在 consumer 将延时消息发送回 broker 的时候设置了 (用户可以自己设置, 如果没有自己设置默认是 0)delayLevel, 到了 broker 处理重试消息的时候如果 delayLevel 是 0(也就是说是默认的延时等级) 的时候会在原来的基础上加 3, 后面的处理就和上面说的延时消息一样了, 存储的时候将消息投递到延时队列, 等待延时到期后再重新投递到原始 topic 队列中等到 consumer 消费.
来源: https://www.cnblogs.com/sunshine-2015/p/9017426.html