消息中间件作为分布式系统的重要成员, 各大公司及开源均有许多解决方案. 目前主流的开源解决方案包括 RabbitMQ,RocketMQ,Kafka,ActiveMQ 等. 消息这个东西说简单也简单, 说难也难. 简单之处在于好用方便, 接入简单使用简单, 异步操作能够解耦系统间的依赖, 同时失败后也能够追溯重试. 难的地方在于, 设计一套可以支撑业务的消息机制, 并提供高可用架构, 解决消息存储, 消息重试, 消息队列的负载均衡等一系列问题. 然而难也不代表没有方法或者 "套路", 熟悉一下原理与实现, 多看几个框架的源码后多总结势必能找出一些共性.
消息框架大同小异, 熟练掌握其原理, 工作机制是必要的. 就拿用的比较多的 RocketMQ 为引, 来说说消息引擎的设计与实现. 阿里的消息引擎经过了从 Notify 到 Napoli, 再到 MetaQ 三代的发展, 现在已经非常成熟, 在不同部门的代码中现在没准都还可以从代码里看到这一系列演进过程. 当前的 Apache RocketMQ 就是阿里将 MetaQ 项目捐赠给了 Apache 基金会, 而内部还是沿用 MetaQ 的名称.
首先诠释几个消息相关的基本概念.
每个消息队列都必须建立一个 Topic.
消息可以分组, 每个消息队列都至少需要一个生产者 Producer 和一个消费者 Consumer. 生产者生产发送消息, 消费者接收消费消息.
每个消费者和生产者都会分批提个 ID.
RocketMQ 系统架构
接下来再来看看 RocketMQ 的架构, 如图所示, 简要描述一下几种角色及作用.
NameServer
NameServer 是消息 Topic 的注册中心, 用于发现和管理消息生产者, 消费者, 及路由关系.
Broker
消息存储与转发的中转站, 使用队列机制管理数据存储. Broker 中会存储多份消息数据进行容错, 以 Master/Slave 的架构保证系统的高可用, Broker 中可以部署单个或多个 Master. 单个 Master 的场景, Master 挂掉后, Producer 新产生的消息无法被消费, 但已经发送到 Broker 的消息, 由于 Slave 节点的存在, 还能继续被 Consumer 所消费; 如果部署多个 Master 则系统能能正常运转.
另外, Broker 中的 Master 和 Slave 不是像 Zookeeper 集群中用选举机制进行确定, 而是固定的配置, 这也是在高可用场景需要部署多个 Master 的原因.
生产者将消息发送到 Broker 中后, Broker 会将消息写到本地的 CommitLog 文件中, 保存消息.
Producer
生产者会和 NameServer 集群中某一节点建立长链接, 定时从 NamerServeri 获取 Topic 路由信息, 并且和 Broker 建立心跳.
Consumer
消费者需要给生产者一个明确的消费成功的回应, MetaQ 才会认为消费成功, 否则失败. 失败后, RocketMQ 会将消息重新发回 Broker, 在指定的延迟时间内进行重试, 当重试达到一定的次数后 (默认 16 次),MetaQ 则认为此消息不能被消费, 消息会被投递到死信队列.
这个架构看其实是否很熟悉? 好像接触过的一些分布式系统的架构和这个长的都比较像是吧, 甚至只要里面框图的角色稍微换换就能变成另外一个框架的介绍, 比如 Dubbo/Redis....
并且在 RocketMQ 架构设计中, 要解决的问题与其他分布式框架也可以触类旁通. Master/Slave 机制, 天然的读写分离方式都是分布式高可用系统的典型解决方案.
负载均衡
负载均衡是消息框架需要解决的又一个重要问题. 当系统中生产者生产了大量消息, 而消费者有多个或多台机器时, 就需要平衡负载, 让消息均分地被消费者进行消费. 目前 RocketMQ 中使用了多种负载均衡算法. 主要有以下几种, 静态配置由于过于简单, 直接为消费者配置需要消费的队列, 因此直接忽略.
求平均数法
环形队列法
一致 Hash 算法
Machine Room 算法
静态配置
来看一下源码, RocketMQ 内部对以上负载均衡算法均有实现, 并定义了一个接口 AllocateMessageQueueStrategy, 采用策略模式, 每种负载均衡算法都依靠实现这个接口实现, 在运行中, 会获取这个接口的实例, 从而动态判断到底采用的是哪种负载均衡算法.
- public interface AllocateMessageQueueStrategy {
- /**
- * Allocating by consumer id
- *
- * @param consumerGroup current consumer group
- * @param currentCID current consumer id
- * @param mqAll message queue set in current topic
- * @param cidAll consumer set in current consumer group
- * @return The allocate result of given strategy
- */
- List<MessageQueue> allocate(
- final String consumerGroup,
- final String currentCID,
- final List<MessageQueue> mqAll,
- final List<String> cidAll
- );
- /**
- * Algorithm name
- *
- * @return The strategy name
- */
- String getName();
- }
1. 求平均数法
顾名思义, 就是根据消息队列的数量和消费者的数量, 求出单个消费者上应该负担的平均消费队列数, 然后根据消费者的 ID, 按照取模的方式将消息队列分配到指定的 consumer 上. 具体代码可以去 GitHub 上找, 截取核心算法代码如下, mqAll 就是消息队列的结构, 是一个 MessageQueue 的 List,cidAll 是消费者 ID 的列表, 也是一个 List. 考虑 mqAll 和 cidAll 固定时以及变化时, 当前消费者节点会从队列中获取到哪个队列中的消息, 比如当 averageSize 大于 1 时, 这时每个消费者上的消息队列就不止一个, 而分配在每个消费者的上的队列的 ID 是连续的.
- int index = cidAll.indexOf(currentCID);
- int mod = mqAll.size() % cidAll.size();
- int averageSize =
- mqAll.size() <= cidAll.size() ? 1 : (mod> 0 && index <mod ? mqAll.size() / cidAll.size()
- + 1 : mqAll.size() / cidAll.size());
- int startIndex = (mod> 0 && index <mod) ? index * averageSize : index * averageSize + mod;
- int range = Math.min(averageSize, mqAll.size() - startIndex);
- for (int i = 0; i < range; i++) {
- result.add(mqAll.get((startIndex + i) % mqAll.size()));
- }
- return result;
2. 环形平均法
这种算法更为简单. 首先获取当前消费者在整个列表中的下标 index, 直接用求余方法得到当前消费者应该处理的消息队列. 注意 mqAll 的 size 和 cidAll 的 size 可以是任意的.
当 ciAll.size() == mqAll.size() 时, 该算法就是类似 hashtable 的求余分桶.
当 ciAll.size()> mqAll.size() 时, 那么多出的消费者上并不能获取到消费的队列, 只有部分消费者能够获取到消息队列并执行, 相当于在消费者资源充足的情况下, 由于队列数少, 所以使用其中一部分消费者就能满足需求, 不用额外的开销.
当 ciAll.size() <mqAll.size() 时, 这样每个消费者上需要负载的队列数就超过了 1 个, 并且区别于直接求平均的方式, 分配在每个消费者上的消费队列不是连续的, 而是有一定步长的间隔.
- int index = cidAll.indexOf(currentCID);
- for (int i = index; i < mqAll.size(); i++) {
- if (i % cidAll.size() == index) {
- result.add(mqAll.get(i));
- }
- }
- return result;
3. 一致 Hash 算法
循环所有需要消费的队列, 根据队列 toString 后的 hash 值计算出处理当前队列的最近节点并分配给该节点. routeNode 中方法稍微复杂一些, 有时间建议细看, 这里就只说功能.
- Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
- for (String cid : cidAll) {
- cidNodes.add(new ClientNode(cid));
- }
- final ConsistentHashRouter<ClientNode> router; //for building hash ring
- if (customHashFunction != null) {
- router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
- } else {
- router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
- }
- List<MessageQueue> results = new ArrayList<MessageQueue>();
- for (MessageQueue mq : mqAll) {
- ClientNode clientNode = router.routeNode(mq.toString());
- if (clientNode != null && currentCID.equals(clientNode.getKey())) {
- results.add(mq);
- }
- }
- return results;
4. Machine Room 算法
基于机房的 Hash 算法. 这个命名看起来很诈唬, 其实和上面的普通求余算法是一样的, 只不过多了个配置和过滤, 为了把这个说清楚就把源码贴全一点. 可以看到在这个算法的实现类中多了一个成员 consumeridcs, 这个就是 consumer id 的一个集合, 按照一定的约定, 预先给 broker 命名, 例如 us@metaq4, 然后给不同集群配置不同的 consumeridcs, 从而实现不同机房处理不同消息队列的能力.
- /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.rebalance;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Set;
- import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
- import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageQueue;
- /**
- * Computer room Hashing queue algorithm, such as Alipay logic room
- */
- public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
- private Set<String> consumeridcs;
- @Override
- public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
- List<String> cidAll) {
- List<MessageQueue> result = new ArrayList<MessageQueue>();
- int currentIndex = cidAll.indexOf(currentCID);
- if (currentIndex <0) {
- return result;
- }
- List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
- for (MessageQueue mq : mqAll) {
- String[] temp = mq.getBrokerName().split("@");
- if (temp.length == 2 && consumeridcs.contains(temp[0])) {
- premqAll.add(mq);
- }
- }
- int mod = premqAll.size() / cidAll.size();
- int rem = premqAll.size() % cidAll.size();
- int startIndex = mod * currentIndex;
- int endIndex = startIndex + mod;
- for (int i = startIndex; i <endIndex; i++) {
- result.add(mqAll.get(i));
- }
- if (rem> currentIndex) {
- result.add(premqAll.get(currentIndex + mod * cidAll.size()));
- }
- return result;
- }
- @Override
- public String getName() {
- return "MACHINE_ROOM";
- }
- public Set<String> getConsumeridcs() {
- return consumeridcs;
- }
- public void setConsumeridcs(Set<String> consumeridcs) {
- this.consumeridcs = consumeridcs;
- }
- }
由于近些年阿里海外业务的扩展和投入, RocketMQ 等中间件对常见的海外业务场景的支持也更加健全. 典型的场景包括跨单元消费以及消息路由. 跨单元消费是比较好实现的, 就是在 consumer 中增加一个配置, 指定接收消息的来源单元, RocketMQ 内部会完成客户端从指定单元拉取消息的工作. 而全球消息路由则是需要一些公共资源, 消息的发送方只能将消息发送到一个指定单元 / 机房, 然后将消息路由到另外指定的单元, consumer 部署在指定单元. 区别在于一个配置在客户端, 一个配置在服务端.
总结
从 RocketMQ 的设计, 原理以及用过的个人用过的其他分布式框架上看, 典型的分布式系统在设计中无外乎要解决的就是以下几点, RocketMQ 全都用上了.
服务的注册和发现. 一般会有一个统一的注册中心进行管理维护.
服务的提供方和使用方间的通信, 可以是异步也可以是同步, 例如 dubbo 服务同步服务, 而消息类型就是异步通信.
HA-- 高可用架构. 八字决 ---- "主从同步, 读写分离". 要再加一句的话可以是 "异地多活".
负载均衡. 典型的负载均衡算法在文章内容里面已经列出好几种了, 常用的基本也就这些.
当然消息框架设计中用到的套路远不止这些, 包括如何保证消息消费的顺序性, 消费者和服务端通信, 以及消息持久化等问题也是难点和重点, 同样, 分布式缓存系统也需要解决这些问题, 先写到这里, 要完全理解并自己设计一个这样的框架难度还是相当大的.
来源: https://www.cnblogs.com/XiaoHDeBlog/p/12920754.html