1. 概述
建议前置阅读内容:
《RocketMQ 源码分析 -- Message 发送与接收》
《RocketMQ 源码分析 -- Message 拉取与消费(下)》
为什么把定时消息与消息重试放在一起? 你猜.
你猜我猜不猜.
2. 定时消息
定时消息是指消息发到 Broker 后, 不能立刻被 Consumer 消费, 要到特定的时间点或者等待特定的时间后才能被消费.
下图是定时消息的处理逻辑图:
2.1 延迟级别
RocketMQ 目前只支持固定精度的定时消息. 官方说法如下:
如果要支持任意的时间精度, 在 Broker 层面, 必须要做消息排序, 如果再涉及到持久化, 那么消息排序要不可避免的产生巨大性能开销.
延迟级别:
延迟级别 | 时间 |
---|---|
1 | 1s |
2 | 5s |
3 | 10s |
4 | 30s |
5 | 1m |
6 | 2m |
7 | 3m |
8 | 4m |
9 | 5m |
10 | 6m |
11 | 7m |
12 | 8m |
13 | 9m |
14 | 10m |
15 | 20m |
16 | 30m |
17 | 1h |
18 | 2h |
核心源码如下:
- // [MessageStoreConfig.java]
- /**
- * 消息延迟级别字符串配置
- */
- private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
- // [ScheduleMessageService.java]
- /**
- * 解析延迟级别
- *
- * @return 是否解析成功
- */
- public boolean parseDelayLevel() {
- HashMap<String, Long> timeUnitTable = new HashMap<>();
- timeUnitTable.put("s", 1000L);
- timeUnitTable.put("m", 1000L * 60);
- timeUnitTable.put("h", 1000L * 60 * 60);
- timeUnitTable.put("d", 1000L * 60 * 60 * 24);
- String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
- try {
- String[] levelArray = levelString.split(" ");
- for (int i = 0; i <levelArray.length; i++) {
- String value = levelArray[i];
- String ch = value.substring(value.length() - 1);
- Long tu = timeUnitTable.get(ch);
- int level = i + 1;
- if (level> this.maxDelayLevel) {
- this.maxDelayLevel = level;
- }
- long num = Long.parseLong(value.substring(0, value.length() - 1));
- long delayTimeMillis = tu * num;
- this.delayLevelTable.put(level, delayTimeMillis);
- }
- } catch (Exception e) {
- log.error("parseDelayLevel exception", e);
- log.info("levelString String = {}", levelString);
- return false;
- }
- return true;
- }
2.2 Producer 发送定时消息
发送时, 设置消息的延迟级别.
- Message msg = new Message(...);
- msg.setDelayTimeLevel(level);
2.3 Broker 存储定时消息
存储消息时, 延迟消息进入 Topic 为
- SCHEDULE_TOPIC_XXXX
- .
延迟级别 与 消息队列编号 做固定映射: QueueId = DelayLevel - 1.
核心代码如下:
- // [CommitLog.java]
- /**
- * 添加消息, 返回消息结果
- *
- * @param msg 消息
- * @return 结果
- */
- public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
- // ....(省略代码)
- // 定时消息处理
- final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
- if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//
- || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
- // Delay Delivery
- if (msg.getDelayTimeLevel()> 0) {
- if (msg.getDelayTimeLevel()> this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
- msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
- }
- // 存储消息时, 延迟消息进入 `Topic` 为 `SCHEDULE_TOPIC_XXXX` .
- topic = ScheduleMessageService.SCHEDULE_TOPIC;
- // 延迟级别 与 消息队列编号 做固定映射
- queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
- // Backup real topic, queueId
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
- msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
- msg.setTopic(topic);
- msg.setQueueId(queueId);
- }
- }
- // ....(省略代码)
- }
- // [ScheduleMessageService.java]
- /**
- * 根据 延迟级别 计算 消息队列编号
- * QueueId = DelayLevel - 1
- *
- * @param delayLevel 延迟级别
- * @return 消息队列编号
- */
- public static int delayLevel2QueueId(final int delayLevel) {
- return delayLevel - 1;
- }
生成 ConsumeQueue 时, 每条消息的 tagsCode 使用[消息计划消费时间] . 这样,
ScheduleMessageService
在轮询 ConsumeQueue 时, 可以使用 tagsCode 进行过滤.
核心代码如下:
- // [CommitLog.java]
- /**
- * check the message and returns the message size
- *
- * @return 0 Come the end of the file //>0 Normal messages // -1 Message checksum failure
- */
- public DispatchRequest checkMessageAndReturnSize(ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {
- try {
- // // ....(省略代码)
- // 17 properties
- short propertiesLength = byteBuffer.getShort();
- if (propertiesLength> 0) {
- // ....(省略代码)
- String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
- if (tags != null && tags.length()> 0) {
- tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
- }
- // Timing message processing
- {
- String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
- if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
- int delayLevel = Integer.parseInt(t);
- if (delayLevel> this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
- delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
- }
- if (delayLevel> 0) {
- tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
- storeTimestamp);
- }
- }
- }
- }
- // ....(省略代码)
- return new DispatchRequest(//
- topic, // 1
- queueId, // 2
- physicOffset, // 3
- totalSize, // 4
- tagsCode, // 5
- storeTimestamp, // 6
- queueOffset, // 7
- keys, // 8
- uniqKey, //9
- sysFlag, // 9
- preparedTransactionOffset// 10
- );
- } catch (Exception e) {
- }
- return new DispatchRequest(-1, false /* success */);
- }
- // [ScheduleMessageService.java]
- /**
- * 计算 投递时间[计划消费时间]
- *
- * @param delayLevel 延迟级别
- * @param storeTimestamp 存储时间
- * @return 投递时间[计划消费时间]
- */
- public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
- Long time = this.delayLevelTable.get(delayLevel);
- if (time != null) {
- return time + storeTimestamp;
- }
- return storeTimestamp + 1000;
- }
2.4 Broker 发送定时消息
对
SCHEDULE_TOPIC_XXXX
每条消费队列对应单独一个定时任务进行轮询, 发送 到达投递时间[计划消费时间] 的消息.
下图是发送定时消息的处理逻辑图:
实现代码如下:
- /**
- * 发送 (投递) 延迟消息定时任务
- */
- class DeliverDelayedMessageTimerTask extends TimerTask {
- /**
- * 延迟级别
- */
- private final int delayLevel;
- /**
- * 位置
- */
- private final long offset;
- public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
- this.delayLevel = delayLevel;
- this.offset = offset;
- }
- @Override
- public void run() {
- try {
- this.executeOnTimeup();
- } catch (Exception e) {
- // XXX: warn and notify me
- log.error("ScheduleMessageService, executeOnTimeup exception", e);
- ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
- this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
- }
- }
- /**
- * 纠正可投递时间.
- * 因为发送级别对应的发送间隔可以调整, 如果超过当前间隔, 则修正成当前配置, 避免后面的消息无法发送.
- *
- * @param now 当前时间
- * @param deliverTimestamp 投递时间
- * @return 纠正结果
- */
- private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
- long result = deliverTimestamp;
- long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
- if (deliverTimestamp> maxTimestamp) {
- result = now;
- }
- return result;
- }
- public void executeOnTimeup() {
- ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
- long failScheduleOffset = offset;
- if (cq != null) {
- SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
- if (bufferCQ != null) {
- try {
- long nextOffset = offset;
- int i = 0;
- for (; i <bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
- long offsetPy = bufferCQ.getByteBuffer().getLong();
- int sizePy = bufferCQ.getByteBuffer().getInt();
- long tagsCode = bufferCQ.getByteBuffer().getLong();
- long now = System.currentTimeMillis();
- long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
- nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
- long countdown = deliverTimestamp - now;
- if (countdown <= 0) { // 消息到达可发送时间
- MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
- if (msgExt != null) {
- try {
- // 发送消息
- MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
- PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore.putMessage(msgInner);
- if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { // 发送成功
- continue;
- } else { // 发送失败
- // XXX: warn and notify me
- log.error("ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId());
- // 安排下一次任务
- ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD);
- // 更新进度
- ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
- return;
- }
- } catch (Exception e) {
- // XXX: warn and notify me
- log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
- + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e);
- }
- }
- } else {
- // 安排下一次任务
- ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
- // 更新进度
- ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
- return;
- }
- } // end of for
- nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
- // 安排下一次任务
- ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
- // 更新进度
- ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
- return;
- } finally {
- bufferCQ.release();
- }
- } // end of if (bufferCQ != null)
- else { // 消费队列已经被删除部分, 跳转到最小的消费进度
- long cqMinOffset = cq.getMinOffsetInQueue();
- if (offset < cqMinOffset) {
- failScheduleOffset = cqMinOffset;
- log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
- + cqMinOffset + ", queueId=" + cq.getQueueId());
- }
- }
- } // end of if (cq != null)
- ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);
- }
- /**
- * 设置消息内容
- *
- * @param msgExt 消息
- * @return 消息
- */
- private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
- MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
- msgInner.setBody(msgExt.getBody());
- msgInner.setFlag(msgExt.getFlag());
- MessageAccessor.setProperties(msgInner, msgExt.getProperties());
- TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
- long tagsCodeValue =
- MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
- msgInner.setTagsCode(tagsCodeValue);
- msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
- msgInner.setSysFlag(msgExt.getSysFlag());
- msgInner.setBornTimestamp(msgExt.getBornTimestamp());
- msgInner.setBornHost(msgExt.getBornHost());
- msgInner.setStoreHost(msgExt.getStoreHost());
- msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
- msgInner.setWaitStoreMsgOK(false);
- MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
- msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
- String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
- int queueId = Integer.parseInt(queueIdStr);
- msgInner.setQueueId(queueId);
- return msgInner;
- }
- }
2.5 Broker 持久化定时发送进度
定时消息发送进度存储在文件(
../config/delayOffset.JSON
)里
每 10s 定时持久化发送进度.
核心代码如下:
- // [ScheduleMessageService.java]
- /**
- public void start() {
- // 定时发送消息
- for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
- Integer level = entry.getKey();
- Long timeDelay = entry.getValue();
- Long offset = this.offsetTable.get(level);
- if (null == offset) {
- offset = 0L;
- }
- if (timeDelay != null) {
- this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
- }
- }
- // 定时持久化发送进度
- this.timer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- try {
- ScheduleMessageService.this.persist();
- } catch (Exception e) {
- log.error("scheduleAtFixedRate flush exception", e);
- }
- }
- }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
- }
- 3. 消息重试
- Consumer 消费消息失败后, 要提供一种重试机制, 令消息再消费一次.
- Consumer 将消费失败的消息发回 Broker, 进入延迟消息队列. 即, 消费失败的消息, 不会立即消费.
- 核心代码如下:
- // [SendMessageProcessor.java]
- /**
- * 消费者发回消息
- *
- * @param ctx ctx
- * @param request 请求
- * @return 响应
- * @throws RemotingCommandException 当远程调用异常
- */
- private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
- throws RemotingCommandException {
- // ....(省略代码)
- // 处理 delayLevel(独有).
- int delayLevel = requestHeader.getDelayLevel();
- int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
- if (request.getVersion()>= MQVersion.Version.V3_4_9.ordinal()) {
- maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
- }
- if (msgExt.getReconsumeTimes()>= maxReconsumeTimes//
- // ....(省略代码)
- } else {
- if (0 == delayLevel) {
- delayLevel = 3 + msgExt.getReconsumeTimes();
- }
- msgExt.setDelayTimeLevel(delayLevel);
- }
- // ....(省略代码)
- return response;
- }
来源: https://juejin.im/entry/5bcf9a03e51d457ab729cad1