目录
设计重点
流程图
伪代码
- 2.1. PublishEvent
- 2.2. SubscribeEvent
- 2.3. Publisher
- 2.4. Subscriber
微服务 强一致性
- 3.1 Publisher
- 3.2 Subscriber
事件总线 - 跨服务 最终一致性
4.1 Publisher & Subscriber 都开启了本地事务, 保证了强一致性
4.2 问题场景一: 当 3 发布失败怎么办?
4.3 问题场景二: 当 3 发布成功, 但 4 更新事件状态失败怎么办?
4.4 问题场景三: Publisher 端 Ok,Subscriber 消费出错
0. 设计重点
Publisher 本地化 PublishEvent 保证事件发布可靠性
Subscriber 本地化 SubscribeEvent 保证事件订阅可靠性
SubscribeEvent 通过 EventId & HandlerType 组合约束 保证不重复消费事件
事件中央控制台 处理 Publisher & Subscriber 事件重试
1. 执行流程图
2. 伪代码
- 2.1 PublishEvent
- public abstract class Event
- {
- public Event()
- {
- Id = Guid.NewGuid();
- CreationTime = DateTime.UtcNow;
- }
- public Guid Id { get; set; }
- public DateTime CreationTime { get; set; }
- }
- public class PublishEvent : Event
- {
- public PublishEvent(Event @event)
- {
- Id = @event.Id;
- CreationTime = @event.CreationTime;
- Type = @event.GetType().FullName;
- Data = JsonConvert.SerializeObject(@event);
- Status = PublishEventStatus.NotPublished;
- }
- public String Type { get; set; }
- public String Data { get; set; }
- public PublishEventStatus Status { get; set; }
- }
- public enum PublishEventStatus
- {
- NotPublished = 0,
- Published = 1,
- PublishedFailed = 2
- }
- 2.2 SubscribeEvent
- public class SubscribeEvent
- {
- public SubscribeEvent(Event @event, IEventHandler handler)
- {
- EventId = @event.Id;
- EventCreationTime = @event.CreationTime;
- EventType = @event.GetType().FullName;
- EventData = JsonConvert.SerializeObject(@event);
- HandlerType = handler.GetType().FullName;
- HandlingStatus = HandlingStatus.HandleSucceeded;
- HandlingTime = DateTime.Now;
- }
- public Guid EventId { get; set; }
- public String EventType { get; set; }
- public String EventData { get; set; }
- public DateTime EventCreationTime { get; set; }
- public String HandlerType { get; set; }
- public DateTime HandlingTime { get; set; }
- public HandlingStatus HandlingStatus { get; set; }
- }
- public enum HandlingStatus
- {
- HandleSucceeded = 0,
- HandleFailed = 1
- }
- 2.3 Publisher
- try
- {
- BeginTransaction(); // 1
- //Biz Flow
- EventRepository.PubilshEvent(@event);// 2
- CommitTransaction();
- }
- catch(Exception ex){
- RollbackTransaction();
- throw ex;
- }
- EventBus.Publish(@event); // 3
- EventResitory.EventPublished(@event.ToString()); // 4
- 2.4 Subscriber
- try
- {
- BeginTransaction();
- //Biz Flow
- EventRepository.SubscribeEvent(@event , eventHandler); // 5
- CommitTransaction();
- }
- catch(Exception ex){
- RollbackTransaction();
- throw ex;
- }
3. 微服务 强一致性
3.1 Publisher
开启本地事务达到强一致性
执行本地业务代码
本地事务内部保存事件 预发布 状态 2
发布事件到事件总线 3
修改事件发布状态为已发布 4
3.2 Subscriber
开启本地事务达到强一致性
执行本地业务代码
保存订阅事件到本地仓库
4 事件总线 - 跨服务 最终一致性
4.1 Publisher & Subscriber 都开启了本地事务, 保证了强一致性
4.2 问题场景一: 当 3 发布失败怎么办?
3 发布失败, 意味着抛出异常, 则 4 不执行, 那么事件状态依然保持 预发布状态
后续 事件重试 重新发布该事件, 并更新事件状态为 已发布
4.3 问题场景二: 当 3 发布成功, 但 4 更新事件状态失败怎么办?
4.3.1 场景二. 一 Subscriber 订阅成功
3 发布成功, 但 4 更新事件状态失败, 事件状态依然是 预发布状态
Subscriber 订阅到该事件后成功执行完业务代码
Subscriber 将订阅事件保存到本地订阅事件仓库 5
该场景存在的问题: Publisher 会通过 事件重试 再次发布 预发布 状态的事件, 那么此时 Subscriber 将重复消费该事件
方案: 该问题我们可以通过将 SubscribeEvent EventId & HandlerType 组合唯一约束, 来避免重复消费
4.3.2 场景二. 二 Subscriber 订阅失败
3 发布成功, 但 4 更新事件状态失败, 事件状态依然是 预发布状态
Subscriber 执行消费失败
Subscriber 回滚本地事务
该场景不存在任何问题, 因为 Publisher 会通过 事件重试 再次发布 预发布 状态的事件 .
4.4 问题场景三: Publisher 端 Ok,Subscriber 消费出错
Publisher 端处理顺利
Subscriber 消费失败, 回滚本地事务, 此时 SubscribeEvent 未存储到本地仓库
该场景存在的问题:
Publisher 发送成功, 并且本地 PublishEvent 事件为已发布, 那么意味着从 Publisher 端是无法知道 Subscriber 消费失败需要重新消费
解决方案:
通过检测 PublishEvent & SubscribeEvent 获得需要 事件重试 的 PublishEvent
将 PublishEvent 重新发布 到 Subscriber
5. 通过 Nuget 安装组件支持以上编程模型
- Install-Package SmartEventBus.RabbitMQImpl
- Install-Package SmartEventBus.Repository
6. ORM:SmartSql 广而告之
SmartSql = Dapper + MyBatis + Cache(Memory | Redis) + ZooKeeper + R/W Splitting + ...... https://github.com/Ahoo-Wang/SmartSql
来源: https://www.cnblogs.com/Ahoo-Wang/p/micoservice-eventbus.html