前言
当你在处理异步消息时, 每个单独的消息处理程序都是一个单独的 handler, 每个 handler 之间互不影响. 这时如果一个消息依赖另一个消息的状态呢? 这时业务逻辑怎么处理?
借用我们上篇文章的业务场景, 如果在 Ship 项目里需要发送一个 ShipOrder Command. 这个 ShipOrder 需要依赖 Sales.OrderPlaced 和 Bill.OrderBilled Command 的状态, 目前我们的两个单独的 Message Handler 都没有保持任何的状态字段, 所以这时如果我们需要完成这个业务模型, 就需要跟踪他们的状态.
什么是 Saga
这个就是本篇文章要提的 saga, 定义在 NServiceBus 框架里, 他的本质是一个消息驱动模型里的状态机, 或者也可以理解为一系列消息处理程序用来共享状态的业务模型. 我理解在消息队列里如果我们要保证消息一致性通常会自己创建一张 Event 表, 这里 saga 维持状态的角色有点像我们这里的 Event 表.
好的, 回到正题上, 如果我们需要在 Shipping Service 里发送一个 ShipOrder, 发送他之前需要确定 OrderPlaced 和 OrderBilled 的状态, 确保这两个消息都收到以后才能发送 ShipOrder.
如何使用 Saga
当然, 我暂且理解 Saga 的目的是为了处理在长时间运行的任务里保证数据一致性这样的一个角色.
Saga 状态
saga 状态主要是告诉 NServiceBus 在处理数据一致性的判断逻辑, 这里需要继承抽象类 ContainSagaData, 在我们这个业务场景中则主要是判断 OrderPlaced 和 OrderBilled 消息是否已经接收到并处理.
- public class ShippingPolicyData:ContainSagaData
- {
- public string OrderId { get; set; }
- public bool IsOrderPlaced { get; set; }
- public bool IsOrderBilled { get; set; }
- }
Saga 如何工作
有了状态以后, 我们还需要一个 "handler" 来告诉 NServiceBus, 在这个 handler 里主要用来处理消息数据一致性, 我看了官方文档后, 他们建议我们这里的 handler 角色使用 Policy 后缀命名, 当然我觉的也可以用 Saga 后缀命名, 比如 ShippingPolicy 或者 ShippingSaga.
同时这里我们这个 handler 觉色还要继承 Saga 类, Saga 类主要重写方法 ConfigureHowToFindSaga, 这个方法的作用主要是在接受的消息和我们的 Saga 实体之间建立映射关系.
- public class ShipPolicy:Saga<ShippingPolicyData>,
- IAmStartedByMessages<OrderPlaced>,
- IAmStartedByMessages<OrderBilled> // 都可以创建 Saga 实例
- {
- private static ILog log = LogManager.GetLogger<ShipPolicy>();
- protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ShippingPolicyData> mapper)
- {
- mapper.ConfigureMapping<OrderPlaced>(t=>t.OrderId).ToSaga(sagaData=>sagaData.OrderId);
- mapper.ConfigureMapping<OrderBilled>(t=>t.OrderId).ToSaga(sagaData=>sagaData.OrderId);
- }
- public Task Handle(OrderPlaced message, IMessageHandlerContext context)
- {
- log.Info("OrderPlaced message received");
- this.Data.IsOrderPlaced = true;
- return ProcessOrder(context);
- }
- public Task Handle(OrderBilled message, IMessageHandlerContext context)
- {
- log.Info("OrderBilled message received");
- this.Data.IsOrderBilled = true;
- return ProcessOrder(context);
- }
- private async Task ProcessOrder(IMessageHandlerContext context)
- {
- if (Data.IsOrderBilled && Data.IsOrderPlaced)
- {
- await context.SendLocal(new ShipOrder()
- {
- OrderId = Data.OrderId
- });
- MarkAsComplete();
- }
- }
- }
这个类里你会发现还实现了接口 IAmStartedByMessages, 这个接口主要是告诉 Saga, 不论是那种消息类型先进来, 都可以创建一个 Saga 实例, 就比如是 Event 表, 不管那个消息进来, 都需要先插入一条数据, 后续消息再进来时要更新数据状态, 当然, 这里的 Saga 实例也好, Event 表也好, 关键问题就是有效标识, 或者叫主键, 我们这个业务模型里, OrderPlaced 和 OrderBilled 都包含一个属性 OrderId, 这里 Saga 实例则使用这个 OrderId 做关键属性.
发送 ShipOrder Command
到这里也就是我们的 OrderPlaced 和 OrderBIlled 消息都收到了, 业务逻辑符合要求, 可以发送 ShipOrder 消息了, 也就是用户创建了订单, 付了款, 可以发货了.
新建 ShipOrder 类
- public class ShipOrder:ICommand
- {
- public string OrderId { get; set; }
- }
新建 ShipOrderHandler
- public class ShipOrderHandler:IHandleMessages<ShipOrder>
- {
- private static ILog log = LogManager.GetLogger<ShipOrderHandler>();
- public Task Handle(ShipOrder message, IMessageHandlerContext context)
- {
- log.Info($"Order [{message.OrderId}] - Successfully shipped");
- return Task.CompletedTask;
- }
- }
运行 Shipping 项目, 看到下图, 则说明程序运行成功, 我们这个业务场景里 OrderPlaced 消息肯定先接受到, OrderBilled 消息后接受到.
参考链接
https://docs.particular.net/nservicebus/sagas/
来源: https://www.cnblogs.com/sword-successful/p/11729082.html