最近一段项目实践中大量使用了基于 RabbitMQ 的消息中间件, 也积累的一些经验和思考, 特此成文, 望大家不吝赐教.
本文包括 RabbitMQ 基本概念, 进阶概念, 实践与思考等三部分, 着重强调相关概念和基于 RabbitMQ 进行扩展开发的思路, 并简要展示 RabbitMQ 客户端的编码, 接下来通过一个思维导图来展示整体思路, 红星表示重点部分.
1. 基本概念
官方文档: http://www.rabbitmq.com/#getstarted
1.1. 核心实体
进入详细介绍之前, 先来看一张简化版的消息流转的模型图.
a. 生产者 Producer 发送消息, 消息分为消息标识和消息体 (payLoad 有效载荷) 两部分, 消息标识中包含 Exchange 交换器的名字和 RoutingKey 路由键信息.
b. 由于交换器之前已经通过 2 个不同的 BindingKey 绑定键分别绑定了两个队列, 因此交换器可以对比路由键和绑定键, 之后将消息路由到匹配的队列中.
c. 队列将消息推送到指定的一个或多个消费者中, 多个消费者会选择最简单的 RoundRobin 轮训方式进行选择.
Exchange 交换器
核心概念, 可以简化理解为路由器, 其不存储数据, 其通常会和一个队列绑定, 但也可以绑定到另一个交换器上. 其包括 4 种类型的交换器类型, 生产实践中主要使用可以精细管理的 direct 和 topic 两种. direct, 路由规则为完全匹配; topic, 支持完全匹配, 也支持模糊匹配; fanout, 会将消息转发到该交换器绑定的所有队列中; header, 实际中无应用.
Queue 队列
用于存储消息, 和 Kafka 的消息模型完全不同, 其会将消息存储在 Topic 中. 因此在实现类似 ConsumerGroup 概念时差异很大, Kafka 是可以回溯消息的, 但 Rabbit 新绑定的队列的数据是空的, 不能回溯.
Binding 绑定
其通过绑定键将交换器和队列关联起来
RouteKey & BindingKey 路由键和绑定键
通常会将路由键和绑定键都称为路由键, 其差异是路由键是包含在消息标识中的, 而绑定键是用于在交换器和队列间建立绑定关系的, 消息会通过它们的匹配情况进行路由.
1.2. 通信
通信实体
包括 Connection 连接和 Channel 通道, 连接通常对应一个基于 TCP 的 Socket, 建立 Connection 的关键参数包括用户名, 密码, 虚拟主机, 主机地址和端口. 一个连接可以建立多个 Channel 实例, 推荐控制数量(比如 10 个), 但 Channel 实例不能在线程间共享, 应用程序需要为每一个线程开辟一个 Channel.
AMQP 协议
Java 技术栈汇中, 关于消息通信听到比较多的是 JMS, 而 AMQP 协议相对更加严格一些, 其包括 Module Layer,Session Layer, Transport Layer 三个层次, 业务开发主要接触到的是 Module Layer, 客户端可以通过 Queue.Declare,Basic.Consume 等命令进行操作.
1.3. 虚拟主机与用户
vhost
虚拟主机, 可以在逻辑上看做一台 RabbitMQ 服务器, 其拥有自己的交换器, 队列和绑定关系等. RabbitMQ 对权限的管理就是基于 vhost 进行的, 默认会创建一个全局的 / 虚拟主机, 通常不推荐直接使用该 vhost, 而是需要自定义一个 vhost 便于管理.
User
对于某一个用户, 通常包括 3 种类型的权限: read, 允许读取队列数据; write, 允许向队列发送数据; config, 允许创建队列, 如果客户端需要支持添加队列, 需要添加该权限, 否则会报无权限错误[踩过坑] .
2. 进阶概念
2.1. 交换器与队列增强
TTL 过期时间
目前在两个不同的粒度设置消息的 TTL, 分别是队列粒度和消息粒度. 由于 RabbitMQ 实际机制的原因, 通常都选择的是队列粒度, 对于队列粒度来说, 队列头的消息一定是最先失效的, 因此可以高效的判断和丢弃. 而对于消息粒度, 其需要在消息真正投递到消费者时进行判断, 如果该消息之前的消息并没有失效, 那么它将一直存活.
死信交换器 DLX
全称为 Dead-Letter-Exchange, 也是 RabbitMQ 扩展开发的核心概念, 当一条消息在一个队列中变成死信之后, 它能自动的被转发到一个交换器中, 这个交换器就是 DLX, 很多地方称和这个交换器绑定的队列是死信队列, 我并不是完全认同.
消息变为死信的原因
消息被拒绝 Nack,Reject, 并且 requeue 参数为 false(重点强调一下, 生产实践中通常不能打开 requeue, 因为打开后队列中的消息就会出现乱序的情况, 且性能很差); 消息过期; 队列达到最大长度.
DLX 也是一个正常的交换器, 和一般的交换器没有区别, 它能在任何的队列上被指定 , 实际上就是设置某个队列的属性.
- Map<String, Object> args = new HashMap<String, Object>();
- args.put("x-dead-letter-exchange", "dlx_exchange");
- args.put("x-dead-letter-routing-key" , "dlx-routing-key");
- channel.exchangeDeclare("dlx_exchange" , "direct"); // 创建 DLX:
- channel.queueDeclare("normal_queue", false, false, false, args); // 为队列 normal_queue 添加 DLX
命名规范: 队列类型,[生产者. 消费者. 队列名后缀];Topic 类型,[生产者. exchange. 队列名后缀]
延迟队列
延迟队列存储的对象是对应的延迟消息, 所谓 "延迟消息" 是指当消息被发送以后, 并不想让消费者立刻拿到消息, 而是等待特定时间后, 消费者才能拿到这个消息进行消费. 使用延迟消息场景如在订单系统中, 希望用户下单后 30 分钟内支付, 否则取消订单. 那么业务系统可以在下单后, 发送延迟消息, 到达指定时间后消费该消息来判断是否支持. 该方式在数据量比较大的场景中比通过 Job 扫描数据表合适.
在 AMQP 协议中, 或者 RabbitMQ 本身没有直接支持延迟队列的功能, 但是可以通过前面 所介绍的 DLX 和 TTL 模拟出延迟队列的功能, 这部分在实践与思考部分进行介绍.
持久化
交换器和队列元数据持久化和消息的持久化, 消息的持久化可以直接使用 MessageProperties.PERSISTENT_TEXT_PLAIN.
2.2. 生产者
生产者客户端的代码比较简洁, 如下所示.
- byte[] messageBodyBytes = "Hello , Xionger!".getBytes();
- channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN,
- messageBodyBytes);
从高可用 HA 的角度不经要问, 消息的生产者将消息发送出去之后, Broker 是否收到消息. RabbitMQ 针对这个问题提供了两种解决方案, 分别是事务机制和发送方确认 PublisherConfirm. 发送者确认的实现继续细分为 3 种形式, 包括单条同步, 批量同步和异步方式. 事务机制和单条同步确认方式的性能都比较差, 通常只能达到 2000QPS 左右, 因此通常推荐使用发送方确认的批量方式和异步方式, 其 QPS 可以达到 8000QPS 以上. 其中批量方式也存在一个隐患, 即发送一批消息到服务端时, 如果有一条消息失败, 那么该批次所有消息都需要重试. 因此目前生产实践中 , 使用的是异步方式, 简化的代码实践如下所示.
- SortedSet confirmSet = Sets.newTreeSet();
- channel.confirmSelect();
- channel.addConfirmListener(new ConfirmListener() {
- @Override
- public void handleAck(long deliveryTag, boolean multiple) throws IOException {
- if (multiple) {
- confirmSet.headSet(deliveryTag - 1);
- } else {
- confirmSet.remove(deliveryTag);
- }
- }
- @Override
- public void handleNack(long deliveryTag, boolean multiple) throws IOException {
- //omit
- // 消息重新投递处理
- }
- });
tip: 这部分在服务端 ack 时有一个优化, 只会回传当前最大的标识, 可以有效减少比对次数.
2.3. 消费者
消费模式: 拉模式, 推模式, RabbitMQ 推荐推模式, 保持消息消费的有序性.
- boolean autoAck = false;
- channel.basicQos(64);//prefetchCount
- channel.basicConsume(queueName, autoAck, "myConsumerTag",
- new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String routingKey = envelope.getRoutingKey();
- String contentType = properties.getContentType();
- long deliveryTag = envelope.getDeliveryTag();
- channel.basicAck(deliveryTag, false);
- }
- });
- tip:
对于消息生产者, 过去还有一个消息投递不可达被返回的概念, 涉及 mandatory 和 immediate 两个参数, 但其在生产实践中并不常用.
3. 实践与思考
3.1. 环境搭建
安装: Mac 环境 brew install rabbitmq, 非常简便
管理界面
unacked: 消费端没有 Ack 的数量
Publish: 推送消息的 QPS
Deliver(manual ack): 手动 Ack
durable: 持久化
Policy: 队列的规则
Mirrors: 镜像 Broker
3.2.Client 组件开发
在介绍了 RabbitMQ 主要知识后, 扩展的分享一个简易的基于 RabbitMQ 消息中间件的思路. 由于 RabbitMQ 是基于 Erlang 开发, 虽然很棒但毕竟比较小众, Java 技术栈的工程师一般很难去修改 RabbitMQ 的源码, 因此通常只是通过构建一个合理的客户端 SDK 来支持业务开发.
生产者
生产者目标比较简单, 需要实现健壮性强的的发送者确认机制[异步] 和支持队列分片, 队列分片可以给队列加上后缀标识, 然后轮训处理即可.
消费者
消费者部分希望支持消费失败的重试机制, 死信队列及其报警机制, 以支持 3 次重试消费为例, 整体思路如下图所示.[借助之前介绍的 TTL 和 DLX]
3.3. 场景思考
RabbitMQ 最大的特点是成熟度高, 管理功能全面, 近似开箱即用, 二次开发实现一个简单靠谱的客户端就足以满足大部分的场景, 尤其对于初创企业, 中小企业来说是一个非常棒的选择.
高可用 HA: 源生支持镜像服务器, 同步模型等机制
高吞吐: 可以通过队列分片的方式支持大量的 QPS, 比如单个队列推荐 QPS4000 以下, 健康的水位在 2000 左右, 那么就需要通过二次开发客户端来实现队列分片.
参考资料
[1]朱忠华. RabbitMQ 实战指南[M]. 电子工业出版社: 北京, 2017.11:.
下期预告: 深入理解 MySQL 索引机制
善良比聪明更重要 -- 张小龙
来源: https://www.cnblogs.com/xiong2ge/p/rabbitmq_fast.html