当今互联网行业中, 对于分布式一致性算法, 个人觉得实用性最高并且应用最广泛的就是 Raft 算法了. Raft 非常适合用于所有的节点均为可信节点时的必要数据同步场景中. Raft 的基本原理理解起来并不难, 网上很多文字简介, 都不如一个很生动的动画 http://thesecretlivesofdata.com/raft/ 来得直观.
etcd/raft
在 Kubenetes 中广泛使用的分布式 KV 存储系统 etcd 使用的就是 Raft 算法. 算法的实现就直接作为 etcd 的子 package(用 Go 编写), 路径为: GitHub.com/etcd-io/etcd/raft https://github.com/etcd-io/etcd/tree/master/raft .
官方提供了一个 demo. 这个 demo 其实已经非常完整了, 它包含了网络通信, 快照压缩, 数据同步等完整的功能. 而对于 etcd/raft 的初见者而言, 还是稍微有点门槛了. 本文的目的是尽量抽丝剥茧, 首先从 raft 最基本的功能 -- 选举来入手, 构建一个小的集群 demo, 一步一步说明 etcd/raft 的用法.
Demo 功能
这个小 demo 只实现一个功能: 已知数量的集群节点, 能够进行 leader 的选举. 更多的功能 (比如数据的存储) 在以后的文章陆续解析.
为此, 我们需要研究 etcd/raft 的相关函数的用法.
Raft 节点数据结构
包 raft 使用接口 Node 来描述一个 Raft 节点. 该接口的函数中, 本文 (或者说本阶段) 涉及的有四个:
- type Node interface {
- Tick()
- Step(ctx context.Context, msg raftpb.Message) error
- Ready() <-chan Ready
- Status() Status
- }
启动节点
Raft 节点数量建议是一个素数. 这里我采用 3 个. 在节点数量已知的情况下, 我们首先要告知 Raft node 节点的列表. 每个节点应该有唯一的一个 uint64 类型的 ID:
peers := []raft.Peer{{ID: 0x01}, {ID: 0x02}, {ID: 0x03}}
应用程序需要自己实现节点与节点之间的网络通信. 这里我就在本地单进程运行三个协程, 模拟三个节点, 给三个节点分配三个 channel 用来通信:
- var (
- bcChans = []chan raftpb.Message{
- make(chan raftpb.Message),
- make(chan raftpb.Message),
- make(chan raftpb.Message),
- }
- )
- // ......
- func main() {
- peers := []raft.Peer{{ID: 0x01}, {ID: 0x02}, {ID: 0x03}}
- go startNode(0x01, peers)
- go startNode(0x02, peers)
- go startNode(0x03, peers)
- time.Sleep(2 * time.Second)
- return
- }
节点程序中要完成的功能
在 etcd/raft 中对 Raft 算法的逻辑实现是尽量地轻量化, 只实现算法的核心功能. 但与此相对的, 需要调用 Raft 的应用程序实现较多的额外逻辑来实现完整的节点功能. 在本文中, 我们只关心节点的选举, 该场景下我们需要实现的功能有以下两个:
节点内部心跳机制
Raft 节点依赖定期的心跳来进行周期性的状态机流转, 应用程序需要给 raft 节点提供. 在 demo 中, 我用了一个带随机抖动的 ticker 来实现 -- 而这也是 Raft 算法中建议的方案, 也就是带有一点随机因素. 当每一次 tick 到来时, 就可以调用 raft node 的 Tick() 方法, 推动内部状态机的更新:
- func startNode(id uint64, peers []raft.Peer) {
- // ......
- for {
- select {
- case <-n.tick.Elapsed(): // 相当于 time 包 Ticker 的 tick.C
- n.node.Tick() // n.node 是 raft.Node 对象, 下同
- // ......
- }
- // ......
- }
转发节点之间的 raft 通信
前文说到, Raft 节点之间的网络通信需要应用程序来实现. 应用程序通过 etcd/raft 节点的 Ready() 方法接收节点需要对其他发出的的信息. Ready() 函数返回 raft.Ready 结构体, 在这一阶段中, 我们需要使用的是 Ready 结构体的 Messages 成员, 这是一个 []raftpb.Message 类型. 应用程序需要负责的, 就是将这些 message 发送出去.
Message 的定义并不长, 如下所示:
- type Message struct {
- Type MessageType `protobuf:"varint,1,opt,name=type,enum=raftpb.MessageType" json:"type"`
- To uint64 `protobuf:"varint,2,opt,name=to" json:"to"`
- From uint64 `protobuf:"varint,3,opt,name=from" json:"from"`
- Term uint64 `protobuf:"varint,4,opt,name=term" json:"term"`
- LogTerm uint64 `protobuf:"varint,5,opt,name=logTerm" json:"logTerm"`
- Index uint64 `protobuf:"varint,6,opt,name=index" json:"index"`
- Entries []Entry `protobuf:"bytes,7,rep,name=entries" json:"entries"`
- Commit uint64 `protobuf:"varint,8,opt,name=commit" json:"commit"`
- Snapshot Snapshot `protobuf:"bytes,9,opt,name=snapshot" json:"snapshot"`
- Reject bool `protobuf:"varint,10,opt,name=reject" json:"reject"`
- RejectHint uint64 `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"`
- Context []byte `protobuf:"bytes,12,opt,name=context" json:"context,omitempty"`
- XXX_unrecognized []byte `json:"-"`
- }
可以看到, 这个结构体分别按照 protobuf 和 JSON 进行了定义, 这就非常方便应用程序根据不同的通信模式对数据进行序列化和反序列化后在网络中传输. 而 To 则告诉了应用程序应该将这个消息发送给哪一个节点. 在 demo 则是根据 To 发到对应的 channel 里.
接收其他节点发来的 raft 通信
在 demo 中, 节点从 channel 中获取到 Message 对象之后, 调用本节点的 Step() 函数:
- func startNode(id uint64, peers []raft.Peer) {
- // ......
- for {
- select {
- // ......
- case m := <-n.recv:
- n.node.Step(context.TODO(), m)
- }
- // ......
- }
完整 demo 代码
完整代码九十来行, 可以直接运行之后观察 shell 输出, 了解 raft 的选举过程.
- package main
- import (
- "context"
- "log"
- "strings"
- "time"
- "github.com/coreos/etcd/raft/raftpb"
- "github.com/etcd-io/etcd/raft"
- "github.com/influxdata/telegraf/agent"
- )
- func init() {
- log.SetFlags(log.Lshortfile | log.LstdFlags)
- }
- var (
- infof = log.Printf
- errorf = log.Printf
- bcChans = []chan raftpb.Message{
- make(chan raftpb.Message),
- make(chan raftpb.Message),
- make(chan raftpb.Message),
- }
- )
- const (
- tickInterval = 100 * time.Millisecond
- jitterMillisecond = 15 * time.Millisecond
- )
- func main() {
- infof("hello, raft!")
- defer infof("end of raft")
- peers := []raft.Peer{{ID: 0x01}, {ID: 0x02}, {ID: 0x03}}
- go startNode(0x01, peers)
- go startNode(0x02, peers)
- go startNode(0x03, peers)
- time.Sleep(2 * time.Second)
- return
- }
- func startNode(id uint64, peers []raft.Peer) {
- ctx := context.TODO()
- storage := raft.NewMemoryStorage()
- c := raft.Config{
- ID: id,
- ElectionTick: 10,
- HeartbeatTick: 1,
- Storage: storage,
- MaxSizePerMsg: 4096,
- MaxInflightMsgs: 256,
- }
- n := &node{
- id: id,
- prefix: strings.Repeat("\t\t\t", int(id)) + "|",
- node: raft.StartNode(&c, peers),
- tick: agent.NewRollingTicker(tickInterval-jitterMillisecond, tickInterval+jitterMillisecond),
- recv: bcChans[id-1],
- raftStorage: storage,
- }
- for {
- select {
- case <-n.tick.Elapsed():
- n.node.Tick()
- case rd := <-n.node.Ready():
- n.raftStorage.Append(rd.Entries)
- go n.sendMessage(rd.Messages)
- n.node.Advance()
- case m := <-n.recv:
- infof("%d -%s got message from %v to %v, type %v", id, n.prefix, m.From, m.To, m.Type)
- n.node.Step(ctx, m)
- infof("%d -%s status: %v", id, n.prefix, n.node.Status().RaftState)
- }
- }
- return
- }
- type node struct {
- id uint64
- prefix string
- node raft.Node
- tick *agent.RollingTicker
- recv chan raftpb.Message
- raftStorage *raft.MemoryStorage
- }
- func (n *node) sendMessage(msg []raftpb.Message) {
- for _, m := range msg {
- to := m.To
- ch := bcChans[to-1]
- infof("%d -%s send to %v, type %v", n.id, n.prefix, m.To, m.Type)
- ch <- m
- }
- return
- }
本文章采用 知识共享署名 - 非商业性使用 - 相同方式共享 4.0 国际许可协议 https://creativecommons.org/licenses/by-nc-sa/4.0/ 进行许可.
来源: https://www.qcloud.com/developer/article/1644111