6.824 raft Lab 2D 日志压缩

2022-10-08 09:37:23 浏览数 (2)

一、背景

书接上文6.824 raft Lab 2C 持久化与恢复,本文继续往下讲解日志压缩。

raft通过日志来实现多副本的数据一致,但是日志会不断膨胀,带来两个缺点:数据量大、恢复时间长,因此需要定期压缩一下,生成snapshot。本文实现的源码:6.824 raft Lab 2D 日志压缩

1 何时压缩?

触发压缩的时机一般是以下两种:

  1. 日志的数据量达到阈值(推荐)。
  2. 日志的数量达到阈值。 测试用例是根据日志数量来触发压缩的。

2 谁触发压缩?

snapshot是状态机某一时刻的副本,具体格式依赖存储引擎的实现,比如说:B 树、LSM、哈希表等,6.824是实现一个键值数据库,所以我们采用的是哈希表,在Lab 3可以看到实现。

3 snapshot是否要一致?

目前有两种方案:

  1. Follower的snapshot需要和Leader保持一致,优点是简单,完全向Leader看齐,但是会传输更多的数据,受Leader网络带宽限制。
  2. Follower独立压缩日志,各个peer的checkpoint可能不一样,但Follower数据传输量小,除非很落后才有可能需要拉取Leader的snapshot。 本文选择第二种,Follower只对已经达成一致、已经提交的的日志进行压缩,并不会影响数据一致性,只是达成一致的数据在不同peer上的呈现形式有所不同,由snapshot log组成,比例可能不一样。

二、设计思想

1 接口

日志压缩的实现涉及到三个接口:

  1. Snapshot:raft提供给应用层调用的接口,用于生成snapshot,并截断自己的log,每个peer都可以调用。
  2. InstallSnapshot:当Follower太过落后而Leader已经将这个Follower需要的日志压缩了,此时就需要传输snapshot,本接口接收Leader的snapshot。
  3. CondInstallSnapshot:Follower接收到snapshot后不能够立刻应用并截断日志,raft和状态机都需要应用snapshot,这需要考虑原子性。如果raft应用成功但状态机应用snapshot失败,那么在接下来的时间里客户端读到的数据是不完整的。如果状态机应用snapshot成功但raft应用失败,那么raft会要求重传,状态机应用成功也没啥意义。因此CondInstallSnapshot是异步于raft的,并由应用层调用。

注:raft是被应用层调用的,不能反过来调用应用层,raft可以单独作为一个库支持各种应用层。

2 交互流程

snapshot流程snapshot流程

任意peer的应用层都可以独立压缩已经提交的日志,这个操作不涉及到其他peer,需要持久化。

如果某个peer的日志太落后,缺少的日志已经被Leader压缩了,此时Leader就需要通过InstallSnapshot接口将snapshot直接传输给这个peer,而这个peer会将snapshot通过applyCh上传给自己的应用层,此时不能截断日志,否则万一应用snapshot失败就出事了。最后由应用层先给状态机先snapshot,然后调用CondInstallSnapshot来给raft应用。

三、代码实现

1 接口定义

InstallSnapshotInstallSnapshot
代码语言:go复制
type Raft struct {

	...

	//提交情况
	//log         *RaftLog
	logs        []*LogEntry
	nextIndex   []int
	matchIndex  []int
	commitIndex int
	lastApplied int

	lastIncludeIndex int //快照的最大logIndex
	lastIncludeTerm  int //最后一条压缩日志的term,不是压缩时peer's term
	snapshotOffset   int //快照可能分批次传输
	snapshot         []byte

	//状态机
	applyCond *sync.Cond
	applyChan chan ApplyMsg
}

type InstallSnapshotArgs struct {
	Term             int //leader's term
	LeaderId         int
	LastIncludeIndex int //snapshot中最后一条日志的index
	LastIncludeTerm  int
	Data             []byte //快照
	//Offset           int //此次传输chunk在快照文件的偏移量,快照文件可能很大,因此需要分chunk,此次不分片
	//Done             bool //是否最后一块
}
type InstallSnapshotReply struct {
	Term int
}

分片有点麻烦,下次优化?。

使用snapshot后,logs的下标就有问题了,需要修改,幸好一开始封装了方法,切勿直接使用rf.logsi来操作。另外,为了减少判断,rf.logs0是一个dummy log,起到哨兵的作用,并且rf.logs0.LogTerm是snapshot的最后一条日志的LogTerm,千万别设置成压缩时的peer's term,否则日志匹配时会出问题。

代码语言:go复制
func (rf *Raft) lastLogTermAndLastLogIndex() (int, int) {
	logIndex := rf.lastLogIndex()
	logTerm := rf.logs[logIndex-rf.lastIncludeIndex].LogTerm
	return logTerm, logIndex
}

func (rf *Raft) lastLogIndex() int {
	return len(rf.logs) - 1   rf.lastIncludeIndex
}
func (rf *Raft) logTerm(logIndex int) int {
	return rf.logs[logIndex].LogTerm
}
func (rf *Raft) logEntry(logIndex int) *LogEntry {
	//越界
	if logIndex > rf.lastLogIndex() {
		return rf.logs[0]
	}
	//该日志已被压缩
	logIndex = logIndex - rf.lastIncludeIndex
	if logIndex <= 0 {
		return rf.logs[0]
	}
	return rf.logs[logIndex]
}

2 InstallSnapshot

