所有文章
https://www.cnblogs.com/lay2017/p/12485081.html
正文
上一篇 https://www.cnblogs.com/lay2017/p/12498190.html 文章中, 我们看了看 DefaultCoordinator 作为分布式事务的协调者, 关于全局事务 begin 的流程.
DefaultCoordinator 把 begin 的核心实现交付给了 DefaultCore,DefaultCore 将会构造处一个 GlobalSession, 一份放到内存里面, 一份持久化 (默认持久化到 ~/sessionStore/root.data).
总结下来就是 begin 的时候将会在 Server 端构造一个 GlobalSession.
那么, 本文将看看关于全局事务的 commit 是怎么样一个流程.
DefaultCore.commit
和 begin 一样, DefaultCoordinator 把 commit 全局事务的实现交付给了 DefaultCore 来做. 我们打开 DefaultCore 的 commit 方法
- @Override
- public GlobalStatus commit(String xid) throws TransactionException {
- // 通过 XID 找到 GlobalSession
- GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
- if (globalSession == null) {
- return GlobalStatus.Finished;
- }
- // 添加 FileTransactionStoreManager
- globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
- // 加锁
- boolean shouldCommit = globalSession.lockAndExcute(() -> {
- // 关闭 session, 防止还有分支事务注册
- globalSession.closeAndClean();
- if (globalSession.getStatus() == GlobalStatus.Begin) {
- // 状态变化
- globalSession.changeStatus(GlobalStatus.Committing);
- return true;
- }
- return false;
- });
- // 如果原本不是 begin 的状态, 返回原本状态
- if (!shouldCommit) {
- return globalSession.getStatus();
- }
- // 判断是否可以异步提交
- if (globalSession.canBeCommittedAsync()) {
- // 异步提交
- asyncCommit(globalSession);
- return GlobalStatus.Committed;
- } else {
- // 同步提交
- doGlobalCommit(globalSession, false);
- }
- // 返回状态
- return globalSession.getStatus();
- }
commit 方法比较长, 但是逻辑并不复杂
1) 首先, 先根据 XID 找到对应的 GlobalSession.
2) 其次, 将 GlobalSession 的状态从 begin 到 committing 表示正在执行提交
3) 然后, 进行提交操作.
findGlobalSession 方法将从 file 或者 db 中查询 GlobalSession,changeStatus 的时候先 close 了当前全局事务, 这样就不会有新的分支事务注册进来了.
canBeCommittedAsync 将判断是否可以进行异步提交, 判断标准是什么呢? 我们看看该方法
- public boolean canBeCommittedAsync() {
- // 遍历分支 session
- for (BranchSession branchSession : branchSessions) {
- // 如果有一个 TCC 分支返回 false
- if (branchSession.getBranchType() == BranchType.TCC) {
- return false;
- }
- }
- return true;
- }
这里判断的是分支事务的 session 是 TCC 类型.
也就是说, 如果当前 GlobalSession 代表的是 AT 自动事务模式. 那么只需要将 GlobalSession 的状态改为 AsyncCommitting 即可, 而通知 RM 端的事情交给 DefaultCoordinator 的异步线程来做. 如果包含 TCC 分支, 那么直接调用 doGlobalCommit 同步提交.
我们再看看 DefaultCoordinator 的异步线程怎么处理 AT 模式的的.
- asyncCommitting.scheduleAtFixedRate(() -> {
- try {
- handleAsyncCommitting();
- } catch (Exception e) {
- LOGGER.info("Exception async committing ...", e);
- }
- }, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
- protected void handleAsyncCommitting() {
- // 获取所有 AsyncCommitting 状态的 GlobalSession
- Collection<GlobalSession> asyncCommittingSessions = SessionHolder.getAsyncCommittingSessionManager().allSessions();
- if (CollectionUtils.isEmpty(asyncCommittingSessions)) {
- return;
- }
- // 遍历 session
- for (GlobalSession asyncCommittingSession : asyncCommittingSessions) {
- try {
- if (GlobalStatus.AsyncCommitting != asyncCommittingSession.getStatus()) {
- continue;
- }
- asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
- // 调用 core 的 doGlobalCommit 通知客户端
- core.doGlobalCommit(asyncCommittingSession, true);
- } catch (TransactionException ex) {
- // ...
- }
- }
- }
异步线程默认 1 秒中执行一次, 将从 SessionManager 中获取所有 AsyncCommitting 状态的 GlobalSession.
然后逐个调用 DefaultCore 的 doGlobalCommit 方法通知客户端删除 undoLog.
DefaultCore.rollback
上面看了全局事务的 commit, 那么 rollback 呢? 我们跟进 DefaultCore 的 rollback 代码
- @Override
- public GlobalStatus rollback(String xid) throws TransactionException {
- // 找到 GlobalSession
- GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
- if (globalSession == null) {
- return GlobalStatus.Finished;
- }
- globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
- // 加锁
- boolean shouldRollBack = globalSession.lockAndExcute(() -> {
- // 先关闭
- globalSession.close();
- // 状态变更
- if (globalSession.getStatus() == GlobalStatus.Begin) {
- globalSession.changeStatus(GlobalStatus.Rollbacking);
- return true;
- }
- return false;
- });
- if (!shouldRollBack) {
- return globalSession.getStatus();
- }
- // 直接同步处理
- doGlobalRollback(globalSession, false);
- return globalSession.getStatus();
- }
rollback 比 commit 逻辑简单多了, 也不用同步异步的区分. 直接都是同步遍历所有分支事务的 session, 通知 RM 进行 rollback 操作.
总结
全局事务的提交, 无非是从 file 或者 db 中拿到 GlobalSession. 不论是同步提交还是异步提交, 最终都是拿到所有分支事务的 session, 然后通知 RM 去做相应的 commit 操作.
全局事务的回滚, 一样是拿到 GlobalSession. 然后通知 RM 做分支事务的 rollback 操作.
总得来说, Server 端主动推送 commit 或者 rollback 通知到 RM, 如果失败那么就进行 retry.seata 在一致性方面选择了最终一致性.
来源: http://www.bubuko.com/infodetail-3465072.html