本文主要基于 RocketMQ 4.0.x 正式版
1 概述
2Consumer
3PushConsumer 一览
4PushConsumer 订阅
- DefaultMQPushConsumerImpl#subscribe(...)
- FilterAPI.buildSubscriptionData(...)
- DefaultMQPushConsumer#registerMessageListener(...)
5PushConsumer 消息队列分配
- RebalanceService
- MQClientInstance#doRebalance(...)
- DefaultMQPushConsumerImpl#doRebalance(...)
- RebalanceImpl#doRebalance(...)
- RebalanceImpl#rebalanceByTopic(...)
- RebalanceImpl#removeUnnecessaryMessageQueue(...)
- RebalancePushImpl#removeUnnecessaryMessageQueue(...)
- [PullConsumer] RebalancePullImpl#removeUnnecessaryMessageQueue(...)
- RebalancePushImpl#dispatchPullRequest(...)
- DefaultMQPushConsumerImpl#executePullRequestImmediately(...)
- AllocateMessageQueueStrategy
- AllocateMessageQueueAveragely
- AllocateMessageQueueByMachineRoom
- AllocateMessageQueueAveragelyByCircle
- AllocateMessageQueueByConfig
5PushConsumer 消费进度读取
- RebalancePushImpl#computePullFromWhere(...)
- [PullConsumer] RebalancePullImpl#computePullFromWhere(...)
6PushConsumer 拉取消息
- PullMessageService
- DefaultMQPushConsumerImpl#pullMessage(...)
- PullAPIWrapper#pullKernelImpl(...)
- PullAPIWrapper#recalculatePullFromWhichNode(...)
- MQClientInstance#findBrokerAddressInSubscribe(...)
- PullAPIWrapper#processPullResult(...)
- ProcessQueue#putMessage(...)
总结
6PushConsumer 消费消息
ConsumeMessageConcurrentlyService 提交消费请求
- ConsumeMessageConcurrentlyService#submitConsumeRequest(...)
- ConsumeMessageConcurrentlyService#submitConsumeRequestLater
- ConsumeRequest
- ConsumeMessageConcurrentlyService#processConsumeResult(...)
- ProcessQueue#removeMessage(...)
- ConsumeMessageConcurrentlyService#cleanExpireMsg(...)
- ProcessQueue#cleanExpiredMsg(...)
7PushConsumer 发回消费失败消息
- DefaultMQPushConsumerImpl#sendMessageBack(...)
- MQClientAPIImpl#consumerSendMessageBack(...)
8Consumer 消费进度
- OffsetStore
- OffsetStore#load(...)
- LocalFileOffsetStore#load(...)
- OffsetSerializeWrapper
- RemoteBrokerOffsetStore#load(...)
- OffsetStore#readOffset(...)
- LocalFileOffsetStore#readOffset(...)
- RemoteBrokerOffsetStore#readOffset(...)
- OffsetStore#updateOffset(...)
- OffsetStore#persistAll(...)
- LocalFileOffsetStore#persistAll(...)
- RemoteBrokerOffsetStore#persistAll(...)
- MQClientInstance#persistAllConsumerOffset(...)
9 结尾
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复甚至不知道如何读源码也可以请教噢
新的源码解析文章实时收到通知每周更新一篇左右
认真的源码交流微信群
1 概述
本文接: RocketMQ 源码分析 Message 拉取与消费(上)
主要解析 Consumer 在 消费 逻辑涉及到的源码
2Consumer
MQ 提供了两类消费者:
PushConsumer:
在大多数场景下使用
名字虽然是 Push 开头, 实际在实现时, 使用 Pull 方式实现通过 Pull 不断不断不断轮询 Broker 获取消息当不存在新消息时, Broker 会挂起请求, 直到有新消息产生, 取消挂起, 返回新消息这样, 基本和 Broker 主动 Push 做到接近的实时性 (当然, 还是有相应的实时性损失) 原理类似 长轮询( Long-Polling )
PullConsumer
本文主要讲解 PushConsumer, 部分讲解 PullConsumer, 跳过顺序消费
本文主要讲解 PushConsumer, 部分讲解 PullConsumer, 跳过顺序消费
本文主要讲解 PushConsumer, 部分讲解 PullConsumer, 跳过顺序消费
3PushConsumer 一览
先看一张 PushConsumer 包含的组件以及组件之间的交互图:
RebalanceService: 均衡消息队列服务, 负责分配当前 Consumer 可消费的消息队列 ( MessageQueue ) 当有新的 Consumer 的加入或移除, 都会重新分配消息队列
PullMessageService: 拉取消息服务, 不断不断不断从 Broker 拉取消息, 并提交消费任务到
- ConsumeMessageService
- ConsumeMessageService
: 消费消息服务, 不断不断不断消费消息, 并处理消费结果
RemoteBrokerOffsetStore
:Consumer 消费进度管理, 负责从 Broker 获取消费进度, 同步消费进度到 Broker
ProcessQueue : 消息处理队列
MQClientInstance : 封装对 Namesrv,Broker 的 API 调用, 提供给 ProducerConsumer 使用
4PushConsumer 订阅
- DefaultMQPushConsumerImpl#subscribe(...)
- public void subscribe(String topic, String subExpression) throws MQClientException {
- try {
- // 创建订阅数据
- SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
- topic, subExpression);
- this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
- // 通过心跳同步 Consumer 信息到 Broker
- if (this.mQClientFactory != null) {
- this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
- }
- } catch (Exception e) {
- throw new MQClientException("subscription exception", e);
- }
- }
说明 : 订阅 Topic
第 3 至 6 行 : 创建订阅数据详细解析见: FilterAPI.buildSubscriptionData(...)
第 7 至 10 行 : 通过心跳同步 Consumer 信息到 Broker
- FilterAPI.buildSubscriptionData(...)
- public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
- String subString) throws Exception {
- SubscriptionData subscriptionData = new SubscriptionData();
- subscriptionData.setTopic(topic);
- subscriptionData.setSubString(subString);
- // 处理订阅表达式
- if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
- subscriptionData.setSubString(SubscriptionData.SUB_ALL);
- } else {
- String[] tags = subString.split("\\|\\|");
- if (tags.length > 0) {
- for (String tag : tags) {
- if (tag.length() > 0) {
- String trimString = tag.trim();
- if (trimString.length() > 0) {
- subscriptionData.getTagsSet().add(trimString);
- subscriptionData.getCodeSet().add(trimString.hashCode());
- }
- }
- }
- } else {
- throw new Exception("subString split error");
- }
- }
- return subscriptionData;
- }
说明 : 根据 Topic 和 订阅表达式 创建订阅数据
- subscriptionData.subVersion = System.currentTimeMillis()
- DefaultMQPushConsumer#registerMessageListener(...)
- public void registerMessageListener(MessageListenerConcurrently messageListener) {
- this.messageListener = messageListener;
- this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
- }
说明 : 注册消息监听器
5PushConsumer 消息队列分配
- RebalanceService
- public class RebalanceService extends ServiceThread {
- /**
- * 等待间隔, 单位: 毫秒
- */
- private static long waitInterval =
- Long.parseLong(System.getProperty(
- "rocketmq.client.rebalance.waitInterval", "20000"));
- private final Logger log = ClientLogger.getLog();
- /**
- * MQClient 对象
- */
- private final MQClientInstance mqClientFactory;
- public RebalanceService(MQClientInstance mqClientFactory) {
- this.mqClientFactory = mqClientFactory;
- }
- @Override
- public void run() {
- log.info(this.getServiceName() + "service started");
- while (!this.isStopped()) {
- this.waitForRunning(waitInterval);
- this.mqClientFactory.doRebalance();
- }
- log.info(this.getServiceName() + "service end");
- }
- @Override
- public String getServiceName() {
- return RebalanceService.class.getSimpleName();
- }
- }
说明 : 均衡消息队列服务, 负责分配当前 Consumer 可消费的消息队列( MessageQueue )
第 26 行 : 调用
MQClientInstance#doRebalance(...)
分配消息队列目前有三种情况情况下触发:
如 第 25 行 等待超时, 每 20s 调用一次
PushConsumer 启动时, 调用
rebalanceService#wakeup(...)
触发
Broker 通知 Consumer 加入 或 移除时, Consumer 响应通知, 调用
rebalanceService#wakeup(...)
触发
详细解析见: MQClientInstance#doRebalance(...)
- MQClientInstance#doRebalance(...)
- public void doRebalance() {
- for (Map.Entry entry : this.consumerTable.entrySet()) {
- MQConsumerInner impl = entry.getValue();
- if (impl != null) {
- try {
- impl.doRebalance();
- } catch (Throwable e) {
- log.error("doRebalance exception", e);
- }
- }
- }
- }
说明 : 遍历当前 Client 包含的 consumerTable( Consumer 集合 ), 执行消息队列分配
疑问: 目前代码调试下来, consumerTable 只包含 Consumer 自己有大大对这个疑问有解答的, 烦请解答下
第 6 行 : 调用
MQConsumerInner#doRebalance(...)
进行队列分配
- DefaultMQPushConsumerImpl
- DefaultMQPullConsumerImpl
分别对该接口方法进行了实现
DefaultMQPushConsumerImpl#doRebalance(...)
详细解析见: DefaultMQPushConsumerImpl#doRebalance(...)
- DefaultMQPushConsumerImpl#doRebalance(...)
- public void doRebalance() {
- if (!this.pause) {
- this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
- }
- }
说明: 执行消息队列分配
第 3 行 : 调用
RebalanceImpl#doRebalance(...)
进行队列分配详细解析见: RebalancePushImpl#doRebalance(...)
- RebalanceImpl#doRebalance(...)
- /**
- * 执行分配消息队列
- *
- * @param isOrder 是否顺序消息
- */
- public void doRebalance(final boolean isOrder) {
- // 分配每个 topic 的消息队列
- Map subTable = this.getSubscriptionInner();
- if (subTable != null) {
- for (final Map.Entry entry : subTable.entrySet()) {
- final String topic = entry.getKey();
- try {
- this.rebalanceByTopic(topic, isOrder);
- } catch (Throwable e) {
- if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- log.warn("rebalanceByTopic Exception", e);
- }
- }
- }
- }
- // 移除未订阅的 topic 对应的消息队列
- this.truncateMessageQueueNotMyTopic();
- }
- /**
- * 移除未订阅的消息队列
- */
- private void truncateMessageQueueNotMyTopic() {
- Map subTable = this.getSubscriptionInner();
- for (MessageQueue mq : this.processQueueTable.keySet()) {
- if (!subTable.containsKey(mq.getTopic())) {
- ProcessQueue pq = this.processQueueTable.remove(mq);
- if (pq != null) {
- pq.setDropped(true);
- log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
- }
- }
- }
- }
- #doRebalance(...) 说明 : 执行分配消息队列
第 7 至 20 行 : 循环订阅主题集合( subscriptionInner ), 分配每一个 Topic 的消息队列
第 22 行 : 移除未订阅的 Topic 的消息队列
#truncateMessageQueueNotMyTopic(...)
说明 : 移除未订阅的消息队列当调用
DefaultMQPushConsumer#unsubscribe(topic)
时, 只移除订阅主题集合( subscriptionInner ), 对应消息队列移除在该方法
- RebalanceImpl#rebalanceByTopic(...)
- private void rebalanceByTopic(final String topic, final boolean isOrder) {
- switch (messageModel) {
- case BROADCASTING: {
- Set mqSet = this.topicSubscribeInfoTable.get(topic);
- if (mqSet != null) {
- boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
- if (changed) {
- this.messageQueueChanged(topic, mqSet, mqSet);
- log.info("messageQueueChanged {} {} {} {}", //
- consumerGroup, //
- topic, //
- mqSet, //
- mqSet);
- }
- } else {
- log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
- }
- break;
- }
- case CLUSTERING: {
- // 获取 topic 对应的 队列 和 consumer 信息
- Set mqSet = this.topicSubscribeInfoTable.get(topic);
- List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
- if (null == mqSet) {
- if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
- }
- }
- if (null == cidAll) {
- log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
- }
- if (mqSet != null && cidAll != null) {
- // 排序 消息队列 和 消费者数组因为是在 Client 进行分配队列, 排序后, 各 Client 的顺序才能保持一致
- List mqAll = new ArrayList<>();
- mqAll.addAll(mqSet);
- Collections.sort(mqAll);
- Collections.sort(cidAll);
- AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
- // 根据 队列分配策略 分配消息队列
- List allocateResult;
- try {
- allocateResult = strategy.allocate(//
- this.consumerGroup, //
- this.mQClientFactory.getClientId(), //
- mqAll, //
- cidAll);
- } catch (Throwable e) {
- log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
- e);
- return;
- }
- Set allocateResultSet = new HashSet<>();
- if (allocateResult != null) {
- allocateResultSet.addAll(allocateResult);
- }
- // 更新消息队列
- boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
- if (changed) {
- log.info(
- "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
- strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
- allocateResultSet.size(), allocateResultSet);
- this.messageQueueChanged(topic, mqSet, allocateResultSet);
- }
- }
- break;
- }
- default:
- break;
- }
- }
- /**
- * 当负载均衡时, 更新 消息处理队列
- * - 移除 在 processQueueTable && 不存在于 mqSet 里的消息队列
- * - 增加 不在 processQueueTable && 存在于 mqSet 里的消息队列
- *
- * @param topic Topic
- * @param mqSet 负载均衡结果后的消息队列数组
- * @param isOrder 是否顺序
- * @return 是否变更
- */
- private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet, final boolean isOrder) {
- boolean changed = false;
- // 移除 在 processQueueTable && 不存在于 mqSet 里的消息队列
- Iterator> it = this.processQueueTable.entrySet().iterator();
- while (it.hasNext()) { // TODO 待读:
- Entry next = it.next();
- MessageQueue mq = next.getKey();
- ProcessQueue pq = next.getValue();
- if (mq.getTopic().equals(topic)) {
- if (!mqSet.contains(mq)) { // 不包含的队列
- pq.setDropped(true);
- if (this.removeUnnecessaryMessageQueue(mq, pq)) {
- it.remove();
- changed = true;
- log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
- }
- } else if (pq.isPullExpired()) { // 队列拉取超时, 进行清理
- switch (this.consumeType()) {
- case CONSUME_ACTIVELY:
- break;
- case CONSUME_PASSIVELY:
- pq.setDropped(true);
- if (this.removeUnnecessaryMessageQueue(mq, pq)) {
- it.remove();
- changed = true;
- log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
- consumerGroup, mq);
- }
- break;
- default:
- break;
- }
- }
- }
- }
- // 增加 不在 processQueueTable && 存在于 mqSet 里的消息队列
- List pullRequestList = new ArrayList<>(); // 拉消息请求数组
- for (MessageQueue mq : mqSet) {
- if (!this.processQueueTable.containsKey(mq)) {
- if (isOrder && !this.lock(mq)) {
- log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
- continue;
- }
- this.removeDirtyOffset(mq);
- ProcessQueue pq = new ProcessQueue();
- long nextOffset = this.computePullFromWhere(mq);
- if (nextOffset >= 0) {
- ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
- if (pre != null) {
- log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
- } else {
- log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
- PullRequest pullRequest = new PullRequest();
- pullRequest.setConsumerGroup(consumerGroup);
- pullRequest.setNextOffset(nextOffset);
- pullRequest.setMessageQueue(mq);
- pullRequest.setProcessQueue(pq);
- pullRequestList.add(pullRequest);
- changed = true;
- }
- } else {
- log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
- }
- }
- }
- // 发起消息拉取请求
- this.dispatchPullRequest(pullRequestList);
- return changed;
- }
- #rebalanceByTopic(...)
说明 : 分配 Topic 的消息队列
第 3 至 19 行 : 广播模式( BROADCASTING ) 下, 分配 Topic 对应的所有消息队列
第 20 至 74 行 : 集群模式( CLUSTERING ) 下, 分配 Topic 对应的部分消息队列
第 21 至 40 行 : 获取 Topic 对应的消息队列和消费者们, 并对其进行排序因为各 Consumer 是在本地分配消息队列, 排序后才能保证各 Consumer 顺序一致
第 42 至 61 行 : 根据 队列分配策略(
AllocateMessageQueueStrategy
) 分配消息队列详细解析见: AllocateMessageQueueStrategy
第 63 至 72 行 : 更新 Topic 对应的消息队列
#updateProcessQueueTableInRebalance(...)
说明 : 当分配队列时, 更新 Topic 对应的消息队列, 并返回是否有变更
第 93 至 126 行 : 移除不存在于分配的消息队列( mqSet ) 的 消息处理队列( processQueueTable )
第 103 行 : 移除不需要的消息队列详细解析见: RebalancePushImpl#removeUnnecessaryMessageQueue(...)
第 108 至 120 行 : 队列拉取超时, 即
当前时间 - 最后一次拉取消息时间 > 120s
( 120s 可配置), 判定发生 BUG, 过久未进行消息拉取, 移除消息队列移除后, 下面 **# 新增队列逻辑 #** 可以重新加入新的该消息队列
第 128 至 158 行 : 增加 分配的消息队列( mqSet ) 新增的消息队列
第 132 至 135 行 : 顺序消费 相关跳过, 详细解析见: RocketMQ 源码分析 Message 顺序发送与消费
第 137 行 : 移除消息队列的消费进度
第 139 行 : 获取队列消费进度详细解析见: RebalancePushImpl#computePullFromWhere(...)
第 140 至 156 行 : 添加新消费处理队列, 添加消费拉取消息请求
第 161 行 : 发起新增的消息队列消息拉取请求详细解析见: RebalancePushImpl#dispatchPullRequest(...)
- RebalanceImpl#removeUnnecessaryMessageQueue(...)
- RebalancePushImpl#removeUnnecessaryMessageQueue(...)
- public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
- // 同步队列的消费进度, 并移除之
- this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
- this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
- // TODO 顺序消费
- if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
- && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
- try {
- if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
- try {
- return this.unlockDelay(mq, pq);
- } finally {
- pq.getLockConsume().unlock();
- }
- } else {
- log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
- mq, //
- pq.getTryUnlockTimes());
- pq.incTryUnlockTimes();
- }
- } catch (Exception e) {
- log.error("removeUnnecessaryMessageQueue Exception", e);
- }
- return false;
- }
- return true;
- }
说明 : 移除不需要的消息队列相关的信息, 并返回是否移除成功
第 2 至 4 行 : 同步队列的消费进度, 并移除之
第 5 至 27 行 : 顺序消费 相关跳过, 详细解析见: RocketMQ 源码分析 Message 顺序发送与消费
- [PullConsumer] RebalancePullImpl#removeUnnecessaryMessageQueue(...)
- public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
- this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
- this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
- return true;
- }
说明 : 移除不需要的消息队列相关的信息, 并返回移除成功和
RebalancePushImpl#removeUnnecessaryMessageQueue(...)
基本一致
- RebalancePushImpl#dispatchPullRequest(...)
- public void dispatchPullRequest(List pullRequestList) {
- for (PullRequest pullRequest : pullRequestList) {
- this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
- log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
- }
- }
说明 : 发起消息拉取请求该调用是 PushConsumer 不断不断不断拉取消息的起点
- DefaultMQPushConsumerImpl#executePullRequestImmediately(...)
- public void executePullRequestImmediately(final PullRequest pullRequest) {
- this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
- }
说明 : 提交拉取请求提交后, PullMessageService 异步执行, 非阻塞详细解析见: PullMessageService
AllocateMessageQueueStrategy
- AllocateMessageQueueAveragely
- public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
- private final Logger log = ClientLogger.getLog();
- @Override
- public List allocate(String consumerGroup, String currentCID, List mqAll,
- List cidAll) {
- // 校验参数是否正确
- if (currentCID == null || currentCID.length() < 1) {
- throw new IllegalArgumentException("currentCID is empty");
- }
- if (mqAll == null || mqAll.isEmpty()) {
- throw new IllegalArgumentException("mqAll is null or mqAll empty");
- }
- if (cidAll == null || cidAll.isEmpty()) {
- throw new IllegalArgumentException("cidAll is null or cidAll empty");
- }
- List result = new ArrayList<>();
- if (!cidAll.contains(currentCID)) {
- log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
- consumerGroup,
- currentCID,
- cidAll);
- return result;
- }
- // 平均分配
- int index = cidAll.indexOf(currentCID); // 第几个 consumer
- int mod = mqAll.size() % cidAll.size(); // 余数, 即多少消息队列无法平均分配
- int averageSize =
- mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
- + 1 : mqAll.size() / cidAll.size());
- int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; // 有余数的情况下,[0, mod) 平分余数, 即每 consumer 多分配一个节点; 第 index 开始, 跳过前 mod 余数
- int range = Math.min(averageSize, mqAll.size() - startIndex); // 分配队列数量之所以要 Math.min()的原因是, mqAll.size() <= cidAll.size(), 部分 consumer 分配不到消息队列
- for (int i = 0; i < range; i++) {
- result.add(mqAll.get((startIndex + i) % mqAll.size()));
- }
- return result;
- }
- @Override
- public String getName() {
- return "AVG";
- }
- }
说明 : 平均分配队列策略
第 7 至 25 行 : 参数校验
第 26 至 36 行 : 平均分配消息队列
第 27 行 :index : 当前 Consumer 在消费集群里是第几个这里就是为什么需要对传入的 cidAll 参数必须进行排序的原因如果不排序, Consumer 在本地计算出来的 index 无法一致, 影响计算结果
第 28 行 :mod : 余数, 即多少消息队列无法平均分配
第 29 至 31 行 :averageSize : 代码可以简化成
- (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size())
- [ 0, mod ) :
- mqAll.size() / cidAll.size() + 1
前面 mod 个 Consumer 平分余数, 多获得 1 个消息队列
- [ mod, cidAll.size() )
- :
- mqAll.size() / cidAll.size()
第 32 行 :startIndex :Consumer 分配消息队列开始位置
第 33 行 :range : 分配队列数量之所以要 Math#min(...) 的原因: 当
mqAll.size() <= cidAll.size()
时, 最后几个 Consumer 分配不到消息队列
第 34 至 36 行 : 生成分配消息队列结果
举个例子:
固定消息队列长度为 4
Consumer * 2 可以整除 | Consumer * 3 不可整除 | Consumer * 5 无法都分配 | |
---|---|---|---|
消息队列 [0] | Consumer[0] | Consumer[0] | Consumer[0] |
消息队列 [1] | Consumer[0] | Consumer[0] | Consumer[1] |
消息队列 [2] | Consumer[1] | Consumer[1] | Consumer[2] |
消息队列[3] | Consumer[1] | Consumer[2] | Consumer[3] |
- AllocateMessageQueueByMachineRoom
- public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
- /**
- * 消费者消费 brokerName 集合
- */
- private Set consumeridcs;
- @Override
- public List allocate(String consumerGroup, String currentCID, List mqAll,
- List cidAll) {
- // 参数校验
- List result = new ArrayList();
- int currentIndex = cidAll.indexOf(currentCID);
- if (currentIndex < 0) {
- return result;
- }
- // 计算符合当前配置的消费者数组 ('consumeridcs') 对应的消息队列
- List premqAll = new ArrayList();
- for (MessageQueue mq : mqAll) {
- String[] temp = mq.getBrokerName().split("@");
- if (temp.length == 2 && consumeridcs.contains(temp[0])) {
- premqAll.add(mq);
- }
- }
- // 平均分配
- int mod = premqAll.size() / cidAll.size();
- int rem = premqAll.size() % cidAll.size();
- int startIndex = mod * currentIndex;
- int endIndex = startIndex + mod;
- for (int i = startIndex; i < endIndex; i++) {
- result.add(mqAll.get(i));
- }
- if (rem > currentIndex) {
- result.add(premqAll.get(currentIndex + mod * cidAll.size()));
- }
- return result;
- }
- @Override
- public String getName() {
- return "MACHINE_ROOM";
- }
- public Set getConsumeridcs() {
- return consumeridcs;
- }
- public void setConsumeridcs(Set consumeridcs) {
- this.consumeridcs = consumeridcs;
- }
- }
说明 : 平均分配可消费的 Broker 对应的消息队列
第 7 至 15 行 : 参数校验
第 16 至 23 行 : 计算可消费的 Broker 对应的消息队列
第 25 至 34 行 : 平均分配消息队列该平均分配方式和
AllocateMessageQueueAveragely
略有不同, 其是将多余的结尾部分分配给前 rem 个 Consumer
疑问: 使用该分配策略时, Consumer 和 Broker 分配需要怎么配置等研究主从相关源码时, 仔细考虑下
- AllocateMessageQueueAveragelyByCircle
- public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
- private final Logger log = ClientLogger.getLog();
- @Override
- public List allocate(String consumerGroup, String currentCID, List mqAll,
- List cidAll) {
- // 校验参数是否正确
- if (currentCID == null || currentCID.length() < 1) {
- throw new IllegalArgumentException("currentCID is empty");
- }
- if (mqAll == null || mqAll.isEmpty()) {
- throw new IllegalArgumentException("mqAll is null or mqAll empty");
- }
- if (cidAll == null || cidAll.isEmpty()) {
- throw new IllegalArgumentException("cidAll is null or cidAll empty");
- }
- List result = new ArrayList();
- if (!cidAll.contains(currentCID)) {
- log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
- consumerGroup,
- currentCID,
- cidAll);
- return result;
- }
- // 环状分配
- int index = cidAll.indexOf(currentCID);
- for (int i = index; i < mqAll.size(); i++) {
- if (i % cidAll.size() == index) {
- result.add(mqAll.get(i));
- }
- }
- return result;
- }
- @Override
- public String getName() {
- return "AVG_BY_CIRCLE";
- }
- }
说明 : 环状分配消息队列
- AllocateMessageQueueByConfig
- public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
- private List messageQueueList;
- @Override
- public List allocate(String consumerGroup, String currentCID, List mqAll,
- List cidAll) {
- return this.messageQueueList;
- }
- @Override
- public String getName() {
- return "CONFIG";
- }
- public List getMessageQueueList() {
- return messageQueueList;
- }
- public void setMessageQueueList(List messageQueueList) {
- this.messageQueueList = messageQueueList;
- }
- }
说明 : 分配配置的消息队列
疑问 : 该分配策略的使用场景
5PushConsumer 消费进度读取
- RebalancePushImpl#computePullFromWhere(...)
- public long computePullFromWhere(MessageQueue mq) {
- long result = -1;
- final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
- final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
- switch (consumeFromWhere) {
- case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
- // 废弃
- case CONSUME_FROM_MIN_OFFSET:
- // 废弃
- case CONSUME_FROM_MAX_OFFSET:
- // 废弃
- case CONSUME_FROM_LAST_OFFSET:
- {
- long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
- if (lastOffset >= 0) {
- result = lastOffset;
- }
- // First start,no offset
- else if ( - 1 == lastOffset) {
- if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- result = 0L;
- } else {
- try {
- result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
- } catch(MQClientException e) {
- result = -1;
- }
- }
- } else {
- result = -1;
- }
- break;
- }
- case CONSUME_FROM_FIRST_OFFSET:
- {
- long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
- if (lastOffset >= 0) {
- result = lastOffset;
- } else if ( - 1 == lastOffset) {
- result = 0L;
- } else {
- result = -1;
- }
- break;
- }
- case CONSUME_FROM_TIMESTAMP:
- {
- long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
- if (lastOffset >= 0) {
- result = lastOffset;
- } else if ( - 1 == lastOffset) {
- if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- try {
- result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
- } catch(MQClientException e) {
- result = -1;
- }
- } else {
- try {
- long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(), UtilAll.YYYY_MMDD_HHMMSS).getTime();
- result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
- } catch(MQClientException e) {
- result = -1;
- }
- }
- } else {
- result = -1;
- }
- break;
- }
- default:
- break;
- }
- return result;
- }
说明 : 计算消息队列开始消费位置
PushConsumer 读取消费进度有三种选项:
CONSUME_FROM_LAST_OFFSET
: 第 6 至 29 行 : 一个新的消费集群第一次启动从队列的最后位置开始消费后续再启动接着上次消费的进度开始消费
CONSUME_FROM_FIRST_OFFSET
: 第 30 至 40 行 : 一个新的消费集群第一次启动从队列的最前位置开始消费后续再启动接着上次消费的进度开始消费
CONSUME_FROM_TIMESTAMP
: 第 41 至 65 行 : 一个新的消费集群第一次启动从指定时间点开始消费后续再启动接着上次消费的进度开始消费
[PullConsumer] RebalancePullImpl#computePullFromWhere(...)
暂时跳过
6PushConsumer 拉取消息
- PullMessageService
- public class PullMessageService extends ServiceThread {
- private final Logger log = ClientLogger.getLog();
- /**
- * 拉取消息请求队列
- */
- private final LinkedBlockingQueue pullRequestQueue = new LinkedBlockingQueue<>();
- /**
- * MQClient 对象
- */
- private final MQClientInstance mQClientFactory;
- /**
- * 定时器用于延迟提交拉取请求
- */
- private final ScheduledExecutorService scheduledExecutorService = Executors
- .newSingleThreadScheduledExecutor(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "PullMessageServiceScheduledThread");
- }
- });
- public PullMessageService(MQClientInstance mQClientFactory) {
- this.mQClientFactory = mQClientFactory;
- }
- /**
- * 执行延迟拉取消息请求
- *
- * @param pullRequest 拉取消息请求
- * @param timeDelay 延迟时长
- */
- public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
- this.scheduledExecutorService.schedule(new Runnable() {
- @Override
- public void run() {
- PullMessageService.this.executePullRequestImmediately(pullRequest);
- }
- }, timeDelay, TimeUnit.MILLISECONDS);
- }
- /**
- * 执行立即拉取消息请求
- *
- * @param pullRequest 拉取消息请求
- */
- public void executePullRequestImmediately(final PullRequest pullRequest) {
- try {
- this.pullRequestQueue.put(pullRequest);
- } catch (InterruptedException e) {
- log.error("executePullRequestImmediately pullRequestQueue.put", e);
- }
- }
- /**
- * 执行延迟任务
- *
- * @param r 任务
- * @param timeDelay 延迟时长
- */
- public void executeTaskLater(final Runnable r, final long timeDelay) {
- this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);
- }
- public ScheduledExecutorService getScheduledExecutorService() {
- return scheduledExecutorService;
- }
- /**
- * 拉取消息
- *
- * @param pullRequest 拉取消息请求
- */
- private void pullMessage(final PullRequest pullRequest) {
- final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
- if (consumer != null) {
- DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
- impl.pullMessage(pullRequest);
- } else {
- log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
- }
- }
- @Override
- public void run() {
- log.info(this.getServiceName() + "service started");
- while (!this.isStopped()) {
- try {
- PullRequest pullRequest = this.pullRequestQueue.take();
- if (pullRequest != null) {
- this.pullMessage(pullRequest);
- }
- } catch (InterruptedException e) {
- } catch (Exception e) {
- log.error("Pull Message Service Run Method exception", e);
- }
- }
- log.info(this.getServiceName() + "service end");
- }
- @Override
- public String getServiceName() {
- return PullMessageService.class.getSimpleName();
- }
- }
说明 : 拉取消息服务, 不断不断不断从 Broker 拉取消息, 并提交消费任务到
- ConsumeMessageService
- #executePullRequestLater(...)
: 第 26 至 40 行 : 提交延迟拉取消息请求
#executePullRequestImmediately(...)
: 第 42 至 53 行 : 提交立即拉取消息请求
#executeTaskLater(...)
: 第 55 至 63 行 : 提交延迟任务
- #pullMessage(...) : 第 69 至 82 行 : 执行拉取消息逻辑详细解析见: DefaultMQPushConsumerImpl#pullMessage(...)
- #run(...) : 第 84 至 101 行 : 循环拉取消息请求队列( pullRequestQueue ), 进行消息拉取
- DefaultMQPushConsumerImpl#pullMessage(...)
- public void pullMessage(final PullRequest pullRequest) {
- final ProcessQueue processQueue = pullRequest.getProcessQueue();
- if (processQueue.isDropped()) {
- log.info("the pull request[{}] is dropped.", pullRequest.toString());
- return;
- }
- // 设置队列最后拉取消息时间
- pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
- // 判断 consumer 状态是否运行中如果不是, 则延迟拉取消息
- try {
- this.makeSureStateOK();
- } catch (MQClientException e) {
- log.warn("pullMessage exception, consumer state not ok", e);
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
- return;
- }
- // 判断是否暂停中
- if (this.isPause()) {
- log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
- return;
- }
- // 判断是否超过最大持有消息数量默认最大值为 1000
- long size = processQueue.getMsgCount().get();
- if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); // 提交延迟消息拉取请求 50ms
- if ((flowControlTimes1++ % 1000) == 0) {
- log.warn(
- "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
- processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
- }
- return;
- }
- if (!this.consumeOrderly) { // 判断消息跨度是否过大
- if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); // 提交延迟消息拉取请求 50ms
- if ((flowControlTimes2++ % 1000) == 0) {
- log.warn(
- "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
- processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
- pullRequest, flowControlTimes2);
- }
- return;
- }
- } else { // TODO 顺序消费
- if (processQueue.isLocked()) {
- if (!pullRequest.isLockedFirst()) {
- final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
- boolean brokerBusy = offset < pullRequest.getNextOffset();
- log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
- pullRequest, offset, brokerBusy);
- if (brokerBusy) {
- log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
- pullRequest, offset);
- }
- pullRequest.setLockedFirst(true);
- pullRequest.setNextOffset(offset);
- }
- } else {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
- log.info("pull message later because not locked in broker, {}", pullRequest);
- return;
- }
- }
- // 获取 Topic 对应的订阅信息若不存在, 则延迟拉取消息
- final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
- if (null == subscriptionData) {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
- log.warn("find the consumer's subscription failed, {}", pullRequest);
- return;
- }
- final long beginTimestamp = System.currentTimeMillis();
- PullCallback pullCallback = new PullCallback() {
- @Override
- public void onSuccess(PullResult pullResult) {
- if (pullResult != null) {
- pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
- subscriptionData);
- switch (pullResult.getPullStatus()) {
- case FOUND:
- // 设置下次拉取消息队列位置
- long prevRequestOffset = pullRequest.getNextOffset();
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
- // 统计
- long pullRT = System.currentTimeMillis() - beginTimestamp;
- DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
- pullRequest.getMessageQueue().getTopic(), pullRT);
- long firstMsgOffset = Long.MAX_VALUE;
- if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- } else {
- firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
- // 统计
- DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
- pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
- // 提交拉取到的消息到消息处理队列
- boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
- // 提交消费请求
- DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
- pullResult.getMsgFoundList(), //
- processQueue, //
- pullRequest.getMessageQueue(), //
- dispathToConsume);
- // 提交下次拉取消息请求
- if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
- DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
- DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
- } else {
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- }
- }
- // 下次拉取消息队列位置小于上次拉取消息队列位置 或者 第一条消息的消息队列位置小于上次拉取消息队列位置, 则判定为 BUG, 输出 log
- if (pullResult.getNextBeginOffset() < prevRequestOffset//
- || firstMsgOffset < prevRequestOffset) {
- log.warn(
- "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", //
- pullResult.getNextBeginOffset(), //
- firstMsgOffset, //
- prevRequestOffset);
- }
- break;
- case NO_NEW_MSG:
- // 设置下次拉取消息队列位置
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
- // 持久化消费进度
- DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
- // 立即提交拉取消息请求
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- break;
- case NO_MATCHED_MSG:
- // 设置下次拉取消息队列位置
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
- // 持久化消费进度
- DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
- // 提交立即拉取消息请求
- DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- break;
- case OFFSET_ILLEGAL:
- log.warn("the pull request offset illegal, {} {}", //
- pullRequest.toString(), pullResult.toString());
- // 设置下次拉取消息队列位置
- pullRequest.setNextOffset(pullResult.getNextBeginOffset());
- // 设置消息处理队列为 dropped
- pullRequest.getProcessQueue().setDropped(true);
- // 提交延迟任务, 进行消费处理队列移除不立即移除的原因: 可能有地方正在使用, 避免受到影响
- DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
- @Override
- public void run() {
- try {
- // 更新消费进度, 同步消费进度到 Broker
- DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
- pullRequest.getNextOffset(), false);
- DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
- // 移除消费处理队列
- DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
- log.warn("fix the pull request offset, {}", pullRequest);
- } catch (Throwable e) {
- log.error("executeTaskLater Exception", e);
- }
- }
- }, 10000);
- break;
- default:
- break;
- }
- }
- }
- @Override
- public void onException(Throwable e) {
- if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- log.warn("execute the pull request exception", e);
- }
- // 提交延迟拉取消息请求
- DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
- }
- };
- // 集群消息模型下, 计算提交的消费进度
- boolean commitOffsetEnable = false;
- long commitOffsetValue = 0L;
- if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
- commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
- if (commitOffsetValue > 0) {
- commitOffsetEnable = true;
- }
- }
- // 计算请求的 订阅表达式 和 是否进行 filtersrv 过滤消息
- String subExpression = null;
- boolean classFilter = false;
- SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
- if (sd != null) {
- if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
- subExpression = sd.getSubString();
- }
- classFilter = sd.isClassFilterMode();
- }
- // 计算拉取消息系统标识
- int sysFlag = PullSysFlag.buildSysFlag(//
- commitOffsetEnable, // commitOffset
- true, // suspend
- subExpression != null, // subscription
- classFilter // class filter
- );
- // 执行拉取如果拉取请求发生异常时, 提交延迟拉取消息请求
- try {
- this.pullAPIWrapper.pullKernelImpl(//
- pullRequest.getMessageQueue(), // 1
- subExpression, // 2
- subscriptionData.getSubVersion(), // 3
- pullRequest.getNextOffset(), // 4
- this.defaultMQPushConsumer.getPullBatchSize(), // 5
- sysFlag, // 6
- commitOffsetValue, // 7
- BROKER_SUSPEND_MAX_TIME_MILLIS, // 8
- CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9
- CommunicationMode.ASYNC, // 10
- pullCallback// 11
- );
- } catch (Exception e) {
- log.error("pullKernelImpl exception", e);
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
- }
- }
- private void correctTagsOffset(final PullRequest pullRequest) {
- if (0L == pullRequest.getProcessQueue().getMsgCount().get()) {
- this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true);
- }
- }
- #pullMessage(...) 说明 : 拉取消息
第 3 至 6 行 : 消息处理队列已经终止, 不进行消息拉取
第 9 行 : 设置消息处理队列最后拉取消息时间
第 11 至 18 行 :Consumer 未处于运行中状态, 不进行消息拉取, 提交延迟拉取消息请求
第 20 至 25 行 : Consumer 处于暂停中, 不进行消息拉取, 提交延迟拉取消息请求
第 27 至 37 行 : 消息处理队列持有消息超过最大允许值(默认: 1000 条), 不进行消息拉取, 提交延迟拉取消息请求
第 39 至 49 行 :Consumer 为并发消费 并且 消息队列持有消息跨度过大(消息跨度 = 持有消息最后一条和第一条的消息位置差, 默认: 2000), 不进行消息拉取, 提交延迟拉取消息请求
第 50 至 70 行 : 顺序消费 相关跳过, 详细解析见: RocketMQ 源码分析 Message 顺序发送与消费
第 72 至 78 行 :Topic 对应的订阅信息不存在, 不进行消息拉取, 提交延迟拉取消息请求
第 222 至 224 行 : 判断请求是否使用 Consumer 本地的订阅信息( SubscriptionData ), 而不使用 Broker 里的订阅信息详细解析见: PullMessageProcessor#processRequest(...) 第 64 至 110 行代码
第 226 行 : 是否开启过滤类过滤模式详细解析见: RocketMQ 源码分析 Filtersrv
第 229 至 235 行 : 计算拉取消息请求系统标识详细解析见: PullMessageRequestHeader.sysFlag
第 237 至 255 行 :
执行消息拉取异步请求详细解析见: PullAPIWrapper#pullKernelImpl(...)
当发起请求产生异常时, 提交延迟拉取消息请求对应 Broker 处理拉取消息逻辑见: PullMessageProcessor#processRequest(...)
PullCallback : 拉取消息回调:
第 86 行 : 处理拉取结果详细逻辑见: PullAPIWrapper#processPullResult(...)
第 89 至 192 行 : 处理拉取状态结果:
第 90 至 139 行 : 拉取到消息( FOUND ) :
第 91 至 93 行 : 设置下次拉取消息队列位置
第 95 至 97 行 : 统计
第 101 至 102 行 : 拉取到消息的消息列表为空, 提交立即拉取消息请求为什么会存在拉取到消息, 但是消息结果未空呢? 原因见: PullAPIWrapper#processPullResult(...)
第 106 至 108 行 : 统计
第 111 行 : 提交拉取到的消息到消息处理队列详细解析见: ProcessQueue#putMessage(...)
第 113 至 118 行 : 提交消费请求到
ConsumeMessageService
详细解析见: ConsumeMessageConcurrentlyService
第 120 至 126 行 : 根据拉取频率( pullInterval ), 提交立即或者延迟拉取消息请求默认拉取频率为 0ms , 提交立即拉取消息请求
第 129 至 137 行 : 下次拉取消息队列位置小于上次拉取消息队列位置 或者 第一条消息的消息队列位置小于上次拉取消息队列位置, 则判定为 BUG, 输出警告日志
第 140 至 149 行 : 没有新消息( NO_NEW_MSG ) :
第 142 行 : 设置下次拉取消息队列位置
第 145 行 : 更正消费进度详细解析见:
#correctTagsOffset(...)
第 148 行 : 提交立即拉取消息请求
第 150 至 159 行 : 有新消息但是不匹配 ( NO_MATCHED_MSG ) 逻辑同 NO_NEW_MSG
第 160 至 189 行 : 拉取请求的消息队列位置不合法 (OFFSET_ILLEGAL)
第 164 行 : 设置下次拉取消息队列位置
第 167 行 : 设置消息处理队列为 dropped
第 169 至 188 行 : 提交延迟任务, 进行队列移除
第 175 至 178 行 : 更新消费进度, 同步消费进度到 Broker
第 181 行 : 移除消费处理队列
疑问: 为什么不立即移除???
第 196 至 204 行 : 发生异常, 提交延迟拉取消息请求
#correctTagsOffset(...)
: 更正消费进度
第 258 至 261 行 : 当消费处理队列持有消息数量为 0 时, 更新消费进度为拉取请求的拉取消息队列位置
- PullAPIWrapper#pullKernelImpl(...)
- /**
- * 拉取消息核心方法
- *
- * @param mq 消息队列
- * @param subExpression 订阅表达式
- * @param subVersion 订阅版本号
- * @param offset 拉取队列开始位置
- * @param maxNums 拉取消息数量
- * @param sysFlag 拉取请求系统标识
- * @param commitOffset 提交消费进度
- * @param brokerSuspendMaxTimeMillis broker 挂起请求最大时间
- * @param timeoutMillis 请求 broker 超时时长
- * @param communicationMode 通讯模式
- * @param pullCallback 拉取回调
- * @return 拉取消息结果只有通讯模式为同步时, 才返回结果, 否则返回 null
- * @throws MQClientException 当寻找不到 broker 时, 或发生其他 client 异常
- * @throws RemotingException 当远程调用发生异常时
- * @throws MQBrokerException 当 broker 发生异常时只有通讯模式为同步时才会发生该异常
- * @throws InterruptedException 当发生中断异常时
- */
- protected PullResult pullKernelImpl(
- final MessageQueue mq,
- final String subExpression,
- final long subVersion,
- final long offset,
- final int maxNums,
- final int sysFlag,
- final long commitOffset,
- final long brokerSuspendMaxTimeMillis,
- final long timeoutMillis,
- final CommunicationMode communicationMode,
- final PullCallback pullCallback
- ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- // 获取 Broker 信息
- FindBrokerResult findBrokerResult =
- this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
- this.recalculatePullFromWhichNode(mq), false);
- if (null == findBrokerResult) {
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
- findBrokerResult =
- this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
- this.recalculatePullFromWhichNode(mq), false);
- }
- // 请求拉取消息
- if (findBrokerResult != null) {
- int sysFlagInner = sysFlag;
- if (findBrokerResult.isSlave()) {
- sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
- }
- PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
- requestHeader.setConsumerGroup(this.consumerGroup);
- requestHeader.setTopic(mq.getTopic());
- requestHeader.setQueueId(mq.getQueueId());
- requestHeader.setQueueOffset(offset);
- requestHeader.setMaxMsgNums(maxNums);
- requestHeader.setSysFlag(sysFlagInner);
- requestHeader.setCommitOffset(commitOffset);
- requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
- requestHeader.setSubscription(subExpression);
- requestHeader.setSubVersion(subVersion);
- String brokerAddr = findBrokerResult.getBrokerAddr();
- if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) { // TODO filtersrv
- brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
- }
- PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
- brokerAddr,
- requestHeader,
- timeoutMillis,
- communicationMode,
- pullCallback);
- return pullResult;
- }
- // Broker 信息不存在, 则抛出异常
- throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
- }
说明 : 拉取消息核心方法该方法参数较多, 可以看下代码注释上每个参数的说明
第 34 至 43 行 : 获取 Broker 信息(Broker 地址是否为从节点)
- #recalculatePullFromWhichNode(...)
- #MQClientInstance#findBrokerAddressInSubscribe(...)
第 45 至 78 行 : 请求拉取消息
第 81 行 : 当 Broker 信息不存在, 则抛出异常
- PullAPIWrapper#recalculatePullFromWhichNode(...)
- /**
- * 消息队列 与 拉取 Broker 的映射
- * 当拉取消息时, 会通过该映射获取拉取请求对应的 Broker
- */
- private ConcurrentHashMap pullFromWhichNodeTable =
- new ConcurrentHashMap(32);
- /**
- * 是否使用默认 Broker
- */
- private volatile boolean connectBrokerByUser = false;
- /**
- * 默认 Broker 编号
- */
- private volatile long defaultBrokerId = MixAll.MASTER_ID;
- /**
- * 计算消息队列拉取消息对应的 Broker 编号
- *
- * @param mq 消息队列
- * @return Broker 编号
- */
- public long recalculatePullFromWhichNode(final MessageQueue mq) {
- // 若开启默认 Broker 开关, 则返回默认 Broker 编号
- if (this.isConnectBrokerByUser()) {
- return this.defaultBrokerId;
- }
- // 若消息队列映射拉取 Broker 存在, 则返回映射 Broker 编号
- AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
- if (suggest != null) {
- return suggest.get();
- }
- // 返回 Broker 主节点编号
- return MixAll.MASTER_ID;
- }
说明 : 计算消息队列拉取消息对应的 Broker 编号
- MQClientInstance#findBrokerAddressInSubscribe(...)
- /**
- * Broker 名字 和 Broker 地址相关 Map
- */
- private final ConcurrentHashMap> brokerAddrTable =
- new ConcurrentHashMap<>();
- /**
- * 获得 Broker 信息
- *
- * @param brokerName broker 名字
- * @param brokerId broker 编号
- * @param onlyThisBroker 是否必须是该 broker
- * @return Broker 信息
- */
- public FindBrokerResult findBrokerAddressInSubscribe(//
- final String brokerName, //
- final long brokerId, //
- final boolean onlyThisBroker//
- ) {
- String brokerAddr = null; // broker 地址
- boolean slave = false; // 是否为从节点
- boolean found = false; // 是否找到
- // 获得 Broker 信息
- HashMap map = this.brokerAddrTable.get(brokerName);
- if (map != null && !map.isEmpty()) {
- brokerAddr = map.get(brokerId);
- slave = brokerId != MixAll.MASTER_ID;
- found = brokerAddr != null;
- // 如果不强制获得, 选择一个 Broker
- if (!found && !onlyThisBroker) {
- Entry entry = map.entrySet().iterator().next();
- brokerAddr = entry.getValue();
- slave = entry.getKey() != MixAll.MASTER_ID;
- found = true;
- }
- }
- // 找到 broker, 则返回信息
- if (found) {
- return new FindBrokerResult(brokerAddr, slave);
- }
- // 找不到, 则返回空
- return null;
- }
说明 : 获取 Broker 信息(Broker 地址是否为从节点)
- PullAPIWrapper#processPullResult(...)
- /**
- * 处理拉取结果
- * 1. 更新消息队列拉取消息 Broker 编号的映射
- * 2. 解析消息, 并根据订阅信息消息 tagCode 匹配合适消息
- *
- * @param mq 消息队列
- * @param pullResult 拉取结果
- * @param subscriptionData 订阅信息
- * @return 拉取结果
- */
- public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
- final SubscriptionData subscriptionData) {
- PullResultExt pullResultExt = (PullResultExt) pullResult;
- // 更新消息队列拉取消息 Broker 编号的映射
- this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
- // 解析消息, 并根据订阅信息消息 tagCode 匹配合适消息
- if (PullStatus.FOUND == pullResult.getPullStatus()) {
- // 解析消息
- ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
- List msgList = MessageDecoder.decodes(byteBuffer);
- // 根据订阅信息消息 tagCode 匹配合适消息
- List msgListFilterAgain = msgList;
- if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
- msgListFilterAgain = new ArrayList<>(msgList.size());
- for (MessageExt msg : msgList) {
- if (msg.getTags() != null) {
- if (subscriptionData.getTagsSet().contains(msg.getTags())) {
- msgListFilterAgain.add(msg);
- }
- }
- }
- }
- // Hook
- if (this.hasHook()) {
- FilterMessageContext filterMessageContext = new FilterMessageContext();
- filterMessageContext.setUnitMode(unitMode);
- filterMessageContext.setMsgList(msgListFilterAgain);
- this.executeHook(filterMessageContext);
- }
- // 设置消息队列当前最小 / 最大位置到消息拓展字段
- for (MessageExt msg : msgListFilterAgain) {
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
- Long.toString(pullResult.getMinOffset()));
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
- Long.toString(pullResult.getMaxOffset()));
- }
- // 设置消息列表
- pullResultExt.setMsgFoundList(msgListFilterAgain);
- }
- // 清空消息二进制数组
- pullResultExt.setMessageBinary(null);
- return pullResult;
- }
说明 : 处理拉取结果
更新消息队列拉取消息 Broker 编号的映射
解析消息, 并根据订阅信息消息 tagCode 匹配合适消息
第 16 行 : 更新消息队列拉取消息 Broker 编号的映射下次拉取消息时, 如果未设置默认拉取的 Broker 编号, 会使用更新后的 Broker 编号
第 18 至 55 行 : 解析消息, 并根据订阅信息消息 tagCode 匹配合适消息
第 20 至 22 行 : 解析消息详细解析见: RocketMQ 源码分析 Message 基础
第 24 至 35 行 : 根据订阅信息 tagCode 匹配消息
第 37 至 43 行 :Hook
第 45 至 51 行 : 设置消息队列当前最小 / 最大位置到消息拓展字段
第 54 行 : 设置消息队列
第 58 行 : 清空消息二进制数组
- ProcessQueue#putMessage(...)
- /**
- * 消息映射读写锁
- */
- private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
- /**
- * 消息映射
- * key: 消息队列位置
- */
- private final TreeMap msgTreeMap = new TreeMap<>();
- /**
- * 消息数
- */
- private final AtomicLong msgCount = new AtomicLong();
- /**
- * 添加消息最大队列位置
- */
- private volatile long queueOffsetMax = 0L;
- /**
- * 是否正在消费
- */
- private volatile boolean consuming = false;
- /**
- * Broker 累计消息数量
- * 计算公式 = queueMaxOffset - 新添加消息数组[n - 1].queueOffset
- * Acc = Accumulation
- * cnt = (猜测)对比度
- */
- private volatile long msgAccCnt = 0;
- /**
- * 添加消息, 并返回是否提交给消费者
- * 返回 true, 当有新消息添加成功时,
- *
- * @param msgs 消息
- * @return 是否提交给消费者
- */
- public boolean putMessage(final List msgs) {
- boolean dispatchToConsume = false;
- try {
- this.lockTreeMap.writeLock().lockInterruptibly();
- try {
- // 添加消息
- int validMsgCnt = 0;
- for (MessageExt msg : msgs) {
- MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
- if (null == old) {
- validMsgCnt++;
- this.queueOffsetMax = msg.getQueueOffset();
- }
- }
- msgCount.addAndGet(validMsgCnt);
- // 计算是否正在消费
- if (!msgTreeMap.isEmpty() && !this.consuming) {
- dispatchToConsume = true;
- this.consuming = true;
- }
- // Broker 累计消息数量
- if (!msgs.isEmpty()) {
- MessageExt messageExt = msgs.get(msgs.size() - 1);
- String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
- if (property != null) {
- long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
- if (accTotal > 0) {
- this.msgAccCnt = accTotal;
- }
- }
- }
- } finally {
- this.lockTreeMap.writeLock().unlock();
- }
- } catch (InterruptedException e) {
- log.error("putMessage exception", e);
- }
- return dispatchToConsume;
- }
总结
如果用最简单粗暴的方式描述 PullConsumer 拉取消息的过程, 那就是如下的代码:
while (true) {
if (不满足拉取消息) {
Thread.sleep(间隔);
- continue;
- }
主动拉取消息();
}
6PushConsumer 消费消息
ConsumeMessageConcurrentlyService 提交消费请求
- ConsumeMessageConcurrentlyService#submitConsumeRequest(...)
- /**
- * 消费线程池队列
- */
- private final BlockingQueue consumeRequestQueue;
- /**
- * 消费线程池
- */
- private final ThreadPoolExecutor consumeExecutor;
- public void submitConsumeRequest(//
- final List msgs, //
- final ProcessQueue processQueue, //
- final MessageQueue messageQueue, //
- final boolean dispatchToConsume) {
- final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
- if (msgs.size() <= consumeBatchSize) { // 提交消息小于批量消息数, 直接提交消费请求
- ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
- try {
- this.consumeExecutor.submit(consumeRequest);
- } catch (RejectedExecutionException e) {
- this.submitConsumeRequestLater(consumeRequest);
- }
- } else { // 提交消息大于批量消息数, 进行分拆成多个消费请求
- for (int total = 0; total < msgs.size(); ) {
- // 计算当前拆分请求包含的消息
- List msgThis = new ArrayList<>(consumeBatchSize);
- for (int i = 0; i < consumeBatchSize; i++, total++) {
- if (total < msgs.size()) {
- msgThis.add(msgs.get(total));
- } else {
- break;
- }
- }
- // 提交拆分消费请求
- ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
- try {
- this.consumeExecutor.submit(consumeRequest);
- } catch (RejectedExecutionException e) {
- // 如果被拒绝, 则将当前拆分消息 + 剩余消息提交延迟消费请求
- for (; total < msgs.size(); total++) {
- msgThis.add(msgs.get(total));
- }
- this.submitConsumeRequestLater(consumeRequest);
- }
- }
- }
- }
说明 : 提交立即消费请求
第 16 至 22 行 : 提交消息小于等于批量消费数, 直接提交消费请求
第 23 至 47 行 : 当提交消息大于批量消费数, 进行分拆成多个请求
第 25 至 33 行 : 计算当前拆分请求包含的消息
第 35 至 38 行 : 提交拆分消费请求
第 39 至 44 行 : 提交请求被拒绝, 则将当前拆分消息 + 剩余消息提交延迟消费请求, 结束拆分循环
- ConsumeMessageConcurrentlyService#submitConsumeRequestLater
- /**
- * 提交延迟消费请求
- *
- * @param msgs 消息列表
- * @param processQueue 消息处理队列
- * @param messageQueue 消息队列
- */
- private void submitConsumeRequestLater(//
- final List msgs, //
- final ProcessQueue processQueue, //
- final MessageQueue messageQueue//
- ) {
- this.scheduledExecutorService.schedule(new Runnable() {
- @Override
- public void run() {
- ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true);
- }
- }, 5000, TimeUnit.MILLISECONDS);
- }
- /**
- * 提交延迟消费请求
- * @param consumeRequest 消费请求
- */
- private void submitConsumeRequestLater(final ConsumeRequest consumeRequest//
- ) {
- this.scheduledExecutorService.schedule(new Runnable() {
- @Override
- public void run() {
- ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest); // TODO BUG ?
- }
- }, 5000, TimeUnit.MILLISECONDS);
- }
说明 : 提交延迟消费请求
第 34 行 : 直接调用
ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
如果消息数超过批量消费上限, 会不会是 BUG
- ConsumeRequest
- class ConsumeRequest implements Runnable {
- /**
- * 消费消息列表
- */
- private final List msgs;
- /**
- * 消息处理队列
- */
- private final ProcessQueue processQueue;
- /**
- * 消息队列
- */
- private final MessageQueue messageQueue;
- public ConsumeRequest(List msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
- this.msgs = msgs;
- this.processQueue = processQueue;
- this.messageQueue = messageQueue;
- }
- @Override
- public void run() {
- // 废弃队列不进行消费
- if (this.processQueue.isDropped()) {
- log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
- return;
- }
- MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; // 监听器
- ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); // 消费 Context
- ConsumeConcurrentlyStatus status = null; // 消费结果状态
- // Hook
- ConsumeMessageContext consumeMessageContext = null;
- if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
- consumeMessageContext = new ConsumeMessageContext();
- consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
- consumeMessageContext.setProps(new HashMap());
- consumeMessageContext.setMq(messageQueue);
- consumeMessageContext.setMsgList(msgs);
- consumeMessageContext.setSuccess(false);
- ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
- }
- long beginTimestamp = System.currentTimeMillis();
- boolean hasException = false;
- ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; // 消费返回结果类型
- try {
- // 当消息为重试消息, 设置 Topic 为原始 Topic
- ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
- // 设置开始消费时间
- if (msgs != null && !msgs.isEmpty()) {
- for (MessageExt msg : msgs) {
- MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
- }
- }
- // 进行消费
- status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
- } catch (Throwable e) {
- log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
- RemotingHelper.exceptionSimpleDesc(e), //
- ConsumeMessageConcurrentlyService.this.consumerGroup,
- msgs,
- messageQueue);
- hasException = true;
- }
- // 解析消费返回结果类型
- long consumeRT = System.currentTimeMillis() - beginTimestamp;
- if (null == status) {
- if (hasException) {
- returnType = ConsumeReturnType.EXCEPTION;
- } else {
- returnType = ConsumeReturnType.RETURNNULL;
- }
- } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
- returnType = ConsumeReturnType.TIME_OUT;
- } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
- returnType = ConsumeReturnType.FAILED;
- } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
- returnType = ConsumeReturnType.SUCCESS;
- }
- // Hook
- if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
- consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
- }
- // 消费结果状态为空时, 则设置为稍后重新消费
- if (null == status) {
- log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
- ConsumeMessageConcurrentlyService.this.consumerGroup,
- msgs,
- messageQueue);
- status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- // Hook
- if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
- consumeMessageContext.setStatus(status.toString());
- consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
- ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
- }
- // 统计
- ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
- .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
- // 处理消费结果
- if (!processQueue.isDropped()) {
- ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
- } else {
- log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
- }
- }
- }
说明 : 消费请求提交请求执行消费
第 24 至 28 行 : 废弃处理队列不进行消费
第 34 至 44 行 :Hook
第 51 行 : 当消息为重试消息, 设置 Topic 为原始 Topic 例如: 原始 Topic 为 TopicTest, 重试时 Topic 为
%RETRY%please_rename_unique_group_name_4
, 经过该方法, Topic 设置回 TopicTest
第 53 至 58 行 : 设置开始消费时间
第 61 行 : 进行消费
第 71 至 85 行 : 解析消费返回结果类型
第 87 至 90 行 :Hook
第 92 至 99 行 : 消费结果状态未空时, 则设置消费结果状态为稍后消费
第 101 至 106 行 :Hook
第 108 至 110 行 : 统计
第 112 至 117 行 : 处理消费结果如果消费处理队列被移除, 恰好消息被消费, 则可能导致消息重复消费, 因此, 消息消费要尽最大可能性实现幂等性详细解析见: ConsumeMessageConcurrentlyService#processConsumeResult(...)
- ConsumeMessageConcurrentlyService#processConsumeResult(...)
- public void processConsumeResult(//
- final ConsumeConcurrentlyStatus status, //
- final ConsumeConcurrentlyContext context, //
- final ConsumeRequest consumeRequest//
- ) {
- int ackIndex = context.getAckIndex();
- // 消息为空, 直接返回
- if (consumeRequest.getMsgs().isEmpty())
- return;
- // 计算从 consumeRequest.msgs[0]到 consumeRequest.msgs[ackIndex]的消息消费成功
- switch (status) {
- case CONSUME_SUCCESS:
- if (ackIndex >= consumeRequest.getMsgs().size()) {
- ackIndex = consumeRequest.getMsgs().size() - 1;
- }
- // 统计成功 / 失败数量
- int ok = ackIndex + 1;
- int failed = consumeRequest.getMsgs().size() - ok;
- this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
- this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
- break;
- case RECONSUME_LATER:
- ackIndex = -1;
- // 统计成功 / 失败数量
- this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
- consumeRequest.getMsgs().size());
- break;
- default:
- break;
- }
- // 处理消费失败的消息
- switch (this.defaultMQPushConsumer.getMessageModel()) {
- case BROADCASTING: // 广播模式, 无论是否消费失败, 不发回消息到 Broker, 只打印 Log
- for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
- MessageExt msg = consumeRequest.getMsgs().get(i);
- log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
- }
- break;
- case CLUSTERING:
- // 发回消息失败到 Broker
- List msgBackFailed = new ArrayList<>(consumeRequest.getMsgs().size());
- for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
- MessageExt msg = consumeRequest.getMsgs().get(i);
- boolean result = this.sendMessageBack(msg, context);
- if (!result) {
- msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
- msgBackFailed.add(msg);
- }
- }
- // 发回 Broker 失败的消息, 直接提交延迟重新消费
- if (!msgBackFailed.isEmpty()) {
- consumeRequest.getMsgs().removeAll(msgBackFailed);
- this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
- }
- break;
- default:
- break;
- }
- // 移除消费成功消息, 并更新最新消费进度
- long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
- if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
- this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
- }
- }
说明 : 处理消费结果
第 8 至 10 行 : 消费请求消息未空时, 直接返回
第 12 至 32 行 : 计算 ackIndex 值
consumeRequest.msgs[0 - ackIndex]
为消费成功, 需要进行 ack 确认
第 14 至 23 行 :CONSUME_SUCCESS :
ackIndex = context.getAckIndex()
第 24 至 29 行 :RECONSUME_LATER :ackIndex = -1
第 34 至 63 行 : 处理消费失败的消息
第 36 至 41 行 :BROADCASTING : 广播模式, 无论是否消费失败, 不发回消息到 Broker, 只打印日志
第 42 至 60 行 :CLUSTERING : 集群模式, 消费失败的消息发回到 Broker
第 43 至 52 行 : 发回消费失败的消息到 Broker 详细解析见: DefaultMQPushConsumerImpl#sendMessageBack(...)
第 54 至 59 行 : 发回 Broker 失败的消息, 直接提交延迟重新消费
如果发回 Broker 成功, 结果因为例如网络异常, 导致 Consumer 以为发回失败, 判定消费发回失败, 会导致消息重复消费, 因此, 消息消费要尽最大可能性实现幂等性
第 65 至 69 行 : 移除 ** 消费成功和消费失败但发回 Broker 成功 ** 的消息, 并更新最新消费进度
为什么会有 ** 消费失败但发回 Broker 成功的消息? 见第 56 行 **
- ProcessQueue#removeMessage(...)
- ProcessQueue#removeMessage(...)
- /**
- * 移除消息, 并返回第一条消息队列位置
- *
- * @param msgs 消息
- * @return 消息队列位置
- */
- public long removeMessage(final List msgs) {
- long result = -1;
- final long now = System.currentTimeMillis();
- try {
- this.lockTreeMap.writeLock().lockInterruptibly();
- this.lastConsumeTimestamp = now;
- try {
- if (!msgTreeMap.isEmpty()) {
- result = this.queueOffsetMax + 1; // 这里 + 1 的原因是: 如果 msgTreeMap 为空时, 下一条获得的消息位置为 queueOffsetMax+1
- // 移除消息
- int removedCnt = 0;
- for (MessageExt msg : msgs) {
- MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
- if (prev != null) {
- removedCnt--;
- }
- }
- msgCount.addAndGet(removedCnt);
- if (!msgTreeMap.isEmpty()) {
- result = msgTreeMap.firstKey();
- }
- }
- } finally {
- this.lockTreeMap.writeLock().unlock();
- }
- } catch (Throwable t) {
- log.error("removeMessage exception", t);
- }
- return result;
- }
- ConsumeMessageConcurrentlyService#cleanExpireMsg(...)
- public void start() {
- this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- cleanExpireMsg();
- }
- }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
- }
- /**
- * 清理过期消息
- */
- private void cleanExpireMsg() {
- Iterator> it =
- this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry next = it.next();
- ProcessQueue pq = next.getValue();
- pq.cleanExpiredMsg(this.defaultMQPushConsumer);
- }
- }
说明 : 定时清理过期消息, 默认周期: 15min
- ProcessQueue#cleanExpiredMsg(...)
- public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
- // 顺序消费时, 直接返回
- if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
- return;
- }
- // 循环移除消息
- int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16; // 每次循环最多移除 16 条
- for (int i = 0; i < loop; i++) {
- // 获取第一条消息判断是否超时, 若不超时, 则结束循环
- MessageExt msg = null;
- try {
- this.lockTreeMap.readLock().lockInterruptibly();
- try {
- if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
- msg = msgTreeMap.firstEntry().getValue();
- } else {
- break;
- }
- } finally {
- this.lockTreeMap.readLock().unlock();
- }
- } catch (InterruptedException e) {
- log.error("getExpiredMsg exception", e);
- }
- try {
- // 发回超时消息
- pushConsumer.sendMessageBack(msg, 3);
- log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
- // 判断此时消息是否依然是第一条, 若是, 则进行移除
- try {
- this.lockTreeMap.writeLock().lockInterruptibly();
- try {
- if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
- try {
- msgTreeMap.remove(msgTreeMap.firstKey());
- } catch (Exception e) {
- log.error("send expired msg exception", e);
- }
- }
- } finally {
- this.lockTreeMap.writeLock().unlock();
- }
- } catch (InterruptedException e) {
- log.error("getExpiredMsg exception", e);
- }
- } catch (Exception e) {
- log.error("send expired msg exception", e);
- }
- }
- }
说明 : 移除过期消息
第 2 至 5 行 : 顺序消费时, 直接返回
第 7 至 9 行 : 循环移除消息默认最大循环次数: 16 次
第 10 至 25 行 : 获取第一条消息判断是否超时, 若不超时, 则结束循环
第 29 行 : 发回超时消息到 Broker
第 32 至 48 行 : 判断此时消息是否依然是第一条, 若是, 则进行移除
7PushConsumer 发回消费失败消息
- DefaultMQPushConsumerImpl#sendMessageBack(...)
- public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- try {
- // Consumer 发回消息
- String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
- : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
- this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
- this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
- } catch (Exception e) { // TODO 疑问: 什么情况下会发生异常
- // 异常时, 使用 Client 内置 Producer 发回消息
- log.error("sendMessageBack Exception," + this.defaultMQPushConsumer.getConsumerGroup(), e);
- Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
- String originMsgId = MessageAccessor.getOriginMessageId(msg);
- MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
- newMsg.setFlag(msg.getFlag());
- MessageAccessor.setProperties(newMsg, msg.getProperties());
- MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
- MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
- MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
- newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
- this.mQClientFactory.getDefaultMQProducer().send(newMsg);
- }
- }
说明 : 发回消息
第 4 至 8 行 :Consumer 发回消息详细解析见: MQClientAPIImpl#consumerSendMessageBack(...)
第 10 至 25 行 : 发生异常时, Consumer 内置默认 Producer 发送消息
疑问: 什么样的情况下会发生异常呢?
- MQClientAPIImpl#consumerSendMessageBack(...)
- /**
- * Consumer 发回消息
- * @param addr Broker 地址
- * @param msg 消息
- * @param consumerGroup 消费分组
- * @param delayLevel 延迟级别
- * @param timeoutMillis 超时
- * @param maxConsumeRetryTimes 消费最大重试次数
- * @throws RemotingException 当远程调用发生异常时
- * @throws MQBrokerException 当 Broker 发生异常时
- * @throws InterruptedException 当线程中断时
- */
- public void consumerSendMessageBack(
- final String addr,
- final MessageExt msg,
- final String consumerGroup,
- final int delayLevel,
- final long timeoutMillis,
- final int maxConsumeRetryTimes
- ) throws RemotingException, MQBrokerException, InterruptedException {
- ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
- requestHeader.setGroup(consumerGroup);
- requestHeader.setOriginTopic(msg.getTopic());
- requestHeader.setOffset(msg.getCommitLogOffset());
- requestHeader.setDelayLevel(delayLevel);
- requestHeader.setOriginMsgId(msg.getMsgId());
- requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return;
- }
- default:
- break;
- }
- throw new MQBrokerException(response.getCode(), response.getRemark());
- }
8Consumer 消费进度
OffsetStore
RemoteBrokerOffsetStore
:Consumer 集群模式 下, 使用远程 Broker 消费进度
LocalFileOffsetStore
:Consumer 广播模式下, 使用本地 文件 消费进度
- OffsetStore#load(...)
- LocalFileOffsetStore#load(...)
- @Override
- public void load() throws MQClientException {
- // 从本地硬盘读取消费进度
- OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
- if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
- offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
- // 打印每个消息队列的消费进度
- for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
- AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
- log.info("load consumer's offset, {} {} {}",
- this.groupName,
- mq,
- offset.get());
- }
- }
- }
说明 : 从本地文件加载消费进度到内存
- OffsetSerializeWrapper
- public class OffsetSerializeWrapper extends RemotingSerializable {
- private ConcurrentHashMap offsetTable =
- new ConcurrentHashMap<>();
- public ConcurrentHashMap getOffsetTable() {
- return offsetTable;
- }
- public void setOffsetTable(ConcurrentHashMap offsetTable) {
- this.offsetTable = offsetTable;
- }
- }
说明 : 本地 Offset 存储序列化
- Yunai-MacdeMacBook-Pro-2:config yunai$ cat /Users/yunai/.rocketmq_offsets/192.168.17.0@DEFAULT/please_rename_unique_group_name_1/offsets.json
- {
- "offsetTable":{{
- "brokerName":"broker-a",
- "queueId":3,
- "topic":"TopicTest"
- }:1470,{
- "brokerName":"broker-a",
- "queueId":2,
- "topic":"TopicTest"
- }:1471,{
- "brokerName":"broker-a",
- "queueId":1,
- "topic":"TopicTest"
- }:1470,{
- "brokerName":"broker-a",
- "queueId":0,
- "topic":"TopicTest"
- }:1470
- }
- }
- RemoteBrokerOffsetStore#load(...)
- @Override
- public void load() {
- }
说明 : 不进行加载, 实际读取消费进度时, 从 Broker 获取
OffsetStore#readOffset(...)
读取消费进度类型:
READ_FROM_MEMORY : 从内存读取
READ_FROM_STORE : 从存储 ( Broker 或 文件 ) 读取
MEMORY_FIRST_THEN_STORE
: 优先从内存读取, 读取不到, 从存储读取
- LocalFileOffsetStore#readOffset(...)
- @Override public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
- if (mq != null) {
- switch (type) {
- case MEMORY_FIRST_THEN_STORE:
- case READ_FROM_MEMORY:
- {
- AtomicLong offset = this.offsetTable.get(mq);
- if (offset != null) {
- return offset.get();
- } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
- return - 1;
- }
- }
- case READ_FROM_STORE:
- {
- OffsetSerializeWrapper offsetSerializeWrapper;
- try {
- offsetSerializeWrapper = this.readLocalOffset();
- } catch(MQClientException e) {
- return - 1;
- }
- if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
- AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
- if (offset != null) {
- this.updateOffset(mq, offset.get(), false);
- return offset.get();
- }
- }
- }
- default:
- break;
- }
- }
- return - 1;
- }
第 16 行 : 从 文件 读取消费进度
- RemoteBrokerOffsetStore#readOffset(...)
- @Override public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
- if (mq != null) {
- switch (type) {
- case MEMORY_FIRST_THEN_STORE:
- case READ_FROM_MEMORY:
- {
- AtomicLong offset = this.offsetTable.get(mq);
- if (offset != null) {
- return offset.get();
- } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
- return - 1;
- }
- }
- case READ_FROM_STORE:
- {
- try {
- long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
- AtomicLong offset = new AtomicLong(brokerOffset);
- this.updateOffset(mq, offset.get(), false);
- return brokerOffset;
- }
- // No offset in broker
- catch(MQBrokerException e) {
- return - 1;
- }
- //Other exceptions
- catch(Exception e) {
- log.warn("fetchConsumeOffsetFromBroker exception," + mq, e);
- return - 2;
- }
- }
- default:
- break;
- }
- }
- return - 1;
- }
第 16 行 : 从 Broker 读取消费进度
OffsetStore#updateOffset(...)
该方法
RemoteBrokerOffsetStore
与
LocalFileOffsetStore
实现相同
- @Override
- public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
- if (mq != null) {
- AtomicLong offsetOld = this.offsetTable.get(mq);
- if (null == offsetOld) {
- offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
- }
- if (null != offsetOld) {
- if (increaseOnly) {
- MixAll.compareAndIncreaseOnly(offsetOld, offset);
- } else {
- offsetOld.set(offset);
- }
- }
- }
- }
- OffsetStore#persistAll(...)
- LocalFileOffsetStore#persistAll(...)
- @Override
- public void persistAll(Set mqs) {
- if (null == mqs || mqs.isEmpty())
- return;
- OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
- for (Map.Entry entry : this.offsetTable.entrySet()) {
- if (mqs.contains(entry.getKey())) {
- AtomicLong offset = entry.getValue();
- offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
- }
- }
- String jsonString = offsetSerializeWrapper.toJson(true);
- if (jsonString != null) {
- try {
- MixAll.string2File(jsonString, this.storePath);
- } catch (IOException e) {
- log.error("persistAll consumer offset Exception," + this.storePath, e);
- }
- }
- }
说明 : 持久化消费进度将消费进度写入文件
- RemoteBrokerOffsetStore#persistAll(...)
- @Override
- public void persistAll(Set mqs) {
- if (null == mqs || mqs.isEmpty())
- return;
- // 持久化消息队列
- final HashSet unusedMQ = new HashSet<>();
- if (!mqs.isEmpty()) {
- for (Map.Entry entry : this.offsetTable.entrySet()) {
- MessageQueue mq = entry.getKey();
- AtomicLong offset = entry.getValue();
- if (offset != null) {
- if (mqs.contains(mq)) {
- try {
- this.updateConsumeOffsetToBroker(mq, offset.get());
- log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
- this.groupName,
- this.mQClientFactory.getClientId(),
- mq,
- offset.get());
- } catch (Exception e) {
- log.error("updateConsumeOffsetToBroker exception," + mq.toString(), e);
- }
- } else {
- unusedMQ.add(mq);
- }
- }
- }
- }
- // 移除不适用的消息队列
- if (!unusedMQ.isEmpty()) {
- for (MessageQueue mq : unusedMQ) {
- this.offsetTable.remove(mq);
- log.info("remove unused mq, {}, {}", mq, this.groupName);
- }
- }
- }
说明 : 持久化指定消息队列数组的消费进度到 Broker, 并移除非指定消息队列
- MQClientInstance#persistAllConsumerOffset(...)
- private void startScheduledTask() {
- // 定时同步消费进度
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- MQClientInstance.this.cleanOfflineBroker();
- MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
- } catch (Exception e) {
- log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
- }
- }
- }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
- }
说明 : 定时进行持久化, 默认周期: 5000ms
重要说明 :
消费进度持久化不仅仅只有定时持久化, 拉取消息分配消息队列等等操作, 都会进行消费进度持久化
消费进度持久化不仅仅只有定时持久化, 拉取消息分配消息队列等等操作, 都会进行消费进度持久化
消费进度持久化不仅仅只有定时持久化, 拉取消息分配消息队列等等操作, 都会进行消费进度持久化
9 结尾
可能是本系列最长的一篇文章, 如有表达错误和不清晰, 请多多见谅
来源: http://www.suo.im/iwZiY