在微服务架构下, 原单体服务被拆分为多个微服务独立部署, 客户端就无法知晓服务的具体位置; 而且服务数量太多, 维护如此多的服务地址, 运维人员也无法高效工作.
因此, 在微服务架构中引入了服务注册中心, 用于接受和维护各个服务的地址信息. 客户端或者网关可以通过注册中心查询目标服务地址, 动态实现服务访问, 并且在此实现服务负载均衡.
对于服务注册与发现, go-kit 默认提供了对 consul,zookeeper,etcd,eureka 常用注册中心的支持.
概述
本文将基于 consul, 使用 "客户端发现模式" 进行实战演练, 主要有以下要点:
consul 使用 docker 镜像 (progrium/consul) 单机部署;
算术服务以名称 arithmetic 注册至 consul;
编写发现服务通过 consul 查询服务实例, 基于 RoundRibbon 进行负载均衡.
本文实例程序采用的思路为: 算术服务注册至 consul, 其他部分保持不变; 发现服务对外暴露 http 接口, 接受请求后(接收请求内容存储在 Body 中, 以 JSON 方式传递), 按照 go-kit 的机制动态查询算术服务实例, 调用算术服务的接口, 然后将响应内容返回. 如下图所示:
启动 consul
修改
docker/docker-compose.YAML
, 如下所示(暂时注释了 Prometheus 和 Grafana 的部分).
- version: '2'
- services:
- consul:
- image: progrium/consul:latest
- ports:
- - 8400:8400
- - 8500:8500
- - 8600:53/udp
- hostname: consulserver
- command: -server -Bootstrap -ui-dir /ui
启动 docker. 在终端切换至项目目录, 执行以下命令:
sudo docker-compose -f docker/docker-compose.YAML up
通过浏览器访问
http://localhost:8500
, 出现以下界面即为启动成功.
服务注册
Step-1: 代码准备
本示例基于 arithmetic_monitor_demo 代码进行改写. 首先, 复制该目录并重命名为 arithmetic_consul_demo; 新建两个目录, 分别命名为 register,discover; 将原有 go 代码文件移动至 register 目录. 结果如下图所示:
另外, 需要下载所依赖的第三方库 uuid 和 hashicorp/consul
- go get GitHub.com/pborman/uuid
- go get GitHub.com/hashicorp/consul
Step-2: 实现注册方法
新建 register/register.go, 添加 Register 方法, 实现向 consul 的注册逻辑. 该方法接收 5 个参数, 分别是注册中心 consul 的 ip, 端口, 算术服务的本地 ip 和端口, 日志记录工具.
创建注册对象需要使用 hashicorp/consul, 查看代码可知其方法定义如下:
func NewRegistrar(client Client, r *stdconsul.AgentServiceRegistration, logger log.Logger) *Registrar
所以 Register 的实现过程主要有三步: 创建 consul 客户端对象; 创建 consul 对算术服务健康检查的参数配置信息; 创建算术服务向 consul 注册的服务配置信息. 代码如下:
- func Register(consulHost, consulPort, svcHost, svcPort string, logger log.Logger) (registar sd.Registrar) {
- // 创建 Consul 客户端连接
- var client consul.Client
- {
- consulCfg := API.DefaultConfig()
- consulCfg.Address = consulHost + ":" + consulPort
- consulClient, err := API.NewClient(consulCfg)
- if err != nil {
- logger.Log("create consul client error:", err)
- os.Exit(1)
- }
- client = consul.NewClient(consulClient)
- }
- // 设置 Consul 对服务健康检查的参数
- check := API.AgentServiceCheck{
- HTTP: "http://" + svcHost + ":" + svcPort + "/health",
- Interval: "10s",
- Timeout: "1s",
- Notes: "Consul check service health status.",
- }
- port, _ := strconv.Atoi(svcPort)
- // 设置微服务想 Consul 的注册信息
- reg := API.AgentServiceRegistration{
- ID: "arithmetic" + uuid.New(),
- Name: "arithmetic",
- Address: svcHost,
- Port: port,
- Tags: []string{"arithmetic", "raysonxin"},
- Check: &check,
- }
- // 执行注册
- registar = consul.NewRegistrar(client, ®, logger)
- return
- }
Step-3: 实现健康检查接口
由 Step-2 可知, consul 将定时请求算术服务的 / heath 用于检查服务的健康状态, 所以我们将从 service,endpoint,transport 中增加对应的实现.
在接口 Service 中新增接口方法 HealthCheck, 并依次在 ArithmeticService,loggingMiddleware,metricMiddleware 中添加实现.
- // service 接口
- // Service Define a service interface
- type Service interface {
- // 省略之前的其他方法
- // HealthCheck check service health status
- HealthCheck() bool
- }
- // ArithmeticService 实现 HealthCheck
- // HealthCheck implement Service method
- // 用于检查服务的健康状态, 这里仅仅返回 true.
- func (s ArithmeticService) HealthCheck() bool {
- return true
- }
- // loggingMiddleware 实现 HealthCheck
- func (mw loggingMiddleware) HealthCheck() (result bool) {
- defer func(begin time.Time) {
- mw.logger.Log(
- "function", "HealthChcek",
- "result", result,
- "took", time.Since(begin),
- )
- }(time.Now())
- result = mw.Service.HealthCheck()
- return
- }
- // metricMiddleware 实现 HealthCheck
- func (mw metricMiddleware) HealthCheck() (result bool) {
- defer func(begin time.Time) {
- lvs := []string{"method", "HealthCheck"}
- mw.requestCount.With(lvs...).Add(1)
- mw.requestLatency.With(lvs...).Observe(time.Since(begin).Seconds())
- }(time.Now())
- result = mw.Service.HealthCheck()
- return
- }
在 endpoints.go 中新增结构:
ArithmeticEndpoints
. 在之前的示例中, 仅使用了一个 endpoint, 所以我直接使用了结构 endpoint.Endpoint. 定义如下:
- // ArithmeticEndpoint define endpoint
- type ArithmeticEndpoints struct {
- ArithmeticEndpoint endpoint.Endpoint
- HealthCheckEndpoint endpoint.Endpoint
- }
创建健康检查的请求, 响应对象, 以及对应的 endpoint.Endpoint 封装方法. 代码如下:
- // HealthRequest 健康检查请求结构
- type HealthRequest struct{}
- // HealthResponse 健康检查响应结构
- type HealthResponse struct {
- Status bool `json:"status"`
- }
- // MakeHealthCheckEndpoint 创建健康检查 Endpoint
- func MakeHealthCheckEndpoint(svc Service) endpoint.Endpoint {
- return func(ctx context.Context, request interface{}) (response interface{}, err error) {
- status := svc.HealthCheck()
- return HealthResponse{status}, nil
- }
- }
在 transports.go 中新增健康检查接口 / health.
- // MakeHttpHandler make http handler use mux
- func MakeHttpHandler(ctx context.Context, endpoints ArithmeticEndpoints, logger log.Logger) http.Handler {
- r := mux.NewRouter()
- // 省略原有 / calculate/{type}/{a}/{b}代码
- // create health check handler
- r.Methods("GET").Path("/health").Handler(kithttp.NewServer(
- endpoints.HealthCheckEndpoint,
- decodeHealthCheckRequest,
- encodeArithmeticResponse,
- options...,
- ))
- return r
- }
Step-4: 修改 main.go
接下来在 main.go 中增加健康检查和服务注册相关的调用代码, 以便上述修改逻辑生效.
健康检查.
- // 创建健康检查的 Endpoint, 未增加限流
- healthEndpoint := MakeHealthCheckEndpoint(svc)
- // 把算术运算 Endpoint 和健康检查 Endpoint 封装至 ArithmeticEndpoints
- endpts := ArithmeticEndpoints{
- ArithmeticEndpoint: endpoint,
- HealthCheckEndpoint: healthEndpoint,
- }
- // 创建 http.Handler
- r := MakeHttpHandler(ctx, endpts, logger)
服务注册. 准备服务需要的环境变量, 创建注册对象, 在服务启动前注册至 consul, 服务退出后从 consul 取消注册. 下面只贴出部分代码, 完整代码可从 GitHub 获取.
- // 定义环境变量
- var (
- consulHost = flag.String("consul.host", "","consul ip address")
- consulPort = flag.String("consul.port", "","consul port")
- serviceHost = flag.String("service.host", "","service ip address")
- servicePort = flag.String("service.port", "","service port")
- )
- // parse
- flag.Parse()
- // ...
- // 创建注册对象
- registar := Register(*consulHost, *consulPort, *serviceHost, *servicePort, logger)
- go func() {
- fmt.Println("Http Server start at port:" + *servicePort)
- // 启动前执行注册
- registar.Register()
- handler := r
- errChan <- http.ListenAndServe(":"+*servicePort, handler)
- }()
- go func() {
- c := make(chan os.Signal, 1)
- signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
- errChan <- fmt.Errorf("%s", <-c)
- }()
- error := <-errChan
- // 服务退出, 取消注册
- registar.Deregister()
- fmt.Println(error)
Step-5: 编译 & 运行
打开终端, 切换至项目目录. 执行 go build ./register 编译成功后, 输入以下指令启动算术服务(注册服务):
./register -consul.host localhost -consul.port 8500 -service.host 192.168.192.145 -service.port 9000
启动成功后, 再次刷新 consul-ui 界面, 看到如下界面即说明算术服务成功注册至 consul.
同时也可以在注册服务运行的终端看到 consul 定时调用 / health 接口的日志输出信息:
服务发现
discover 服务要完成的工作为: 以 REST 接口 / calculate 对外提供 API 服务, 客户端使用 HTTP POST 方法发送 JSON 数据执行请求; 在 endpoint 中查询已经在 consul 中注册的服务实例; 然后选择合适的服务实例向其发起请求转发; 完成请求后向原客户端请求响应.
查阅 go-kit 源码可知, kit/sd/Endpointer 提供了一套服务发现机制, 其定义和创建接口如下所示:
- // Endpointer listens to a service discovery system and yields a set of
- // identical endpoints on demand. An error indicates a problem with connectivity
- // to the service discovery system, or within the system itself; an Endpointer
- // may yield no endpoints without error.
- type Endpointer interface {
- Endpoints() ([]endpoint.Endpoint, error)
- }
- // NewEndpointer creates an Endpointer that subscribes to updates from Instancer src
- // and uses factory f to create Endpoints. If src notifies of an error, the Endpointer
- // keeps returning previously created Endpoints assuming they are still good, unless
- // this behavior is disabled via InvalidateOnError option.
- func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) *DefaultEndpointer
通过代码注释我们可以知道: Endpointer 通过监听服务发现系统的事件信息, 并且通过 factory 按需创建服务终结点(Endpoint).
所以, 我们需要通过 Endpointer 来实现服务发现功能. 在微服务模式下, 同一个服务可能存在多个实例, 所以需要通过负载均衡机制完成实例选择, 这里使用 go-kit 工具集中的 kit/sd/lb 组件(该组件实现 RoundRibbon, 并具备 Retry 功能).
Step-1: 创建 factory
在 discover 目录中创建 go 文件 factory.go, 实现 sd.Factory 的逻辑, 即把服务实例转换为 endpoint, 在该 endpoint 中实现对于目标服务的调用过程. 这里直接针对算术运算服务进行封装, 代码如下所示:
- func arithmeticFactory(_ context.Context, method, path string) sd.Factory {
- return func(instance string) (endpoint endpoint.Endpoint, closer io.Closer, err error) {
- if !strings.HasPrefix(instance, "http") {
- instance = "http://" + instance
- }
- tgt, err := url.Parse(instance)
- if err != nil {
- return nil, nil, err
- }
- tgt.Path = path
- var (
- enc kithttp.EncodeRequestFunc
- dec kithttp.DecodeResponseFunc
- )
- enc, dec = encodeArithmeticRequest, decodeArithmeticReponse
- return kithttp.NewClient(method, tgt, enc, dec).Endpoint(), nil, nil
- }
- }
- func encodeArithmeticRequest(_ context.Context, req *http.Request, request interface{}) error {
- arithReq := request.(ArithmeticRequest)
- p := "/" + arithReq.RequestType + "/" + strconv.Itoa(arithReq.A) + "/" + strconv.Itoa(arithReq.B)
- req.URL.Path += p
- return nil
- }
- func decodeArithmeticReponse(_ context.Context, resp *http.Response) (interface{}, error) {
- var response ArithmeticResponse
- var s map[string]interface{}
- if respCode := resp.StatusCode; respCode>= 400 {
- if err := JSON.NewDecoder(resp.Body).Decode(&s); err != nil {
- return nil, err
- }
- return nil, errors.New(s["error"].(string) + "\n")
- }
- if err := JSON.NewDecoder(resp.Body).Decode(&response); err != nil {
- return nil, err
- }
- return response, nil
- }
Step-2: 创建 endpoint
创建 go 文件 discover/enpoints.go. 根据上述分析, 在该 endpoint 实现对服务发现系统的监听, 实现实例选择, 最终返回可执行的 endpoint.Endpoint. 下面根据代码注释说明实现过程:
- // MakeDiscoverEndpoint 使用 consul.Client 创建服务发现 Endpoint
- // 为了方便这里默认了一些参数
- func MakeDiscoverEndpoint(ctx context.Context, client consul.Client, logger log.Logger) endpoint.Endpoint {
- serviceName := "arithmetic"
- tags := []string{"arithmetic", "raysonxin"}
- passingOnly := true
- duration := 500 * time.Millisecond
- // 基于 consul 客户端, 服务名称, 服务标签等信息,
- // 创建 consul 的连接实例,
- // 可实时查询服务实例的状态信息
- instancer := consul.NewInstancer(client, logger, serviceName, tags, passingOnly)
- // 针对 calculate 接口创建 sd.Factory
- factory := arithmeticFactory(ctx, "POST", "calculate")
- // 使用 consul 连接实例(发现服务系统),factory 创建 sd.Factory
- endpointer := sd.NewEndpointer(instancer, factory, logger)
- // 创建 RoundRibbon 负载均衡器
- balancer := lb.NewRoundRobin(endpointer)
- // 为负载均衡器增加重试功能, 同时该对象为 endpoint.Endpoint
- retry := lb.Retry(1, duration, balancer)
- return retry
- }
Step-3: 创建 transport
创建 go 文件 discover/transports.go. 通过 mux/Router 使用 POST 方法为发现服务开放 REST 接口 / calculate, 与算术服务一样, 这里需要 endpoint.Endpoint,DecodeRequestFunc,EncodeResponseFunc. 为了方便, 我把算术服务中的请求与响应结构和编解码方法直接复制过来. 代码如下所示:
- func MakeHttpHandler(endpoint endpoint.Endpoint) http.Handler {
- r := mux.NewRouter()
- r.Methods("POST").Path("/calculate").Handler(kithttp.NewServer(
- endpoint,
- decodeDiscoverRequest,
- encodeDiscoverResponse,
- ))
- return r
- }
- // 省略实体结构和编解码方法
Step-4: 编写 main 方法
接下来就是在 main 方法把以上逻辑串起来, 然后启动发现服务了, 这里监听端口为 9001. 比较简单, 直接贴代码了:
- func main() {
- // 创建环境变量
- var (
- consulHost = flag.String("consul.host", "","consul server ip address")
- consulPort = flag.String("consul.port", "","consul server port")
- )
- flag.Parse()
- // 创建日志组件
- var logger log.Logger
- {
- logger = log.NewLogfmtLogger(os.Stderr)
- logger = log.With(logger, "ts", log.DefaultTimestampUTC)
- logger = log.With(logger, "caller", log.DefaultCaller)
- }
- // 创建 consul 客户端对象
- var client consul.Client
- {
- consulConfig := API.DefaultConfig()
- consulConfig.Address = "http://" + *consulHost + ":" + *consulPort
- consulClient, err := API.NewClient(consulConfig)
- if err != nil {
- logger.Log("err", err)
- os.Exit(1)
- }
- client = consul.NewClient(consulClient)
- }
- ctx := context.Background()
- // 创建 Endpoint
- discoverEndpoint := MakeDiscoverEndpoint(ctx, client, logger)
- // 创建传输层
- r := MakeHttpHandler(discoverEndpoint)
- errc := make(chan error)
- go func() {
- c := make(chan os.Signal)
- signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
- errc <- fmt.Errorf("%s", <-c)
- }()
- // 开始监听
- go func() {
- logger.Log("transport", "HTTP", "addr", "9001")
- errc <- http.ListenAndServe(":9001", r)
- }()
- // 开始运行, 等待结束
- logger.Log("exit", <-errc)
- }
Step-5: 编译 & 运行
在终端中切换至 discover 目录, 执行 go build 完成编译, 然后使用以下命令 (指定注册中心服务地址) 启动发现服务:
./discover -consul.host localhost -consul.port 8500
请求测试
使用 postman 请求 http://localhost:9001/calculate, 在 body 中设置请求信息, 完成测试. 如下图所示:
总结
本文使用 consul 作为注册中心, 通过实例演示了 go-kit 的服务注册与发现功能. 由于本人在这个部分了解不够透彻, 在编写代码和本文的过程中, 一直在研究 go-kit 发现组件的设计方式, 力求能够通过代码, 文字解释清楚. 本人水平有限, 有任何错误或不妥之处, 请大家批评指正.
本文实例代码见 arithmetic_consul_demo https://github.com/raysonxin/gokit-article-demo .
来源: https://juejin.im/post/5c740a335188257c1e2c86a7