一、简要
接上文6.824 raft lab 2A、2B,本文只是简单的添加编码和解码的功能,至于持久化是上层提供的接口,并不属于raft核心逻辑。另外,对raft探测进行了优化,采用按term来探测。
lab 2C代码
二、编码/解码
代码语言:go复制//外层加锁,内层不能够再加锁了
func (rf *Raft) persist() {
// Your code here (2C).
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
//持久化当前term以及是否给其他结点投过票,避免同一个term多次投票的情况
e.Encode(rf.term)
e.Encode(rf.votedFor)
e.Encode(rf.leaderId)
e.Encode(rf.logs)
data := w.Bytes()
//persister是初始化时从应用层传进来的
rf.persister.SaveRaftState(data)
}
//
// restore previously persisted state.
//
//一般刚刚启动时执行
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
// Your code here (2C).
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
rf.mu.Lock()
d.Decode(&rf.term)
d.Decode(&rf.votedFor)
d.Decode(&rf.leaderId)
d.Decode(&rf.logs)
rf.mu.Unlock()
}
目前只持久化了term、votedFor、leaderId和logs这四个字段。term和logs是必须持久化的,votedFor则是为了判断当前term是否已经投票过,避免同一term投两次票。
三、日志探测
目前我采用的思想是由follower来决定下次传输日志的起点,通过在reply中nextIndex字段告诉Leader下次从哪里开始传。
探测过程分为以下几种情况:
- 如果当前结点缺少前一条日志,则nextIndex=len(rf.logs) 1。
- 如果前一条日志term不匹配,则找到改term的第一次出现的位置作为nextIndex,按term来探测。
- 如果前一条日志能够匹配上,则将各节点后续的日志截断,以Leader为主。
1 数据结构
代码语言:go复制//
type AppendEntriesReply struct {
Success bool
Term int
//用于探测日志匹配点
NextIndex int
Msg string
}
2 Leader发送日志
代码语言:go复制// The ticker go routine starts a new election if this peer hasn't received
// heartsbeats recently.
func (rf *Raft) heartBeatLoop() {
for rf.killed() == false {
//改成10ms一次就通过了?
time.Sleep(time.Millisecond * 20)
func() {
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.role != Leader {
return
}
//如果没有超时或者没有需要发送的数据,则直接返回
if time.Now().Sub(rf.lastActiveTime) < HeatBeatTimeout-50 {
return
}
rf.lastActiveTime = time.Now()
for i := 0; i < rf.nPeers; i {
if rf.me == i {
rf.matchIndex[i] = rf.lastLogIndex()
rf.nextIndex[i] = rf.matchIndex[i] 1
continue
}
//DPrintf("lastLogIndex: %v, logs: %v", lastLogIndex, mr.Any2String(rf.logs))
//记录每个node本次发送日志的前一条日志
prevLogIndex := rf.matchIndex[i]
if prevLogIndex > rf.lastLogIndex() {
prevLogIndex = rf.lastLogIndex()
}
//有可能follower的matchIndex比leader还大,此时要担心是否越界
//fmt.Printf("node[%d] role[%v] term[%d] lastLogIndex[%d] matchIndex[%d], log: %vn", rf.me, rf.role, rf.term, rf.lastLogIndex(), rf.matchIndex[i], mr.Any2String(rf.logs))
//fmt.Printf("node[%d] role[%v] term[%d] matchIndex: %vn", rf.me, rf.role, rf.term, mr.Any2String(rf.matchIndex))
argsI := &AppendEntriesArgs{
LogType: HeartBeatLogType,
Term: rf.term,
LeaderId: rf.me,
PrevLogIndex: prevLogIndex,
PrevLogTerm: rf.logTerm(prevLogIndex),
LeaderCommitedIndex: rf.commitIndex, //对上一次日志复制请求的二阶段
}
//本次复制的最后一条日志
lastLogIndex := rf.lastLogIndex()
if rf.matchIndex[i] < lastLogIndex {
argsI.LogType = AppendEntryLogType
argsI.LogEntries = make([]*LogEntry, 0)
//因为此时没有加锁,担心有新日志写入,必须保证每个节点复制的最后一条日志一样才能起到过半提交的效果
argsI.LogEntries = append(argsI.LogEntries, rf.logs[rf.nextIndex[i]:]...)
}
go func(server int, args *AppendEntriesArgs) {
reply := &AppendEntriesReply{}
ok := rf.sendAppendEntries(server, args, reply)
if !ok {
return
}
rf.mu.Lock()
defer rf.mu.Unlock()
//如果term变了,表示该结点不再是leader,什么也不做
if rf.term != args.Term {
return
}
//发现更大的term,本结点是旧leader
if reply.Term > rf.term {
rf.term = reply.Term
rf.votedFor = RoleNone
rf.leaderId = RoleNone
rf.role = Follower
rf.persist()
return
}
if reply.Success {
rf.nextIndex[server] = reply.NextIndex
rf.matchIndex[server] = rf.nextIndex[server] - 1
//提交到哪个位置需要根据中位数来判断,中位数表示过半提交的日志位置,
//每次提交日志向各结点发送的日志并不完全一样,不能光靠是否发送成功来判断
matchIndexSlice := make([]int, rf.nPeers)
for index, matchIndex := range rf.matchIndex {
matchIndexSlice[index] = matchIndex
}
sort.Slice(matchIndexSlice, func(i, j int) bool {
return matchIndexSlice[i] < matchIndexSlice[j]
})
//fmt.Printf("matchIndexSlice: %v, newcommitIndex: %v, lastLogIndex: %vn", mr.Any2String(matchIndexSlice), matchIndexSlice[rf.nPeers/2], rf.lastLogIndex())
newCommitIndex := matchIndexSlice[rf.nPeers/2]
//不能提交不属于当前term的日志
if newCommitIndex > rf.commitIndex && rf.logs[newCommitIndex].LogTerm == rf.term {
DPrintf("id[%d] role[%v] commitIndex %v update to newcommitIndex %v, command: %v", rf.me, rf.role, rf.commitIndex, newCommitIndex, rf.logs[newCommitIndex])
//如果commitIndex比自己实际的日志长度还大,这时需要减小
if newCommitIndex > rf.lastLogIndex() {
rf.commitIndex = rf.lastLogIndex()
} else {
rf.commitIndex = newCommitIndex
}
}
} else {
//follower缺少的之前的日志,探测缺少的位置
//后退策略,可以按term探测,也可以二分,此处采用线性探测,简单一些
rf.nextIndex[server] = reply.NextIndex
rf.matchIndex[server] = reply.NextIndex - 1
}
}(i, argsI)
}
}()
}
}
3 Follower接收日志
代码语言:go复制//如果收到term比自己大的AppendEntries请求,则表示发生过新一轮的选举,此时拒绝掉,等待超时选举
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
defer func() {
DPrintf("id[%d] role[%v] args: %v, reply: %v", rf.me, rf.role, mr.Any2String(args), mr.Any2String(reply))
}()
reply.Term = rf.term
reply.Success = false
//拒绝旧leader请求
if args.Term < rf.term {
return
}
//发现一个更大的任期,转变成这个term的follower,leader、follower--> follower
if args.Term > rf.term {
rf.term = args.Term
rf.role = Follower
//发现term大于等于自己的日志复制请求,则认其为主
rf.votedFor = RoleNone
rf.leaderId = RoleNone
rf.persist()
}
rf.leaderId = args.LeaderId
rf.votedFor = args.LeaderId
rf.lastActiveTime = time.Now()
//还缺少前面的日志或者前一条日志匹配不上
if args.PrevLogIndex > rf.lastLogIndex() {
reply.NextIndex = rf.lastLogIndex() 1
return
}
//前一条日志的任期不匹配,找到冲突term首次出现的地方
if args.PrevLogTerm != rf.logTerm(args.PrevLogIndex) {
index := args.PrevLogIndex
term := rf.logTerm(index)
for ; index > 0 && rf.logTerm(index) == term; index-- {
}
reply.NextIndex = index 1
return
}
//args.PrevLogIndex<=lastLogIndex,有可能发生截断的情况
if rf.lastLogIndex() > args.PrevLogIndex {
rf.logs = rf.logs[:args.PrevLogIndex 1]
}
rf.logs = append(rf.logs, args.LogEntries...)
if args.LeaderCommitedIndex > rf.commitIndex {
rf.commitIndex = args.LeaderCommitedIndex
if rf.lastLogIndex() < rf.commitIndex {
rf.commitIndex = rf.lastLogIndex()
}
}
rf.matchIndex[rf.me] = rf.lastLogIndex()
rf.nextIndex[rf.me] = rf.matchIndex[rf.me] 1
rf.persist()
reply.Success = true
reply.NextIndex = rf.nextIndex[rf.me]
}
四、测试
分布式场景下测试是真的头疼,只能够打印日志,最好结合测试代码来定位问题。