概述
lab2 中实现了 raft 协议, 本 lab 将在 raft 之上实现一个可容错的 k/v 存储服务, 第一部分是实现一个不带日志压缩的版本, 第二部分是实现日志压缩. 时间原因我只完成了第一部分.
设计思路
如上图, lab2 实现了 raft 协议, 本 lab 将实现 kvserver. 每个 raft 都关联一个 kvserver,Clerks 发送 Put(), Append(), Get() RPC 给 leader 服务器中的 kvserver,kvserver 收到请求后将操作打包成 Log Entry 提交给 raft, 然后阻塞等待 raft 将这个 Entry 拷贝到其它 server, 当 Log Entry 被拷贝到大部分的 server 后, leader 的 raft 会通知 kvserver(raft 往管道中塞 comitted Entry,kvserver 通过读这个管道获取通知),kvserver 执行命令, 然后响应 Clerk.
Clerk
客户端通过 Clerk 发送请求, 来看下 Clerk 代码:
- type Clerk struct {
- servers []*labrpc.ClientEnd
- // You will have to modify this struct.
- lastLeader int
- cid int64
- seq int
- }
- func (ck *Clerk) Get(key string) string {
- // You will have to modify this function.
- // 参数: 要读的 key, 当前 clerk 的 id, 请求序列号
- getArgs := GetArgs{Key: key, Cid:ck.cid, Seq:ck.seq}
- reply := GetReply{}
- for {
- doneCh := make(chan bool, 1)
- go func() {
- // 发送 Get() RPC
- ok := ck.servers[ck.lastLeader].Call("KVServer.Get", &getArgs, &reply)
- doneCh <- ok
- }()
- select {
- case <-time.After(600 * time.Millisecond):
- DPrintf("clerk(%d) retry PutAppend after timeout\n", ck.cid)
- continue
- case ok := <- doneCh:
- // 收到响应后, 并且是 leader 返回的, 那么说明这个命令已经执行了
- if ok && reply.WrongLeader != WrongLeader {
- // 请求序列号加 1
- ck.seq++
- return reply.Value
- }
- }
- // 换一个 server 重试
- ck.lastLeader++
- ck.lastLeader %= len(ck.servers)
- }
- return ""
- }
这里只给出了 Get() 的代码, Put() 和 Append() 类似, 发送 KVServer.Get 给一个 server, 如果这个 server 不是 leader, 换一个 server 重试. 直到发给真正的 leader, 并且 leader 将这个命令拷贝到大部分其它 server 后, 然后成功执行该命令, Clerk.Get() 才会返回.
KVServer
再来看下服务端的代码, KVServer 处理 Clerk 的 RPC 请求:
- type KVServer struct {
- mu sync.Mutex
- me int
- rf *raft.Raft
- applyCh chan raft.ApplyMsg
- maxraftstate int // snapshot if log grows this big
- // Your definitions here.
- // 保存键值对
- db map[string]string
- latestReplies map[int64]*LatestReply
- notify map[int]chan struct{}
- }
- func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
- // Your code here.
- if _, isLeader := kv.rf.GetState(); !isLeader {
- reply.WrongLeader = WrongLeader
- reply.Err = ""
- return
- }
- // 防止重复请求
- kv.mu.Lock()
- if latestReply, ok := kv.latestReplies[args.Cid]; ok && args.Seq <= latestReply.Seq {
- reply.WrongLeader = IsLeader
- reply.Value = latestReply.Reply.Value
- reply.Err = latestReply.Reply.Err
- kv.mu.Unlock()
- return
- }
- kv.mu.Unlock()
- command := Op{Operation:"Get", Key:args.Key, Cid:args.Cid, Seq:args.Seq}
- index, term, _ := kv.rf.Start(command)
- // 阻塞等待结果
- kv.mu.Lock()
- ch := make(chan struct{})
- kv.notify[index] = ch
- kv.mu.Unlock()
- select {
- case <-ch:
- curTerm, isLeader := kv.rf.GetState()
- DPrintf("%v got notify at index %v, isLeader = %v\n", kv.me, index, isLeader)
- if !isLeader || curTerm != term {
- reply.WrongLeader = WrongLeader
- reply.Err = ""
- } else {
- reply.WrongLeader = IsLeader
- kv.mu.Lock()
- if value, ok := kv.db[args.Key]; ok {
- reply.Value = value
- reply.Err = OK
- } else {
- reply.Err = ErrNoKey
- }
- kv.mu.Unlock()
- }
- }
- }
KVServer.db 用于保存键值对.
KVServer.Get() 首先判断自己是不是 leader, 如果不是 leader, 直接返回, 这样 Clerk 好重试其它 server. 如果是 leader, 先在缓存中找, 看这个请求是否已经执行过了.
因为可能出现这么一种情况: 如果 leader commit 一个 Entry 后立即奔溃了, 那么 Clerk 就收不到响应, 那么 Clerk 会将这个请求发给新的 leader, 新的 leader 收到请求后如果不做任何措施, 将会二次 commit 该 Log Entry, 对于 Put() 和 Append() 请求执行两次是不正确的, 所以需要一个办法防止一个请求执行两次.
可以这么做: 每个 Clerk 都分配一个唯一的 cid, 每个请求分配一个唯一的序列号 seq, 每成功一个请求, 该序列号加一. 服务端记录每个客户端 cid 最近一次 apply 的请求的序列号 seq 和对应的响应结果, 根据这个信息可知, 当再次收到这个客户端的序列号小于 seq 的请求时, 说明已经执行过了, 直接返回结果.
如果之前没有执行过, 那么调用
kv.rf.Start(command)
将 Log Entry 提交给 raft, 并且阻塞等待 raft 将这个 Entry 拷贝到其它大部分 server, 从阻塞返回后, 说明这个 Entry 已经被拷贝到大部分 server 了, 并且已经执行了命令, 这时可以将结果返回给 Clerk 了.
那么在哪里接收 raft 的消息呢? KVServer 在创建的时候会在一个线程中执行如下函数:
- func (kv *KVServer) applyDaemon() {
- for appliedEntry := range kv.applyCh {
- command := appliedEntry.Command.(Op)
- // 执行命令, 过滤已经执行过得命令
- kv.mu.Lock()
- if latestReply, ok := kv.latestReplies[command.Cid]; !ok || command.Seq> latestReply.Seq {
- switch command.Operation {
- case "Get":
- latestReply := LatestReply{Seq:command.Seq,}
- reply := GetReply{}
- if value, ok := kv.db[command.Key]; ok {
- reply.Value = value
- } else {
- reply.Err = ErrNoKey
- }
- latestReply.Reply = reply
- kv.latestReplies[command.Cid] = &latestReply
- case "Put":
- kv.db[command.Key] = command.Value
- latestReply := LatestReply{Seq:command.Seq}
- kv.latestReplies[command.Cid] = &latestReply
- case "Append":
- kv.db[command.Key] += command.Value
- latestReply := LatestReply{Seq:command.Seq}
- kv.latestReplies[command.Cid] = &latestReply
- default:
- panic("invalid command operation")
- }
- }
- DPrintf("%d applied index:%d, cmd:%v\n", kv.me, appliedEntry.CommandIndex, command)
- // 通知
- if ch, ok := kv.notify[appliedEntry.CommandIndex]; ok && ch != nil {
- DPrintf("%d notify index %d\n",kv.me, appliedEntry.CommandIndex)
- close(ch)
- delete(kv.notify, appliedEntry.CommandIndex)
- }
- kv.mu.Unlock()
- }
- }
kv.applyCh 这个 chanel 会在创建 raft 的时候传给 raft, 当某个 Log Entry 可以被 commit 的时候, raft 会往这个 chanel 中塞, 只要 for 循环这个 kv.applyCh, 就能知道已经被 commit 的 Entry, 拿到 Entry 后, 根据其中的命令执行相应的操作, 然后通知 KVServer.Get() 继续执行.
具体代码在: https://github.com/gatsbyd/mit_6.824_2018
如有错误, 欢迎指正:
15313676365
来源: https://www.cnblogs.com/gatsby123/p/10580757.html