代码语言:go复制
//接收来自leader的快照,并上传给应用层,通过applyCh写入
//这个函数只是follower为了赶上leader状态的,
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {

	rf.mu.Lock()
	defer rf.mu.Unlock()
	defer func() {
		DPrintf("process InstallSnapshot node[%v] term[%v] lastLogIndex[%d] lastLogTerm[%d] lastIncludeIndex[%d] commitIndex[%d]  received InstallSnapshot, args: %v, reply: %v", rf.me, rf.term, rf.lastLogIndex(), rf.logTerm(rf.lastLogIndex()-rf.lastIncludeIndex), rf.lastIncludeIndex, rf.commitIndex, mr.Any2String(args), mr.Any2String(reply))
	}()
	reply.Term = rf.term
	if rf.term > args.Term || args.Data == nil {
		DPrintf("InstallSnapshot node[%d] term[%d] from node[%d] term[%d], rf.term > args.Term delined, args: %v, reply: %v ", rf.me, rf.term, args.LeaderId, args.Term, mr.Any2String(args), mr.Any2String(reply))
		return
	}
	if rf.term < args.Term {
		rf.role = Follower
		rf.term = args.Term
		rf.votedFor = RoleNone
		rf.leaderId = RoleNone
		rf.persist()
	}
	rf.lastActiveTime = time.Now()
	//只有缺少的数据在快照点之前时才需要快照
	if rf.commitIndex >= args.LastIncludeIndex {
		DPrintf("InstallSnapshot node[%d] term[%d] from node[%d] term[%d], commitIndex[%d], rf.commitIndex >= args.LastIncludeIndex delined, args: %v, reply: %v ", rf.me, rf.term, args.LeaderId, args.Term, rf.commitIndex, mr.Any2String(args), mr.Any2String(reply))

		return
	}
	//接收快照并持久化,至于应用到状态机可以异步做,就算意外下线,也是从日志和快照恢复
	//
	//不能立刻应用快照,需要保证raft和状态机都应用快照成功,放到CondInstallSnapshot中应用
	applyMsg := ApplyMsg{
		SnapshotValid: true,
		Snapshot:      args.Data,
		SnapshotTerm:  args.LastIncludeTerm,
		SnapshotIndex: args.LastIncludeIndex,
	}
	//异步做,及早返回,就算失败raft也会回退,凡是和应用层交互并有可能阻塞的地方都异步做。
	go func() {
		rf.applyChan <- applyMsg
	}()
}

3 Snapshot

代码语言:go复制
//snapshot是应用层状态机的序列化,index表示checkpoint
//peer独立压缩日志
func (rf *Raft) Snapshot(index int, snapshot []byte) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
    //lastApplied是已经应用到状态机的最后一条日志,也是压缩点。
	if index <= rf.lastIncludeIndex || index != rf.lastApplied || index > rf.lastLogIndex() {
		return
	}
	DPrintf("node[%d] role[%d] term[%d] snapshoting, index[%d] commitIndex[%d] lastApplied[%d]", rf.me, rf.role, rf.term, index, rf.commitIndex, rf.lastApplied)
	logs := rf.logs[0:1]
    //dummy log的LogTerm一定要是压缩的最后一条日志的Term
	logs[0].LogTerm = rf.logs[index-rf.lastIncludeIndex].LogTerm
	//本结点最后一条日志在快照点之前,太落后,清空,应用快照,否则截断
	logs = append(logs, rf.logs[index-rf.lastIncludeIndex 1:]...)
	rf.logs = logs
	rf.snapshot = snapshot
	rf.lastIncludeIndex = index
	rf.lastIncludeTerm = logs[0].LogTerm
	rf.persister.SaveStateAndSnapshot(rf.encodeRaftState(), snapshot)
}

独立压缩时checkpoint是lastApplied。从Leader同步snapshot时,snapshot的LastIncludedIndex是commit Index。

4 CondInstallSnapshot

代码语言:go复制
//应用层调用,询问raft是否需要安装这个snapshot,在InstallSnapshot时并不会安装
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {

	rf.mu.Lock()
	defer rf.mu.Unlock()
	//异步应用快照,如果此时commitIndex已经追上来了,就不需要再应用快照了
	if rf.commitIndex > lastIncludedIndex {
		return false
	}
	logs := rf.logs[0:1]
	logs[0].LogTerm = lastIncludedTerm
	//本结点最后一条日志在快照点之前,太落后,清空,应用快照,否则截断
	if rf.lastLogIndex() > lastIncludedIndex {
		logs = append(logs, rf.logs[lastIncludedIndex-rf.lastIncludeIndex 1:]...)
	}
	rf.logs = logs
	rf.snapshot = snapshot
	rf.lastIncludeIndex = lastIncludedIndex
	rf.lastIncludeTerm = lastIncludedTerm

    //给raft安装snapshot,需要更新新的rf.lastApplied、rf.commitIndex
	rf.lastApplied = lastIncludedIndex
	rf.commitIndex = lastIncludedIndex

	rf.persister.SaveStateAndSnapshot(rf.encodeRaftState(), snapshot)
	return true
}

四、测试和总结

1 测试

测试结果测试结果

2 小结

本文简要介绍了日志压缩的实现,减少了数据量和恢复时间。日志压缩由应用层根据日志数量触发,生成snapshot并截断日志,每个peer都可以独立进行。如果某个peer的日志太过于落后就需要复制Leader的snapshot,而peer接收到snapshot后不能立刻应用和截断日志,应该由状态机先应用,raft后应用snapshot。倘若先应用到raft成功了,后应用到状态机失败了,就会导致状态机数据不完整,最好是两者保持原子性。

0 人点赞