Etcd数据恢复机制

2023-08-19 09:30:58 浏览数 (1)

WAL简介

使用过存储软件的都知道,为了应对写入性能和灾难恢复,各个存储软件都有相关的机制来保障, 比如:

  • mysql的redo log 和undo log
  • zookeeper的事务日志
  • etcd的wal日志

等等,那为啥都需要额外写入这样一种日志呢?

因为离散写的场景下会造成过多的随机写,更合理的方式是顺序 io 写到一个日志文件里,然后等待时机把数据进行合并后变更数据文件,这个日志就是 wal ( write ahead log) 预写日志. 另外还有一点,直接修改数据文件的话,如果写到一半发生了 crash,那么该页就损坏了. 如果 DB 关了或者挂了,但还有部分数据在内存和 wal 日志中,还未变更到 db 文件中. 那没关系,只需在启动后从 wal 日志中重放日志做还原即可.

源码地址:https://github.com/etcd-io/etcd

数据恢复

入口函数:mian -> StartEtcd -> etcdserver.NewServer -> bootstrap

查看bootstrap方法:

代码语言:javascript复制
func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
   if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
      cfg.Logger.Warn(
         "exceeded recommended request limit",
         zap.Uint("max-request-bytes", cfg.MaxRequestBytes),
         zap.String("max-request-size", humanize.Bytes(uint64(cfg.MaxRequestBytes))),
         zap.Int("recommended-request-bytes", recommendedMaxRequestBytes),
         zap.String("recommended-request-size", recommendedMaxRequestBytesString),
      )
   }

   if terr := fileutil.TouchDirAll(cfg.Logger, cfg.DataDir); terr != nil {
      return nil, fmt.Errorf("cannot access data directory: %v", terr)
   }

   if terr := fileutil.TouchDirAll(cfg.Logger, cfg.MemberDir()); terr != nil {
      return nil, fmt.Errorf("cannot access member directory: %v", terr)
   }
   ss := bootstrapSnapshot(cfg)
   prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout())
   if err != nil {
      return nil, err
   }

   haveWAL := wal.Exist(cfg.WALDir())
   st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
   backend, err := bootstrapBackend(cfg, haveWAL, st, ss)
   if err != nil {
      return nil, err
   }
   var bwal *bootstrappedWAL

   if haveWAL {
      if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
         return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
      }
      bwal = bootstrapWALFromSnapshot(cfg, backend.snapshot)
   }

   cluster, err := bootstrapCluster(cfg, bwal, prt)
   if err != nil {
      backend.Close()
      return nil, err
   }

   s, err := bootstrapStorage(cfg, st, backend, bwal, cluster)
   if err != nil {
      backend.Close()
      return nil, err
   }

   if err = cluster.Finalize(cfg, s); err != nil {
      backend.Close()
      return nil, err
   }
   raft := bootstrapRaft(cfg, cluster, s.wal)
   return &bootstrappedServer{
      prt: prt,
      ss: ss,
      storage: s,
      cluster: cluster,
      raft: raft,
   }, nil
}

上面代码逻辑大致如下 :

  • 检查数据目录是否可访问
  • 删除快照目录下的 tmp文件
  • 恢复最新快照数据

快照恢复

代码语言:javascript复制
func recoverSnapshot(cfg config.ServerConfig, st v2store.Store, be backend.Backend, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer, ss *snap.Snapshotter) (*raftpb.Snapshot, backend.Backend, error) {
   // Find a snapshot to start/restart a raft node
   walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir())
   if err != nil {
      return nil, be, err
   }
   // snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding
   // bwal log entries
   snapshot, err := ss.LoadNewestAvailable(walSnaps)
   if err != nil && !errors.Is(err, snap.ErrNoSnapshot) {
      return nil, be, err
   }

   if snapshot != nil {
      if err = st.Recovery(snapshot.Data); err != nil {
         cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err))
      }

      if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil {
         cfg.Logger.Error("illegal v2store content", zap.Error(err))
         return nil, be, err
      }

      cfg.Logger.Info(
         "recovered v2 store from snapshot",
         zap.Uint64("snapshot-index", snapshot.Metadata.Index),
         zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))),
      )

      if be, err = serverstorage.RecoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
         cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
      }
      // A snapshot db may have already been recovered, and the old db should have
      // already been closed in this case, so we should set the backend again.
      ci.SetBackend(be)

      s1, s2 := be.Size(), be.SizeInUse()
      cfg.Logger.Info(
         "recovered v3 backend from snapshot",
         zap.Int64("backend-size-bytes", s1),
         zap.String("backend-size", humanize.Bytes(uint64(s1))),
         zap.Int64("backend-size-in-use-bytes", s2),
         zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))),
      )
      if beExist {
         // TODO: remove kvindex != 0 checking when we do not expect users to upgrade
         // etcd from pre-3.0 release.
         kvindex := ci.ConsistentIndex()
         if kvindex < snapshot.Metadata.Index {
            if kvindex != 0 {
               return nil, be, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index)
            }
            cfg.Logger.Warn(
               "consistent index was never saved",
               zap.Uint64("snapshot-index", snapshot.Metadata.Index),
            )
         }
      }
   } else {
      cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!")
   }
   return snapshot, be, nil
}

上面代码逻辑大致如下:

  • 通过wal文件取出快照目录下的最新快照
  • 将最新快照数据加载到内存中:从快照数据中可以反序列化出:快照数据、对应的任期号、索引号
  • 检查内存快照数据是否合法
  • 使用最新快照恢复db数据

WAL恢复

代码语言:javascript复制
// openWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
// after the position of the given snap in the WAL.
// The snap must have been previously saved to the WAL, or this call will panic.
func openWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (*wal.WAL, *raftpb.HardState, []raftpb.Entry, *raftpb.Snapshot, *snapshotMetadata) {
   var walsnap walpb.Snapshot
   if snapshot != nil {
      walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
   }
   repaired := false
   for {
      w, err := wal.Open(cfg.Logger, cfg.WALDir(), walsnap)
      if err != nil {
         cfg.Logger.Fatal("failed to open WAL", zap.Error(err))
      }
      if cfg.UnsafeNoFsync {
         w.SetUnsafeNoFsync()
      }
      wmetadata, st, ents, err := w.ReadAll()
      if err != nil {
         w.Close()
         // we can only repair ErrUnexpectedEOF and we never repair twice.
         if repaired || !errors.Is(err, io.ErrUnexpectedEOF) {
            cfg.Logger.Fatal("failed to read WAL, cannot be repaired", zap.Error(err))
         }
         if !wal.Repair(cfg.Logger, cfg.WALDir()) {
            cfg.Logger.Fatal("failed to repair WAL", zap.Error(err))
         } else {
            cfg.Logger.Info("repaired WAL", zap.Error(err))
            repaired = true
         }
         continue
      }
      var metadata etcdserverpb.Metadata
      pbutil.MustUnmarshal(&amp;metadata, wmetadata)
      id := types.ID(metadata.NodeID)
      cid := types.ID(metadata.ClusterID)
      meta := &amp;snapshotMetadata{clusterID: cid, nodeID: id}
      return w, &amp;st, ents, snapshot, meta
   }
}

上面代码逻辑大致如下:

  • 根据上面拿到的快照数据,到wal目录中拿到日志索引号在快照数据索引号之后的日志,遍历满足条件的记录进行数据恢复。 具体查看ReadAll方法

0 人点赞