abp 通过 IDistributedEventBus 接口集成自实现分布式事件消息的发布订阅.
IEventBus 在什么时机触发 PublishAsync?
当前 UnitOfWork 完成时, 触发 IEventBus 的 PublishAsync
在没有事务环境下, 同步调用 IEventBus 的 PublishAsync
abp 默认实现基于 RabbitMq 消息队列 Volo.Abp.EventBus.RabbitMQ 实现分布式消息的发布与订阅.
消息治理核心问题:
生产端如何保证投递成功的消息不能丢失.
Mq 自身如何保证消息不丢失.
消费段如何保证消费端的消息不丢失.
基于 abp 默认实现的 DistributedEventBus 不能满足以下场景:
Publisher 生产者无法保证消息一定能投递到 MQ.
Consumer 消费端在消息消费时, 出现异常时, 没有异常错误处理机制 (确保消费失败的消息能重新被消费).
我们引入 Masstransit http://masstransit-project.com/MassTransit/usage/ , 来提升 abp 对消息治理能力.
Masstransit 提供以下开箱即用功能:
Publish/Send/Request-Response 等几种消息投递机制.
多种 IoC 容器支持.
异常机制.
Saga 事务管理.
事务活动补偿机制 (Courier)
消息审计
消息管道处理机制
Abp 框架下事件消息集成
使用 MassTransit 重新实现
- IDistributedEventBus
- .
在消费端 Consumer 传递用户身份信息.
使用 ASP.NET Core web Host 作消费端 Consumer 宿主.
集成 MassTransit
在 Module 初始化时, 注入 MassTransit 实例, 并启动.
- /// <summary>
- /// 配置 DistributedEventBus
- /// </summary>
- /// <param name="context"></param>
- /// <param name="configuration"></param>
- /// <param name="hostingEnvironment"></param>
- private void ConfigureDistributedEventBus(ServiceConfigurationContext context, IConfiguration configuration, IWebHostEnvironment hostingEnvironment)
- {
- var options = context.Services.GetConfiguration().GetSection("Rabbitmq").Get<MassTransitEventBusOptions>();
- var mqConnectionString = "rabbitmq://" + options.ConnectionString;
- context.Services.AddMassTransit(mtConfig =>
- {
- //inject consumers into IoC from assembly
- mtConfig.AddConsumers(typeof(AuthCenterEventBusHostModule));
- mtConfig.AddBus(provider =>
- {
- var bus = Bus.Factory.CreateUsingRabbitMq(mqConfig =>
- {
- var host = mqConfig.Host(new Uri(mqConnectionString), h =>
- {
- h.Username(options.UserName);
- h.Password(options.Password);
- });
- // set special message serializer
- mqConfig.UseBsonSerializer();
- // integrated existed logger compontent
- mqConfig.UseExtensionsLogging(provider.GetService<ILoggerFactory>());
- mqConfig.ReceiveEndpoint(host, "authcenter-queue", q =>
- {
- //set rabbitmq prefetch count
- q.PrefetchCount = 200;
- //set message retry policy
- q.UseMessageRetry(r => r.Interval(3, 100));
- q.Consumer<SmsTokenValidationCreatedEventConsumer>(provider);
- EndpointConvention.Map<SmsTokenValidationCreatedEvent>(q.InputAddress);
- });
- mqConfig.ReceiveEndpoint(host, "user-synchronization", q =>
- {
- //set rabbitmq prefetch count
- q.PrefetchCount = 50;
- //q.UseRateLimit(100, TimeSpan.FromSeconds(1));
- //q.UseConcurrencyLimit(2);
- //set message retry policy
- q.UseMessageRetry(r => r.Interval(3, 100));
- q.Consumer<UserSyncEventConsumer>(provider);
- EndpointConvention.Map<UserSyncEvent>(q.InputAddress);
- });
- mqConfig.ConfigureEndpoints(provider);
- mqConfig.UseAuditingFilter(provider, o =>
- {
- o.ReplaceAuditing = true;
- });
- });
- // set authtication middleware for user identity
- bus.ConnectAuthenticationObservers(provider);
- return bus;
- });
- });
- }
在 MassTransit 中, 使用 IBusControl 接口 StartAsync 或 StopAsync 来启动或停止.
使用 IPublishEndpoint 重新实现 IDistributedEventBus 接口, 实现与 abp 分布式事件总线集成.
- public class MassTransitDistributedEventBus : IDistributedEventBus, ISingletonDependency
- {
- private readonly IPublishEndpoint _publishEndpoint;
- //protected IHybridServiceScopeFactory ServiceScopeFactory { get; }
- protected AbpDistributedEventBusOptions DistributedEventBusOptions { get; }
- public MassTransitDistributedEventBus(
- IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions,
- IPublishEndpoint publishEndpoint)
- {
- //ServiceScopeFactory = serviceScopeFactory;
- _publishEndpoint = publishEndpoint;
- DistributedEventBusOptions = distributedEventBusOptions.Value;
- //Subscribe(distributedEventBusOptions.Value.Handlers);
- }
- /*
- * Not Implementation
- */
- public Task PublishAsync<TEvent>(TEvent eventData)
- where TEvent : class
- {
- return _publishEndpoint.Publish(eventData);
- }
- public Task PublishAsync(Type eventType, object eventData)
- {
- return _publishEndpoint.Publish(eventData, eventType);
- }
- }
到此, 我们实现了 MassTransit 与 Abp 集成.
事件消息传递 User Claims
在实际业务实现过程中, 我们会用消息队列实现 "削峰填谷" 的效果. 异步消息队列中传递用户身份信息如何实现呢?
我们先看看 abp 在 WebApi 中, 如何确定当前用户?
ICurrentUser 提供当前 User Claims 抽象. 而 ICurrentUser 依赖于 ICurrentPrincipalAccessor, 在 ASP.NET core 中利用 HttpContext User 来记录当前用户身份.
在 MassTransit 中, 利用 IPublishObserver> IConsumeObserver 生产者 / 消费端的观察者, 来实现传递已认证的用户 Claims.
- /// <summary>
- /// 生产者传递当前用户 Principal
- /// </summary>
- public class AuthPublishObserver : IPublishObserver
- {
- private readonly ICurrentPrincipalAccessor _currentPrincipalAccessor;
- private readonly IClaimsPrincipalFactory _claimsPrincipalFactory;
- public AuthPublishObserver(
- ICurrentPrincipalAccessor currentPrincipalAccessor,
- IClaimsPrincipalFactory claimsPrincipalFactory)
- {
- _currentPrincipalAccessor = currentPrincipalAccessor;
- _claimsPrincipalFactory = claimsPrincipalFactory;
- }
- public Task PrePublish<T>(PublishContext<T> context) where T : class
- {
- var claimsPrincipal = _claimsPrincipalFactory
- .CreateClaimsPrincipal(
- _currentPrincipalAccessor.Principal
- );
- if (claimsPrincipal != null)
- {
- context.Headers.SetAuthenticationHeaders(claimsPrincipal);
- }
- return TaskUtil.Completed;
- }
- public Task PostPublish<T>(PublishContext<T> context) where T : class => TaskUtil.Completed;
- public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class => TaskUtil.Completed;
- }
- /// <summary>
- /// 消费端从 MqMessage Heads 中获取当前用户 Principal, 并赋值给 HttpContext
- /// </summary>
- public class AuthConsumeObserver : IConsumeObserver
- {
- private readonly IHttpContextAccessor _httpContextAccessor;
- private readonly IServiceScopeFactory _factory;
- public AuthConsumeObserver(IHttpContextAccessor httpContextAccessor, IServiceScopeFactory factory)
- {
- _httpContextAccessor = httpContextAccessor;
- _factory = factory;
- }
- public Task PreConsume<T>(ConsumeContext<T> context) where T : class
- {
- if (_httpContextAccessor.HttpContext == null)
- {
- _httpContextAccessor.HttpContext = new DefaultHttpContext
- {
- RequestServices = _factory.CreateScope().ServiceProvider
- };
- }
- var abpClaimsPrincipal = context.Headers.TryGetAbpClaimsPrincipal();
- if (abpClaimsPrincipal != null && abpClaimsPrincipal.IsAuthenticated)
- {
- var claimsPrincipal = abpClaimsPrincipal.ToClaimsPrincipal();
- _httpContextAccessor.HttpContext.User = claimsPrincipal;
- Thread.CurrentPrincipal = claimsPrincipal;
- }
- return TaskUtil.Completed;
- }
- public Task PostConsume<T>(ConsumeContext<T> context) where T : class
- {
- _httpContextAccessor.HttpContext = null;
- return TaskUtil.Completed;
- }
- public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class
- {
- _httpContextAccessor.HttpContext = null;
- return TaskUtil.Completed;
- }
- }
使用 ASP.NET Core Web Host 作消费端 Consumer 宿主
基于以下几点原因, 我们使用 ASP.NET Core Web Host 作为消息端 Consumer 宿主
部署在 Linux 环境下, ASP.NET Core Web Host 通常使用守护进程来启动服务实例, 这样可以保证服务不被中断.
根据 abp vnext DDD 项目分层, 最大程度利用 Application 层应用方法, 复用 abp vnext 框架机制.
MassTransit 深入研究
延迟消息
限流, 熔断降级
批量消费
- Saga
- References
- abp vnext disctributed event bus https://docs.abp.io/zh-Hans/abp/latest
- http://masstransit-project.com/MassTransit/usage/
来源: http://www.bubuko.com/infodetail-3401450.html