本文主要基于 RocketMQ 4.0.x 正式版
1. 概述
2. Namesrv 高可用
2.1 Broker 注册到 Namesrv
2.2 ProducerConsumer 访问 Namesrv
3. Broker 高可用
3.2 Broker 主从
3.1.1 配置
3.1.2 组件
3.1.3 通信协议
- 3.1.4 Slave
- 3.1.5 Master
- 3.1.6 Master_SYNC
3.2 Producer 发送消息
3.3 Consumer 消费消息
4. 总结
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复甚至不知道如何读源码也可以请教噢
新的源码解析文章实时收到通知每周更新一篇左右
认真的源码交流微信群
1. 概述
本文主要解析 NamesrvBroker 如何实现高可用, ProducerConsumer 怎么与它们通信保证高可用
2. Namesrv 高可用
启动多个 Namesrv 实现高可用
相较于 ZookeeperConsulEtcd 等, Namesrv 是一个超轻量级的注册中心, 提供命名服务
2.1 Broker 注册到 Namesrv
多个 Namesrv 之间, 没有任何关系(不存在类似 Zookeeper 的 Leader/Follower 等角色), 不进行通信与数据同步通过 Broker 循环注册多个 Namesrv
- // BrokerOuterAPI.java
- public RegisterBrokerResult registerBrokerAll(
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId,
- final String haServerAddr,
- final TopicConfigSerializeWrapper topicConfigWrapper,
- final List filterServerList,
- final boolean oneway,
- final int timeoutMills) {
- RegisterBrokerResult registerBrokerResult = null;
- List nameServerAddressList = this.remotingClient.getNameServerAddressList();
- if (nameServerAddressList != null) {
- for (String namesrvAddr : nameServerAddressList) { // 循环多个 Namesrv
- try {
- RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
- haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
- if (result != null) {
- registerBrokerResult = result;
- }
- log.info("register broker to name server {} OK", namesrvAddr);
- } catch (Exception e) {
- log.warn("registerBroker Exception, {}", namesrvAddr, e);
- }
- }
- }
- return registerBrokerResult;
- }
2.2 ProducerConsumer 访问 Namesrv
ProducerConsumer 从 Namesrv 列表选择一个可连接的进行通信
- // NettyRemotingClient.java
- private Channel getAndCreateNameserverChannel() throws InterruptedException {
- // 返回已选择可连接 Namesrv
- String addr = this.namesrvAddrChoosed.get();
- if (addr != null) {
- ChannelWrapper cw = this.channelTables.get(addr);
- if (cw != null && cw.isOK()) {
- return cw.getChannel();
- }
- }
- //
- final List addrList = this.namesrvAddrList.get();
- if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
- try {
- // 返回已选择可连接的 Namesrv
- addr = this.namesrvAddrChoosed.get();
- if (addr != null) {
- ChannelWrapper cw = this.channelTables.get(addr);
- if (cw != null && cw.isOK()) {
- return cw.getChannel();
- }
- }
- // 从 Namesrv 列表中选择一个连接的返回
- if (addrList != null && !addrList.isEmpty()) {
- for (int i = 0; i < addrList.size(); i++) {
- int index = this.namesrvIndex.incrementAndGet();
- index = Math.abs(index);
- index = index % addrList.size();
- String newAddr = addrList.get(index);
- this.namesrvAddrChoosed.set(newAddr);
- Channel channelNew = this.createChannel(newAddr);
- if (channelNew != null)
- return channelNew;
- }
- }
- } catch (Exception e) {
- log.error("getAndCreateNameserverChannel: create name server channel exception", e);
- } finally {
- this.lockNamesrvChannel.unlock();
- }
- } else {
- log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
- }
- return null;
- }
3. Broker 高可用
启动多个 Broker 分组 形成 集群 实现高可用
Broker 分组 = Master 节点 x1 + Slave 节点 xN
类似 MySQL,Master 节点 提供读写服务, Slave 节点 只提供读服务
3.2 Broker 主从
每个分组, Master 节点 不断发送新的 CommitLog 给 Slave 节点 Slave 节点 不断上报本地的 CommitLog 已经同步到的位置给 Master 节点
Broker 分组 与 Broker 分组 之间没有任何关系, 不进行通信与数据同步
消费进度 目前不支持 Master/Slave 同步
集群内, Master 节点 有两种类型: Master_SYNCMaster_ASYNC: 前者在 Producer 发送消息时, 等待 Slave 节点 存储完毕后再返回发送结果, 而后者不需要等待
3.1.1 配置
目前官方提供三套配置:
2m-2s-async
brokerClusterName | brokerName | brokerRole | brokerId |
---|---|---|---|
DefaultCluster | broker-a | ASYNC_MASTER | 0 |
DefaultCluster | broker-a | SLAVE | 1 |
DefaultCluster | broker-b | ASYNC_MASTER | 0 |
DefaultCluster | broker-b | SLAVE | 1 |
2m-2s-sync
brokerClusterName | brokerName | brokerRole | brokerId |
---|---|---|---|
DefaultCluster | broker-a | SYNC_MASTER | 0 |
DefaultCluster | broker-a | SLAVE | 1 |
DefaultCluster | broker-b | SYNC_MASTER | 0 |
DefaultCluster | broker-b | SLAVE | 1 |
2m-noslave
brokerClusterName | brokerName | brokerRole | brokerId |
---|---|---|---|
DefaultCluster | broker-a | ASYNC_MASTER | 0 |
DefaultCluster | broker-b | ASYNC_MASTER | 0 |
3.1.2 组件
再看具体实现代码之前, 我们来看看 Master/Slave 节点 包含的组件:
Master 节点
AcceptSocketService : 接收 Slave 节点 连接
HAConnection
ReadSocketService : 读来自 Slave 节点 的数据
WriteSocketService : 写到往 Slave 节点 的数据
Slave 节点
HAClient : 对 Master 节点 连接读写数据
3.1.3 通信协议
Master 节点 与 Slave 节点 通信协议很简单, 只有如下两条
对象 | 用途 | 第几位 | 字段 | 数据类型 | 字节数 | 说明 |
---|---|---|---|---|---|---|
Slave=>Master | 上报 CommitLog已经 同步到的 < strong ow="233" oh="50"> 物理 位置 | |||||
0 | maxPhyOffset | Long | 8 | CommitLog 最大物理位置 | ||
Master=>Slave | 传输新的 CommitLog 数据 | |||||
0 | fromPhyOffset | Long | 8 | CommitLog 开始物理位置 | ||
1 | size | Int | 4 | 传输 CommitLog 数据长度 | ||
2 | body | Bytes | size | 传输 CommitLog 数据 |
3.1.4 Slave
Slave 主循环, 实现了不断不断不断从 Master 传输 CommitLog 数据, 上传 Master 自己本地的 CommitLog 已经同步物理位置
- // HAClient.java
- public void run() {
- log.info(this.getServiceName() + "service started");
- while (!this.isStopped()) {
- try {
- if (this.connectMaster()) {
- // 若到满足上报间隔, 上报到 Master 进度
- if (this.isTimeToReportOffset()) {
- boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
- if (!result) {
- this.closeMaster();
- }
- }
- this.selector.select(1000);
- // 处理读取事件
- boolean ok = this.processReadEvent();
- if (!ok) {
- this.closeMaster();
- }
- // 若进度有变化, 上报到 Master 进度
- if (!reportSlaveMaxOffsetPlus()) {
- continue;
- }
- // Master 过久未返回数据, 关闭连接
- long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
- if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
- .getHaHousekeepingInterval()) {
- log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
- + "] expired," + interval);
- this.closeMaster();
- log.warn("HAClient, master not response some time, so close connection");
- }
- } else {
- this.waitForRunning(1000 * 5);
- }
- } catch (Exception e) {
- log.warn(this.getServiceName() + "service has exception.", e);
- this.waitForRunning(1000 * 5);
- }
- }
- log.info(this.getServiceName() + "service end");
- }
第 8 至 14 行 : 固定间隔 (默认 5s) 向 Master 上报 Slave 本地 CommitLog 已经同步到的物理位置该操作还有心跳的作用
第 16 至 22 行 : 处理 Master 传输 Slave 的 CommitLog 数据
我们来看看
#dispatchReadRequest(...)
与
#reportSlaveMaxOffset(...)
是怎么实现的
- // HAClient.java
- /**
- * 读取 Master 传输的 CommitLog 数据, 并返回是异常
- * 如果读取到数据, 写入 CommitLog
- * 异常原因:
- * 1. Master 传输来的数据 offset 不等于 Slave 的 CommitLog 数据最大 offset
- * 2. 上报到 Master 进度失败
- *
- * @return 是否异常
- */
- private boolean dispatchReadRequest() {
- final int msgHeaderSize = 8 + 4; // phyoffset + size
- int readSocketPos = this.byteBufferRead.position();
- while (true) {
- // 读取到请求
- int diff = this.byteBufferRead.position() - this.dispatchPostion;
- if (diff >= msgHeaderSize) {
- // 读取 masterPhyOffsetbodySize 使用 dispatchPostion 的原因是: 处理数据粘包导致数据读取不完整
- long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
- int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);
- // 校验 Master 传输来的数据 offset 是否和 Slave 的 CommitLog 数据最大 offset 是否相同
- long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
- if (slavePhyOffset != 0) {
- if (slavePhyOffset != masterPhyOffset) {
- log.error("master pushed offset not equal the max phy offset in slave, SLAVE:"
- + slavePhyOffset + "MASTER:" + masterPhyOffset);
- return false;
- }
- }
- // 读取到消息
- if (diff >= (msgHeaderSize + bodySize)) {
- // 写入 CommitLog
- byte[] bodyData = new byte[bodySize];
- this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);
- this.byteBufferRead.get(bodyData);
- HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
- // 设置处理到的位置
- this.byteBufferRead.position(readSocketPos);
- this.dispatchPostion += msgHeaderSize + bodySize;
- // 上报到 Master 进度
- if (!reportSlaveMaxOffsetPlus()) {
- return false;
- }
- // 继续循环
- continue;
- }
- }
- // 空间写满, 重新分配空间
- if (!this.byteBufferRead.hasRemaining()) {
- this.reallocateByteBuffer();
- }
- break;
- }
- return true;
- }
- /**
- * 上报进度
- *
- * @param maxOffset 进度
- * @return 是否上报成功
- */
- private boolean reportSlaveMaxOffset(final long maxOffset) {
- this.reportOffset.position(0);
- this.reportOffset.limit(8);
- this.reportOffset.putLong(maxOffset);
- this.reportOffset.position(0);
- this.reportOffset.limit(8);
- for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
- try {
- this.socketChannel.write(this.reportOffset);
- } catch (IOException e) {
- log.error(this.getServiceName()
- + "reportSlaveMaxOffset this.socketChannel.write exception", e);
- return false;
- }
- }
- return !this.reportOffset.hasRemaining();
- }
- 3.1.5 Master
ReadSocketService 逻辑同
HAClient#processReadEvent(...)
基本相同, 我们直接看代码
- // ReadSocketService.java
- private boolean processReadEvent() {
- int readSizeZeroTimes = 0;
- // 清空 byteBufferRead
- if (!this.byteBufferRead.hasRemaining()) {
- this.byteBufferRead.flip();
- this.processPostion = 0;
- }
- while (this.byteBufferRead.hasRemaining()) {
- try {
- int readSize = this.socketChannel.read(this.byteBufferRead);
- if (readSize > 0) {
- readSizeZeroTimes = 0;
- // 设置最后读取时间
- this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
- if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
- // 读取 Slave 请求来的 CommitLog 的最大位置
- int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
- long readOffset = this.byteBufferRead.getLong(pos - 8);
- this.processPostion = pos;
- // 设置 Slave CommitLog 的最大位置
- HAConnection.this.slaveAckOffset = readOffset;
- // 设置 Slave 第一次请求的位置
- if (HAConnection.this.slaveRequestOffset < 0) {
- HAConnection.this.slaveRequestOffset = readOffset;
- log.info("slave[" + HAConnection.this.clientAddr + "] request offset" + readOffset);
- }
- // 通知目前 Slave 进度主要用于 Master 节点为同步类型的
- HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
- }
- } else if (readSize == 0) {
- if (++readSizeZeroTimes >= 3) {
- break;
- }
- } else {
- log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
- return false;
- }
- } catch (IOException e) {
- log.error("processReadEvent exception", e);
- return false;
- }
- }
- return true;
- }
WriteSocketService 计算 Slave 开始同步的位置后, 不断向 Slave 传输新的 CommitLog 数据
- // WriteSocketService.java
- @Override
- public void run() {
- HAConnection.log.info(this.getServiceName() + "service started");
- while (!this.isStopped()) {
- try {
- this.selector.select(1000);
- // 未获得 Slave 读取进度请求, sleep 等待
- if (-1 == HAConnection.this.slaveRequestOffset) {
- Thread.sleep(10);
- continue;
- }
- // 计算初始化 nextTransferFromWhere
- if (-1 == this.nextTransferFromWhere) {
- if (0 == HAConnection.this.slaveRequestOffset) {
- long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
- masterOffset = masterOffset - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMapedFileSizeCommitLog());
- if (masterOffset < 0) {
- masterOffset = 0;
- }
- this.nextTransferFromWhere = masterOffset;
- } else {
- this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
- }
- log.info("master transfer data from" + this.nextTransferFromWhere + "to slave[" + HAConnection.this.clientAddr
- + "], and slave request" + HAConnection.this.slaveRequestOffset);
- }
- if (this.lastWriteOver) {
- long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
- if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) { // 心跳
- // Build Header
- this.byteBufferHeader.position(0);
- this.byteBufferHeader.limit(headerSize);
- this.byteBufferHeader.putLong(this.nextTransferFromWhere);
- this.byteBufferHeader.putInt(0);
- this.byteBufferHeader.flip();
- this.lastWriteOver = this.transferData();
- if (!this.lastWriteOver)
- continue;
- }
- } else { // 未传输完成, 继续传输
- this.lastWriteOver = this.transferData();
- if (!this.lastWriteOver)
- continue;
- }
- // 选择新的 CommitLog 数据进行传输
- SelectMappedBufferResult selectResult =
- HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
- if (selectResult != null) {
- int size = selectResult.getSize();
- if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
- size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
- }
- long thisOffset = this.nextTransferFromWhere;
- this.nextTransferFromWhere += size;
- selectResult.getByteBuffer().limit(size);
- this.selectMappedBufferResult = selectResult;
- // Build Header
- this.byteBufferHeader.position(0);
- this.byteBufferHeader.limit(headerSize);
- this.byteBufferHeader.putLong(thisOffset);
- this.byteBufferHeader.putInt(size);
- this.byteBufferHeader.flip();
- this.lastWriteOver = this.transferData();
- } else { // 没新的消息, 挂起等待
- HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
- }
- } catch (Exception e) {
- HAConnection.log.error(this.getServiceName() + "service has exception.", e);
- break;
- }
- }
- // 断开连接 & 暂停写线程 & 暂停读线程 & 释放 CommitLog
- if (this.selectMappedBufferResult != null) {
- this.selectMappedBufferResult.release();
- }
- this.makeStop();
- readSocketService.makeStop();
- haService.removeConnection(HAConnection.this);
- SelectionKey sk = this.socketChannel.keyFor(this.selector);
- if (sk != null) {
- sk.cancel();
- }
- try {
- this.selector.close();
- this.socketChannel.close();
- } catch (IOException e) {
- HAConnection.log.error("", e);
- }
- HAConnection.log.info(this.getServiceName() + "service end");
- }
- /**
- * 传输数据
- */
- private boolean transferData() throws Exception {
- int writeSizeZeroTimes = 0;
- // Write Header
- while (this.byteBufferHeader.hasRemaining()) {
- int writeSize = this.socketChannel.write(this.byteBufferHeader);
- if (writeSize > 0) {
- writeSizeZeroTimes = 0;
- this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
- } else if (writeSize == 0) {
- if (++writeSizeZeroTimes >= 3) {
- break;
- }
- } else {
- throw new Exception("ha master write header error < 0");
- }
- }
- if (null == this.selectMappedBufferResult) {
- return !this.byteBufferHeader.hasRemaining();
- }
- writeSizeZeroTimes = 0;
- // Write Body
- if (!this.byteBufferHeader.hasRemaining()) {
- while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
- int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
- if (writeSize > 0) {
- writeSizeZeroTimes = 0;
- this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
- } else if (writeSize == 0) {
- if (++writeSizeZeroTimes >= 3) {
- break;
- }
- } else {
- throw new Exception("ha master write body error < 0");
- }
- }
- }
- boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
- if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
- this.selectMappedBufferResult.release();
- this.selectMappedBufferResult = null;
- }
- return result;
- }
- 3.1.6 Master_SYNC
Producer 发送消息时, Master_SYNC 节点 会等待 Slave 节点 存储完毕后再返回发送结果
核心代码如下:
- // CommitLog.java
- public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
- // .... 省略处理发送代码
- // Synchronous write double 如果是同步 Master, 同步到从节点
- if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
- HAService service = this.defaultMessageStore.getHaService();
- if (msg.isWaitStoreMsgOK()) {
- // Determine whether to wait
- if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
- if (null == request) {
- request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
- }
- service.putRequest(request);
- // 唤醒 WriteSocketService
- service.getWaitNotifyObject().wakeupAll();
- boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
- if (!flushOK) {
- log.error("do sync transfer other node, wait return, but failed, topic:" + msg.getTopic() + "tags:"
- + msg.getTags() + "client address:" + msg.getBornHostString());
- putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
- }
- }
- // Slave problem
- else {
- // Tell the producer, slave not available
- putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
- }
- }
- }
- return putMessageResult;
- }
第 16 行 : 唤醒 WriteSocketService
唤醒后, WriteSocketService 挂起等待新消息结束, Master 传输 Slave 新的 CommitLog 数据
Slave 收到数据后, 立即上报最新的 CommitLog 同步进度到 MasterReadSocketService 唤醒第 18 行:
request#waitForFlush(...)
我们来看下
GroupTransferService
的核心逻辑代码:
- // GroupTransferService.java
- private void doWaitTransfer() {
- synchronized (this.requestsRead) {
- if (!this.requestsRead.isEmpty()) {
- for (CommitLog.GroupCommitRequest req : this.requestsRead) {
- // 等待 Slave 上传进度
- boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
- for (int i = 0; !transferOK && i < 5; i++) {
- this.notifyTransferObject.waitForRunning(1000); // 唤醒
- transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
- }
- if (!transferOK) {
- log.warn("transfer messsage to slave timeout," + req.getNextOffset());
- }
- // 唤醒请求, 并设置是否 Slave 同步成功
- req.wakeupCustomer(transferOK);
- }
- this.requestsRead.clear();
- }
- }
- }
3.2 Producer 发送消息
Producer 发送消息时, 会对 Broker 集群 的所有队列进行选择
核心代码如下:
- // DefaultMQProducerImpl.java
- private SendResult sendDefaultImpl( //
- Message msg, //
- final CommunicationMode communicationMode, //
- final SendCallback sendCallback, //
- final long timeout //
- ) throws MQClientException,
- RemotingException,
- MQBrokerException,
- InterruptedException {
- // .... 省略: 处理校验逻辑
- // 获取 Topic 路由信息
- TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
- if (topicPublishInfo != null && topicPublishInfo.ok()) {
- MessageQueue mq = null; // 最后选择消息要发送到的队列
- Exception exception = null;
- SendResult sendResult = null; // 最后一次发送结果
- int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步多次调用
- int times = 0; // 第几次发送
- String[] brokersSent = new String[timesTotal]; // 存储每次发送消息选择的 broker 名
- // 循环调用发送消息, 直到成功
- for (; times < timesTotal; times++) {
- String lastBrokerName = null == mq ? null: mq.getBrokerName();
- MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 选择消息要发送到的队列
- if (tmpmq != null) {
- mq = tmpmq;
- brokersSent[times] = mq.getBrokerName();
- try {
- beginTimestampPrev = System.currentTimeMillis();
- // 调用发送消息核心方法
- sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
- endTimestamp = System.currentTimeMillis();
- // 更新 Broker 可用性信息
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
- // .... 省略: 处理发送返回结果
- }
- } catch(e) { // .... 省略: 处理异常
- }
- } else {
- break;
- }
- }
- // .... 省略: 处理发送返回结果
- }
- // .... 省略: 处理找不到消息路由
- }
如下是调试
#sendDefaultImpl(...)
时 TopicPublishInfo 的结果, Producer 获得到了 broker-a,broker-b 两个 Broker 分组 的消息队列:
3.3 Consumer 消费消息
Consumer 消费消息时, 会对 Broker 集群 的所有队列进行选择
4. 总结
来源: http://www.suo.im/3IxO1D