本文主要基于 RocketMQ 4.0.x 正式版
1. 概述
2. Producer 顺序发送
3. Consumer 严格顺序消费
3.1 获得 (锁定) 消息队列
3.2 移除消息队列
3.3 消费消息队列
3.1.1 消费消息
3.1.2 处理消费结果
3.13 消息处理队列核心方法
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复甚至不知道如何读源码也可以请教噢
新的源码解析文章实时收到通知每周更新一篇左右
认真的源码交流微信群
1. 概述
建议前置阅读内容:
RocketMQ 源码分析 Message 发送与接收
RocketMQ 源码分析 Message 拉取与消费(下)
当然对 Message 发送与消费已经有一定了解的同学, 可以选择跳过
RocketMQ 提供了两种顺序级别:
普通顺序消息 :Producer 将相关联的消息发送到相同的消息队列
完全严格顺序 : 在 普通顺序消息 的基础上, Consumer 严格顺序消费
绝大部分场景下只需要用到普通顺序消息
例如说: 给用户发送短信消息 + 发送推送消息, 将两条消息发送到不同的消息队列, 若其中一条消息队列消费较慢造成堵塞, 用户可能会收到两条消息会存在一定的时间差, 带来的体验会相对较差当然类似这种场景, 即使有一定的时间差, 不会产生系统逻辑上 BUG 另外, 普通顺序消息性能能更加好
那么什么时候使用使用完全严格顺序? 如下是来自官方文档的说明:
目前已知的应用只有数据库 binlog 同步强依赖严格顺序消息, 其他应用绝大部分都可以容忍短暂乱序, 推荐使用普通的顺序消息
上代码!!!
2. Producer 顺序发送
官方发送顺序消息的例子:
- package org.apache.rocketmq.example.ordermessage;
- import java.io.UnsupportedEncodingException;
- import java.util.List;
- import org.apache.rocketmq.client.exception.MQBrokerException;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.client.producer.MQProducer;
- import org.apache.rocketmq.client.producer.MessageQueueSelector;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.common.message.MessageQueue;
- import org.apache.rocketmq.remoting.common.RemotingHelper;
- import org.apache.rocketmq.remoting.exception.RemotingException;
- public class Producer {
- public static void main(String[] args) throws UnsupportedEncodingException {
- try {
- MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
- producer.start();
- String[] tags = new String[] {
- "TagA",
- "TagB",
- "TagC",
- "TagD",
- "TagE"
- };
- for (int i = 0; i < 100; i++) {
- int orderId = i % 10;
- Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
- SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Override public MessageQueue select(List mqs, Message msg, Object arg) {
- Integer id = (Integer) arg;
- int index = id % mqs.size();
- return mqs.get(index);
- }
- },
- orderId);
- System.out.printf("%s%n", sendResult);
- }
- producer.shutdown();
- } catch(MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
第 28 至 35 行 : 实现了根据 id % mqs.size() 来进行消息队列的选择当前例子, 我们传递 orderId 作为参数, 那么相同的 orderId 能够进入相同的消息队列
MessageQueueSelector
接口的源码:
- public interface MessageQueueSelector {
- /**
- * 选择消息队列
- *
- * @param mqs 消息队列
- * @param msg 消息
- * @param arg 参数
- * @return 消息队列
- */
- MessageQueue select(final List mqs, final Message msg, final Object arg);
- }
Producer 选择队列发送消息方法的源码:
- private SendResult sendSelectImpl( //
- Message msg, //
- MessageQueueSelector selector, //
- Object arg, //
- final CommunicationMode communicationMode, //
- final SendCallback sendCallback, final long timeout //
- ) throws MQClientException,
- RemotingException,
- MQBrokerException,
- InterruptedException {
- this.makeSureStateOK();
- Validators.checkMessage(msg, this.defaultMQProducer);
- TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
- if (topicPublishInfo != null && topicPublishInfo.ok()) {
- MessageQueue mq = null;
- try {
- mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
- } catch(Throwable e) {
- throw new MQClientException("select message queue throwed exception.", e);
- }
- if (mq != null) {
- return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);
- } else {
- throw new MQClientException("select message queue return null.", null);
- }
- }
- throw new MQClientException("No route info for this topic," + msg.getTopic(), null);
- }
第 30 行 : 选择消息队列
第 36 行 : 发送消息
3. Consumer 严格顺序消费
Consumer 在严格顺序消费时, 通过 三 把锁保证严格顺序消费
Broker 消息队列锁(分布式锁) :
集群模式下, Consumer 从 Broker 获得该锁后, 才能进行消息拉取消费
广播模式下, Consumer 无需该锁
Consumer 消息队列锁(本地锁) :Consumer 获得该锁才能操作消息队列
Consumer 消息处理队列消费锁(本地锁) :Consumer 获得该锁才能消费消息队列
可能同学有疑问, 为什么有 Consumer 消息队列锁还需要有 Consumer 消息队列消费锁呢? 让我们带着疑问继续往下看
3.1 获得 (锁定) 消息队列
集群模式下, Consumer 更新属于自己的消息队列时, 会向 Broker 锁定该消息队列 (广播模式下不需要) 如果锁定失败, 则更新失败, 即该消息队列不属于自己, 不能进行消费核心代码如下:
- // RebalanceImpl.java
- private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet, final boolean isOrder) {
- // ..... 此处省略部分代码
- // 增加 不在 processQueueTable && 存在于 mqSet 里的消息队列
- List pullRequestList = new ArrayList<>(); // 拉消息请求数组
- for (MessageQueue mq : mqSet) {
- if (!this.processQueueTable.containsKey(mq)) {
- if (isOrder && !this.lock(mq)) { // 顺序消息锁定消息队列
- log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
- continue;
- }
- this.removeDirtyOffset(mq);
- ProcessQueue pq = new ProcessQueue();
- long nextOffset = this.computePullFromWhere(mq);
- if (nextOffset >= 0) {
- ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
- if (pre != null) {
- log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
- } else {
- log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
- PullRequest pullRequest = new PullRequest();
- pullRequest.setConsumerGroup(consumerGroup);
- pullRequest.setNextOffset(nextOffset);
- pullRequest.setMessageQueue(mq);
- pullRequest.setProcessQueue(pq);
- pullRequestList.add(pullRequest);
- changed = true;
- }
- } else {
- log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
- }
- }
- }
- // ..... 此处省略部分代码
- }
- // RebalanceImpl.java
- /**
- * 请求 Broker 获得指定消息队列的分布式锁
- *
- * @param mq 队列
- * @return 是否成功
- */
- public boolean lock(final MessageQueue mq) {
- FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
- if (findBrokerResult != null) {
- LockBatchRequestBody requestBody = new LockBatchRequestBody();
- requestBody.setConsumerGroup(this.consumerGroup);
- requestBody.setClientId(this.mQClientFactory.getClientId());
- requestBody.getMqSet().add(mq);
- try {
- // 请求 Broker 获得指定消息队列的分布式锁
- Set lockedMq =
- this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
- // 设置消息处理队列锁定成功锁定消息队列成功, 可能本地没有消息处理队列, 设置锁定成功会在 lockAll()方法
- for (MessageQueue mmqq : lockedMq) {
- ProcessQueue processQueue = this.processQueueTable.get(mmqq);
- if (processQueue != null) {
- processQueue.setLocked(true);
- processQueue.setLastLockTimestamp(System.currentTimeMillis());
- }
- }
- boolean lockOK = lockedMq.contains(mq);
- log.info("the message queue lock {}, {} {}",
- lockOK ? "OK" : "Failed",
- this.consumerGroup,
- mq);
- return lockOK;
- } catch (Exception e) {
- log.error("lockBatchMQ exception," + mq, e);
- }
- }
- return false;
- }
第 8 至 11 行 : 顺序消费时, 锁定消息队列如果锁定失败, 新增消息处理队列失败
Broker 消息队列锁会过期, 默认配置 30s 因此, Consumer 需要不断向 Broker 刷新该锁过期时间, 默认配置 20s 刷新一次核心代码如下:
- // ConsumeMessageOrderlyService.java
- public void start() {
- if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- ConsumeMessageOrderlyService.this.lockMQPeriodically();
- }
- }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
- }
- }
3.2 移除消息队列
集群模式下, Consumer 移除自己的消息队列时, 会向 Broker 解锁该消息队列 (广播模式下不需要) 核心代码如下:
- // RebalancePushImpl.java
- /**
- * 移除不需要的队列相关的信息
- * 1. 持久化消费进度, 并移除之
- * 2. 顺序消费 & 集群模式, 解锁对该队列的锁定
- *
- * @param mq 消息队列
- * @param pq 消息处理队列
- * @return 是否移除成功
- */
- @Override
- public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
- // 同步队列的消费进度, 并移除之
- this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
- this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
- // 集群模式下, 顺序消费移除时, 解锁对队列的锁定
- if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
- && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
- try {
- if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
- try {
- return this.unlockDelay(mq, pq);
- } finally {
- pq.getLockConsume().unlock();
- }
- } else {
- log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
- mq, //
- pq.getTryUnlockTimes());
- pq.incTryUnlockTimes();
- }
- } catch (Exception e) {
- log.error("removeUnnecessaryMessageQueue Exception", e);
- }
- return false;
- }
- return true;
- }
- // RebalancePushImpl.java
- /**
- * 延迟解锁 Broker 消息队列锁
- * 当消息处理队列不存在消息, 则直接解锁
- *
- * @param mq 消息队列
- * @param pq 消息处理队列
- * @return 是否解锁成功
- */
- private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
- if (pq.hasTempMessage()) { // TODO 疑问: 为什么要延迟移除
- log.info("[{}]unlockDelay, begin {}", mq.hashCode(), mq);
- this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new Runnable() {
- @Override
- public void run() {
- log.info("[{}]unlockDelay, execute at once {}", mq.hashCode(), mq);
- RebalancePushImpl.this.unlock(mq, true);
- }
- }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);
- } else {
- this.unlock(mq, true);
- }
- return true;
- }
第 20 至 32 行 : 获取消息队列消费锁, 避免和消息队列消费冲突如果获取锁失败, 则移除消息队列失败, 等待下次重新分配消费队列时, 再进行移除如果未获得锁而进行移除, 则可能出现另外的 Consumer 和当前 Consumer 同时消费该消息队列, 导致消息无法严格顺序消费
第 51 至 64 行 : 解锁 Broker 消息队列锁如果消息处理队列存在剩余消息, 则延迟解锁 Broker 消息队列锁为什么消息处理队列存在剩余消息不能直接解锁呢? 我也不知道, 百思不得其解如果有知道的同学麻烦教育下俺
3.3 消费消息队列
本节会类比并发消费消费队列, 建议对照 PushConsumer 并发消费消息 一起理解
3.1.1 消费消息
- // ConsumeMessageOrderlyService.java
- class ConsumeRequest implements Runnable {
- /**
- * 消息处理队列
- */
- private final ProcessQueue processQueue;
- /**
- * 消息队列
- */
- private final MessageQueue messageQueue;
- @Override
- public void run() {
- if (this.processQueue.isDropped()) {
- log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
- return;
- }
- // 获得 Consumer 消息队列锁
- final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
- synchronized (objLock) {
- // (广播模式) 或者 (集群模式 && Broker 消息队列锁有效)
- if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
- || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
- final long beginTime = System.currentTimeMillis();
- // 循环
- for (boolean continueConsume = true; continueConsume; ) {
- if (this.processQueue.isDropped()) {
- log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
- break;
- }
- // 消息队列分布式锁未锁定, 提交延迟获得锁并消费请求
- if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
- && !this.processQueue.isLocked()) {
- log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
- ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
- break;
- }
- // 消息队列分布式锁已经过期, 提交延迟获得锁并消费请求
- if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
- && this.processQueue.isLockExpired()) {
- log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
- ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
- break;
- }
- // 当前周期消费时间超过连续时长, 默认: 60s, 提交延迟消费请求默认情况下, 每消费 1 分钟休息 10ms
- long interval = System.currentTimeMillis() - beginTime;
- if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
- ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
- break;
- }
- // 获取消费消息此处和并发消息请求不同, 并发消息请求已经带了消费哪些消息
- final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
- List msgs = this.processQueue.takeMessags(consumeBatchSize);
- if (!msgs.isEmpty()) {
- final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
- ConsumeOrderlyStatus status = null;
- // .... 省略代码: Hook:before
- // 执行消费
- long beginTimestamp = System.currentTimeMillis();
- ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
- boolean hasException = false;
- try {
- this.processQueue.getLockConsume().lock(); // 锁定队列消费锁
- if (this.processQueue.isDropped()) {
- log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
- this.messageQueue);
- break;
- }
- status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
- } catch (Throwable e) {
- log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
- RemotingHelper.exceptionSimpleDesc(e), //
- ConsumeMessageOrderlyService.this.consumerGroup, //
- msgs, //
- messageQueue);
- hasException = true;
- } finally {
- this.processQueue.getLockConsume().unlock(); // 锁定队列消费锁
- }
- // .... 省略代码: 解析消费结果状态
- // .... 省略代码: Hook:after
- ConsumeMessageOrderlyService.this.getConsumerStatsManager()
- .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
- // 处理消费结果
- continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
- } else {
- continueConsume = false;
- }
- }
- } else {
- if (this.processQueue.isDropped()) {
- log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
- return;
- }
- ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
- }
- }
- }
- }
第 20 行 : 获得 Consumer 消息队列锁
第 58 行 : 从消息处理队列顺序获得消息和并发消费获得消息不同并发消费请求在请求创建时, 已经设置好消费哪些消息
第 71 行 : 获得 Consumer 消息处理队列消费锁相比 Consumer 消息队列锁, 其粒度较小这就是上文提到的为什么有 Consumer 消息队列锁还需要有 Consumer 消息队列消费锁呢的原因
第 79 行 : 执行消费
第 99 行 : 处理消费结果
3.1.2 处理消费结果
顺序消费消息结果 (
ConsumeOrderlyStatus
) 有四种情况:
SUCCESS : 消费成功但不提交
ROLLBACK : 消费失败, 消费回滚
COMMIT : 消费成功提交并且提交
SUSPEND_CURRENT_QUEUE_A_MOMENT
: 消费失败, 挂起消费队列一会会, 稍后继续消费
考虑到 ROLLBACK COMMIT 暂时只使用在 MySQL binlog 场景, 官方将这两状态标记为 @Deprecated 当然, 相应的实现逻辑依然保留
在并发消费场景时, 如果消费失败, Consumer 会将消费失败消息发回到 Broker 重试队列, 跳过当前消息, 等待下次拉取该消息再进行消费
但是在完全严格顺序消费消费时, 这样做显然不行也因此, 消费失败的消息, 会挂起队列一会会, 稍后继续消费
不过消费失败的消息一直失败, 也不可能一直消费当超过消费重试上限时, Consumer 会将消费失败超过上限的消息发回到 Broker 死信队列
让我们来看看代码:
- // ConsumeMessageOrderlyService.java
- /**
- * 处理消费结果, 并返回是否继续消费
- *
- * @param msgs 消息
- * @param status 消费结果状态
- * @param context 消费 Context
- * @param consumeRequest 消费请求
- * @return 是否继续消费
- */
- public boolean processConsumeResult( //
- final List msgs, //
- final ConsumeOrderlyStatus status, //
- final ConsumeOrderlyContext context, //
- final ConsumeRequest consumeRequest //
- ) {
- boolean continueConsume = true;
- long commitOffset = -1L;
- if (context.isAutoCommit()) {
- switch (status) {
- case COMMIT:
- case ROLLBACK:
- log.warn("the message queue consume result is illegal, we think you want to ack these message {}", consumeRequest.getMessageQueue());
- case SUCCESS:
- // 提交消息已消费成功到消息处理队列
- commitOffset = consumeRequest.getProcessQueue().commit();
- // 统计
- this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
- break;
- case SUSPEND_CURRENT_QUEUE_A_MOMENT:
- // 统计
- this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
- if (checkReconsumeTimes(msgs)) { // 计算是否暂时挂起 (暂停) 消费 N 毫秒, 默认: 10ms
- // 设置消息重新消费
- consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
- // 提交延迟消费请求
- this.submitConsumeRequestLater( //
- consumeRequest.getProcessQueue(), //
- consumeRequest.getMessageQueue(), //
- context.getSuspendCurrentQueueTimeMillis());
- continueConsume = false;
- } else {
- commitOffset = consumeRequest.getProcessQueue().commit();
- }
- break;
- default:
- break;
- }
- } else {
- switch (status) {
- case SUCCESS:
- this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
- break;
- case COMMIT:
- // 提交消息已消费成功到消息处理队列
- commitOffset = consumeRequest.getProcessQueue().commit();
- break;
- case ROLLBACK:
- // 设置消息重新消费
- consumeRequest.getProcessQueue().rollback();
- this.submitConsumeRequestLater( //
- consumeRequest.getProcessQueue(), //
- consumeRequest.getMessageQueue(), //
- context.getSuspendCurrentQueueTimeMillis());
- continueConsume = false;
- break;
- case SUSPEND_CURRENT_QUEUE_A_MOMENT:
- // 计算是否暂时挂起 (暂停) 消费 N 毫秒, 默认: 10ms
- this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
- if (checkReconsumeTimes(msgs)) {
- // 设置消息重新消费
- consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
- // 提交延迟消费请求
- this.submitConsumeRequestLater( //
- consumeRequest.getProcessQueue(), //
- consumeRequest.getMessageQueue(), //
- context.getSuspendCurrentQueueTimeMillis());
- continueConsume = false;
- }
- break;
- default:
- break;
- }
- }
- // 消息处理队列未 dropped, 提交有效消费进度
- if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
- this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
- }
- return continueConsume;
- }
- private int getMaxReconsumeTimes() {
- // default reconsume times: Integer.MAX_VALUE
- if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
- return Integer.MAX_VALUE;
- } else {
- return this.defaultMQPushConsumer.getMaxReconsumeTimes();
- }
- }
- /**
- * 计算是否要暂停消费
- * 不暂停条件: 存在消息都超过最大消费次数并且都发回 broker 成功
- *
- * @param msgs 消息
- * @return 是否要暂停
- */
- private boolean checkReconsumeTimes(List msgs) {
- boolean suspend = false;
- if (msgs != null && !msgs.isEmpty()) {
- for (MessageExt msg: msgs) {
- if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {
- MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));
- if (!sendMessageBack(msg)) { // 发回失败, 中断
- suspend = true;
- msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
- }
- } else {
- suspend = true;
- msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
- }
- }
- }
- return suspend;
- }
- /**
- * 发回消息
- * 消息发回 broker 后, 对应的消息队列是死信队列
- *
- * @param msg 消息
- * @return 是否发送成功
- */
- public boolean sendMessageBack(final MessageExt msg) {
- try {
- // max reconsume times exceeded then send to dead letter queue.
- Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
- String originMsgId = MessageAccessor.getOriginMessageId(msg);
- MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
- newMsg.setFlag(msg.getFlag());
- MessageAccessor.setProperties(newMsg, msg.getProperties());
- MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
- MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
- MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
- newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
- this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
- return true;
- } catch(Exception e) {
- log.error("sendMessageBack exception, group:" + this.consumerGroup + "msg:" + msg.toString(), e);
- }
- return false;
- }
第 21 至 29 行 : 消费成功在自动提交进度 ( AutoCommit ) 的情况下, COMMITROLLBACKSUCCESS 逻辑已经统一
第 30 至 45 行 : 消费失败当消息重试次数超过上限 (默认 :16 次) 时, 将消息发送到 Broker 死信队列, 跳过这些消息此时, 消息队列无需挂起, 继续消费后面的消息
第 85 至 88 行 : 提交消费进度
3.13 消息处理队列核心方法
涉及到的四个核心方法的源码:
- // ProcessQueue.java
- /**
- * 消息映射
- * key: 消息队列位置
- */
- private final TreeMap msgTreeMap = new TreeMap < >();
- /**
- * 消息映射临时存储(消费中的消息)
- */
- private final TreeMap msgTreeMapTemp = new TreeMap < >();
- /**
- * 回滚消费中的消息
- * 逻辑类似于{@link #makeMessageToCosumeAgain(List)}
- */
- public void rollback() {
- try {
- this.lockTreeMap.writeLock().lockInterruptibly();
- try {
- this.msgTreeMap.putAll(this.msgTreeMapTemp);
- this.msgTreeMapTemp.clear();
- } finally {
- this.lockTreeMap.writeLock().unlock();
- }
- } catch(InterruptedException e) {
- log.error("rollback exception", e);
- }
- }
- /**
- * 提交消费中的消息已消费成功, 返回消费进度
- *
- * @return 消费进度
- */
- public long commit() {
- try {
- this.lockTreeMap.writeLock().lockInterruptibly();
- try {
- // 消费进度
- Long offset = this.msgTreeMapTemp.lastKey();
- //
- msgCount.addAndGet(this.msgTreeMapTemp.size() * ( - 1));
- //
- this.msgTreeMapTemp.clear();
- // 返回消费进度
- if (offset != null) {
- return offset + 1;
- }
- } finally {
- this.lockTreeMap.writeLock().unlock();
- }
- } catch(InterruptedException e) {
- log.error("commit exception", e);
- }
- return - 1;
- }
- /**
- * 指定消息重新消费
- * 逻辑类似于{@link #rollback()}
- *
- * @param msgs 消息
- */
- public void makeMessageToCosumeAgain(List msgs) {
- try {
- this.lockTreeMap.writeLock().lockInterruptibly();
- try {
- for (MessageExt msg: msgs) {
- this.msgTreeMapTemp.remove(msg.getQueueOffset());
- this.msgTreeMap.put(msg.getQueueOffset(), msg);
- }
- } finally {
- this.lockTreeMap.writeLock().unlock();
- }
- } catch(InterruptedException e) {
- log.error("makeMessageToCosumeAgain exception", e);
- }
- }
- /**
- * 获得持有消息前 N 条
- *
- * @param batchSize 条数
- * @return 消息
- */
- public List takeMessags(final int batchSize) {
- List result = new ArrayList < >(batchSize);
- final long now = System.currentTimeMillis();
- try {
- this.lockTreeMap.writeLock().lockInterruptibly();
- this.lastConsumeTimestamp = now;
- try {
- if (!this.msgTreeMap.isEmpty()) {
- for (int i = 0; i < batchSize; i++) {
- Map.Entry entry = this.msgTreeMap.pollFirstEntry();
- if (entry != null) {
- result.add(entry.getValue());
- msgTreeMapTemp.put(entry.getKey(), entry.getValue());
- } else {
- break;
- }
- }
- }
- if (result.isEmpty()) {
- consuming = false;
- }
- } finally {
- this.lockTreeMap.writeLock().unlock();
- }
- } catch(InterruptedException e) {
- log.error("take Messages exception", e);
- }
- return result;
- }
来源: http://www.suo.im/2XXIYB