微服务之间是相互独立的, 不像单个工程一样各个模块之间可以直接通过方法调用实现通信, 相互独立的服务直接一般的通信方式是使用 HTTP 协议, rpc 协议或者使用消息中间件如 RabbitMQ``Kafka 等
在这篇文章 使用 Golang 和 MongoDB 构建微服务 已经实现了一个微服务的应用, 在文章中已经实现了各个服务直接的通信, 是使用的 HTTP 的形式 , 那各个服务之间如何通过 RabbitMQ 进行消息通信呢, 我们现在要实现一个功能, 就是一个用户预订电影票的接口, 需要服务 User Service(port 8000) 和 服务 **Booking Service(port 8003)** 之间通信, 用户预订之后, 把预订信息写入到 booking 的数据库中
安装 RabbitMQ
安装 RabbitMQ 之前需要先安装 Erlang http://www.erlang.org/downloads 的环境 , 然后下载安装 RabbitMQ http://www.rabbitmq.com/download.html , 请选择对应的版本, 安装完成之后, RabbitMQ 在 Windows 上是作为一个服务在后台运行, 关于 RabbitMQ 的接口如何使用, 请参考官网的 教程 http://www.rabbitmq.com/getstarted.html , 有各个主流语言的实现我们使用的是 Go 版本, 请下载对应的实现接口
go get github.com/streadway/amqp
对 RabbitMQ 的接口做一下简单的封装
定义一个接口
- messaging/message.go
- type IMessageClient interface {
- ConnectToBroker(connectionStr string) error
- PublishToQueue(data []byte, queueName string) error
- SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error
- Close()
- }
- type MessageClient struct {
- conn *amqp.Connection
- }
复制代码
连接接口
- func (m *MessageClient) ConnectToBroker(connectionStr string) error {
- if connectionStr == "" {
- panic("the connection str mustnt be null")
- }
- var err error
- m.conn, err = amqp.Dial(connectionStr)
- return err
- }
复制代码
发布消息接口
- func (m *MessageClient) PublishToQueue(body []byte, queueName string) error {
- if m.conn == nil {
- panic("before publish you must connect the RabbitMQ first")
- }
- ch, err := m.conn.Channel()
- defer ch.Close()
- failOnError(err, "Failed to open a channel")
- q, err := ch.QueueDeclare(
- queueName,
- false,
- false,
- false,
- false,
- nil,
- )
- failOnError(err, "Failed to declare a queue")
- err = ch.Publish(
- "",
- q.Name,
- false,
- false,
- amqp.Publishing{
- ContentType: "application/json",
- Body: body,
- },
- )
- failOnError(err, "Failed to publish a message")
- return nil
- }
复制代码
订阅消息接口
- func (m *MessageClient) SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error {
- ch, err := m.conn.Channel()
- //defer ch.Close()
- failOnError(err, "Failed to open a channel")
- q, err := ch.QueueDeclare(
- queueName,
- false,
- false,
- false,
- false,
- nil,
- )
- failOnError(err, "Failed to declare a queue")
- msgs, err := ch.Consume(
- q.Name,
- "",
- true,
- false,
- false,
- false,
- nil,
- )
- failOnError(err, "Failed to register a consumer")
- go consumeLoop(msgs, handlerFunc)
- return nil
- }
复制代码
实现通信
在 User Service 中定义一个新的 POST 接口
/user/{name}/booking
, 实现用户的预订功能, 预订之后, 通过 RabbitMQ 发布一个消息给 Booking Service,Booking Service 接收到消息之后, 做相应的处理 (写入数据库)
User Service
初始化 MessageClient
- users/controllers/user.go
- var client messaging.IMessageClient
- func init() {
- client = &messaging.MessageClient{}
- err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")
- if err != nil {
- fmt.Println("connect to rabbitmq error", err)
- }
- }
复制代码
添加新的路由和实现
- routes.go
- register("POST", "/user/{name}/booking", controllers.NewBooking, nil)
复制代码
- users/controllers/user.go
- func NewBooking(w http.ResponseWriter, r *http.Request) {
- params := mux.Vars(r)
- user_name := params["name"]
- defer r.Body.Close()
- var bookings models.Booking
- body, _ := ioutil.ReadAll(r.Body)
- err := json.Unmarshal(body, &bookings)
- if err != nil {
- fmt.Println("the format body error", err)
- }
- fmt.Println("user name:", user_name, bookings)
- go notifyMsg(body)
- }
复制代码
用一个协程实现消息的发布
- func notifyMsg(body []byte) {
- err := client.PublishToQueue(body, "new_booking")
- if err != nil {
- fmt.Println("Failed to publis message", err)
- }
- }
复制代码
Booking Service
初始化 MessageClient
- var client messaging.IMessageClient
- func initMessage() {
- client = &messaging.MessageClient{}
- err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")
- if err != nil {
- fmt.Println("Failed to connect to RabbitMQ", err)
- }
- err = client.SubscribeToQueue("new_booking", getBooking)
- if err != nil {
- fmt.Println("Failed to comsuer the msg", err)
- }
- }
复制代码
在 web 服务之前启动
- func main() {
- initMessage()
- r := routes.NewRouter()
- http.ListenAndServe(":8003", r)
- }
复制代码
接收后的消息处理
- func getBooking(delivery amqp.Delivery) {
- var booking models.Booking
- json.Unmarshal(delivery.Body, &booking)
- booking.Id = bson.NewObjectId().Hex()
- dao.Insert("Booking", "BookModel", booking)
- fmt.Println("the booking msg", booking)
- }
复制代码
验证, 需要启动 User Service 和 Booking Service
使用 Postman 发送对应的数据
- post 127.0.0.1:8000/user/kevin_woo/booking
- {
- "name":"kevin_woo",
- "books":[
- {
- "date":"20180727",
- "movies":["5b4c45d49d5e3e33c4a5b97a"]
- },
- {
- "date":"20180810",
- "movies":["5b4c45ea9d5e3e33c4a5b97b"]
- }
- ]
- }
复制代码
可以看到数据库已经有了一条新的预订信息
说明, 我这里 POST 的数据就是 booking 数据库中的结构, 实际情况需要对数据进行封装处理, 在 POST 数据时, 没有对数据进行验证, 在实际开发过程中需要对各个数据做相应的验证, 这里主要是看一下 RabbitMQ 的消息传递处理的过程
源码 Github https://github.com/coderminer/microservice
来源: https://juejin.im/post/5b4c6463f265da0fa50a0b65