What is RocketMQ
Apache RocketMQ 是一个分布式消息传递和流平台, 具有低延迟, 高性能和可靠性, 万亿级容量和灵活的可扩展性. 它由四部分组成: NamerServer,Broker,Produer 和 Customer. 它们中的每一个都可以水平扩展而没有单一的故障点. 如下面的截图所示.
NameServer Cluster
NameServers 提供轻量级服务发现和路由. 每个 NameServer 记录完整的路由信息, 提供相应的读写服务, 并支持快速存储扩展.
NameServer 是一个几乎无状态的节点, 可集群部署, 节点之间无任何信息同步.
将 NameServer 地址列表提供给客户端有四种方法:
编程方式, 例如 producer.setNamesrvAddr("ip:port")
Java 选项, 使用 rocketmq.namesrv.addr
环境变量, 使用 NAMESRV_ADDR
HTTP 端点.
Broker Cluster
Brokers 通过提供轻量级的 TOPIC 和 QUEUE 机制来处理消息存储. 它们支持 Push 和 Pull 模型, 包含容错机制(2 个副本或 3 个副本), 并提供强大的峰值填充和按原始时间顺序累积数千亿条消息的能力. 此外, Brokers 还提供灾难恢复, 丰富的指标统计和警报机制, 这些都是传统的消息传递系统所缺少的.
Broker 部署相对复杂, Broker 分为 Master 与 Slave, 一个 Master 可以对应多个 Slaver, 但是一个 Slaver 只能对应一个 Master,Master 与 Slaver 的对应关系通过指定相同的 BrokerName, 不同的 BrokerId 来定义, BrokerId 为 0 表示 Master, 非 0 表示 Slaver.Master 可以部署多个. 每个 Broker 与 NameServer 集群中的所有节点建立长连接, 定时注册 Topic 信息到所有的 NameServer.
如下图所示, Broker 有几个重要的子模块组成:
远程处理模块, 即 Broker 的入口, 处理来自客户端的请求
客户端管理模块, 管理客户端 (生产者 / 消费者) 并维护消费者的主题订阅
存储服务, 提供简单的 API 来存储或查询物理磁盘中的消息
HA 服务, 提供主从 broker 之间的数据同步功能
索引服务, 按指定的 key 来为消息创建索引并提供快速消息查询
Producer Cluster
Produers 支持分布式部署. Distributed Producers 通过多种负载均衡模式向 Broker 集群发送消息. 发送过程支持快速故障并具有低延迟.
Producer 与 NameServer 集群中的其中一个节点 (随机选择) 建立长连接, 定期从 NameServer 取 Topic 路由信息, 并向提供 Topic 服务的 Master 建立长连接, 且定时向 Master 发送心跳. Produce 完全无状态.
Customers Cluster
Customers 也支持 Push 和 Pull 两种模式的分布式部署. 它还支持群集消费和消息广播. 它提供实时消息订阅机制, 可以满足大多数消费者的需求.
Consumer 与 NameServer 集群中的其中一个节点 (随机选择) 建立长连接, 定期从 NameServer 取 Topic 路由信息, 并向提供 Topic 服务的 Master,Slaver 建立长连接, 且定时向 Master,Slaver 发送心跳. Consumer 即可从 Master 订阅消息, 也可以从 Slave 订阅消息, 订阅规则由 Broker 配置决定.
Best Practices
Produer 最佳实践
发送消息注意事项
1. 一个应用尽可能用一个 Topic, 消息子类型用 tags 来标识, tags 可以由应用自由设置. 只有发送消息设置了 tags, 消费方在订阅消息时, 才可以利用 tags 在 broker 做消息过滤.
2. 每个消息在业务层面的唯一标识码, 要设置到 keys 字段, 方便将来定位消息丢失问题. 服务器会为每个消息创建索引(哈希索引), 应用可以通过 topic,key 来查询这条消息内容, 以及消息被谁消费. 由于是哈希索引, 请务必保证 key 尽可能唯一, 这样可以避免潜在的哈希冲突.
3. 消息发送成功或者失败, 要打印消息日志, 务必要打印 sendresult 和 key 字段.
4.send 消息方法, 只要不抛异常, 就代表发送成功. 但是发送成功会有多个状态:
SEND_OK: 消息发送成功
FLUSH_DISK_TIMEOUT: 消息发送成功, 但是服务器刷盘超时, 消息已经进入服务器队列, 只有此时服务器宕机, 消息才会丢失
FLUSH_SLAVE_TIMEOUT: 消息发送成功, 但是服务器同步到 Slave 时超时, 消息已经进入服务器队列, 只有此时服务器宕机, 消息才会丢失
SLAVE_NOT_AVAILABLE: 消息发送成功, 但是此时 slave 不可用, 消息已经进入服务器队列, 只有此时服务器宕机, 消息才会丢失. 对于精确发送顺序消息的应用, 由于顺序消息的局限性, 可能会涉及到主备自动切换问题, 所以如果 sendresult 中的 status 字段不等于 SEND_OK, 就应该尝试重试. 对于其他应用, 则没有必要这样
5. 对于消息不可丢失应用, 务必要有消息重发机制
消息发送失败处理
Producer 的 send 方法本身支持内部重试, 重试逻辑如下:
至多重试 3 次
如果发送失败, 则轮转到下一个 Broker
这个方法的总耗时时间不超过 sendMsgTimeout 设置的值, 默认 10s 所以, 如果本身向 broker 发送消息产生超时异常, 就不会再做重试
如果调用 send 同步方法发送失败, 则尝试将消息存储到 db, 由后台线程定时重试, 保证消息一定到达 Broker.
选择 oneway 形式发送
一个 RPC 调用, 通常是这样一个过程
客户端发送请求到服务器
服务器处理该请求
服务器向客户端返回应答
所以一个 RPC 的耗时时间是上述三个步骤的总和, 而某些场景要求耗时非常短, 但是对可靠性要求并不高, 例如日志收集类应用, 此类应用可以采用 oneway 形式调用, oneway 形式只发送请求不等待应答, 而发送请求在客户端实现层面仅仅是一个 os 系统调用的开销, 即将数据写入客户端的 socket 缓冲区, 此过程耗时通常在微秒级. RocketMQ 不止可以直接推送消息, 在消费端注册监听器进行监听, 还可以由消费端决定自己去拉取数据.
Consumer 最佳实践
消费过程要做到幂等
RocketMQ 无法做到消息重复, 所以如果业务对消息重复非常敏感, 务必要在业务层面去重. 将消息的唯一键, 可以是 MsgId, 也可以是消息内容中的唯一标识字段, 例如订单 ID, 消费之前判断是否在 DB 或 Tair(全局 KV 存储)中存在, 如果不存在则插入, 并消费, 否则跳过.(实践过程要考虑原子性问题, 判断是否存在可以尝试插入, 如果报主键冲突, 则插入失败, 直接跳过) msgid 一定是全局唯一的标识符, 但是可能会存在同样的消息有两个不同的 msgid 的情况(有多种原因), 这种情况可能会使业务上重复, 建议最好使用消息体中的唯一标识字段去重.
批量方式消费
如果业务流程支持批量方式消费, 则可以很大程度上的提高吞吐量, 可以通过设置 Consumer 的 consumerMessageBatchMaxSize 参数, 默认是 1, 即一次消费一条.
跳过非重要的消息
发生消息堆积时, 如果消费速度一直跟不上发送速度, 可以选择丢弃不重要的消息. 例如当前 offset 和 maxOffset 差值过大时(可能时因为消息系统堆积), 直接把当前消息消费成功, 可以快速使消息的消费和发出达到平衡.
优化消息消费过程
根据实际业务需要, 尽可能的优化代码, 减少 DB 访问数量, 进而减少 RT, 提高消息的消费速度.
顺序消息
RocketMQ 通过轮询所有队列的方式来确定消息被发送到哪一个队列(负载均衡策略). 比如下面的示例中, 订单号相同的消息会被先后发送到同一个队列中:
- // RocketMQ 通过 MessageQueueSelector 中实现的算法来确定消息发送到哪一个队列上
- // RocketMQ 默认提供了两种 MessageQueueSelector 实现: 随机 / Hash
- // 当然你可以根据业务实现自己的 MessageQueueSelector 来决定消息按照何种策略发送到消息队列中
- SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
- @Override
- public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
- Integer id = (Integer) arg;
- int index = id % mqs.size();
- return mqs.get(index);
- }
- }, orderId);
事务消息
RocketMQ 除了支持普通消息, 顺序消息, 另外还支持事务消息.
MQ 与 DB 一致性
A(存在 DB 操作),B(存在 DB 操作)两方需要保证分布式事务一致性, 通过引入中间层 MQ,A 和 MQ 保持事务一致性(异常情况下通过 MQ 反查 A 接口实现 check),B 和 MQ 保证事务一致(通过重试), 从而达到最终事务一致性.
上面以 DB 为例, 其实此处可以是任何业务或者数据源.
TransactionCheckListener 是在消息的 commit 或者 rollback 消息丢失的情况下才会回调(上图中灰色部分). 这种消息丢失只存在于断网或者 rocketmq 集群挂了的情况下. 当 rocketmq 集群挂了, 如果采用异步刷盘, 存在 1s 内数据丢失风险, 异步刷盘场景下保障事务没有意义. 所以如果要核心业务用 Rocketmq 解决分布式事务问题, 建议选择同步刷盘模式.
多系统之间数据一致性
当需要保证多方 (超过 2 方) 的分布式一致性, 上面的两方事务一致性 (通过 Rocketmq 的事务性消息解决) 已经无法支持. 这个时候需要引入 TCC 模式思想.
以上图交易系统为例:
1)交易系统创建订单(往 DB 插入一条记录), 同时发送订单创建消息. 通过 RocketMq 事务性消息保证一致性
2)接着执行完成订单所需的同步核心 RPC 服务(非核心的系统通过监听 MQ 消息自行处理, 处理结果不会影响交易状态). 执行成功更改订单状态, 同时发送 MQ 消息.
3)交易系统接受自己发送的订单创建消息, 通过定时调度系统创建延时回滚任务(或者使用 RocketMq 的重试功能, 设置第二次发送时间为定时任务的延迟创建时间. 在非消息堵塞的情况下, 消息第一次到达延迟为 1ms 左右, 这时可能 RPC 还未执行完, 订单状态还未设置为完成, 第二次消费时间可以指定). 延迟任务先通过查询订单状态判断订单是否完成, 完成则不创建回滚任务, 否则创建. PS: 多个 RPC 可以创建一个回滚任务, 通过一个消费组接受一次消息就可以; 也可以通过创建多个消费组, 一个消息消费多次, 每次消费创建一个 RPC 的回滚任务. 回滚任务失败, 通过 MQ 的重发来重试.
以上是交易系统和其他系统之间保持最终一致性的解决方案.
参考:
- http://rocketmq.apache.org/docs/rmq-arc/
- https://yq.aliyun.com/articles/624207?utm_content=m_1000012577
- https://www.jianshu.com/p/2838890f3284
- https://www.jianshu.com/p/453c6e7ff81c
来源: http://www.bubuko.com/infodetail-2991128.html