继续上篇文章解决 RabbitMQ 消息丢失问题和保证消息可靠性 (一) https://juejin.im/post/5d468591f265da03b810427e 未完成部分, 我们聊聊 MQ Server 端的高可用和消费端如何保证消息不丢的问题?
回归上篇的内容, 我们知道消息从生产端到服务端, 为了保证消息不丢, 我们必须做哪些事情?
发送端采用 Confirm 模式, 注意 Server 端没成功通知发送端, 需要重发操作需要额外处理
消息的持久化处理
上面两个操作保证消息到服务端不丢, 但是非高可用状态, 如果节点挂掉, 服务暂时不可用, 需要重启后, 消息恢复, 消息不会丢失, 因为有磁盘存储.
本文先从消费端讲起:
RabbitMQ Server 到消费者消息如何不丢?
上面一篇文章也提到了, 消费者获取到消息之后, 没有来得及处理完毕, 自己直接宕机了, 因为消息者默认采用自动 ack, 此时 RabbitMQ 的自动 ack 机制会通知 MQ Server 这条消息已经处理好了, 此时消息就丢了, 并不是预期的.
那么我们采用手动 ack 机制来解决这个问题, 消费端处理完逻辑之后再通知 MQ Server, 这样消费者没处理完消息不会发送 ack, 如果在消费者拿到消息, 没来得及处理的情况下自己挂了, 此时 MQ 集群会自动感知到, 它就会自觉的重发消息给其他的消费者服务实例.
根据上面的思路你需要完成下面的两步操作:
第一: 消费者监听设置手动 ack
- this.channel = channelManager.getListenerChannel(namespace);
- this.queue = queue;
- this.channel.basicConsume(queue, false, consumerTag, this);
- this.disconnectedCallback.setChannel(channel);
核心代码: this.channel.basicConsume(queue, false, consumerTag, this); 第二个参数设置 false 代表不自动 ack
第二: 业务执行完成后手动 ack
- public static void ack(MessageContext context) {
- long deliveryTag = context.getEnvelope().getDeliveryTag();
- try {
- context.getChannel().basicAck(deliveryTag, false);
- } catch (IOException e) {
- throw new MqAckException("消息 ack 出错: 连接异常或远端关闭", context, e);
- }
- }
核心代码: context.getChannel().basicAck(deliveryTag, false);
这里封装来, 需要业务在执行完自己的业务代码后, 调用对象 channel 的 ack 方法通知 MQServer, 说我这边执行完了, 你可以删除了.
注意这里有个问题: 如果忘记调用这个 context.getChannel().basicAck(deliveryTag, false);
或者因为代码异常, 这个代码没被执行, 会怎么样? 后面找时间再写一篇文章讲这个问题.
RabbitMQ Server 中存储的消息高可用
当我们解决了, 生产端和消费端的问题后, 基本保证消息的不丢问题, 但是还有一个是消息的高可用问题, 单节点问题, 普通节点的问题都会影响消息的临时不可用, 这个时候要用上我们的 HA 镜像集群模式来保证.
上一篇文章 解决 RabbitMQ 消息丢失问题和保证消息可靠性 (一) https://juejin.im/post/5d468591f265da03b810427e 已经提到过, 服务端消息部署的三种模式的区别, 今天就专门讲镜像模式的介绍.
镜像模式至少采用 3 节点, 2 个磁盘节点和 1 个内存节点来保证, 架构图:
设置镜像也有一些策略:
同步至所有的, 一般不这么做, 性能会受到极大影响
同步最多 N 个机器
只同步至符合指定名称的 nodes
命令处理 HA 策略模版: rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
为每个以 "rock.wechat" 开头的队列设置所有节点的镜像, 并且设置为自动同步模式
- rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
- rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
为每个以 "rock.wechat." 开头的队列设置两个节点的镜像, 并且设置为自动同步模式
- rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" \
- '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
为每个以 "node." 开头的队列分配指定的节点做镜像
- rabbitmqctl set_policy ha-nodes "^nodes\." \
- '{"ha-mode":"nodes","ha-params":["rabbit@nodeA","rabbit@nodeB"]}'
但是: HA 镜像队列有一个很大的缺点就是: 系统的吞吐量会有所下降
所以采用镜像模式, 要根据具体的业务规则定制话处理, 没那么重要的业务, 消息丢了也没关系的场景, 又要求必须高的性能的时候, 镜像也可以不用设置.
总结
两篇文章的讲解, 分析了消息中间件高可用问题的大概的思路, 没有具体的代码详细, 如有疑问可以下方留言评论, 我会及时回复解答, 后面我会逐步完善相关细节, 欢迎多多关注.
后面计划更新文章如下:
什么情况会导致重复消费并怎么解决?
什么样的真实业务场景需要保障顺序性和如何保证消息的顺序性?
如何通过消息队列优雅的解决微服务间接口失败的重试?
来源: https://www.cnblogs.com/flyrock/p/11437512.html