花了一个礼拜整理了一下 RPCX 的使用方法, 大致过程如下:
服务器端开发
首先, 我们需要实现自己的服务, 这很简单, 就是定义普通的方法即可:
- package example
- import (
- "context"
- "fmt"
- )
- type Args struct {
- A int
- B int
- }
- type Reply struct {
- C int
- }
- type Arith int
- func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
- reply.C = args.A * args.B
- fmt.Printf("call: %d * %d = %d\n", args.A, args.B, reply.C)
- return nil
- }
- func (t *Arith) Add(ctx context.Context, args *Args, reply *Reply) error {
- reply.C = args.A + args.B
- fmt.Printf("call: %d + %d = %d\n", args.A, args.B, reply.C)
- return nil
- }
- func (t *Arith) Say(ctx context.Context, args *string, reply *string) error {
- *reply = "hello" + *args
- return nil
- }
一. 点对点
点对点是最简单的一种注册中心的方式, 事实上没有注册中心, 客户端直接得到唯一的服务器的地址.
服务器
服务器并没有配置注册中心, 而是直接启动.
- package main
- import (
- "flag"
- "GitHub.com/smallnest/rpcx/server"
- "GitHub.com/rpcx-ecosystem/rpcx-examples3"
- )
- var (
- addr = flag.String("addr", "localhost:8972", "server address")
- )
- func main() {
- flag.Parse()
- s := server.Server{}
- s.RegisterName("Arith", new(example.Arith), "")
- go s.Serve("tcp", *addr)
- select {}
- }
客户端
客户端直接配置了服务器的地址, 格式是 network@ipaddress:port 的格式, 并没有通过第三方组件来查找.
- package main
- import (
- "GitHub.com/smallnest/rpcx/client"
- "log"
- "service"
- "context"
- "flag"
- )
- var (
- addr = flag.String("addr", "127.0.0.1:8972", "server address")
- )
- func main() {
- Peer2Peer()
- }
- func Peer2Peer() {
- flag.Parse()
- d := client.NewPeer2PeerDiscovery("tcp@" + *addr, "")
- xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
- defer xclient.Close()
- args := &service.Args{
- A: 10,
- B: 20,
- }
- reply := &service.Reply{}
- err := xclient.Call(context.Background(), "Mul", args, reply)
- if err != nil {
- log.Fatalf("failed to call: %v", err)
- }
- log.Printf("%d * %d = %d", args.A, args.B, reply.C)
- }
二. 点对多
MultipleServers
上面的方式只能访问一台服务器, 假设我们有固定的几台服务器提供相同的服务, 我们可以采用这种方式.
服务器
服务器还是和上面的代码一样, 只需启动自己的服务, 不需要做额外的配置. 下面这个例子启动了两个服务:
- package main
- import (
- "flag"
- "GitHub.com/smallnest/rpcx/server"
- "GitHub.com/rpcx-ecosystem/rpcx-examples3"
- )
- var (
- addr1 = flag.String("addr1", "localhost:8972", "server1 address")
- addr2 = flag.String("addr2", "localhost:8973", "server2 address")
- )
- func main() {
- flag.Parse()
- go createServer(*addr1)
- go createServer(*addr2)
- select {}
- }
- func createServer(addr string) {
- s := server.NewServer()
- s.RegisterName("Arith", new(example.Arith), "")
- s.Serve("tcp", addr)
- }
客户端
客户端需要使用 MultipleServersDiscovery 来配置同一个服务的多个服务器地址, 这样客户端就能基于规则从中选择一个进行调用.
可以看到, 除了初始化 XClient 有所不同外, 实际调用服务是一样的, 后面介绍的注册中心也是一样, 只有初始化客户端有所不同, 后续的调用都一样.
- package main
- import (
- "GitHub.com/smallnest/rpcx/client"
- "log"
- "service"
- "context"
- "flag"
- "GitHub.com/rpcx-ecosystem/rpcx-examples3"
- )
- var (
- addr1 = flag.String("addr1", "localhost:8972", "server1 address")
- addr2 = flag.String("addr2", "localhost:8973", "server2 address")
- )
- func main() {
- Peer2Many()
- }
- func Peer2Many(){
- flag.Parse()
- d := client.NewMultipleServersDiscovery([]*client.KVPair{{Key:*addr1}, {Key:*addr2}})
- xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
- defer xclient.Close()
- args := &example.Args{
- A: 10,
- B: 20,
- }
- reply := &example.Reply{}
- err := xclient.Call(context.Background(), "Mul",args, reply)
- if err != nil {
- log.Fatalf("failed to call: %v", err)
- }
- log.Printf("%d * %d = %d", args.A, args.B, reply.C)
- }
- Consul
Consul 是 HashiCorp 公司推出的开源工具, 用于实现分布式系统的服务发现与配置. Consul 是分布式的, 高可用的, 可横向扩展的. 它具备以下特性:
服务发现: Consul 提供了通过 DNS 或者 HTTP 接口的方式来注册服务和发现服务. 一些外部的服务通过 Consul 很容易的找到它所依赖的服务.
健康检测: Consul 的 Client 提供了健康检查的机制, 可以通过用来避免流量被转发到有故障的服务上.
Key/Value 存储: 应用程序可以根据自己的需要使用 Consul 提供的 Key/Value 存储. Consul 提供了简单易用的 HTTP 接口, 结合其他工具可以实现动态配置, 功能标记, 领袖选举等等功能.
多数据中心: Consul 支持开箱即用的多数据中心. 这意味着用户不需要担心需要建立额外的抽象层让业务扩展到多个区域.
Consul 也是使用 Go 开发的, 在 Go 生态圈也被广泛应用.
服务器
服务器端的开发和 zookeeper,etcd 和 consul 类似.
它主要配置几个参数:
ServiceAddress: 本机的监听地址, 这个对外暴露的监听地址, 格式为 tcp@ipaddress:port
ConsulServers: consul 集群的地址
BasePath: 服务前缀. 如果有多个项目同时使用 consul, 避免命名冲突, 可以设置这个参数, 为当前的服务设置命名空间
Metrics: 用来更新服务的 TPS
UpdateInterval: 服务的刷新间隔, 如果在一定间隔内 (当前设为 2 * UpdateInterval) 没有刷新, 服务就会从 consul 中删除
在开始之前, 你得确保已经装上了 Consul 使用指令 consul agent -dev 开启, 更多 consul 使用方式请自行 get
- package main
- import (
- "flag"
- "GitHub.com/smallnest/rpcx/server"
- "GitHub.com/smallnest/rpcx/serverplugin"
- "time"
- "GitHub.com/rcrowley/go-metrics"
- "log"
- "GitHub.com/rpcx-ecosystem/rpcx-examples3"
- )
- var (
- addr = flag.String("addr", "localhost:8972", "server address")
- consulAddr = flag.String("consulAddr", "localhost:8500", "consul address")
- basePath = flag.String("base", "/rpcx_test", "prefix path")
- )
- func main() {
- flag.Parse()
- s := server.NewServer()
- addRegistryPlugin(s)
- s.RegisterName("Arith", new(example.Arith), "")
- s.Serve("tcp", *addr)
- }
- func addRegistryPlugin(s *server.Server) {
- r := &serverplugin.ConsulRegisterPlugin{
- ServiceAddress: "tcp@" + *addr,
- ConsulServers: []string{*consulAddr},
- BasePath: *basePath,
- Metrics: metrics.NewRegistry(),
- UpdateInterval: time.Minute,
- }
- err := r.Start()
- if err != nil {
- log.Fatal(err)
- }
- s.Plugins.Add(r)
- }
客户端
配置 ConsulDiscovery, 使用 basepath 和 consul 的地址.
- package main
- import (
- "GitHub.com/smallnest/rpcx/client"
- "log"
- "context"
- "flag"
- "GitHub.com/rpcx-ecosystem/rpcx-examples3"
- )
- var (
- consulAddr = flag.String("consulAddr", "localhost:8500", "consul address")
- basePath = flag.String("base", "/rpcx_test/Arith", "prefix path")
- )
- func ConsulServer() {
- flag.Parse()
- d := client.NewConsulDiscovery(*basePath, "", []string{*consulAddr}, nil)
- xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
- defer xclient.Close()
- args := &example.Args{
- A: 10,
- B: 20,
- }
- reply := &example.Reply{}
- err := xclient.Call(context.Background(), "Mul", args, reply)
- if err != nil {
- log.Fatalf("failed to call: %v", err)
- }
- log.Printf("%d * %d = %d", args.A, args.B, reply.C)
- }
- func main() {
- ConsulServer()
- }
来源: https://juejin.im/entry/5bb06a046fb9a05d32516148