什么是 EventBus
EventBus 是对发布 - 订阅模式的一种实现其以一种非常优雅的方式实现了组件间的解耦与通信, 在 Android 开发 DDD 等领域都有非常广泛的应用
事件流大致如下:
Producer 向 EventBus 发送事件
EventBus 向所有监听了该事件的 Consumer 推送事件
监听了该事件的 Consumer 消费事件
注: 一个组件即可以是 Producer, 也可以是 Consumer
分布式服务间的 EventBus
在分布式系统中, 事件在服务之间的传递要比单机 EventBus 复杂很多有没有一种适用于分布式服务之间的, 并且事件传递就像单机一样简单的 EventBus 呢? 在 GitHub 上搜索了 JAVA 实现的 EventBus, 排名前十的几乎都是用于 Android 或 JAVA 的单机事件总线良久之后... 还是自己动手吧集群环境下的 EventBus 比单机版需要多考虑一些问题, 比如:
服务集群部署的情况下, 如何保证每个集群均可订阅该事件, 且每个集群只能消费一次该事件
如何实现一个服务内部多个 `xxxService` 订阅同一事件
解决方案:
使用 `kafka` 实现集群间的发布订阅 (基于 `topic`), 同一集群处于同一个 kafka 的 consumer-group 来保证每个集群只会消费一次该事件
服务在启动时可反射获得所有实现了 `IEventHandler<TEventArg>` 的类并缓存, 服务消费消息时获取所有注册了该消息的 handler 并调用其 `HandleEvent` 方法
部分关键源码
1 事件消息的定义
- public abstract class EventArg implements IEventArg {
- private Date eventTime;
- public EventArg() {
- eventTime = new Date();
- }
- public Date getEventTime() {
- return eventTime;
- }
- public void setEventTime(Date eventTime) {
- this.eventTime = eventTime;
- }
- }
事件消息默认记录创建时间, 可扩展其他字段, 比如发送时间标识等
2 使用 spring-kafka 发送消息
- /**
- * kafka 事件注册器, 向 kafka 队列中 push 消息
- */
- @Component
- public class KafkaRegister implements IEventRegister {
- @Autowired(required = false)
- private KafkaTemplate<String,IEventArg> kafkaTemplate;
- /**
- * 事件注册
- *
- * @param eventArg 事件参数
- */
- @Override
- public void regist(IEventArg eventArg) {
- kafkaTemplate.send(getTopic(eventArg),eventArg);
- }
- /**
- * 获取 kafka 的 topic
- *
- *
- * @param eventArg
- * @return topic
- */
- private String getTopic(IEventArg eventArg){
- return eventArg.getClass().getName();
- }
- }
3 消费 kafka 消息并执行当前服务中所有订阅了该消息的事件
- /**
- * kafka 事件监听器
- */
- public class KafkaEventArgListener implements MessageListener<String,EventArg> {
- @Autowired
- private IEventHandlerFactory eventHandlerFactory;
- @Override
- public void onMessage(ConsumerRecord<String, EventArg> consumerRecord) {
- if (consumerRecord == null) return;
- EventArg value = consumerRecord.value();
- Set<IEventHandler> handlers = eventHandlerFactory.getHandlers(value);
- if (handlers == null) return;
- for (IEventHandler handler : handlers) {
- handler.HandleEvent(value);
- }
- }
- }
EventBus 的使用
1 事件的定义所有事件均继承于上文 EventArg 抽象类, 示例如下:
- public class TestEventArg extends EventArg {
- private String value;
- public String getValue() {
- return value;
- }
- public void setValue(String value) {
- this.value = value;
- }
- }
2 事件发布示例代码:
eventBus.push(new TestEventArg());
3 事件订阅一个服务发布事件之后, 任何服务中的任何 `xxxServiceImpl` 类均可订阅该事件, 实现 `IEventHandler<TEventArg>` 接口即可完成事件的订阅, 示例如下:
- public class FormServiceImpl extends AbstractServiceImpl<Form> implements FormService,IEventHandler<TestEventArg> {
- @Override
- public void HandleEvent(TestEventArg eventArg) {
- System.out.println("notify zero======");
- }
- }
整体来说, 使用还是很简单的, EventBus 实现与使用示例收录于 bird-java 项目中, 项目地址: https://github.com/liuxx001/bird-java
来源: https://www.cnblogs.com/chentaotao/p/8398943.html