本篇是 "事件驱动的微服务" 系列的第二篇, 主要讲述事件驱动设计. 如果想要了解总体设计, 请看第一篇 "事件驱动的微服务 - 总体设计"
程序流程
我们通过一个具体的例子来讲解事件驱动设计. 本文中的程序有两个微服务, 一个是订单服务 (Order Service), 另一个是支付服务(Payment Service). 用户调用订单服务的用例 createOrder() 来创建订单, 创建之后的订单暂时还没有支付信息, 订单服务然后发布命令 (Command) 给支付服务, 支付服务完成支付, 发送支付完成 (Payment Created) 消息. 订单服务收到消息(Event), 在 Order 表里增加 Payment_Id 并修改订单状态为 "已付款".
下面就是组件图:
事件处理
事件分成内部事件和外部事件, 内部事件是存在于一个微服务内部的事件, 不与其他微服务共享. 如果用 DDD 的语言来描述就是在有界上下文 (Bounded Context) 内的域事件 (Domain Event). 外部事件是从一个微服务发布, 而被其他微服务接收的事件. 如果用 DDD 的语言来描述就是在不同有界上下文(Bounded Context) 之间传送的域事件(Domain Event). 这两种事件的处理方式不同.
内部事件:
对于内部事件的处理早已有了成熟的方法, 它的基本思路是创建一个事件总线 (Event Bus), 由它来监听事件. 然后注册不同的事件处理器(Event Handler) 来处理事件. 这种思路被广泛地用于各种领域.
下面就是事件总线 (Event Bus) 的接口, 它有两个函数, 一个是发布事件(Publish Event), 另一个是添加事件处理器(Event Handler). 一个事件可以有一个或多个事件处理器.
- type EventBus interface {
- PublishEvent(EventMessage)
- AddHandler(EventHandler, ...interface{})
- }
事件总线的代码的关键部分是加载事件处理器. 我们以订单服务为例, 下面就是加载事件处理器 (Event Handler) 的代码, 它是初始化容器代码的一部分. 在这段代码中, 它只注册了一个事件, 支付完成事件(PaymentCreateEvent), 和与之相对应的事件处理器 - 支付完成事件处理器(PaymentCreatedEventHandler).
- func loadEventHandler(c servicecontainer.ServiceContainer) error {
- var value interface{}
- var found bool
- rluf, err := containerhelper.BuildModifyOrderUseCase(&c)
- if err != nil {
- return err
- }
- pceh := event.PaymentCreatedEventHandler{rluf}
- if value, found = c.Get(container.EVENT_BUS); !found {
- message := "can't find key="+ container.EVENT_BUS +" in container "
- return errors.New(message)
- }
- eb := value.(ycq.EventBus)
- eb.AddHandler(pceh,&event.PaymentCreatedEvent{})
- return nil
- }
由于在处理事件时要调用相应的用例, 因此需要把用例注入到事件处理器中. 在上段代码中, 首先从容器中获得用例, 然后创建事件处理器, 最后把事件和与之对应的处理器加入到事件总线中.
事件的发布是通过调用事件总线的 PublishEvent()来实现的. 下面的例子就是在订单服务中通过消息中间件来监听来自外部的支付完成事件(PaymentCreatedEvent), 收到后, 把它转化成内部事件, 然后发送到事件总线上, 这样已经注册的事件处理器就能处理它了.
- eb := value.(ycq.EventBus)
- subject := config.SUBJECT_PAYMENT_CREATED
- _, err := ms.Subscribe(subject, func(pce event.PaymentCreatedEvent) {
- cpm := pce.NewPaymentCreatedDescriptor()
- logger.Log.Debug("payload:",pce)
- eb.PublishEvent(cpm)
- })
那么事件是怎样被处理的呢? 关键就在 PublishEvent 函数. 当一个事件发布时, 事件总线会把所有注册到该事件的事件处理器的 Handle()函数依次调用一遍, 下面就是 PublishEvent()的代码. 这样每个事件处理器只要实现 Handle()函数就可以了.
- func (b *InternalEventBus) PublishEvent(event EventMessage) {
- if handlers, ok := b.eventHandlers[event.EventType()]; ok {
- for handler := range handlers {
- handler.Handle(event)
- }
- }
- }
下面就是 PaymentCreatedEventHandler 的代码. 它的逻辑比较简单, 就是从 Event 里获得需要的支付信息, 然后调用相应的用例来完成 UpdatePayment()功能.
- type PaymentCreatedEventHandler struct {
- Mouc usecase.ModifyOrderUseCaseInterface
- }
- func(pc PaymentCreatedEventHandler) Handle (message ycq.EventMessage) {
- switch event := message.Event().(type) {
- case *PaymentCreatedEvent:
- status := model.ORDER_STATUS_PAID
- err := pc.Mouc.UpdatePayment(event.OrderNumber, event.Id,status)
- if err != nil {
- logger.Log.Errorf("error in PaymentCreatedEventHandler:", err)
- }
- default:
- logger.Log.Errorf("event type mismatch in PaymentCreatedEventHandler:")
- }
- }
我在这里用到了一个第三方库 "jetbasrawi/go.cqrs" https://github.com/jetbasrawi/go.cqrs 来处理 Eventbus.Jetbasrawi 是一个事件溯源 (Event Sourcing) 的库. 事件溯源与事件驱动很容易搞混, 它们看起来有点像, 但实际上是完全不同的两个东西. 事件驱动是微服务之间的一种调用方式, 存在于微服务之间, 与 RPC 的调用方式相对应; 而事件溯源是一种编程模式, 你可以在微服务内部使用它或不使用它. 但我一时找不到事件驱动的库, 就先找一个事件溯源的库来用. 其实自己写一个也很简单, 但我不觉得能写的比 jetbasrawi 更好, 那就还是先用它把. 不过事件溯源要比事件驱动复杂, 因此用 Jetbasrawi 可能有点大材小用了.
外部事件:
外部事件的不同之处是它要在微服务之间进行传送, 因此需要消息中间件. 我定义了一个通用接口, 这样可以支持不同的消息中间件. 它的最重要的两个函数是 publish()和 Subscribe().
- package gmessaging
- type MessagingInterface interface {
- Publish(subject string, i interface{}) error
- Subscribe(subject string, cb interface{} ) (interface{}, error)
- Flush() error
- // Close will close the decorated connection (For example, it could be the coded connection)
- Close()
- // CloseConnection will close the connection to the messaging server. If the connection is not decorated, then it is
- // the same with Close(), otherwise, it is different
- CloseConnection()
- }
由于定义了通用接口, 它可以支持多种消息中间件, 我这里选的是 "NATS" 消息中间件. 当初选它是因为它是云原生计算基金会 ("CNCF" https://www.cncf.io/services-for-projects/ ) 的项目, 而且功能强大, 速度也快. 如果想了解云原生概念, 请参见 "云原生的不同解释及正确含义"
下面的代码就是 NATS 的实现, 如果你想换用别的消息中间件, 可以参考下面的代码.
- type Nat struct {
- Ec *nats.EncodedConn
- }
- func (n Nat) Publish(subject string, i interface{}) error {
- return n.Ec.Publish(subject,i)
- }
- func (n Nat) Subscribe(subject string, i interface{} ) (interface{}, error) {
- h := i.(nats.Handler)
- subscription, err :=n.Ec.Subscribe(subject, h)
- return subscription, err
- }
- func (n Nat) Flush() error {
- return n.Ec.Flush()
- }
- func (n Nat) Close() {
- n.Ec.Close()
- }
- func (n Nat) CloseConnection() {
- n.Ec.Conn.Close()
- }
"Publish(subject string, i interface{})" 有两个参数,"subject" 是消息中间件的队列 (Queue) 或者是主题 (Topic). 第二个参数是要发送的信息, 它一般是 JSON 格式. 使用消息中间件时需要一个链接(Connection), 这里用的是 "*nats.EncodedConn", 它是一个封装之后的链接, 它里面含有一个 JSON 解码器, 可以支持在结构(struct) 和 JSON 之间进行转换. 当你调用发布函数时, 发送的是结构 (struct), 解码器自动把它转换成 JSON 文本再发送出去."Subscribe(subject string, i interface{} )" 也有两个参数, 第一个与 Publish() 的一样, 第二个是事件驱动器. 当接收到 JSON 文本后, 解码器自动把它转换成结构(struct), 然后调用事件处理器.
我把与消息中间件有关的代码写成了一个单独的第三方库, 这样不论你是否使用本框架都可以使用这个库. 详细信息参见 "jfeng45/gmessaging" https://github.com/jfeng45/gmessaging
命令
命令 (Command) 在代码实现上和事件 (Event) 非常相似, 但他们在概念上完全不同. 例如支付申请 (Make Payment) 是命令, 是你主动要求第三方 (支付服务) 去做一件事情, 而且你知道这个第三方是谁. 支付完成 (Payment Created) 是事件, 是你在汇报一件事情已经做完, 而其他第三方程序可能会根据它的结果来决定是否要做下一步的动作, 例如订单服务当收到支付完成这个事件时, 就可以更改自己的订单状态为 "已支付". 这里, 事件的发送方并不知道谁会对这条消息感兴趣, 因此这个发送是广播式发送. 而且这个动作 (支付) 已经完成, 而命令是尚未完成的动作, 因此接收方可以选择拒绝执行一条命令. 我们平常经常讲的事件驱动是松耦合, 而 RPC 是紧耦合, 这里指的是事件方式, 而不是命令方式. 采用命令方式时, 由于你已经知道了要发给谁, 因此是紧耦合的.
在实际应用中, 我们所看到的大部分的命令都是在一个微服务内部使用, 很少有在微服务之间发送命令的, 微服务之间传递的主要是事件. 但由于事件和命令很容易混淆, 有不少在微服务之间传递的 "事件" 实际上是 "命令". 因此并不是使用事件驱动方式就能把程序变成松耦合的, 而要进一步检查你是否将 "命令" 错用成了 "事件". 在本程序中会严格区分它们.
下面就是命令总线 (Dispatcher) 的接口, 除了函数名字不一样外, 其他与事件总线几乎一模一样.
- type Dispatcher interface {
- Dispatch(CommandMessage) error
- RegisterHandler(CommandHandler, ...interface{}) error
- }
我们完全可以把它定义成下面的样子, 是不是就与事件总线很像了? 下面的接口和上面的是等值的.
- type CommandBus interface {
- PublishCommand(CommandMessage) error
- AddHandler(CommandHandler, ...interface{}) error
- }
事件和命令的其他方面, 例如定义方式, 处理流程, 实现方式, 传送方式也几乎一模一样. 详细的我就不讲了, 你可以自己看代码进行比较. 那我们可不可以只用一个事件总线同时处理时间和命令呢? 理论上来讲是没有问题的. 我开始的时候也是这么想的, 但由于现在的接口 ("jetbasrawi/go.cqrs") 不支持, 如果要改的话需要重新定义接口, 因此就暂时放弃了. 另外, 他们两个在概念上还是很不同的, 所以在实现上定义不同的接口也是有必要的.
事件和命令设计
下面来讲解在设计事件驱动时应注意的问题.
结构设计
事件驱动模式与 RPC 相比增加的部分是事件和命令. 因此首先要考虑的是要对 RPC 的程序结构做哪些扩充和怎样扩充."Event" 和 "command" 从本质上来讲是业务逻辑的一部分, 因此应属于领域层. 因此在程序结构上也增加了两个目录 "Event" 和 "command" 分别用来存放事件和命令. 结构如下图所示.
发送和接收的不同处理方式
现在的代码在处理外部事件时, 在发送端和接收端的方式是不一样的.
下面就是发送端的代码(代码在支付服务项目里), 整个代码功能是创建支付, 完成之后再发布 "支付完成" 消息. 它直接通过消息中间件接口把事件发送出去.
- type MakePaymentUseCase struct {
- PaymentDataInterface dataservice.PaymentDataInterface
- Mi gmessaging.MessagingInterface
- }
- func (mpu *MakePaymentUseCase) MakePayment(payment *model.Payment) (*model.Payment, error) {
- payment, err := mpu.PaymentDataInterface.Insert(payment)
- if err!= nil {
- return nil, errors.Wrap(err, "")
- }
- pce := event.NewPaymentCreatedEvent(*payment)
- err = mpu.Mi.Publish(config.SUBJECT_PAYMENT_CREATED, pce)
- if err != nil {
- return nil, err
- }
- return payment, nil
- }
下面就是接收端的代码例子. 是它是先用消息接口接收时间, 再把外部事件转化为内部事件, 然后调用事件总线的接口在微服务内部发布事件.
- eb := value.(ycq.EventBus)
- subject := config.SUBJECT_PAYMENT_CREATED
- _, err := ms.Subscribe(subject, func(pce event.PaymentCreatedEvent) {
- cpm := pce.NewPaymentCreatedDescriptor()
- logger.Log.Debug("payload:",pce)
- eb.PublishEvent(cpm)
- })
为什么会有这种不同? 在接收时, 可不可以不生成内部事件, 而是直接调用用例来处理外部事件呢? 在发送时, 如果没有别的内部事件处理器, 那么直接调用消息中间件来发送是最简单的方法(这个发送过程是轻量级的, 耗时很短). 而接收时可能需要处理比较复杂的业务逻辑. 因此你希望把这个过程分成接收和处理两个部分, 让复杂的业务逻辑在另外一个过程里处理, 这样可以尽量缩短接收时间, 提高接收效率.
是否需每个事件都要单独的事件处理器?
现在的设计是每个事件和事件驱动器都有一个单独的文件. 我见过有些人只用一个文件例如 PaymentEvent 来存放所有与 Payment 相关的事件, 事件驱动器也是一样. 这两种办法都是可行的. 现在看来, 生成单独的文件比较清晰, 但如果以后事件非常多, 也许一个文件存放多个事件会比较容易管理, 不过到那时再改也不迟.
事件处理逻辑放在哪?
对于一个好的设计来讲, 所有的业务逻辑都应该集中在一起, 这样便于管理. 在现在的架构里, 业务逻辑是放在用例 (Use Case) 里的, 但事件处理器里也需要有业务逻辑, 应该怎么办? 支付事件处理器的主要功能是修改订单中的付款信息, 这部分的业务逻辑已经体现在修改订单 (Modify Order) 用例里, 因此支付事件处理器只要调用修改订单的 MakePayment()函数就可以了. 实际上所有的事件处理器都应该这样设计, 它们本身不应包含业务逻辑, 而只是一个简单的封装, 去调用用例里的业务逻辑. 那么可不可以直接在用例里定义 Handle()函数, 这样用例就变成了事件处理器? 这样的设计确实可行, 但我觉得把事件处理器做成一个单独的文件, 这样逻辑上更清晰. 因为修改订单付款功能你是一定要有的, 但事件处理器只有在事件驱动模式下才有, 它们是属于两个不同层面的东西, 只有分开放置才层次清晰.
源程序:
完整的源程序链接:
- "订单服务(Order Service)" https://github.com/jfeng45/order
- "支付服务(Payment Service)" https://github.com/jfeng45/payment
索引:
- "事件驱动的微服务 - 总体设计"
- "jetbasrawi/go.cqrs" https://github.com/jetbasrawi/go.cqrs
- "CNCF" https://www.cncf.io/services-for-projects/
- "云原生的不同解释及正确含义"
- "NATS"
- "jfeng45/gmessaging" https://github.com/jfeng45/gmessaging
来源: https://www.cnblogs.com/code-craftsman/p/12712922.html