1. 初始化 etcdServer 流程:
代码路径为: github.com\coreos\etcd\embed\etcd.go
StartEtcd(inCfg *Config) (e *Etcd, err error)
流程如下:
1.1: 参数校验: inCfg.Validate()
校验关注点 1:
checkBindURLs(cfg.LPUrls): 校验 peer-urls schem 相关信息, 且在 3.1 版本之后不允许使用域名作为 url 来进行绑定操作.
checkBindURLs(cfg.LCUrls) : 校验 client-urls schem 相关信息, 且在 3.1 版本之后不允许使用域名作为 url 来进行绑定操作.
使用域名对于性能上是有一定的影响, 但是在实际生产环境中, 是存在使用域名的场景, 需要修改如下代码进行适配:
func checkBindURLs(urls []url.URL) error {
- //...
- if net.ParseIP(host) == nil {
- // 取消 err 的 return, 改为打印告警信息, 同 3.1 之前版本.
- return fmt.Errorf("expected IP in URL for binding (%s)", url.String())
- }
- }
校验关注点 2: 由于实际现网的网络延迟各不相同, 选举及心跳超时时间可作为调优适配的考虑范畴.
5*cfg.TickMs> cfg.ElectionMs : 选举超时时间必须大于五倍于心跳超时时间.
cfg.ElectionMs> maxElectionMs: 选举超时时间必须小于 5000ms
1.2: 初始化 PeerListeners,ClientListeners, 用于监听 peers 间及 client 端发送的 http 请求
PeerListeners: 作为 etcd member 之间进行通信使用的 listeners, 为了性能考量, 建议内部试用 schema:http, 由 flag "listen-peer-urls" 确定,
ClientListeners: 作为接受外部请求的 listerners, 一般为了安全性考量, 一般使用 schema:https, 由 flag "listen-client-urls" 确定,
具体方法实现为:
transport.NewTimeoutListener(u.Host, u.Scheme, tlsinfo, ConnReadTimeout, ConnWriteTimeout)
默认的读写超时均为 5s:
- ConnReadTimeout = 5 * time.Second
- ConnWriteTimeout = 5 * time.Second
1.3: 获取 PeerURLsMap 以及 cluster token
1.4: 生成 new etcdServer 所需的的 ServerConfig 结构体:
- // ServerConfig holds the configuration of etcd as taken from the command line or discovery.
- type ServerConfig struct {
- Name string // etcdserver 名称, 对应 flag "name"
- DiscoveryURL string // etcd 用于服务发现, 无需知道具体 etcd 节点 ip 即可访问 etcd 服务, 对应 flag "discovery"
- DiscoveryProxy string // 供服务发现 url 的代理地址, 对应 flag "discovery-proxy"
- ClientURLs types.URLs // 由 ip+port 组成, 默认 DefaultListenClientURLs = "http://localhost:2379"; 实际情况使用 https schema, 供 etcd member 通信, 对应 flag "listen-client-urls"
- PeerURLs types.URLs // 由 ip+port 组成, 默认 DefaultListenPeerURLs = "http://localhost:2380"; 实际生产环境使用 http schema, 用以外部 etcd client 访问, 对应 flag "listen-client-urls"
- DataDir string // 数据目录地址, 为全路径, 对应 flag "data-dir"
- // DedicatedWALDir config will make the etcd to write the WAL to the WALDir
- // rather than the dataDir/member/wal.
DedicatedWALDir string
SnapCount uint64 // 默认是 10000 次事件做一次快照: DefaultSnapCount = 100000 可以作为调优参数进行参考, 对应 flag "snapshot-count",
MaxSnapFiles uint // 默认是 5, 这是 v2 的参数, v3 内只有一个 db 文件, DefaultMaxSnapshots = 5, 对应 flag "max-snapshots"
MaxWALFiles uint // 默认是 5,DefaultMaxWALs = 5, 表示最大存储 wal 文件的个数, 对应 flag "max-wals", 保留的文件可以作为 etcd-dump-logs 工具进行 debug 使用.
- InitialPeerURLsMap types.URLsMap // peerUrl 与 etcd name 对应的 map, 由方法 cfg.PeerURLsMapAndToken("etcd") 生成.
- InitialClusterToken string // etcd 集群 token, 对应 flang "initial-cluster-token"
NewCluster bool // 确定是否为新建集群, 对应 flag "initial-cluster-state", 由方法 func (cfg Config) IsNewCluster() bool { return cfg.ClusterState == ClusterStateFlagNew } 确定;
ForceNewCluster bool // 对应 flag "force-new-cluster", 默认为 false, 若为 true, 在生产环境内, 一般用于含 v2 数据的集群恢复, 效果为以现有数据或者空数据新建一个单节点的 etcd 集群, 如果存在数据, 则会清楚数据内的元数据信息, 并重建只包含该 etcd 的元数据信息.
PeerTLSInfo transport.TLSInfo // member 间通信使用的证书信息, 若 peerURL 为 https 时使用, 对应 flag "peer-ca-file","peer-cert-file", "peer-key-file"
TickMs uint // raft node 发送心跳信息的超时时间. "heartbeat-interval"
ElectionTicks int // raft node 发起选举的超时时间, 最大为 5000ms maxElectionMs = 50000, 对应 flag "election-timeout", 选举时间与心跳时间在最佳实践内建议是 10 倍关系.
- BootstrapTimeout time.Duration // etcd server 启动的超时时间, 默认为 1s, 由方法 func (c *ServerConfig) bootstrapTimeout() time.Duration 确定;
- AutoCompactionRetention int // 默认为 0, 单位为小时, 主要为了方便用户快速查询, 定时对 key 进行合并处理, 对应 flag "auto-compaction-retention", 由方法 func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic 确定,
- // 具体 compact 的实现方法为: func (s *kvServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
- QuotaBackendBytes int64 // etcd 后端数据文件的大小, 默认为 2GB, 最大为 8GB, v3 的参数, 对应 flag "quota-backend-bytes" , 具体定义: etcd\etcdserver\quota.go
- StrictReconfigCheck bool
- // ClientCertAuthEnabled is true when cert has been signed by the client CA.
- ClientCertAuthEnabled bool
- AuthToken string
- }
1.5, 调用方法
func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error)
初始化 etcdServer:
- // NewServer creates a new EtcdServer from the supplied configuration. The
- // configuration is considered static for the lifetime of the EtcdServer.
- func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
- }
1.5.1: 分配内存空间
st := store.New(StoreClusterPrefix, StoreKeysPrefix)
1.5.2: 检测并生成数据目录, 生成向远端 raft node peer listeners 发送请求的 Transport
其中的超时时间计算方法为:
- time.Second + time.Duration(c.ElectionTicks)*time.Duration(c.TickMs)*time.Millisecond/5
- 1.5.3:
根据日志目录是否存在, 对应生成 raft node 实体.
1.5.3.1: 若日志目录不存在且 flag "initial-cluster-state" 为'existing':
case !haveWAL && !cfg.NewCluster:
使用方法
func startNode(cfg *ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL)
生成 raft node 实体
- id,
- n,
- s,
- w = startNode(cfg, cl, nil)
1.5.3.2: 若日志目录不存在且 flag "initial-cluster-state" 为'new':
case !haveWAL && cfg.NewCluster:
使用方法
func startNode(cfg *ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL)
生成 raft node 实体
id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
1.5.3.3 若日志目录存在:
1.5.3.3.1 若 flag "force-new-cluster" 为 "false":
调用方法
func restartNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL)
生成 raft node 实体
- id,
- cl,
- n,
- s,
- w = restartNode(cfg, snapshot)
1.5.3.3.2 若 flag "force-new-cluster" 为 "true":
调用方法
func restartAsStandaloneNode(cfg *ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL)
生成 raft node 实体
- id,
- cl,
- n,
- s,
- w = restartAsStandaloneNode(cfg, snapshot)
1.5.4 初始化 EtcdServer:
- srv = &EtcdServer {
- readych: make(chan struct {}),
- Cfg: cfg,
- snapCount: cfg.SnapCount,
- errorc: make(chan error, 1),
- store: st,
- snapshotter: ss,
- r: *newRaftNode(raftNodeConfig {
- isIDRemoved: func(id uint64) bool {
- return cl.IsIDRemoved(types.ID(id))
- },
- Node: n,
- heartbeat: heartbeat,
- raftStorage: s,
- storage: NewStorage(w, ss),
- },
- ),
- id: id,
- attributes: membership.Attributes {
- Name: cfg.Name,
- ClientURLs: cfg.ClientURLs.StringSlice()
- },
- cluster: cl,
- stats: sstats,
- lstats: lstats,
- SyncTicker: time.NewTicker(500 * time.Millisecond),
- peerRt: prt,
- reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
- forceVersionC: make(chan struct {}),
- ....
- }
在初始化 EtcdServer 过程中, 会启动用于 peer 间发送及接收 raft 消息的 rafthttp transport, 具体方法如下:
- func (t *Transport) Start() error {
- var err error
- t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)
- if err != nil {
- return err
- }
- t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)
- if err != nil {
- return err
- }
- t.remotes = make(map[types.ID]*remote)
- t.peers = make(map[types.ID]Peer)
- t.prober = probing.NewProber(t.pipelineRt)
- return nil
2.1. 启动 etcdServer
3.1. 为每个 client url 及 peer url 启动一个 client server 的 goroutine, 以提供监听服务, 这个动作在 raft http transport 启动之后:
- peer server goroutine:
- go func(l *peerListener) {
- e.errHandler(l.serve())
- }(pl)
- client server goroutine:
- go func(s *serveCtx) {
- e.errHandler(s.serve(e.Server, ctlscfg, v2h, e.errHandler))
- }(sctx)
若启动失败, 则停止 grpcServer:
- defer func() {
- ...
- if !serving {
- // errored before starting gRPC server for serveCtx.grpcServerC
- for _, sctx := range e.sctxs {
- close(sctx.grpcServerC)
- }
- }
- ...
- }()
暂时就启动流程进行粗略分享, 后续将进一步分析 etcdServer 启动具体机制, 及针对 NewServer 内针对生成 raft node 详细机制进行分析及基于 k8s 平台部署 etcd 集群备份恢复方案进行探讨.
来源: http://www.tuicool.com/articles/yAre63J