很长一段时间以来,我都在思考如何在 ASP.NET Core 的框架下,实现一套完整的事件驱动型架构。这个问题看上去有点大,其实主要目标是为了实现一个基于 ASP.NET Core 的微服务,它能够非常简单地订阅来自于某个渠道的事件消息,并对接收到的消息进行处理,于此同时,它还能够向该渠道发送事件消息,以便订阅该事件消息的消费者能够对消息数据做进一步处理。让我们回顾一下微服务之间通信的几种方式,分为同步和异步两种。同步通信最常见的就是 RESTful API,而且非常简单轻量,一个 Request/Response 回环就结束了;异步通信最常见的就是通过消息渠道,将载有特殊意义的数据的事件消息发送到消息渠道,而对某种类型消息感兴趣的消费者,就可以获取消息中所带信息并执行相应操作,这也是我们比较熟知的事件驱动架构的一种表现形式。虽然事件驱动型架构看起来非常复杂,从微服务的实现来看显得有些繁重,但它的应用范围确实很广,也为服务间通信提供了新的思路。了解 DDD 的朋友相信一定知道 CQRS 体系结构模式,它就是一种事件驱动型架构。事实上,实现一套完整的、安全的、稳定的、正确的事件驱动架构并不简单,由于异步特性带来的一致性问题会非常棘手,甚至需要借助一些基础结构层工具(比如关系型数据库,不错!只能是关系型数据库)来解决一些特殊问题。本文就打算带领大家一起探探路,基于 ASP.NET Core Web API 实现一个相对比较简单的事件驱动架构,然后引出一些有待深入思考的问题,留在今后的文章中继续讨论。或许,本文所引入的源代码无法直接用于生产环境,但我希望本文介绍的内容能够给到读者一些启发,并能够帮助解决实际中遇到的问题。
本文会涉及一些相关的专业术语,在此先作约定:
注意:为了迎合描述的需要,在下文中可能会混用事件和消息两个概念。
先从简单的设计开始,基本上事件驱动型架构会有事件消息(Events)、事件订阅器(Event Subscriber)、事件派发器(Event Publisher)、事件处理器(Event Handler)以及事件总线(Event Bus)等主要组件,它们之间的关系大致如下:
首先,IEvent 接口定义了事件消息(更确切地说,数据)的基本结构,几乎所有的事件都会有一个唯一标识符(Id)和一个事件发生的时间(Timestamp),这个时间通常使用 UTC 时间作为标准。IEventHandler 定义了事件处理器接口,显而易见,它包含两个方法:CanHandle 方法,用以确定传入的事件对象是否可被当前处理器所处理,以及 Handle 方法,它定义了事件的处理过程。IEvent 和 IEventHandler 构成了事件处理的基本元素。
然后就是 IEventSubscriber 与 IEventPublisher 接口。前者表示实现该接口的类型为事件订阅器,它负责事件处理器的注册,并侦听来自事件通信渠道上的消息,一旦所获得的消息能够被某个处理器处理,它就会指派该处理器对接收到的消息进行处理。因此,IEventSubscriber 会保持着对事件处理器的引用;而对于实现了 IEventPublisher 接口的事件派发器而言,它的主要任务就是将事件消息发送到消息通信渠道,以便订阅端能够获得消息并进行处理。
IEventBus 接口表示消息通信渠道,也就是大家所熟知的消息总线的概念。它不仅具有消息订阅的功能,而且还具有消息派发的能力,因此,它会同时继承于 IEventSubscriber 和 IEventPublisher 接口。在上面的设计中,通过接口分离消息总线的订阅器和派发器的角色是很有必要的,因为两种角色的各自职责不一样,这样的设计同时满足 SOLID 中的 SRP 和 ISP 两个准则。
基于以上基础模型,我们可以很快地将这个对象关系模型转换为 C# 代码:
- public interface IEvent
- {
- Guid Id { get; }
- DateTime Timestamp { get; }
- }
- public interface IEventHandler
- {
- Task<bool> HandleAsync(IEvent @event, CancellationToken cancellationToken = default);
- bool CanHandle(IEvent @event);
- }
- public interface IEventHandler<in T> : IEventHandler
- where T : IEvent
- {
- Task<bool> HandleAsync(T @event, CancellationToken cancellationToken = default);
- }
- public interface IEventPublisher : IDisposable
- {
- Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
- where TEvent : IEvent;
- }
- public interface IEventSubscriber : IDisposable
- {
- void Subscribe();
- }
- public interface IEventBus : IEventPublisher, IEventSubscriber { }
短短 30 行代码,就把我们的基本对象关系描述清楚了。对于上面的代码我们需要注意以下几点:
OK,接口定义好了。实现呢?下面,我们实现一个非常简单的消息总线:PassThroughEventBus。在今后的文章中,我还会介绍如何基于 RabbitMQ 和 Azure Service Bus 实现不一样的消息总线。
顾名思义,PassThroughEventBus 表示当有消息被派发到消息总线时,消息总线将不做任何处理与路由,而是直接将消息推送到订阅方。在订阅方的事件监听函数中,会通过已经注册的事件处理器对接收到的消息进行处理。整个过程并不会依赖于任何外部组件,不需要引用额外的开发库,只是利用现有的. NET 数据结构来模拟消息的派发和订阅过程。因此,PassThroughEventBus 不具备容错和消息重发功能,不具备消息存储和路由功能,我们先实现这样一个简单的消息总线,来体验事件驱动型架构的设计过程。
我们可以使用. NET 中的 Queue 或者 ConcurrentQueue 等基本数据结构来作为消息队列的实现,与这些基本的数据结构相比,消息队列本身有它自己的职责,它需要在消息被推送进队列的同时通知调用方。当然,PassThroughEventBus 不需要依赖于 Queue 或者 ConcurrentQueue,它所要做的事情就是模拟一个消息队列,当消息推送进来的时候,立刻通知订阅方进行处理。同样,为了分离职责,我们可以引入一个 EventQueue 的实现(如下),从而将消息推送和路由的职责(基础结构层的职责)从消息总线中分离出来。
- internal sealed class EventQueue
- {
- public event System.EventHandler<EventProcessedEventArgs> EventPushed;
- public EventQueue() { }
- public void Push(IEvent @event)
- {
- OnMessagePushed(new EventProcessedEventArgs(@event));
- }
- private void OnMessagePushed(EventProcessedEventArgs e) => this.EventPushed?.Invoke(this, e);
- }
EventQueue 中最主要的方法就是 Push 方法,从上面的代码可以看到,当 EventQueue 的 Push 方法被调用时,它将立刻触发 EventPushed 事件,它是一个. NET 事件,用以通知 EventQueue 对象的订阅者,消息已经被派发。整个 EventQueue 的实现非常简单,我们仅专注于事件的路由,完全没有考虑任何额外的事情。
接下来,就是利用 EventQueue 来实现 PassThroughEventBus。毫无悬念,PassThroughEventBus 需要实现 IEventBus 接口,它的两个基本操作分别是 Publish 和 Subscribe。在 Publish 方法中,会将传入的事件消息转发到 EventQueue 上,而 Subscribe 方法则会订阅 EventQueue.EventPushed 事件(.NET 事件),而在 EventPushed 事件处理过程中,会从所有已注册的事件处理器(Event Handlers)中找到能够处理所接收到的事件,并对其进行处理。整个流程还是非常清晰的。以下便是 PassThroughEventBus 的实现代码:
- public sealed class PassThroughEventBus : IEventBus
- {
- private readonly EventQueue eventQueue = new EventQueue();
- private readonly IEnumerable<IEventHandler> eventHandlers;
- public PassThroughEventBus(IEnumerable<IEventHandler> eventHandlers)
- {
- this.eventHandlers = eventHandlers;
- }
- private void EventQueue_EventPushed(object sender, EventProcessedEventArgs e)
- => (from eh in this.eventHandlers
- where eh.CanHandle(e.Event)
- select eh).ToList().ForEach(async eh => await eh.HandleAsync(e.Event));
- public Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
- where TEvent : IEvent
- => Task.Factory.StartNew(() => eventQueue.Push(@event));
- public void Subscribe()
- => eventQueue.EventPushed += EventQueue_EventPushed;
- #region IDisposable Support
- private bool disposedValue = false; // To detect redundant calls
- void Dispose(bool disposing)
- {
- if (!disposedValue)
- {
- if (disposing)
- {
- this.eventQueue.EventPushed -= EventQueue_EventPushed;
- }
- disposedValue = true;
- }
- }
- public void Dispose() => Dispose(true);
- #endregion
- }
实现过程非常简单,当然,从这些代码也可以更清楚地了解到,PassThroughEventBus 不做任何路由处理,更不会依赖于一个基础结构设施(比如实现了 AMQP 的消息队列),因此,不要指望能够在生产环境中使用它。不过,目前来看,它对于我们接下来要讨论的事情还是会很有帮助的,至少在我们引入基于 RabbitMQ 等实现的消息总线之前。
同样地,请将 PassThroughEventBus 实现在另一个 NetStandard 的 Class Library 中,虽然它不需要额外的依赖,但它毕竟是众多消息总线中的一种,将它从接口定义的程序集中剥离开来,好处有两点:第一,保证了定义接口的程序集的纯净度,使得该程序集不需要依赖任何外部组件,并确保了该程序集的职责单一性,即为消息系统的实现提供基础类库;第二,将 PassThroughEventBus 置于独立的程序集中,有利于调用方针对 IEventBus 进行技术选择,比如,如果开发者选择使用基于 RabbitMQ 的实现,那么,只需要引用基于 RabbitMQ 实现 IEventBus 接口的程序集就可以了,而无需引用包含了 PassThroughEventBus 的程序集。这一点我觉得可以归纳为框架设计中 "隔离依赖关系(Dependency Segregation)" 的准则。
好了,基本组件都定义好了,接下来,让我们一起基于 ASP.NET Core Web API 来做一个 RESTful 服务,并接入上面的消息总线机制,实现消息的派发和订阅。
我们仍然以客户管理的 RESTful API 为例子,不过,我们不会过多地讨论如何去实现管理客户信息的 RESTful 服务,那并不是本文的重点。作为一个案例,我使用 ASP.NET Core 2.0 Web API 建立了这个服务,使用 Visual Studio 2017 15.5 做开发,并在 CustomersController 中使用 Dapper 来对客户信息 CRUD。后台基于 SQL Server 2017 Express Edition,使用 SQL Server Management Studio 能够让我方便地查看数据库操作的结果。
假设我们的客户信息只包含客户 ID 和名称,下面的 CustomersController 代码展示了我们的 RESTful 服务是如何保存并读取客户信息的。当然,我已经将本文的代码通过 Github 开源,开源协议为 MIT,虽然商业友好,但毕竟是案例代码没有经过测试,所以请谨慎使用。本文源代码的使用我会在文末介绍。
- [Route("api/[controller]")]
- public class CustomersController : Controller
- {
- private readonly IConfiguration configuration;
- private readonly string connectionString;
- public CustomersController(IConfiguration configuration)
- {
- this.configuration = configuration;
- this.connectionString = configuration["mssql:connectionString"];
- }
- // 获取指定ID的客户信息
- [HttpGet("{id}")]
- public async Task<IActionResult> Get(Guid id)
- {
- const string sql = "SELECT [CustomerId] AS Id, [CustomerName] AS Name FROM [dbo].[Customers] WHERE [CustomerId]=@id";
- using (var connection = new SqlConnection(connectionString))
- {
- var customer = await connection.QueryFirstOrDefaultAsync<Model.Customer>(sql, new { id });
- if (customer == null)
- {
- return NotFound();
- }
- return Ok(customer);
- }
- }
- // 创建新的客户信息
- [HttpPost]
- public async Task<IActionResult> Create([FromBody] dynamic model)
- {
- var name = (string)model.Name;
- if (string.IsNullOrEmpty(name))
- {
- return BadRequest();
- }
- const string sql = "INSERT INTO [dbo].[Customers] ([CustomerId], [CustomerName]) VALUES (@Id, @Name)";
- using (var connection = new SqlConnection(connectionString))
- {
- var customer = new Model.Customer(name);
- await connection.ExecuteAsync(sql, customer);
- return Created(Url.Action("Get", new { id = customer.Id }), customer.Id);
- }
- }
- }
代码一如既往的简单,Web API 控制器通过 Dapper 简单地实现了客户信息的创建和返回。我们不妨测试一下,使用下面的 Invoke-RestMethod PowerShell 指令,发送 Post 请求,通过上面的 Create 方法创建一个用户:
可以看到,response 中已经返回了新建客户的 ID 号。接下来,继续使用 Invoke-RestMethod 来获取新建客户的详细信息:
OK,API 调试完全没有问题。下面,我们将这个案例再扩充一下,我们希望这个 API 在完成客户信息创建的同时,向外界发送一条 "客户信息已创建" 的事件,并设置一个事件处理器,负责将该事件的详细内容保存到数据库中。
首先,我们在 ASP.NET Core Web API 项目上,添加对以上两个程序集的引用,然后,按常规做法,在 ConfigureServices 方法中,将 PassThroughEventBus 添加到 IoC 容器中:
- public void ConfigureServices(IServiceCollection services) {
- services.AddMvc();
- services.AddSingleton < IEventBus,
- PassThroughEventBus > ();
- }
在此,将事件总线注册为单例(Singleton)服务,是因为它不保存状态。理论上讲,使用单例服务时,需要特别注意服务实例对象的生命周期管理,因为它的生命周期是整个应用程序级别,在程序运行的过程中,由其引用的对象资源将无法释放,因此,当程序结束运行时,需要合理地将这些资源 dispose 掉。好在 ASP.NET Core 的依赖注入框架中已经帮我们处理过了,因此,对于上面的 PassThroughEventBus 单例注册,我们不需要过多担心,程序执行结束并正常退出时,依赖注入框架会自动帮我们 dispose 掉 PassThroughEventBus 的单例实例。那么对于单例实例来说,我们是否只需要通过 AddSingleton 方法进行注册就可以了,而无需关注它是否真的被 dispose 了呢?答案是否定的,有兴趣的读者可以参考 微软的官方文档 ,在下一篇文章中我会对这部分内容做些介绍。
接下来,我们需要定义一个 CustomerCreatedEvent 对象,表示 "客户信息已经创建" 这一事件信息,同时,再定义一个 CustomerCreatedEventHandler 事件处理器,用来处理从 PassThroughEventBus 接收到的事件消息。代码如下,当然也很简单:
- public class CustomerCreatedEvent: IEvent {
- public CustomerCreatedEvent(string customerName) {
- this.Id = Guid.NewGuid();
- this.Timestamp = DateTime.UtcNow;
- this.CustomerName = customerName;
- }
- public Guid Id {
- get;
- }
- public DateTime Timestamp {
- get;
- }
- public string CustomerName {
- get;
- }
- }
- public class CustomerCreatedEventHandler: IEventHandler < CustomerCreatedEvent > {
- public bool CanHandle(IEvent@event) = >@event.GetType().Equals(typeof(CustomerCreatedEvent));
- public Task < bool > HandleAsync(CustomerCreatedEvent@event, CancellationToken cancellationToken =
- default) {
- return Task.FromResult(true);
- public Task < bool > HandleAsync(IEvent@event, CancellationToken cancellationToken =
- default) = >CanHandle(@event) ? HandleAsync((CustomerCreatedEvent)@event, cancellationToken) : Task.FromResult(false);
- }
两者分别实现了我们最开始定义好的 IEvent 和 IEventHandler 接口。在 CustomerCreatedEventHandler 类的第一个 HandleAsync 重载方法中,我们暂且让它简单地返回一个 true 值,表示事件处理成功。下面要做的事情就是,在客户信息创建成功后,向事件总线发送 CustomerCreatedEvent 事件,以及在 ASP.NET Core Web API 程序启动的时候,注册 CustomerCreatedEventHandler 实例,并调用事件总线的 Subscribe 方法,使其开始侦听事件的派发行为。
于是,CustomerController 需要依赖 IEventBus,并且在 CustomerController.Create 方法中,需要通过调用 IEventBus 的 Publish 方法将事件发送出去。现对 CustomerController 的实现做一些调整,调整后代码如下:
- [Route("api/[controller]")]
- public class CustomersController : Controller
- {
- private readonly IConfiguration configuration;
- private readonly string connectionString;
- private readonly IEventBus eventBus;
- public CustomersController(IConfiguration configuration,
- IEventBus eventBus)
- {
- this.configuration = configuration;
- this.connectionString = configuration["mssql:connectionString"];
- this.eventBus = eventBus;
- }
- // 创建新的客户信息
- [HttpPost]
- public async Task<IActionResult> Create([FromBody] dynamic model)
- {
- var name = (string)model.Name;
- if (string.IsNullOrEmpty(name))
- {
- return BadRequest();
- }
- const string sql = "INSERT INTO [dbo].[Customers] ([CustomerId], [CustomerName]) VALUES (@Id, @Name)";
- using (var connection = new SqlConnection(connectionString))
- {
- var customer = new Model.Customer(name);
- await connection.ExecuteAsync(sql, customer);
- await this.eventBus.PublishAsync(new CustomerCreatedEvent(name));
- return Created(Url.Action("Get", new { id = customer.Id }), customer.Id);
- }
- }
- // Get方法暂且省略
- }
然后,修改 Startup.cs 中的 ConfigureServices 方法,将 CustomerCreatedEventHandler 注册进来:
- public void ConfigureServices(IServiceCollection services)
- {
- services.AddMvc();
- services.AddTransient<IEventHandler, CustomerCreatedEventHandler>();
- services.AddSingleton<IEventBus, PassThroughEventBus>();
- }
并且调用 Subscribe 方法,开始侦听消息总线:
- public void Configure(IApplicationBuilder app, IHostingEnvironment env)
- {
- var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
- eventBus.Subscribe();
- if (env.IsDevelopment())
- {
- app.UseDeveloperExceptionPage();
- }
- app.UseMvc();
- }
OK,现在让我们在 CustomerCreatedEventHandler 的 HandleAsync 方法上设置个断点,按下 F5 启用 Visual Studio 2017 调试,然后重新使用 Invoke-RestMethod 命令发送一个 Post 请求,可以看到,HandleAsync 方法上的断点被命中,同时事件已被正确派发:
数据库中的数据也被正确更新:
目前还差最后一小步,就是在 HandleAsync 中,将 CustomerCreatedEvent 对象的数据序列化并保存到数据库中。当然这也不难,同样可以考虑使用 Dapper,或者直接使用 ADO.NET,甚至使用比较重量级的 Entity Framework Core,都可以实现。那就在此将这个问题留给感兴趣的读者朋友自己搞定啦。
到这里基本上本文的内容也就告一段落了,回顾一下,本文一开始就提出了一种相对简单的消息系统和事件驱动型架构的设计模型,并实现了一个最简单的事件总线:PassThroughEventBus。随后,结合一个实际的 ASP.NET Core Web API 案例,了解了在 RESTful API 中实现事件消息派发和订阅的过程,并实现了在事件处理器中,对获得的事件消息进行处理。
然而,我们还有很多问题需要更深入地思考,比如:
等等。。。在接下来的文章中,我会尽力做更详细的介绍。
本系列文章的源代码在 https://github.com/daxnet/edasample 这个 Github Repo 里,通过不同的 release tag 来区分针对不同章节的源代码。本文的源代码请参考 chapter_1 这个 tag,如下:
接下来还将会有 chapter_2、chapter_3 等这些 tag,对应本系列文章的第二部分、第三部分等等。敬请期待。
来源: http://www.cnblogs.com/daxnet/p/8082694.html