接上一篇: RocketMQ 入门到入土 (一) 新手也能看懂的原理和实战!
一, 事务消息的由来
1, 案例
引用官方的购物案例:
小明购买一个 100 元的东西, 账户扣款 100 元的同时需要保证在下游的积分系统给小明这个账号增加 100 积分. 账号系统和积分系统是两个独立是系统, 一个要减少 100 元, 一个要增加 100 积分. 如下图:
2, 问题
账号服务扣款成功了, 通知积分系统也成功了, 但是积分增加的时候失败了, 数据不一致了.
账号服务扣款成功了, 但是通知积分系统失败了, 所以积分不会增加, 数据不一致了.
3, 方案
RocketMQ 针对第一个问题解决方案是: 如果消费失败了, 是会自动重试的, 如果重试几次后还是消费失败, 那么这种情况就需要人工解决了, 比如放到死信队列里然后手动查原因进行处理等.
RocketMQ 针对第二个问题解决方案是: 如果你扣款成功了, 但是往 mq 写消息的时候失败了, 那么 RocketMQ 会进行回滚消息的操作, 这时候我们也能回滚我们扣款的操作.
二, 事务消息的原理
1, 原理图解
2, 详细过程
1.Producer 发送半消息 (Half Message) 到 broker.
我真想吐槽一句为啥叫半消息, 难以理解, 其实这就是 prepare message, 预发送消息.
Half Message 发送成功后开始执行本地事务.
如果本地事务执行成功的话则返回 commit, 如果执行失败则返回 rollback.(这个是在事务消息的回调方法里由开发者自己决定 commit or rollback)
Producer 发送上一步的 commit 还是 rollback 到 broker, 这里有两种情况:
1. 如果 broker 收到了 commit/rollback 消息 :
如果收到了 commit, 则 broker 认为整个事务是没问题的, 执行成功的. 那么会下发消息给 Consumer 端消费.
如果收到了 rollback, 则 broker 认为本地事务执行失败了, broker 将会删除 Half Message, 不下发给 Consumer 端.
2. 如果 broker 未收到消息(如果执行本地事务突然宕机了, 相当本地事务执行结果返回 unknow, 则和 broker 未收到确认消息的情况一样处理.):
broker 会定时回查本地事务的执行结果: 如果回查结果是本地事务已经执行则返回 commit, 若未执行, 则返回 rollback.
Producer 端回查的结果发送给 Broker.Broker 接收到的如果是 commit, 则 broker 视为整个事务执行成功, 如果是 rollback, 则 broker 视为本地事务执行失败, broker 删除 Half Message, 不下发给 consumer. 如果 broker 未接收到回查的结果(或者查到的是 unknow), 则 broker 会定时进行重复回查, 以确保查到最终的事务结果. 重复回查的时间间隔和次数都可配.
三, 事务消息实现流程
1, 实现流程
简单来看就是: 事务消息是个监听器, 有回调函数, 回调函数里我们进行业务逻辑的操作, 比如给账户 - 100 元, 然后发消息到积分的 mq 里, 这时候如果账户 - 100 成功了, 且发送到 mq 成功了, 则设置消息状态为 commit, 这时候 broker 会将这个半消息发送到真正的 topic 中. 一开始发送他是存到半消息队列里的, 并没存在真实 topic 的队列里. 只有确认 commit 后才会转移.
2, 补救方案
如果事务因为中断, 或是其他的网络原因, 导致无法立即响应的, RocketMQ 当做 UNKNOW 处理, RocketMQ 事务消息还提供了一个补救方案: 定时查询事务消息的事务状态. 这也是一个回调函数, 这里面可以做补偿, 补偿逻辑开发者自己写, 成功的话自己返回 commit 就完事了.
四, 事务消息代码实例
1, 代码
- package com.chentongwei.mq.rocketmq;
- import org.apache.rocketmq.client.producer.LocalTransactionState;
- import org.apache.rocketmq.client.producer.TransactionListener;
- import org.apache.rocketmq.client.producer.TransactionMQProducer;
- import org.apache.rocketmq.client.producer.TransactionSendResult;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.common.message.MessageExt;
- import java.util.Date;
- /**
- * Description:
- *
- * @author TongWei.Chen 2020-06-21 11:32:58
- */
- public class ProducerTransaction2 {
- public static void main(String[] args) throws Exception {
- TransactionMQProducer producer = new TransactionMQProducer("my-transaction-producer");
- producer.setNamesrvAddr("124.57.180.156:9876");
- // 回调
- producer.setTransactionListener(new TransactionListener() {
- @Override
- public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
- LocalTransactionState state = null;
- //msg-4 返回 COMMIT_MESSAGE
- if(message.getKeys().equals("msg-1")){
- state = LocalTransactionState.COMMIT_MESSAGE;
- }
- //msg-5 返回 ROLLBACK_MESSAGE
- else if(message.getKeys().equals("msg-2")){
- state = LocalTransactionState.ROLLBACK_MESSAGE;
- }else{
- // 这里返回 unknown 的目的是模拟执行本地事务突然宕机的情况(或者本地执行成功发送确认消息失败的场景)
- state = LocalTransactionState.UNKNOW;
- }
- System.out.println(message.getKeys() + ",state:" + state);
- return state;
- }
- /**
- * 事务消息的回查方法
- */
- @Override
- public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
- if (null != messageExt.getKeys()) {
- switch (messageExt.getKeys()) {
- case "msg-3":
- System.out.println("msg-3 unknow");
- return LocalTransactionState.UNKNOW;
- case "msg-4":
- System.out.println("msg-4 COMMIT_MESSAGE");
- return LocalTransactionState.COMMIT_MESSAGE;
- case "msg-5":
- // 查询到本地事务执行失败, 需要回滚消息.
- System.out.println("msg-5 ROLLBACK_MESSAGE");
- return LocalTransactionState.ROLLBACK_MESSAGE;
- }
- }
- return LocalTransactionState.COMMIT_MESSAGE;
- }
- });
- producer.start();
- // 模拟发送 5 条消息
- for (int i = 1; i <6; i++) {
- try {
- Message msg = new Message("transactionTopic", null, "msg-" + i, ("测试, 这是事务消息!" + i).getBytes());
- producer.sendMessageInTransaction(msg, null);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
2, 结果
- msg-1,state:COMMIT_MESSAGE
- msg-2,state:ROLLBACK_MESSAGE
- msg-3,state:UNKNOW
- msg-4,state:UNKNOW
- msg-5,state:UNKNOW
- msg-3 unknow
- msg-3 unknow
- msg-5 ROLLBACK_MESSAGE
- msg-4 COMMIT_MESSAGE
- msg-3 unknow
- msg-3 unknow
- msg-3 unknow
- msg-3 unknow
3, 管控台
4, 结果分析
只有 msg-1 和 msg-4 发送成功了. msg-4 在 msg-1 前面是因为 msg-1 先成功的, msg-4 是回查才成功的. 按时间倒序来的.
先来输出五个结果, 对应五条消息
- msg-1,state:COMMIT_MESSAGE
- msg-2,state:ROLLBACK_MESSAGE
- msg-3,state:UNKNOW
- msg-4,state:UNKNOW
- msg-5,state:UNKNOW
然后进入了回查, msg-3 还是 unknow,msg-5 回滚了, msg-4 提交了事务. 所以这时候 msg-4 在管控台里能看到了.
过了一段时间再次回查 msg-3, 发现还是 unknow, 所以一直回查.
回查的时间间隔和次数都是可配的, 默认是回查 15 次还失败的话就会把这个消息丢掉了.
五, 疑问
疑问: Spring 事务, 常规的分布式事务不行吗? Rocketmq 的事务是否多此一举了呢?
MQ 用于解耦, 之前是分布式事务直接操作了账号系统和积分系统. 但是他两就是强耦合的存在, 如果中间插了个 mq, 账号系统操作完发消息到 mq, 这时候只要保证发送成功就提交, 发送失败则回滚, 这步怎么保证, 就是靠事务了. 而且用 RocketMQ 做分布式事务的也蛮多的.
六, 顺序消息解释
1, 概述
RocketMQ 的消息是存储到 Topic 的 queue 里面的, queue 本身是 FIFO(First Int First Out)先进先出队列. 所以单个 queue 是可以保证有序性的.
但问题是 1 个 topic 有 N 个 queue, 作者这么设计的好处也很明显, 天然支持集群和负载均衡的特性, 将海量数据均匀分配到各个 queue 上, 你发了 10 条消息到同一个 topic 上, 这 10 条消息会自动分散在 topic 下的所有 queue 中, 所以消费的时候不一定是先消费哪个 queue, 后消费哪个 queue, 这就导致了无序消费.
2, 图解
3, 再次分析
一个 Producer 发送了 m1,m2,m3,m4 四条消息到 topic 上, topic 有四个队列, 由于自带的负载均衡策略, 四个队列上分别存储了一条消息. queue1 上存储的 m1,queue2 上存储的 m2,queue3 上存储的 m3,queue4 上存储的 m4,Consumer 消费的时候是多线程消费, 所以他无法保证先消费哪个队列或者哪个消息, 比如发送的时候顺序是 m1,m2,m3,m4, 但是消费的时候由于 Consumer 内部是多线程消费的, 所以可能先消费了 queue4 队列上的 m4, 然后才是 m1, 这就导致了无序.
七, 顺序消息解决方案
1, 方案一
很简单, 问题产生的关键在于多个队列都有消息, 我消费的时候又不知道哪个队列的消息是最新的. 那么思路就有了, 发消息的时候你要想保证有序性的话, 就都给我发到一个 queue 上, 然后消费的时候因为只有那一个 queue 上有消息且 queue 是 FIFO, 先进先出, 所以正常消费就完了.
很完美. 而且 RocketMQ 也给我们提供了这种发消息的时候选择 queue 的 API(MessageQueueSelector). 直接上代码.
2, 代码一
2.1, 生产者
- import java.util.List;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.client.producer.MessageQueueSelector;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.common.message.MessageQueue;
- /**
- * 消息发送者
- */
- public class Producer5 {
- public static void main(String[] args)throws Exception {
- DefaultMQProducer producer = new DefaultMQProducer("my-order-producer");
- producer.setNamesrvAddr("124.57.180.156:9876");
- producer.start();
- for (int i = 0; i < 5; i++) {
- Message message = new Message("orderTopic", ("hello!" + i).getBytes());
- producer.send(
- // 要发的那条消息
- message,
- // queue 选择器 , 向 topic 中的哪个 queue 去写消息
- new MessageQueueSelector() {
- // 手动 选择一个 queue
- @Override
- public MessageQueue select(
- // 当前 topic 里面包含的所有 queue
- List<MessageQueue> mqs,
- // 具体要发的那条消息
- Message msg,
- // 对应到 send() 里的 args, 也就是 2000 前面的那个 0
- // 实际业务中可以把 0 换成实际业务系统的主键, 比如订单号啥的, 然后这里做 hash 进行选择 queue 等. 能做的事情很多, 我这里做演示就用第一个 queue, 所以不用 arg.
- Object arg) {
- // 向固定的一个 queue 里写消息, 比如这里就是向第一个 queue 里写消息
- MessageQueue queue = mqs.get(0);
- // 选好的 queue
- return queue;
- }
- },
- // 自定义参数: 0
- // 2000 代表 2000 毫秒超时时间
- 0, 2000);
- }
- }
- }
2.2, 消费者
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.consumer.listener.*;
- import org.apache.rocketmq.common.message.MessageExt;
- import java.util.List;
- /**
- * Description:
- *
- * @author TongWei.Chen 2020-06-22 11:17:47
- */
- public class ConsumerOrder {
- public static void main(String[] args) throws Exception {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer");
- consumer.setNamesrvAddr("124.57.180.156:9876");
- consumer.subscribe("orderTopic", "*");
- consumer.registerMessageListener(new MessageListenerOrderly() {
- @Override
- public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
- for (MessageExt msg : msgs) {
- System.out.println(new String(msg.getBody()) + "Thread:" + Thread.currentThread().getName() + "queueid:" + msg.getQueueId());
- }
- return ConsumeOrderlyStatus.SUCCESS;
- }
- });
- consumer.start();
- System.out.println("Consumer start...");
- }
- }
2.3, 输出结果
- Consumer start...
- hello!0 Thread:ConsumeMessageThread_1 queueid:0
- hello!1 Thread:ConsumeMessageThread_1 queueid:0
- hello!2 Thread:ConsumeMessageThread_1 queueid:0
- hello!3 Thread:ConsumeMessageThread_1 queueid:0
- hello!4 Thread:ConsumeMessageThread_1 queueid:0
很完美, 有序输出!
3, 情况二
比如你新需求: 把未支付的订单都放到 queue1 里, 已支付的订单都放到 queue2 里, 支付异常的订单都放到 queue3 里, 然后你消费的时候要保证每个 queue 是有序的, 不能消费 queue1 一条直接跑到 queue2 去了, 要逐个 queue 去消费.
这时候思路是发消息的时候利用自定义参数 arg, 消息体里肯定包含支付状态, 判断是未支付的则选择 queue1, 以此类推. 这样就保证了每个 queue 里只包含同等状态的消息. 那么消费者目前是多线程消费的, 肯定乱序. 三个 queue 随机消费. 解决方案更简单, 直接将消费端的线程数改为 1 个, 这样队列是 FIFO, 他就逐个消费了. RocketMQ 也为我们提供了这样的 API, 如下两句:
- // 最大线程数 1
- consumer.setConsumeThreadMax(1);
- // 最小线程数
- consumer.setConsumeThreadMin(1);
来源: https://www.cnblogs.com/javazhiyin/p/13212151.html