1, 基础理论知识篇 "两阶段提交" 如果你了解可以跳过这段, 当然如果你想深入了解你可以购买相关书籍或去搜索相关资料阅读
两阶段提交分为 正常提交和异常提交或异常回滚
上面是正常提交的示意图, 协调者发起预提交请求, 参与者回复成功之后协调者再次发起 commit 请求, 统一提交事物. 事物结束.
如果这两阶段提交过程当中有任何一个请求出现异常就会回滚, 如下流程:
异常请求包括预提交 返回预提交的应答, commit 请求 等任何一个失败都会导致整个事物回滚.
二阶段提交的问题
"二阶段提交" 还有一个很严重的问题就是如果 commit 过程当中失败了 就导致了全部事物失败, 代价很大, 简单粗暴的处理方式
还有一个问题是如果 commit 过程中网络出现问题 commit 没有被整个事物的参与者之一或者多个收到, 这个时候就会出现数据不一致现象.
可能大家会提到 协调者是谁, 参与者又是谁那?
这里简单说下自己的理解
如果在你的应用程序中你是通过 begin 等相关操作语句开始的, 比如 你使用了 spring 的 @Transactional 注解等,
那协调者就是你的 "应用程序", 参与者就是 MySQL 或其他支持事物的数据库系统.
如果你就直接向 MySQL 发送了一条 sql 语句 MySQL 是自行提交的, 那协调者和参与者都是 MySQL 数据库自己
2, 这里说下 MySQL 对所谓的 "重复数据" 提供的相关 sql 或关键字.
unique 唯一主键约束
在 sql 事物中和应用程序中都可以捕获这个错误码或异常, 可以作为幂等判断的一个依据.
upset 操作, 发现唯一主键冲突然后更新相关数据, MongoDB 有直接使用的 sql 方法语句
示例: insert into tablename(column1) values(123) on duplicate key update column1 =column1 +123
ignore 忽略操作对于多余的操作直接忽略
insert ignore into tablename(column1) values(123)
基础篇说完很多内容如果想深入了解可以自己找资料处理. 下面是华丽分割线
3, 在我们原有的认知里有一个方案就差那么一点点就可以大面积使用的.
我们之前可能想过怎样既能发送 mq 又能写数据库, 下面这个方案会分接近我们的愿望.
我们遵从如下步骤进行代码处理:
1, 开启数据库事物执行相关 sql
2, 发送 MQ 消息
3, 提交数据库事物
(注意: 以上每一步都是在上一步执行成功之后在执行下一步的)
根据步骤我画出了下面的流程图
其实这个流程是有一个漏洞的, 如果我把上面的流程图改造为下面的二阶段提交的示意图就会很明显的看出来
不知道大家有么有发现问题, 是不是 各种提交和回滚操作都是针对的数据库, 而不是 MQ.commit 数据库事物出现异常就会造成数据不一致现象.
其实也不用在想有没有其他的流程方案能解决分布式双写问题, 只要存在多写问题就存在数据不一致问题的现象,
所以就出现了 3pc Paxos 等协议来解决分布式事物 / 一致性的问题.
下面我们开始介绍怎么使用 MySQL 和 RocketMQ 来实现事物问题
华丽分割线
4,RocketMQ 事物消息的过程
1, 发送 MQ 的事物消息
2, 事物消息发送成功后会同步触发对应执行本地接口来执行针对 MySQL 数据库的操作
3, 如果有未 commit 的消息, RocketMQ 的 broker 会定时间隔时间来回查数据库事物是否已经提交完成
5, 结合 RocketMQ 的事物消息与 MySQL 数据库事物的实现思想
如果上面的二阶段提交你已经理解了, 你会发现我这里设计的流程 (上面图的流程) 有点不太一样的地方
什么地方那?
MQ 事物消息回滚的时候是因为 MySQL 数据库事物没有提交成功而导致的, 也就是说如果 MySQL 数据库事务成功了 MQ 的事务消息是一定要成功的
否则就会出现事物不一致的现象.
假如发送 MQ 的 prepare 消息成功了, 执行 MySQL 事物的操作也成功了, 但是偏偏返回给 MQ 的 commit 消息丢失了, 那这个时候数据库消息并不会回滚.
所以就有了回查本地事物消息是否成功的操作, 来对 MQ 的消息做个补偿动作实现数据一致性
理解了二阶段提交以及 RocketMQ 的事物实现之后你就可以自己设计事物相关操作的执行顺序了
(这里的流程设计以及包括我的代码实现是以我的理解做出的最佳实践)
6,RocketMQ 与 MySQL 事物结合注意事项
1, 如果应用程序承担协调者的工作就尽量晚开启事物和尽量早的提交数据库事物, 事物中的 sql 对数据竞争多的 sql 尽量靠后
因为执行数据库事物会有各种锁操作, 减少锁的生命周期, 数据库是稀缺资源, 大家能省则省
2, 数据库事物最好设置超时时间, 超时之后自动解除, 最好不超过 1 分钟
3,MQ 默认 1 分钟之后回查一次已发送 message 但未 commit 的消息, 最多回查 15 次, 之后会执行回滚操作
4, 应用程序一定要做好幂等处理(可以参考上面 MySQL 相关语句实现幂等接口)
5, 网络不要太差, 否则会造成大量的重试, 重试就会影响消息的及时性
6, 适用场景
单次请求数量小
每次请求会有数据产生, 而不是查询产生的数据(比如 insert 操作叫生产数据, select 操作不要生产数据)
下游可以接受一定的延迟(这里有两个因素, 有应用程序本身和 Broker, 这里指 broker)
下游服务或系统以接收到的消息为依据做响应的操作
MQ 消息作为主要信息进行传递
下面说下具体代码实现
华丽分割线
7, 实战代码解析
首先附上源码地址 https://github.com/zygfengyuwuzu/springboot-rocketmq-example
下面将针对关键代码进行讲解
首先介绍一下代码目录
了解了上面的代码目录下面说下代码的执行流程
首先看事物消息生产者的实例对象创建
- package rocketmq_example.mqandmysqltraction.producer;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.ThreadFactory;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- import javax.annotation.PostConstruct;
- import javax.annotation.PreDestroy;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.client.producer.TransactionListener;
- import org.apache.rocketmq.client.producer.TransactionMQProducer;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.remoting.common.RemotingHelper;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- /**
- * 生产者和消费者测试的时候记得注掉一中的一个以免观察不出效果
- *
- */
- @Component
- public class TransactionProducer {
- static Logger logger = LoggerFactory.getLogger(TransactionProducer.class);
- public DefaultMQProducer producer = null;
- @Autowired
- TransactionListener transactionListenerImp;
- @PostConstruct
- private void init() throws MQClientException {
- logger.info("MQ 事物生产者初始化开始 --------------------------------------------------");
- TransactionMQProducer transactionProducer = new TransactionMQProducer("mytestgroup");
- // Producer 组名, 多个 Producer 如果属于一 个应用, 发送同样的消息, 则应该将它们 归为同一组
- //transactionProducer.setProducerGroup("mytestgroup");
- // Name Server 地址列表
- transactionProducer.setNamesrvAddr("10.10.6.71:9876;10.10.6.72:9876");
- // 超时时间 这里一定要大于数据库事物执行的超时时间
- transactionProducer.setSendMsgTimeout(90000);
- // 这个线程池作用就是 mqbroker 端回调信息的本地处理线程池
- ExecutorService executorService = new ThreadPoolExecutor(1, 5, 100, TimeUnit.SECONDS,
- new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r);
- thread.setName("client-transaction-msg-check-thread");
- return thread;
- }
- });
- transactionProducer.setExecutorService(executorService);
- transactionProducer.setTransactionListener(transactionListenerImp);
- producer = transactionProducer;
- producer.start();
- logger.info("MQ 事物生产者初始化结束 --------------------------------------------------");
- }
- public SendResult send(Message me) throws Exception {
- return producer.send(me);
- }
- /**
- * 发送普通消息
- * @param Topic
- * @param Tags
- * @param body
- * @return
- * @throws Exception
- */
- public SendResult send(String Topic, String Tags, String body) throws Exception {
- Message me = new Message();
- // 标示
- me.setTopic(Topic);
- // 标签
- me.setTags(Tags);
- // 内容
- me.setBody(body.getBytes(RemotingHelper.DEFAULT_CHARSET));
- return producer.send(me);
- }
- /**
- * 发送普通消息
- * @param Topic
- * @param Tags
- * @param key
- * @param body
- * @return
- * @throws Exception
- */
- public SendResult send(String Topic, String Tags, String key, String body) throws Exception {
- try {
- Message me = new Message(Topic, Tags, key, 0, body.getBytes(RemotingHelper.DEFAULT_CHARSET), true);
- return producer.send(me);
- } catch (Exception e) {
- logger.error("发送 MQ 信息异常 Topic{},Tags{},key{},body{}", Topic, Tags, key, body);
- throw e;
- }
- }
- @PreDestroy
- public void Destroy() {
- producer.shutdown();
- }
- }
上面的代码我们接收到请求传输过来的数据之后, 首先做了 MQ 消息对象的创建, 创建成功之后直接发送 MQ 事物消息
事物消息发送成功之后会调用上面设置的接口实现类的 TransactionListenerImpl.executeLocalTransaction()这个方法.
接口实现的方法代码如下:
- package rocketmq_example.mqandmysqltraction.producer;
- import java.util.List;
- import org.apache.rocketmq.client.producer.LocalTransactionState;
- import org.apache.rocketmq.client.producer.TransactionListener;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.common.message.MessageExt;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import rocketmq_example.mqandmysqltraction.MyTableModel;
- import rocketmq_example.mqandmysqltraction.MytableService;
- /**
- * 把数据库事物嵌套在 mq 事物当中不能显示抛出异常
- *
- *
- *
- *
- * @author zyg
- *
- */
- @Component
- public class TransactionListenerImpl implements TransactionListener {
- static Logger logger = LoggerFactory.getLogger(TransactionListenerImpl.class);
- @Autowired
- MytableService mytableService;
- /**
- * 一定要设置执行 sql 时间, 尽量不要超时
- *
- */
- @Override
- public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
- logger.info("开始执行本地数据库事物 transactionid:{}", msg.getTransactionId());
- LocalTransactionState lts = LocalTransactionState.UNKNOW;
- @SuppressWarnings("unchecked")
- List<MyTableModel> mytablelist = (List<MyTableModel>) arg;
- try {
- long start=System.currentTimeMillis();
- // 数据库事物执行时间不要超过 mq 回查时间 默认 15 分钟
- mytableService.execMytableinsert2(mytablelist, msg.getTransactionId());
- logger.info("执行数据库事物耗时:{}",System.currentTimeMillis()-start);
- lts = LocalTransactionState.COMMIT_MESSAGE;
- } catch (Exception e) {
- logger.error("数据库事务异常", e);
- lts = LocalTransactionState.ROLLBACK_MESSAGE;
- }
- logger.info("结束执行本地数据库事物 transactionid:{} 返回:{}", msg.getTransactionId(),lts);
- return lts;
- }
- /**
- * 去数据库查询看看是否存在已经成功发送预提交数据而没有 commit 成功的 mq 信息
- * 每分钟 1 次默认 15 次
- *
- * 这里可以做个计数 让 MQ 重试 5 次 / 5 分钟就回滚减轻 MQ 回查的压力
- *
- */
- @Override
- public LocalTransactionState checkLocalTransaction(MessageExt msg) {
- if (mytableService.existMyTableModelByMsgid(msg.getTransactionId())) {
- logger.info("查询到已提交事物 transactionid:{}",msg.getTransactionId());
- return LocalTransactionState.COMMIT_MESSAGE;
- } else {
- logger.info("未查到已提交事物 transactionid:{}",msg.getTransactionId());
- return LocalTransactionState.UNKNOW;
- }
- }
- }
上面代码有两个方法, 这里说下两个方法的作用和执行时间
executeLocalTransaction 这个方法是发送完 事物消息 之后同步被调用到的方法, 用来执行本地事物操作
executeLocalTransaction 方法有两个参数, 第一个是发送成功之后的 message 消息, 在这个方法中包含事物 ID 其实就是 msgid
第二个参数是 object 类型的是从 dataapi 传过来,
我的代码中没做任何处理直接传递过来了然后直接转化传递给了 service 层进行事物处理
这个 executeLocalTransaction 方法里面为什么要直接返回 commit 或 rollback,
目的是尽量快的告诉 MQ 我的数据库事务执行成功了,
尽快将 half 消息转为正常消息, 已备消费者消费到做业务处理.
这里完全可以直接返回 unknow, 等待 broker 回查来实现 commit 操作的. 但是这样做对回查消息 broker 造成一定的压力.
上面代码的第二个方法是提供给 broker 回调执行的, 进行检查本地事务是否成功执行的操作, 发起方是 broker
这里面我们接收到 broker 的回查请求之后直接去数据库查询是否存在 broker 提供的事务 ID 的数据
如果存在返回 commit 标识, 如果不存在返回 unknow 标识以等待下一次再来回查
到此我们的一个事务操作就算完成了
另外大家可以直接查看 service 层的实现代码, 就不一一解释了
- package rocketmq_example.mqandmysqltraction;
- import java.util.ArrayList;
- import java.util.List;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
- import com.fasterxml.jackson.databind.ObjectMapper;
- @Service
- public class MytableService {
- static Logger logger = LoggerFactory.getLogger(MytableService.class);
- @Autowired
- IMytableMapper mytable;
- @Autowired
- ObjectMapper objMapper;
- /**
- * 这里可以显示提交事物 返回 boolean 一条一条插入只是为了展现事物的特性 获取所有异常 处理你的业务逻辑等等
- *
- * @param mytablemodels
- * @return
- */
- @Transactional(rollbackFor = Exception.class, timeout = 60000)
- public List<Integer> execMytableinsert2(List<MyTableModel> mytablemodels, String msgid) {
- // logger.info("开始执行数据库事物");
- List<Integer> result = new ArrayList<Integer>();
- for (MyTableModel myTableModel : mytablemodels) {
- // 插入数据库
- myTableModel.setMsgid(msgid);
- mytable.insertmytable(myTableModel);
- result.add(myTableModel.getId());
- }
- // logger.info("结束执行数据库事物");
- return result;
- }
- public boolean existMyTableModelById(Integer id) {
- MyTableModel myTableModel = mytable.selectMyTableModelById(id);
- if (myTableModel != null && null != myTableModel.getId()) {
- return true;
- }
- return false;
- }
- /**
- * 查询是否存在已经发送过的 msgid 消息
- *
- * @param msgid
- * @return
- */
- public boolean existMyTableModelByMsgid(String msgid) {
- int count = mytable.selectMyTableModelByMsgid(msgid);
- if (count> 0) {
- return true;
- }
- return false;
- }
- public void insetmsg(MyTableModel mytablemodel) {
- try {
- mytable.insertmsgrecord(mytablemodel);
- } catch (org.springframework.dao.DuplicateKeyException e) {
- logger.error("主键冲突异常被捕获",e);
- }
- }
- }
非常感谢你能看到这里!!! 看到这里相信你已经对本篇博客的内容有所了解了! 如果有什么问题或者想不通的地方欢迎评论区进行讨论.
如果有不正确的地方恳请指正
来源: https://www.cnblogs.com/zhyg/p/11160098.html