前言
Saga 单词翻译过来是指尤指古代挪威或冰岛讲述冒险经历和英雄业绩的长篇故事, 对, 这里强调长篇故事. 许多系统都存在长时间运行的业务流程, NServiceBus 使用基于事件驱动的体系结构将容错性和可伸缩性融入这些业务处理过程中.
当然一个单一接口调用则算不上一个长时间运行的业务场景, 那么如果在给定的用例中有两个或多个调用, 则应该考虑数据一致性的问题, 这里有可能第一个接口调用成功, 第二次调用则可能失败或者超时, Saga 的设计以简单而健壮的方式处理这样的业务用例.
认识 Saga
先来通过一段代码简单认识一下 Saga, 在 NServiceBus 里, 使用 Saga 的话则需要实现抽象类 Saga,SqlSaga, 这里的 T 的是 Saga 业务实体, 封装数据, 用来在长时间运行过程中封装业务数据.
- public class Saga:Saga<State>,
- IAmStartedByMessages<StartOrder>,
- IHandleMessages<CompleteOrder>
- {
- protected override void ConfigureHowToFindSaga(SagaPropertyMapper<State> mapper)
- {
- mapper.ConfigureMapping<StartOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
- mapper.ConfigureMapping<CompleteOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
- }
- public Task Handle(StartOrder message, IMessageHandlerContext context)
- {
- return Task.CompletedTask;
- }
- public Task Handle(CompleteOrder message, IMessageHandlerContext context)
- {
- MarkAsComplete();
- return Task.CompletedTask;
- }
- }
临时状态
长时间运行则意味着有状态, 任何涉及多个网络调用的进程都需要一个临时状态, 这个临时状态可以存储在内存中, 序列化在磁盘中, 也可以存储在分布式缓存中. 在 NServiceBus 中我们定义实体, 继承抽象类 ContainSagaData 即可, 默认情况下, 所有公开访问的属性都会被持久化.
- public class State:ContainSagaData
- {
- public Guid OrderId { get; set; }
- }
添加行为
在 NServiceBus 里, 处理消息的有两种接口: IHandlerMessages,IAmStartedByMessages.
开启一个 Saga
在前面的代码片段里, 我们看到已经实现了接口 IAmStartedByMessages, 这个接口告诉 NServiceBus, 如果收到了 StartOrder 消息, 则创建一个 Saga 实例 (Saga Instance), 当然 Saga 长流程处理的实体至少有一个需要开启 Saga 流程.
处理无序消息
如果你的业务用例中确实存在无序消息的情况, 则还需要业务流程正常轮转, 那么则需要多个 messaeg 都要事先接口 IAmStartedByMessages 接口, 也就是说多个 message 都可以创建 Saga 实例.
依赖可恢复性
在处理无序消息和多个消息类型的时候, 就存在消息丢失的可能, 必须在你的 Saga 状态完成以后, 这个 Saga 实例又收到一条消息, 但这时 Saga 状态已经是完结状态, 这条消息则仍然需要处理, 这里则实现 NServiceBus 的 IHandleSagaNotFound 接口.
- public class SagaNotFoundHandler:IHandleSagaNotFound
- {
- public Task Handle(object message, IMessageProcessingContext context)
- {
- return context.Reply(new SagaNotFoundMessage());
- }
- }
- public class SagaNotFoundMessage
- {
- }
结束 Saga
当你的业务用例不再需要 Saga 实例时, 则调用 MarkComplete() 来结束 Saga 实例. 这个方法在前面的代码片段中也可以看到, 其实本质也就是设置 Saga.Complete 属性, 这是个 bool 值, 你在业务用例中也可以用此值来判断 Saga 流程是否结束.
- namespace NServiceBus
- {
- using System;
- using System.Threading.Tasks;
- using Extensibility;
- public abstract class Saga
- {
- /// <summary>
- /// The saga's typed data.
- /// </summary>
- public IContainSagaData Entity { get; set; }
- public bool Completed { get; private set; }
- internal protected abstract void ConfigureHowToFindSaga(IConfigureHowToFindSagaWithMessage sagaMessageFindingConfiguration);
- protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, DateTime at) where TTimeoutMessageType : new()
- {
- return RequestTimeout(context, at, new TTimeoutMessageType());
- }
- protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, DateTime at, TTimeoutMessageType timeoutMessage)
- {
- if (at.Kind == DateTimeKind.Unspecified)
- {
- throw new InvalidOperationException("Kind property of DateTime'at'must be specified.");
- }
- VerifySagaCanHandleTimeout(timeoutMessage);
- var options = new SendOptions();
- options.DoNotDeliverBefore(at);
- options.RouteToThisEndpoint();
- SetTimeoutHeaders(options);
- return context.Send(timeoutMessage, options);
- }
- protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within) where TTimeoutMessageType : new()
- {
- return RequestTimeout(context, within, new TTimeoutMessageType());
- }
- protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within, TTimeoutMessageType timeoutMessage)
- {
- VerifySagaCanHandleTimeout(timeoutMessage);
- var sendOptions = new SendOptions();
- sendOptions.DelayDeliveryWith(within);
- sendOptions.RouteToThisEndpoint();
- SetTimeoutHeaders(sendOptions);
- return context.Send(timeoutMessage, sendOptions);
- }
- protected Task ReplyToOriginator(IMessageHandlerContext context, object message)
- {
- if (string.IsNullOrEmpty(Entity.Originator))
- {
- throw new Exception("Entity.Originator cannot be null. Perhaps the sender is a SendOnly endpoint.");
- }
- var options = new ReplyOptions();
- options.SetDestination(Entity.Originator);
- context.Extensions.Set(new AttachCorrelationIdBehavior.State { CustomCorrelationId = Entity.OriginalMessageId });
- options.Context.Set(new PopulateAutoCorrelationHeadersForRepliesBehavior.State
- {
- SagaTypeToUse = null,
- SagaIdToUse = null
- });
- return context.Reply(message, options);
- }
- // 这个方法结束 saga 流程, 标记 Completed 属性
- protected void MarkAsComplete()
- {
- Completed = true;
- }
- void VerifySagaCanHandleTimeout<TTimeoutMessageType>(TTimeoutMessageType timeoutMessage)
- {
- var canHandleTimeoutMessage = this is IHandleTimeouts<TTimeoutMessageType>;
- if (!canHandleTimeoutMessage)
- {
- var message = $"The type'{GetType().Name}'cannot request timeouts for'{timeoutMessage}'because it does not implement'IHandleTimeouts<{typeof(TTimeoutMessageType).FullName}>'";
- throw new Exception(message);
- }
- }
- void SetTimeoutHeaders(ExtendableOptions options)
- {
- options.SetHeader(Headers.SagaId, Entity.Id.ToString());
- options.SetHeader(Headers.IsSagaTimeoutMessage, bool.TrueString);
- options.SetHeader(Headers.SagaType, GetType().AssemblyQualifiedName);
- }
- }
- }
Saga 持久化
本机开发环境我们使用 LearningPersistence, 但是投产的话则需要使用数据库持久化, 这里我们基于 MySQL,SQL 持久化需要引入 NServiceBus.Persistence.Sql.SQL Persistence 会生成几种关系型数据库的 sql scripts, 然后会根据你的断言配置选择所需数据库, 比如 SQL Server,MySQL,PostgreSQL,Oracle.
持久化 Saga 自动创建所需表结构, 你只需手动配置即可, 配置后编译成功后项目执行目录下会生成 sql 脚本, 文件夹名称是 NServiceBus.Persistence.Sql, 下面会有 Saga 子目录.
- /* TableNameVariable */
- set @tableNameQuoted = concat('`', @tablePrefix, 'Saga`');
- set @tableNameNonQuoted = concat(@tablePrefix, 'Saga');
- /* Initialize */
- drop procedure if exists sqlpersistence_raiseerror;
- create procedure sqlpersistence_raiseerror(message varchar(256))
- begin
- signal sqlstate
- 'ERROR'
- set
- message_text = message,
- mysql_errno = '45000';
- end;
- /* CreateTable */
- set @createTable = concat(' create table if not exists', @tableNameQuoted, '(
- Id varchar(38) not null,
- Metadata JSON not null,
- Data JSON not null,
- PersistenceVersion varchar(23) not null,
- SagaTypeVersion varchar(23) not null,
- Concurrency int not null,
- primary key (Id)
- ) default charset=ascii;
- ');
- prepare script from @createTable;
- execute script;
- deallocate prepare script;
- /* AddProperty OrderId */
- select count(*)
- into @exist
- from information_schema.columns
- where table_schema = database() and
- column_name = 'Correlation_OrderId' and
- table_name = @tableNameNonQuoted;
- set @query = IF(
- @exist <= 0,
- concat('alter table', @tableNameQuoted, 'add column Correlation_OrderId varchar(38) character set ascii'), 'select \'Column Exists\'status');
- prepare script from @query;
- execute script;
- deallocate prepare script;
- /* VerifyColumnType Guid */
- set @column_type_OrderId = (
- select concat(column_type,'character set', character_set_name)
- from information_schema.columns
- where
- table_schema = database() and
- table_name = @tableNameNonQuoted and
- column_name = 'Correlation_OrderId'
- );
- set @query = IF(
- @column_type_OrderId <> 'varchar(38) character set ascii',
- 'call sqlpersistence_raiseerror(concat(\'Incorrect data type for Correlation_OrderId. Expected varchar(38) character set ascii got \', @column_type_OrderId, \'.\'));',
- 'select \'Column Type OK\'status');
- prepare script from @query;
- execute script;
- deallocate prepare script;
- /* WriteCreateIndex OrderId */
- select count(*)
- into @exist
- from information_schema.statistics
- where
- table_schema = database() and
- index_name = 'Index_Correlation_OrderId' and
- table_name = @tableNameNonQuoted;
- set @query = IF(
- @exist <= 0,
- concat('create unique index Index_Correlation_OrderId on', @tableNameQuoted, '(Correlation_OrderId)'), 'select \'Index Exists\'status');
- prepare script from @query;
- execute script;
- deallocate prepare script;
- /* PurgeObsoleteIndex */
- select concat('drop index', index_name, 'on', @tableNameQuoted, ';')
- from information_schema.statistics
- where
- table_schema = database() and
- table_name = @tableNameNonQuoted and
- index_name like 'Index_Correlation_%' and
- index_name <> 'Index_Correlation_OrderId' and
- table_schema = database()
- into @dropIndexQuery;
- select if (
- @dropIndexQuery is not null,
- @dropIndexQuery,
- 'select''no index to delete'';')
- into @dropIndexQuery;
- prepare script from @dropIndexQuery;
- execute script;
- deallocate prepare script;
- /* PurgeObsoleteProperties */
- select concat('alter table', table_name, 'drop column', column_name, ';')
- from information_schema.columns
- where
- table_schema = database() and
- table_name = @tableNameNonQuoted and
- column_name like 'Correlation_%' and
- column_name <> 'Correlation_OrderId'
- into @dropPropertiesQuery;
- select if (
- @dropPropertiesQuery is not null,
- @dropPropertiesQuery,
- 'select''no property to delete'';')
- into @dropPropertiesQuery;
- prepare script from @dropPropertiesQuery;
- execute script;
- deallocate prepare script;
- /* CompleteSagaScript */
生成的表结构:
持久化配置
Saga 持久化需要依赖 NServiceBus.Persistence.Sql. 引入后需要实现 SqlSaga 抽象类, 抽象类需要重写 ConfigureMapping, 配置 Saga 工作流程业务主键.
- public class Saga:SqlSaga<State>,
- IAmStartedByMessages<StartOrder>
- {
- protected override void ConfigureMapping(IMessagePropertyMapper mapper)
- {
- mapper.ConfigureMapping<StartOrder>(message=>message.OrderId);
- }
- protected override string CorrelationPropertyName => nameof(StartOrder.OrderId);
- public Task Handle(StartOrder message, IMessageHandlerContext context)
- {
- Console.WriteLine($"Receive message with OrderId:{message.OrderId}");
- MarkAsComplete();
- return Task.CompletedTask;
- }
- }
- static async Task MainAsync()
- {
- Console.Title = "Client-UI";
- var configuration = new EndpointConfiguration("Client-UI");
- // 这个方法开启自动建表, 自动创建 RabbitMQ 队列
- configuration.EnableInstallers();
- configuration.UseSerialization<NewtonsoftSerializer>();
- configuration.UseTransport<LearningTransport>();
- string connectionString = "server=127.0.0.1;uid=root;pwd=000000;database=nservicebus;port=3306";
- var persistence = configuration.UsePersistence<SqlPersistence>();
- persistence.SqlDialect<SqlDialect.MySQL>();
- // 配置 MySQL 连接串
- persistence.ConnectionBuilder(()=>new MySqlConnection(connectionString));
- var instance = await Endpoint.Start(configuration).ConfigureAwait(false);
- var command = new StartOrder()
- {
- OrderId = Guid.NewGuid()
- };
- await instance.SendLocal(command).ConfigureAwait(false);
- Console.ReadKey();
- await instance.Stop().ConfigureAwait(false);
- }
- Saga Timeouts
在消息驱动类型的环境中, 虽然传递的无连接特性可以防止在线等待过程中消耗资源, 但是毕竟等待时间需要有一个上线. 在 NServiceBus 里已经提供了 Timeout 方法, 我们只需订阅即可, 可以在你的 Handle 方法中根据需要订阅 Timeout, 可参考如下代码:
- public class Saga:Saga<State>,
- IAmStartedByMessages<StartOrder>,
- IHandleMessages<CompleteOrder>,
- IHandleTimeouts<TimeOutMessage>
- {
- public Task Handle(StartOrder message, IMessageHandlerContext context)
- {
- var model=new TimeOutMessage();
- // 订阅超时消息
- return RequestTimeout(context,TimeSpan.FromMinutes(10));
- }
- public Task Handle(CompleteOrder message, IMessageHandlerContext context)
- {
- MarkAsComplete();
- return Task.CompletedTask;
- }
- protected override string CorrelationPropertyName => nameof(StartOrder.OrderId);
- public Task Timeout(TimeOutMessage state, IMessageHandlerContext context)
- {
- // 处理超时消息
- }
- protected override void ConfigureHowToFindSaga(SagaPropertyMapper<State> mapper)
- {
- mapper.ConfigureMapping<StartOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
- mapper.ConfigureMapping<CompleteOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
- }
- }
- // 从 Timeout 的源码看, 这个方法是通过设置 SendOptions, 然后再把当前这个消息发送给自己来实现
- protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within, TTimeoutMessageType timeoutMessage)
- {
- VerifySagaCanHandleTimeout(timeoutMessage);
- var sendOptions = new SendOptions();
- sendOptions.DelayDeliveryWith(within);
- sendOptions.RouteToThisEndpoint();
- SetTimeoutHeaders(sendOptions);
- return context.Send(timeoutMessage, sendOptions);
- }
总结
NServiceBus 因为是商业产品, 对分布式消息系统所涉及到的东西都做了实现, 包括分布式事务 (Outbox),DTC 都有, 还有心跳检测, 监控都有, 全而大, 目前我们用到的也只是 NServiceBus 里很小的一部分功能.
来源: https://www.cnblogs.com/sword-successful/p/11925790.html