前言
前面我们介绍了 RabbitMQ 的安装, 各大消息中间件的对比, AMQP 核心概念, 管控台的使用, 快速入门 RabbitMQ. 本章将介绍 RabbitMQ 的高级特性. 分两篇 (上 / 下) 进行介绍.
消息如何保障 100% 的投递成功?
幂等性概念详解
在海量订单产生的业务高峰期, 如何避免消息的重复消费的问题?
Confirm 确认消息, Return 返回消息
1 消息如何保障 100% 的投递成功?
1.1 什么是生产端的可靠性投递?
保障消息的成功发出
保障 MQ 节点的成功接收
发送端收到 MQ 节点 (Broker) 确认应答
完善的消息进行补偿机制
前三步不一定能保障消息能够 100% 投递成功. 因此要加上第四步
BAT/TMD 互联网大厂的解决方案:
- 消息落库, 对消息状态进行打标
在发送消息的时候, 需要将消息持久化到数据库中, 并给这个消息设置一个状态(未发送, 发送中, 到达). 当消息状态发生了变化, 需要对消息做一个变更. 针对没有到达的消息做一个轮训操作, 重新发送. 对轮训次数也需要做一个限制 3-5 次. 确保消息能够成功的发送.
消息的延迟投递, 做二次确认, 回调检查
具体采用哪种方案, 还需要根据业务与消息的并发量而定.
1.2 第一种方案:
生产端 - 可靠性投递
图解:
蓝色部分表示: 生产者负责发送消息发送至 Broker 端
Biz DB: 订单数据库 MSG DB: 消息数据
面对小规模的应用可以采用加事务的方式, 保证事务的一致性. 但在大厂中面对高并发, 并没有加事务, 事务的性能拼接非常严重, 而是做补偿.
比如: 如下发一条订单消息.
step1: 存储订单消息(创建订单), 业务数据入库, 消息也入库. 缺点: 需要持久化两次.(status:0)
step2: 在 step1 成功的前提下, 发送消息
step3:Broker 收到消息后, confirm 给我们的生产端. Confirm Listener 异步监听 Broker 回送的消息.
step4: 抓取出指定的消息, 更新(status=1), 表示消息已经投递成功.
step5: 分布式定时任务获取消息状态, 如果等于 0 则抓取数据出来.
step6: 重新发送消息
step7: 重试限制设置 3 次. 如果消息重试了 3 次还是失败, 那么(status=2), 认为这个消息就是失败的.
查询这些消息为什么失败, 可能需要人工去查询.
假设 step2 执行成功, step3 由于网络闪断. 那么 confirm 将永远收不到消息, 那么我们需要设定一个规则:
例如: 在消息入库的时候, 设置一个临界值 timeout=5min, 当超过 5min 之后, 就将这条数据抓取出来.
或者写一个定时任务每隔 5 分钟就将 status=0 的消息抓取出来. 可能存在小问题: 消息发送出去, 定时任务又正好刚执行, Confirm 还未收到, 定时任务就会执行, 会导致消息执行两次.
更精细化操作: 消息超时容忍限制. confirm 在 2-3 分钟内未收到消息, 则重新发送.
保障 MQ 我们思考如果第一种可靠性投递, 在高并发的场景下是否合适?
第一种方案对数据有两次入库, 一次业务数据入库, 一次消息入库. 这样对数据的入库是一个瓶颈.
其实我们只需要对业务进行入库.
消息的延迟投递, 做二次确认, 回调检查
这种方式并不一定能保证 100% 成功, 但是也能保证 99.99% 的消息成功. 如果遇到特别极端的情况, 那么就只能需要人工去补偿, 或者定时任务去做.
第二种方式主要是为了减少对数据库的操作.
看下第二种方式:
图解:
Upstream service: 生产端
DownStream service: 消费端
Callback service: 回调服务
step1: 业务消息入库成功后, 第一次消息发送.
step2: 同样在消息入库成功后, 发送第二次消息, 这两条消息是同时发送的. 第二条消息是延迟检查, 可以设置 2min,5min 延迟发送.
step3: 消费端监听指定队列.
step4: 消费端处理完消息后, 内部生成新的消息 send confirm. 投递到 MQ Broker.
step5: Callback Service 回调服务监听 MQ Broker, 如果收到 Downstream service 发送的消息, 则可以确定消息发送成功, 执行消息存储到 MSG DB.
step6:Check Detail 检查监听 step2 延迟投递的消息. 此时两个监听的队列不是同一个, 5 分钟后, Callback service 收到消息, 检查 MSG DB. 如果发现之前的消息已经投递成功, 则不需要做其他事情. 如果检查发现失败, 则 Callback 进行补偿, 主动发送 RPC 通信. 通知上游生产端重新发送消息.
这样做的目的: 少做了一次 DB 存储. 关注点并不是百分百的投递成功, 而是性能.
2. 幂等性概念
2.1 幂等性是什么?
幂等 (idempotent,idempotence) 是一个数学与计算机学概念, 常见于抽象代数中, 即 f(f(x)) = f(x). 简单的来说就是一个操作多次执行产生的结果与一次执行产生的结果一致.
我们可以借鉴数据库的乐观锁机制:
比如我们执行一条更新库存的 SQL 语句:
UPDATE T_REPS SET COUNT = COUNT - 1,VERSION = VERSION + 1 WHERE VERSION = 1
利用加版本号 Version 的方式来保证幂等性.
- /**
- *
- * @ClassName: Producer
- * @Description: 生产者
- * @author Coder 编程
- * @date 2019 年 7 月 30 日 上午 21:27:02
- *
- */
- public class Producer {
- public static void main(String[] args) throws Exception {
- //1 创建 ConnectionFactory
- Connection connection = ConnectionUtils.getConnection();
- //2 通过 Connection 创建一个新的 Channel
- Channel channel = connection.createChannel();
- //3 指定我们的消息投递模式: 消息的确认模式
- channel.confirmSelect();
- String exchangeName = "test_confirm_exchange";
- String routingKey = "confirm.save";
- //4 发送一条消息
- String msg = "Hello RabbitMQ Send confirm message!";
- channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
- //5 添加一个确认监听 用于发送消息到 Broker 端之后, 回送消息的监听
- channel.addConfirmListener(new ConfirmListener() {
- @Override
- public void handleNack(long deliveryTag, boolean multiple) throws IOException {
- System.err.println("-------no ack!-----------");
- }
- @Override
- public void handleAck(long deliveryTag, boolean multiple) throws IOException {
- System.err.println("-------ack!-----------");
- }
- });
- }
- }
- /**
- *
- * @ClassName: Consumer
- * @Description: 消费者
- * @author Coder 编程
- * @date 2019 年 7 月 30 日 上午 21:32:02
- *
- */
- public class Consumer {
- public static void main(String[] args) throws Exception {
- //1 获取一个连接
- Connection connection = ConnectionUtils.getConnection();
- //2 通过 Connection 创建一个新的 Channel
- Channel channel = connection.createChannel();
- String exchangeName = "test_confirm_exchange";
- String routingKey = "confirm.#";
- String queueName = "test_confirm_queue";
- //3 声明交换机和队列 然后进行绑定设置, 最后制定路由 Key
- channel.exchangeDeclare(exchangeName, "topic", true);
- channel.queueDeclare(queueName, true, false, false, null);
- channel.queueBind(queueName, exchangeName, routingKey);
- //4 创建消费者
- QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, queueingConsumer);
- while(true){
- Delivery delivery = queueingConsumer.nextDelivery();
- String msg = new String(delivery.getBody());
- System.err.println("消费端:" + msg);
- }
- }
- }
- /**
- *
- * @ClassName: ConnectionUtils
- * @Description: 连接工具类
- * @author Coder 编程
- * @date 2019 年 6 月 21 日 上午 22:28:22
- *
- */
- public class ConnectionUtils {
- public static Connection getConnection() throws IOException, TimeoutException {
- // 定义连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- // 设置服务地址
- factory.setHost("127.0.0.1");
- // 端口
- factory.setPort(5672);//amqp 协议 端口 类似与 MySQL 的 3306
- // 设置账号信息, 用户名, 密码, vhost
- factory.setVirtualHost("/vhost_cp");
- factory.setUsername("user_cp");
- factory.setPassword("123456");
- // 通过工程获取连接
- Connection connection = factory.newConnection();
- return connection;
- }
- }
- /**
- *
- * @ClassName: Producer
- * @Description: 生产者
- * @author Coder 编程
- * @date 2019 年 7 月 30 日 上午 22:03:22
- *
- */
- public class Producer {
- public static void main(String[] args) throws Exception {
- //1 创建 ConnectionFactory
- Connection connection = ConnectionUtils.getConnection();
- Channel channel = connection.createChannel();
- String exchange = "test_return_exchange";
- String routingKey = "return.save";
- String routingKeyError = "abc.save";
- String msg = "Hello RabbitMQ Return Message";
- channel.addReturnListener(new ReturnListener() {
- @Override
- public void handleReturn(int replyCode, String replyText, String exchange,
- String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.err.println("---------handle return----------");
- // 响应码
- System.err.println("replyCode:" + replyCode);
- // 响应文本
- System.err.println("replyText:" + replyText);
- System.err.println("exchange:" + exchange);
- System.err.println("routingKey:" + routingKey);
- System.err.println("properties:" + properties);
- System.err.println("body:" + new String(body));
- }
- });
- // 第三个参数 mandatory=true, 意味着路由不到的话 mq 也不会删除消息, false 则会自动删除
- channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
- // 修改 routingkey, 测试是否能够收到消息
- //channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
- }
- }
- /**
- *
- * @ClassName: Consumer
- * @Description: 消费者
- * @author Coder 编程
- * @date 2019 年 7 月 30 日 上午 22:33:34
- *
- */
- public class Consumer {
- public static void main(String[] args) throws Exception {
- //1 创建 ConnectionFactory
- Connection connection = ConnectionUtils.getConnection();
- Channel channel = connection.createChannel();
- String exchangeName = "test_return_exchange";
- String routingKey = "return.#";
- String queueName = "test_return_queue";
- channel.exchangeDeclare(exchangeName, "topic", true, false, null);
- channel.queueDeclare(queueName, true, false, false, null);
- channel.queueBind(queueName, exchangeName, routingKey);
- QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, queueingConsumer);
- while(true){
- Delivery delivery = queueingConsumer.nextDelivery();
- String msg = new String(delivery.getBody());
- System.err.println("消费者:" + msg);
- }
- }
- }
- GitHub: https://github.com/CoderMerlin/coder-programming
- Gitee: https://gitee.com/573059382/coder-programming
来源: https://www.cnblogs.com/coder-programming/p/11412048.html