开发者

使用go语言实现Redis持久化的示例代码

目录
  • 定义 AofHandler
    • 实现 NewAofHandler
    • 实现 AddAof
    • 实现 HandleAof
  • 实现 Aof 落盘功能
    • 实现 LoadAof

      Redis 是一个内存数据库,如果你把进程杀掉,那么里面存储的数据都会消失,那么这篇文章就是来解决 redis 持久化的问题

      我们在 redis.conf 文件中增加两个配置

      appendonly yes
      appendfilename appendonly.aof
      
      • appenonly 表示只追加
      • appendfilename 表示追加到那什么文件中

      指令: *3\r\n$3\r\nSET\r\n$3\r\nKEY\r\n$5\r\nvalue\r\n 落在 appendonly.aof 文件中

      *3
      $3
      SET
      $3
      KEY
      $5
      value
      

      这里要实现的功能就是把用户发过来的指令,用 RESP 的形式记录在 appendonly.aof 文件中

      这个文件是在机器的硬盘上,当 redis 停了之后,内存中的数据都没了,但这个文件会保存下

      redis 重启后,会读取这个文件,把之前内存中的数据再次加载回来

      定义 AofHandler

      在项目下新建文件 aof/aof.go

      在里面定义一个 AofHandler 结构体,它的作用就是用来处理 appendonly.aof 文件

      type AofHandler struct {
      	database    databaseface.Database // 持有 db,db 有业务核心
      	aofFile     *os.File              // 持有 aof 文件
      	aofFilename string                // aof 文件名
      	currentDB   int                   // 当前 db
      	aofChan     chan *payload         // 写文件的缓冲区
      }
      

      这里有注意的是 aofChan,它是写文件的缓冲区

      因为从文件中读取指令,指令是非常密集的,但是将指令写入硬盘时非常慢的,我们又不可能每次都等待指令写完成后再去操作 redis

      这时我们就把所有想写入 aof 文件的指令放到 aofChan 中,然后在另一个 goroutine 中去写入硬盘

      所以这个 aofChan 的类型是 payload 结构体

      type CmdLine = [][]byte
      type payload struct {
        cmdLine CmdLine // 指令
        dbIndex int     // db 索引
      }
      

      AofHandler 结构体定义好之后,我们需要定义一个 NewAofHandler 函数来初始化 AofHandler 结构体

      还需要定义一个 AddAof 方法,用来往 aofChan 中添加指令

      放到缓冲区之后,还需要一个方法 HandleAof 将指令写入硬盘

      最后还要实现一个从硬盘加载 aof 文件到内存的的函数 LoadAof

      实现 NewAofHandler

      NewAofHandler 函数用来初始化 AofHandler 结构体

      func NewAofHandler(database databaseface.Database) (*AofHandler, error) {
        // 初始化 AofHandler 结构体
        handler := &AofHandler{}
        // 从配置文件中读取 aof 文件名
        handler.aofFilename = config.Properties.AppendFilename
        // 持有 db
        handler.database = database
        // 从硬盘加载 aof 文件
        handler.LoadAof()
        // 打开 aof 文件, 如果不存在则创建
        aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RdwR, 0600)
        if err != nil {
          return nil, err
        }
        // 持有 aof 文件
        handler.aofFile = aofFile
        // 初始化 aofChan
        handler.aofChan = make(chan *payload, aofBufferSize)
        // 启动一个 goroutine 处理 aofChan
        go func() {
          handler.HandleAof()
        }()
        // 返回 AofHandler 结构体
        return handler, nil
      }
      

      实现 AddAof

      AddAof 方法用来往 aofChan 中添加指令,它不做落盘的操作

      因为在执行指令的时候,等待它落盘的话,效率太低了,所以我们把指令放到 aofChan 中,然后在另一个 goroutine 中去处理

      func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) {
        // 如果配置文件中的 appendonly 为 true 并且 aofChan 不为 nil
        if config.Properties.AppendOnly && handler.aofChan != nil {
          // 往 aofChan 中添加指令
          handler.aofChan <- &payload{
            cmdLine: cmdLine,
            dbIndex: dbIndex,
          }
        }
      }
      

      实现 HandleAof

      HandleAof 方法用来处理 aofChan 中的指令,将指令写入硬盘

      currentDB 记录的是当前工作的 DB,如果切换了 DB,会在 aof 文件中插入 select 0 这样切换 DB 的语句

      func (handler *AofHandler) HandleAof() {
        // 初始化 currentDB
        handler.currentDB = 0
        // 遍历 chan
        for p := range handler.aofChan {
          // 如果当前 db 不等于上一次工作的 db,就要插入一条 select 语句
          if p.dbIndex != handler.currentDB {
            // 我们要把 select 0 编码成 RESP 格式
            // 也就是 *2\r\n$6\r\nSELECT\r\n$1\r\n0\r\n
            data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()
            // 写入 aof 文件
            _, err := handler.aofFile.Write(data)
            if err != nil {
              logger.Warn(err)
              continue
            }
            // 更新 currentDB
            handler.currentDB = p.dbIndex
          }
          // 这里是插入正常的指令
          data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes()
          // 写入 aof 文件
          _, err := handler.aofFile.Write(data)
          if err != nil {
            logger.Warn(err)
          }
        }
      }
      

      实现 Aof 落盘功能

      我们之前在实现指令的部分,都是直接执行指令,现在我们要把指令写入 aof 文件

      我们在 StandaloneDatabase 结构体中增加一个 aofHandler 字段

      type StandaloneDatabase struct {
        dbSet      []*DB
        aofHandler *aof.AofHandler // 增加落盘功能
      }
      

      然后新建 database 时需要对 aofHandler 进行初始化

      func NewStandaloneDatabase() *StandaloneDatabase {
        // ...
        // 先看下配置文件中的 appendonly 是否为 true
        if config.Properties.AppendOnly {
          // 初始化 aofHandler
          aofHandler, err := aof.NewAofHandler(database)
          if err != nil {
            panic(err)
          }
          // 持有 aofHandler
          database.aofHjsandler = aofHandler
          // 遍历 dbSet
          for _, db := range dat编程客栈abase.dbSet {
            // 解决闭包问题
            sdb := db
            // 为每个 db 添加 AddAof 方法
            // 这个 addAof 方法是在执行指令的时候调用的
            sdb.addAof = func(line CmdLine) {
              database.aofHandler.AddAof(sdb.index, line)
            }
          }
        }
        return database
      }
      

      这里要注意的是 addAof 方法,它是在执行指令的时候调用的

      因为我们需要在指令中调用 Addaof 函数,实现指令写入 aof 文件

      但是在指令中,���们只能拿到 dbdb 上又没有操作 aof 相关的方法,所以我们需要在 db 中增加一个 addAof 方法

      type DB struct {
      	index  编程客栈int           // 数据的编号
      	data   dict.Dict     // 数据类型
      	addAof func(CmdLine) // 每个 db 都有一个 addAof 方法
      }
      

      然后就在需要落盘的指令中调用 addAof 方法

      DEL 方法需要记录下来,因为 DEL 方法是删除数据的,如果不记录下来,那么 aof 文件中的数据就会和内存中的数据不一致

      // DEL K1 K2 K3
      func DEL(db *DB, args [][]byte) resp.Reply {
        deleted := db.Removes(keys...)
        // delete 大于 0 说明有数据被删除
        if deleted > 0 {
          db.addAof(utils.ToCmdLine2("DEL", args...))
        }
      }
      

      FLUSHDB 方法也需要记录下来,因为 FLUSHDB 方法是删除当前 DB 中的所有数据

      // FLUSHDB
      func FLUSHDB(db *DB, args [][]byte) resp.Reply {
      	db.addAof(utils.ToCmdLine2("FLUSEHDB", args...))
      }
      

      RENAME 和 RENAMENX 方法也需要记录下来,因为这两个方法是修改 key 的名字

      // RENAME K1 K2
      func RENAME(db *DB, args [][]byte) resp.Reply {
        db.addAof(utils.ToCmdLine2("RENAME", args...))
      }
      
      // RENAMENX K1 K2
      fujsnc RENAMENX(db *DB, args [][]byte) resp.Reply {
        db.addAof(utils.ToCmdLine2("RENAMENX", args...))
      }
      

      SET 和 SETNX 方法也需要记录下来,因为这两个方法是设置数据的

      // SET K1 v
      func SET(db *DB, args [][]byte) resp.Reply {
        db.addAof(utils.ToCmdLphpine2("SET", args...))
      }
      
      // SETNX K1 v
      func SETNX(db *DB, args [][]byte) resp.Reply {
        db.addAof(utils.ToCmdLine2("SETNX", args...))
      }
      

      GETSET 方法也需要记录下来,因为这个方法是设置数据的同时返回旧数据

      // GETSET K1 v1
      func GETSET(db *DB, args [][]byte) resp.Reply {
        db.addAof(utils.ToCmdLine2("GETSET", args...))
      }
      

      实现 LoadAof

      LoadAof 方法用来从硬盘加载 aof 文件到内存

      aof 中的指令是符合 RESP 协议的,我们就可以把这些指令当成用户发过来的指令,执行就可以了

      func (handler *AofHandler) LoadAof() {
        // 打开 aof 文件
        file, err := os.Open(handler.aofFilename)
        if err != nil {
          logger.Error(err)
          return
        }
        // 关闭文件
        defer func() {
          _ = file.Close()
        }()
        // 创建一个 RESP 解析器,将 file 传入,解析后的指令会放到 chan 中
        ch := parser.ParseStream(file)
        fackConn := &connection.Connection{}
        // 遍历 chan,执行指令
        for p := range ch {
          if p.Err != nil {
            // 如果是 EOF,说明文件读取完毕
            if p.Err == io.EOF {
              break
            }
            logger.Error(err)
            continue
          }
          if p.Data == nil {
            logger.Error("empty payload")
            continue
          }
          // 将指令转换成 MultiBulkReply 类型
          r, ok := p.Data.(*reply.MultiBulkReply)
          if !ok {
            logger.Error("exec multi mulk")
            continue
          }
          // 执行指令
          rep := handler.database.Exec(fackConn, r.Args)
          if reply.IsErrReply(rep) {
            logger.Error(rep)
          }
        }
      }
      

      到此这篇关于使用go语言实现Redis持久化的示例代码的文章就介绍到这了,更多相关go实现Redis持久化内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

      0

      上一篇:

      下一篇:

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      最新数据库

      数据库排行榜