本文是使用 golang 实现 Redis 系列的第四篇文章, 将介绍如何使用 golang 实现 Append Only File 持久化及 AOF 文件重写.
本文完整源代码在作者 GitHub
AOF 文件
AOF 持久化是典型的异步任务, 主协程(goroutine) 可以使用 channel 将数据发送到异步协程由异步协程执行持久化操作.
在 DB 中定义相关字段:
- type DB struct {
- // 主线程使用此 channel 将要持久化的命令发送到异步协程
- aofChan chan *reply.MultiBulkReply
- // append file 文件描述符
- aofFile *os.File
- // append file 路径
- aofFilename string
- // aof 重写需要的缓冲区, 将在 AOF 重写一节详细介绍
- aofRewriteChan chan *reply.MultiBulkReply
- // 在必要的时候使用此字段暂停持久化操作
- pausingAof sync.RWMutex
- }
在进行持久化时需要注意两个细节:
get 之类的读命令并不需要进行持久化
expire 命令要用等效的 expireat 命令替换. 举例说明, 10:00 执行 expire a 3600 表示键 a 在 11:00 过期, 在 10:30 载入 AOF 文件时执行 expire a 3600 就成了 11:30 过期与原数据不符.
我们在命令处理方法中返回 AOF 需要的额外信息:
- type extra struct {
- // 表示该命令是否需要持久化
- toPersist bool
- // 如上文所述 expire 之类的命令不能直接持久化
- // 若 specialAof == nil 则将命令原样持久化, 否则持久化 specialAof 中的指令
- specialAof []*reply.MultiBulkReply
- }
- type CmdFunc func(db *DB, args [][]byte) (Redis.Reply, *extra)
以 SET 命令为例:
- func Set(db *DB, args [][]byte) (Redis.Reply, *extra) {
- //....
- var result int
- switch policy {
- case upsertPolicy:
- result = db.Put(key, entity)
- case insertPolicy:
- result = db.PutIfAbsent(key, entity)
- case updatePolicy:
- result = db.PutIfExists(key, entity)
- }
- extra := &extra{toPersist: result> 0} // 若实际写入了数据则 toPresist=true, 若因为 XX 或 NX 选项没有实际写入数据则 toPresist=false
- if result> 0 {
- if ttl != unlimitedTTL { // 使用了 EX 或 NX 选项
- expireTime := time.Now().Add(time.Duration(ttl) * time.Millisecond)
- db.Expire(key, expireTime)
- // 持久化时使用 set key value 和 pexpireat 命令代替 set key value EX ttl 命令
- extra.specialAof = []*reply.MultiBulkReply{
- reply.MakeMultiBulkReply([][]byte{
- []byte("SET"),
- args[0],
- args[1],
- }),
- makeExpireCmd(key, expireTime),
- }
- } else {
- db.Persist(key) // override ttl
- }
- }
- return &reply.OkReply{}, extra
- }
- var pExpireAtCmd = []byte("PEXPIREAT")
- func makeExpireCmd(key string, expireAt time.Time) *reply.MultiBulkReply {
- args := make([][]byte, 3)
- args[0] = pExpireAtCmd
- args[1] = []byte(key)
- args[2] = []byte(strconv.FormatInt(expireAt.UnixNano()/1e6, 10))
- return reply.MakeMultiBulkReply(args)
- }
在处理命令的调度方法中将 aof 命令发送到 channel:
- func (db *DB) Exec(c Redis.Client, args [][]byte) (result Redis.Reply) {
- // ....
- // normal commands
- var extra *extra
- cmdFunc, ok := router[cmd] // 找到命令对应的处理函数
- if !ok {
- return reply.MakeErrReply("ERR unknown command'" + cmd + "'")
- }
- // 使用处理函数执行命令
- if len(args)> 1 {
- result, extra = cmdFunc(db, args[1:])
- } else {
- result, extra = cmdFunc(db, [][]byte{})
- }
- // AOF 持久化
- if config.Properties.AppendOnly {
- if extra != nil && extra.toPersist {
- // 写入 specialAof
- if extra.specialAof != nil && len(extra.specialAof)> 0 {
- for _, r := range extra.specialAof {
- db.addAof(r)
- }
- } else {
- // 写入原始命令
- r := reply.MakeMultiBulkReply(args)
- db.addAof(r)
- }
- }
- }
- return
- }
在异步协程中写入命令:
- func (db *DB) handleAof() {
- for cmd := range db.aofChan {
- // 异步协程在持久化之前会尝试获取锁, 若其他协程持有锁则会暂停持久化操作
- // 锁也保证了每次写入完整的一条指令不会格式错误
- db.pausingAof.RLock()
- if db.aofRewriteChan != nil {
- db.aofRewriteChan <- cmd
- }
- _, err := db.aofFile.Write(cmd.ToBytes())
- if err != nil {
- logger.Warn(err)
- }
- db.pausingAof.RUnlock()
- }
- }
读取过程与协议解析器一节基本相同, 不在正文中赘述: loadAof.
AOF 重写
若我们对键 a 赋值 100 次会在 AOF 文件中产生 100 条指令但只有最后一条指令是有效的, 为了减少持久化文件的大小需要进行 AOF 重写以删除无用的指令.
重写必须在固定不变的数据集上进行, 不能直接使用内存中的数据. Redis 重写的实现方式是进行 fork 并在子进程中遍历数据库内的数据重新生成 AOF 文件. 由于 golang 不支持 fork 操作, 我们只能采用读取 AOF 文件生成副本的方式来代替 fork.
在进行 AOF 重写操作时需要满足两个要求:
若 AOF 重写失败或被中断, AOF 文件需保持重写之前的状态不能丢失数据
进行 AOF 重写期间执行的命令必须保存到新的 AOF 文件中, 不能丢失
因此我们设计了一套比较复杂的流程:
暂停 AOF 写入 -> 更改状态为重写中 -> 复制当前 AOF 文件 -> 恢复 AOF 写入
在重写过程中, 持久化协程在将命令写入文件的同时也将其写入内存中的重写缓存区
重写协程读取 AOF 副本并将重写到临时文件 (tmp.aof) 中
暂停 AOF 写入 -> 将重写缓冲区中的命令写入 tmp.aof -> 使用临时文件 tmp.aof 覆盖 AOF 文件(使用文件系统的 mv 命令保证安全)-> 清空重写缓冲区 -> 恢复 AOF 写入
在不阻塞在线服务的同时进行其它操作是一项必需的能力, AOF 重写的思路在解决这类问题时具有重要的参考价值. 比如 MySQL Online DDL: gh-ost https://github.com/github/gh-ost 采用了类似的策略保证数据一致.
首先准备开始重写操作:
- func (db *DB) startRewrite() (*os.File, error) {
- // 暂停 AOF 写入, 数据会在 db.aofChan 中暂时堆积
- db.pausingAof.Lock()
- defer db.pausingAof.Unlock()
- // 创建重写缓冲区
- db.aofRewriteChan = make(chan *reply.MultiBulkReply, aofQueueSize)
- // 创建临时文件
- file, err := ioutil.TempFile("","aof")
- if err != nil {
- logger.Warn("tmp file create failed")
- return nil, err
- }
- return file, nil
- }
在重写过程中, 持久化协程进行双写:
- func (db *DB) handleAof() {
- for cmd := range db.aofChan {
- db.pausingAof.RLock()
- if db.aofRewriteChan != nil {
- // 数据写入重写缓冲区
- db.aofRewriteChan <- cmd
- }
- _, err := db.aofFile.Write(cmd.ToBytes())
- if err != nil {
- logger.Warn(err)
- }
- db.pausingAof.RUnlock()
- }
- }
执行重写:
- func (db *DB) aofRewrite() {
- file, err := db.startRewrite()
- if err != nil {
- logger.Warn(err)
- return
- }
- // load aof file
- tmpDB := &DB{
- Data: dict.MakeSimple(),
- TTLMap: dict.MakeSimple(),
- Locker: lock.Make(lockerSize),
- interval: 5 * time.Second,
- aofFilename: db.aofFilename,
- }
- tmpDB.loadAof()
- // rewrite aof file
- tmpDB.Data.ForEach(func(key string, raw interface{}) bool {
- var cmd *reply.MultiBulkReply
- entity, _ := raw.(*DataEntity)
- switch val := entity.Data.(type) {
- case []byte:
- cmd = persistString(key, val)
- case *List.LinkedList:
- cmd = persistList(key, val)
- case *set.Set:
- cmd = persistSet(key, val)
- case dict.Dict:
- cmd = persistHash(key, val)
- case *SortedSet.SortedSet:
- cmd = persistZSet(key, val)
- }
- if cmd != nil {
- _, _ = file.Write(cmd.ToBytes())
- }
- return true
- })
- tmpDB.TTLMap.ForEach(func(key string, raw interface{}) bool {
- expireTime, _ := raw.(time.Time)
- cmd := makeExpireCmd(key, expireTime)
- if cmd != nil {
- _, _ = file.Write(cmd.ToBytes())
- }
- return true
- })
- db.finishRewrite(file)
- }
重写完毕后写入缓冲区中的数据并替换正式文件:
- func (db *DB) finishRewrite(tmpFile *os.File) {
- // 暂停 AOF 写入
- db.pausingAof.Lock()
- defer db.pausingAof.Unlock()
- // 将重写缓冲区内的数据写入临时文件
- // 因为 handleAof 已被暂停, 在遍历期间 aofRewriteChan 中不会有新数据
- loop:
- for {
- select {
- case cmd := <-db.aofRewriteChan:
- _, err := tmpFile.Write(cmd.ToBytes())
- if err != nil {
- logger.Warn(err)
- }
- default:
- // 只有 channel 为空时才会进入此分支
- break loop
- }
- }
- // 释放重写缓冲区
- close(db.aofRewriteChan)
- db.aofRewriteChan = nil
- // 使用临时文件代替 aof 文件
- _ = db.aofFile.Close()
- _ = os.Rename(tmpFile.Name(), db.aofFilename)
- // 重新打开文件描述符以保证正常写入
- aofFile, err := os.OpenFile(db.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
- if err != nil {
- panic(err)
- }
- db.aofFile = aofFile
- }
来源: https://www.cnblogs.com/Finley/p/12663636.html