Intro
EventBus 是一种事件发布订阅模式, 通过 EventBus 我们可以很方便的实现解耦, 将事件的发起和事件的处理的很好的分隔开来, 很好的实现解耦. 微软官方的示例项目 EShopOnContainers 也有在使用 EventBus .
这里的 EventBus 实现也是参考借鉴了微软 eShopOnContainers 项目.
EventBus 处理流程:
微服务间使用 EventBus 实现系统间解耦:
借助 EventBus 我们可以很好的实现组件之间, 服务之间, 系统之间的解耦以及相互通信的问题.
起初觉得 EventBus 和 MQ 其实差不多嘛, 都是通过异步处理来实现解耦合, 高性能. 后来看到了下面这张图才算明白为什么要用 EventBus 以及 EventBus 和 MQ 之间的关系, EventBus 是抽象的, 可以用 MQ 来实现 EventBus.
为什么要使用 EventBus
解耦合 (轻松的实现系统间解耦)
高性能可扩展 (每一个事件都是简单独立且不可更改的对象, 只需要保存新增的事件, 不涉及其他的变更删除操作)
系统审计 (每一个事件都是不可变更的, 每一个事件都是可追溯的)
...
EventBus 整体架构:
IEventBase : 所有的事件应该实现这个接口, 这个接口定义了事件的唯一 id EventId 和事件发生的事件 EventAt
IEventHandler: 定义了一个 Handle 方法来处理相应的事件
IEventStore: 所有的事件的处理存储, 保存事件的 IEventHandler, 一般不会直接操作, 通过 EventBus 的订阅和取消订阅来操作 EventStore
IEventBus: 用来发布 / 订阅 / 取消订阅事件, 并将事件的某一个 IEventHandler 保存到 EventStore 或从 EventStore 中移除
使用示例
来看一个使用示例, 完整代码示例:
- internal class EventTest
- {
- public static void MainTest()
- {
- var eventBus = DependencyResolver.Current.ResolveService<IEventBus>();
- eventBus.Subscribe<CounterEvent, CounterEventHandler1>();
- eventBus.Subscribe<CounterEvent, CounterEventHandler2>();
- eventBus.Subscribe<CounterEvent, DelegateEventHandler<CounterEvent>>();
- eventBus.Publish(new CounterEvent { Counter = 1 });
- eventBus.Unsubscribe<CounterEvent, CounterEventHandler1>();
- eventBus.Unsubscribe<CounterEvent, DelegateEventHandler<CounterEvent>>();
- eventBus.Publish(new CounterEvent { Counter = 2 });
- }
- }
- internal class CounterEvent : EventBase
- {
- public int Counter { get; set; }
- }
- internal class CounterEventHandler1 : IEventHandler<CounterEvent>
- {
- public Task Handle(CounterEvent @event)
- {
- LogHelper.GetLogger<CounterEventHandler1>().Info($"Event Info: {@event.ToJson()}, Handler Type:{GetType().FullName}");
- return Task.CompletedTask;
- }
- }
- internal class CounterEventHandler2 : IEventHandler<CounterEvent>
- {
- public Task Handle(CounterEvent @event)
- {
- LogHelper.GetLogger<CounterEventHandler2>().Info($"Event Info: {@event.ToJson()}, Handler Type:{GetType().FullName}");
- return Task.CompletedTask;
- }
- }
具体实现
实现:
EventStoreInMemory 是 IEventStore 将数据放在内存中的实现, 使用了 ConcurrentDictionary 以及 HashSet 来尽可能的保证高效, 具体实现代码如下:
- public class EventStoreInMemory : IEventStore
- {
- private readonly ConcurrentDictionary<string, HashSet<Type>> _eventHandlers = new ConcurrentDictionary<string, HashSet<Type>>();
- public bool AddSubscription<TEvent, TEventHandler>()
- where TEvent : IEventBase
- where TEventHandler : IEventHandler<TEvent>
- {
- var eventKey = GetEventKey<TEvent>();
- if (_eventHandlers.ContainsKey(eventKey))
- {
- return _eventHandlers[eventKey].Add(typeof(TEventHandler));
- }
- else
- {
- return _eventHandlers.TryAdd(eventKey, new HashSet<Type>()
- {
- typeof(TEventHandler)
- });
- }
- }
- public bool Clear()
- {
- _eventHandlers.Clear();
- return true;
- }
- public ICollection<Type> GetEventHandlerTypes<TEvent>() where TEvent : IEventBase
- {
- if(_eventHandlers.Count == 0)
- return new Type[0];
- var eventKey = GetEventKey<TEvent>();
- if (_eventHandlers.TryGetValue(eventKey, out var handlers))
- {
- return handlers;
- }
- return new Type[0];
- }
- public string GetEventKey<TEvent>()
- {
- return typeof(TEvent).FullName;
- }
- public bool HasSubscriptionsForEvent<TEvent>() where TEvent : IEventBase
- {
- if(_eventHandlers.Count == 0)
- return false;
- var eventKey = GetEventKey<TEvent>();
- return _eventHandlers.ContainsKey(eventKey);
- }
- public bool RemoveSubscription<TEvent, TEventHandler>()
- where TEvent : IEventBase
- where TEventHandler : IEventHandler<TEvent>
- {
- if(_eventHandlers.Count == 0)
- return false;
- var eventKey = GetEventKey<TEvent>();
- if (_eventHandlers.ContainsKey(eventKey))
- {
- return _eventHandlers[eventKey].Remove(typeof(TEventHandler));
- }
- return false;
- }
- }
的实现, 从上面可以看到 EventStore 保存的是 IEventHandler 对应的 Type, 在 Publish 的时候根据 Type 从 IoC 容器中取得相应的 Handler 即可, 如果没有在 IoC 容器中找到对应的类型, 则会尝试创建一个类型实例, 然后调用 IEventHandler 的 Handle 方法, 代码如下:
- /// <summary>
- /// EventBus in process
- /// </summary>
- public class EventBus : IEventBus
- {
- private static readonly ILogHelperLogger Logger = Helpers.LogHelper.GetLogger<EventBus>();
- private readonly IEventStore _eventStore;
- private readonly IServiceProvider _serviceProvider;
- public EventBus(IEventStore eventStore, IServiceProvider serviceProvider = null)
- {
- _eventStore = eventStore;
- _serviceProvider = serviceProvider ?? DependencyResolver.Current;
- }
- public bool Publish<TEvent>(TEvent @event) where TEvent : IEventBase
- {
- if (!_eventStore.HasSubscriptionsForEvent<TEvent>())
- {
- return false;
- }
- var handlers = _eventStore.GetEventHandlerTypes<TEvent>();
- if (handlers.Count> 0)
- {
- var handlerTasks = new List<Task>();
- foreach (var handlerType in handlers)
- {
- try
- {
- if (_serviceProvider.GetServiceOrCreateInstance(handlerType) is IEventHandler<TEvent> handler)
- {
- handlerTasks.Add(handler.Handle(@event));
- }
- }
- catch (Exception ex)
- {
- Logger.Error(ex, $"handle event [{_eventStore.GetEventKey<TEvent>()}] error, eventHandlerType:{handlerType.FullName}");
- }
- }
- handlerTasks.WhenAll().ConfigureAwait(false);
- return true;
- }
- return false;
- }
- public bool Subscribe<TEvent, TEventHandler>()
- where TEvent : IEventBase
- where TEventHandler : IEventHandler<TEvent>
- {
- return _eventStore.AddSubscription<TEvent, TEventHandler>();
- }
- public bool Unsubscribe<TEvent, TEventHandler>()
- where TEvent : IEventBase
- where TEventHandler : IEventHandler<TEvent>
- {
- return _eventStore.RemoveSubscription<TEvent, TEventHandler>();
- }
- }
项目实例
来看一个实际的项目中的使用, 在我的活动室预约项目中有一个公告的模块, 访问公告详情页面, 这个公告的访问次数加 1, 把这个访问次数加 1 改成了用 EventBus 来实现, 实际项目代码:
定义 Event 以及 EventHandler
- public class NoticeViewEvent : EventBase
- {
- public Guid NoticeId { get; set; }
- // UserId
- // IP
- // ...
- }
- public class NoticeViewEventHandler : IEventHandler<NoticeViewEvent>
- {
- public async Task Handle(NoticeViewEvent @event)
- {
- await DependencyResolver.Current.TryInvokeServiceAsync<ReservationDbContext>(async dbContext =>
- {
- var notice = await dbContext.Notices.FindAsync(@event.NoticeId);
- notice.NoticeVisitCount += 1;
- await dbContext.SaveChangesAsync();
- });
- }
- }
这里的 Event 只定义了一个 NoticeId , 其实也可以把请求信息如 IP/UA 等信息加进去, 在 EventHandler 里处理以便日后数据分析.
注册 EventBus 相关服务以及 EventHandlers
- services.AddSingleton<IEventBus, EventBus>();
- services.AddSingleton<IEventStore, EventStoreInMemory>();
- //register EventHandlers
- services.AddSingleton<NoticeViewEventHandler>();
订阅事件
- public void Configure(IApplicationBuilder App, IHostingEnvironment env, ILoggerFactory loggerFactory, IEventBus eventBus)
- {
- eventBus.Subscribe<NoticeViewEvent, NoticeViewEventHandler>();
- // ...
- }
发布事件
- eventBus.Publish(new NoticeViewEvent {
- NoticeId = notice.NoticeId
- });
- Reference
- https://github.com/sheng-jie/EventBus
- https://github.com/WeihanLi/ActivityReservation
来源: https://www.cnblogs.com/weihanli/p/implement-a-simple-event-bus.html