目录
RocketMQ 的负载均衡
producer 对 MessageQueue 的负载均衡
producer 负载均衡
系统计算路由 MessageQueue
自定义路由 MessageQueue
Consumer 的负载均衡
消费端设置负责均衡策略
负载均衡的时机
RocketMQ 的负载均衡
producer 对 MessageQueue 的负载均衡
通过调试代码可以知道, 所谓的 MessageQueue 就是 broker 上的队列信息, 每个 topic 在创建的时候可以指定相应的 queue 的数量. 也就是说, 一个 topic 的消息存储在多个主 broker 中
producer 负载均衡
producer 端的负载均衡主要是在选择对应的 broker. 在 producer 发送消息的时候会对消息进行路由, 看到底是路由到哪个 broker. 下面主要说下以下两种发送消息的方法: 系统计算路由 MessageQueue, 自定义路由 MessageQueue.
系统计算路由 MessageQueue
SendResult send = producer.send(message, 60 * 1000);
系统计算路由 MessageQueue 的其他路由算法
- public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
- if (this.sendLatencyFaultEnable) {
- try {
- int index = tpInfo.getSendWhichQueue().getAndIncrement();
- for (int i = 0; i <tpInfo.getMessageQueueList().size(); i++) {
- int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
- if (pos < 0)
- pos = 0;
- MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
- if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
- if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
- return mq;
- }
- }
- final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
- int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
- if (writeQueueNums> 0) {
- final MessageQueue mq = tpInfo.selectOneMessageQueue();
- if (notBestBroker != null) {
- mq.setBrokerName(notBestBroker);
- mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
- }
- return mq;
- } else {
- latencyFaultTolerance.remove(notBestBroker);
- }
- } catch (Exception e) {
- log.error("Error occurred when selecting message queue", e);
- }
- return tpInfo.selectOneMessageQueue();
- }
- // 默认策略 (路由到当前的 broker 主节点列表取模后的 broker 中)
- return tpInfo.selectOneMessageQueue(lastBrokerName);
- }
自定义路由 MessageQueue
- SendResult send = producer.send(message, new MessageQueueSelector() {
- /**
- *
- * @param mqs 通过 name server 返回的 broker 主节点列表
- * @param msg 当前消息
- * @param arg
- * @return
- */
- @Override
- public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
- int size = mqs.size();
- long timeMillis = System.currentTimeMillis();
- return mqs.get((int)timeMillis % size);
- }
- }, 60 * 1000);
Consumer 的负载均衡
消费端设置负责均衡策略
在 consumer.statrt() 中, consumer 会对所订阅的 topic 上的 messagequeue 做负载均衡
DefaultConsumerPushImpl.start()
下的
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
, 默认返回的是
AllocateMessageQueueAveragely
负责均衡策略
AllocateMessageQueueAveragely
负载均衡的时机
- // RebalanceService
- @Override
- public void run() {
- log.info(this.getServiceName() + "service started");
- while (!this.isStopped()) {
- this.waitForRunning(waitInterval);
- // 开始进行分配
- this.mqClientFactory.doRebalance();
- }
- log.info(this.getServiceName() + "service end");
- }
具体实现
- /**
- consumerGroup : 消费组名称
- currentCID: 当前消费者实例 Id(随机数)
- mqAll: 该 topic 对应的 queue 的信息列表
- cidAll: 消费组中所有的消费者列表
- */
- @Override
- public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
- List<String> cidAll) {
- if (currentCID == null || currentCID.length() <1) {
- throw new IllegalArgumentException("currentCID is empty");
- }
- if (mqAll == null || mqAll.isEmpty()) {
- throw new IllegalArgumentException("mqAll is null or mqAll empty");
- }
- if (cidAll == null || cidAll.isEmpty()) {
- throw new IllegalArgumentException("cidAll is null or cidAll empty");
- }
- List<MessageQueue> result = new ArrayList<MessageQueue>();
- if (!cidAll.contains(currentCID)) {
- log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
- consumerGroup,
- currentCID,
- cidAll);
- return result;
- }
- int index = cidAll.indexOf(currentCID);
- int mod = mqAll.size() % cidAll.size();
- int averageSize =
- mqAll.size() <= cidAll.size() ? 1 : (mod> 0 && index <mod ? mqAll.size() / cidAll.size()
- + 1 : mqAll.size() / cidAll.size());
- int startIndex = (mod> 0 && index < mod) ? index * averageSize : index * averageSize + mod;
- int range = Math.min(averageSize, mqAll.size() - startIndex);
- for (int i = 0; i < range; i++) {
- result.add(mqAll.get((startIndex + i) % mqAll.size()));
- }
- return result;
- }
来源: https://www.cnblogs.com/KevinStark/p/12497214.html