WAL简介
使用过存储软件的都知道,为了应对写入性能和灾难恢复,各个存储软件都有相关的机制来保障, 比如:
- mysql的redo log 和undo log
- zookeeper的事务日志
- etcd的wal日志
等等,那为啥都需要额外写入这样一种日志呢?
因为离散写的场景下会造成过多的随机写,更合理的方式是顺序 io 写到一个日志文件里,然后等待时机把数据进行合并后变更数据文件,这个日志就是 wal ( write ahead log) 预写日志. 另外还有一点,直接修改数据文件的话,如果写到一半发生了 crash,那么该页就损坏了. 如果 DB 关了或者挂了,但还有部分数据在内存和 wal 日志中,还未变更到 db 文件中. 那没关系,只需在启动后从 wal 日志中重放日志做还原即可.
源码地址:https://github.com/etcd-io/etcd
数据恢复
入口函数:mian -> StartEtcd -> etcdserver.NewServer -> bootstrap
查看bootstrap
方法:
func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
cfg.Logger.Warn(
"exceeded recommended request limit",
zap.Uint("max-request-bytes", cfg.MaxRequestBytes),
zap.String("max-request-size", humanize.Bytes(uint64(cfg.MaxRequestBytes))),
zap.Int("recommended-request-bytes", recommendedMaxRequestBytes),
zap.String("recommended-request-size", recommendedMaxRequestBytesString),
)
}
if terr := fileutil.TouchDirAll(cfg.Logger, cfg.DataDir); terr != nil {
return nil, fmt.Errorf("cannot access data directory: %v", terr)
}
if terr := fileutil.TouchDirAll(cfg.Logger, cfg.MemberDir()); terr != nil {
return nil, fmt.Errorf("cannot access member directory: %v", terr)
}
ss := bootstrapSnapshot(cfg)
prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout())
if err != nil {
return nil, err
}
haveWAL := wal.Exist(cfg.WALDir())
st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
backend, err := bootstrapBackend(cfg, haveWAL, st, ss)
if err != nil {
return nil, err
}
var bwal *bootstrappedWAL
if haveWAL {
if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
}
bwal = bootstrapWALFromSnapshot(cfg, backend.snapshot)
}
cluster, err := bootstrapCluster(cfg, bwal, prt)
if err != nil {
backend.Close()
return nil, err
}
s, err := bootstrapStorage(cfg, st, backend, bwal, cluster)
if err != nil {
backend.Close()
return nil, err
}
if err = cluster.Finalize(cfg, s); err != nil {
backend.Close()
return nil, err
}
raft := bootstrapRaft(cfg, cluster, s.wal)
return &bootstrappedServer{
prt: prt,
ss: ss,
storage: s,
cluster: cluster,
raft: raft,
}, nil
}
上面代码逻辑大致如下 :
- 检查数据目录是否可访问
- 删除快照目录下的 tmp文件
- 恢复最新快照数据
快照恢复
代码语言:javascript复制func recoverSnapshot(cfg config.ServerConfig, st v2store.Store, be backend.Backend, beExist bool, beHooks *serverstorage.BackendHooks, ci cindex.ConsistentIndexer, ss *snap.Snapshotter) (*raftpb.Snapshot, backend.Backend, error) {
// Find a snapshot to start/restart a raft node
walSnaps, err := wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir())
if err != nil {
return nil, be, err
}
// snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding
// bwal log entries
snapshot, err := ss.LoadNewestAvailable(walSnaps)
if err != nil && !errors.Is(err, snap.ErrNoSnapshot) {
return nil, be, err
}
if snapshot != nil {
if err = st.Recovery(snapshot.Data); err != nil {
cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err))
}
if err = serverstorage.AssertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil {
cfg.Logger.Error("illegal v2store content", zap.Error(err))
return nil, be, err
}
cfg.Logger.Info(
"recovered v2 store from snapshot",
zap.Uint64("snapshot-index", snapshot.Metadata.Index),
zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))),
)
if be, err = serverstorage.RecoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
}
// A snapshot db may have already been recovered, and the old db should have
// already been closed in this case, so we should set the backend again.
ci.SetBackend(be)
s1, s2 := be.Size(), be.SizeInUse()
cfg.Logger.Info(
"recovered v3 backend from snapshot",
zap.Int64("backend-size-bytes", s1),
zap.String("backend-size", humanize.Bytes(uint64(s1))),
zap.Int64("backend-size-in-use-bytes", s2),
zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))),
)
if beExist {
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
// etcd from pre-3.0 release.
kvindex := ci.ConsistentIndex()
if kvindex < snapshot.Metadata.Index {
if kvindex != 0 {
return nil, be, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", cfg.BackendPath(), kvindex, snapshot.Metadata.Index)
}
cfg.Logger.Warn(
"consistent index was never saved",
zap.Uint64("snapshot-index", snapshot.Metadata.Index),
)
}
}
} else {
cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!")
}
return snapshot, be, nil
}
上面代码逻辑大致如下:
- 通过wal文件取出快照目录下的最新快照
- 将最新快照数据加载到内存中:从快照数据中可以反序列化出:快照数据、对应的任期号、索引号
- 检查内存快照数据是否合法
- 使用最新快照恢复db数据
WAL恢复
代码语言:javascript复制// openWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
// after the position of the given snap in the WAL.
// The snap must have been previously saved to the WAL, or this call will panic.
func openWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (*wal.WAL, *raftpb.HardState, []raftpb.Entry, *raftpb.Snapshot, *snapshotMetadata) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
repaired := false
for {
w, err := wal.Open(cfg.Logger, cfg.WALDir(), walsnap)
if err != nil {
cfg.Logger.Fatal("failed to open WAL", zap.Error(err))
}
if cfg.UnsafeNoFsync {
w.SetUnsafeNoFsync()
}
wmetadata, st, ents, err := w.ReadAll()
if err != nil {
w.Close()
// we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || !errors.Is(err, io.ErrUnexpectedEOF) {
cfg.Logger.Fatal("failed to read WAL, cannot be repaired", zap.Error(err))
}
if !wal.Repair(cfg.Logger, cfg.WALDir()) {
cfg.Logger.Fatal("failed to repair WAL", zap.Error(err))
} else {
cfg.Logger.Info("repaired WAL", zap.Error(err))
repaired = true
}
continue
}
var metadata etcdserverpb.Metadata
pbutil.MustUnmarshal(&metadata, wmetadata)
id := types.ID(metadata.NodeID)
cid := types.ID(metadata.ClusterID)
meta := &snapshotMetadata{clusterID: cid, nodeID: id}
return w, &st, ents, snapshot, meta
}
}
上面代码逻辑大致如下:
- 根据上面拿到的快照数据,到wal目录中拿到日志索引号在快照数据索引号之后的日志,遍历满足条件的记录进行数据恢复。 具体查看ReadAll方法