6.824 lab2 raft A、B实验

2022-09-28 17:57:32 浏览数 (2)

一、背景

为突破单机的容量和性能瓶颈,现在都采用分布式存储系统,而分布式存储系统就是采用分片扩大容量、多副本提供容错和性能的能力,可以简单、自动地横向扩展。

1 一致性算法介绍

raft就是非常著名的维护多副本之间一致性的算法,中心化并且强一致。强一致性一般满足过半原则,能够保证集群可用时至少有一个节点拥有最新数据。而中心化有以下特点:

  1. 受限于leader节点的性能,其他节点均分leader的网络带宽,资源利用不充分。后来产生了一种流水线推送数据的方式,将控制流和数据流分离,客户端可以将数据推送给任意一个副本,然后请求leader发起提交命令,可以充分利用网络带宽,提高系统吞吐量。
  2. 主从一致容易控制,可以保证副本的更新顺序一致。
  3. 去中心化的一致性算法,虽然没有单机性能瓶颈,但是达成一致的时间窗口难以控制,而且需要检测冲突、解决冲突,一般追求极致可用性而不是一致性的系统会采用这种算法,例如redis、dynamo。

2 两阶段提交

raft的两阶段提交和分布式事务的两阶段提交针对的对象不一样,分布式事务是将多个不同分片的节点耦合在一起,而raft的两阶段提交只是针对同一个分片的多个副本。但是它们面临的问题是一样的,两阶段提交随时都可能有节点下线,导致了各种安全性问题。

第一阶段发起预提交请求:复制日志,过半成功后发起第二阶段提交请求去更新follower的commitIndex,使得能够向状态机应用日志。注意,第二阶段的提交请求可以携带下一批次日志,用来减少往返时延,也就是每个请求既可以是上一条日志的提交请求,又是下一条日志的预提交请求。

二、实验搭建

  1. 源码下载https://pdos.csail.mit.edu/6.824/labs/lab-raft.html
  2. 初始化go mod init
  3. 替换依赖
  4. 本人仓库6.824 2022版https://github.com/yutianneng/6.824/

三、raft架构

Lab 2A、2B主要包括选举循环、心跳循环、RequestVote及AppendEntries实现。这些循环都是定时循环,之前打算通过条件变量去控制,能够更实时一些,但发现rpc调用次数过多以及重复调用,所以最终还是定时循环,至少能够能够批量传送,有待改进?。

1 状态转换

状态转换状态转换

一个节点只能身处一个状态,Follower和Candidate处于选主循环,一旦超时立刻变为Candidate发起投票,Leader身处心跳循环,一旦超时会立刻向其他节点发送心跳或者日志,心跳时间比选举超时时间要短一些。

2 日志复制

日志应用日志应用

针对日志复制增加了一个日志应用循环,定时将已提交的日志上传给应用层。

3 安全性问题

针对安全性问题,我分析了以下几点:

  1. 提交的日志不能丢失,丢失则会影响客户端体验,必须保证。
  2. 没有成功提交的日志可能被认为是提交的,因为过半原则 两阶段提交也并非尽善尽美。
  3. 过半原则能够保证,在集群可用时,至少有一个节点拥有最新数据,但是投票时节点并不能判断这条日志是已经提交的,还是没有提交成功的,所以一律按照提交成功对待,避免丢失数据。
  4. 两阶段提交的诸多可能性: 4.1 可能第一阶段预提交后leader下线了,新选出的Leader可能会包含这条未提交的日志,会强行复制给其他节点。 4.2 可能leader发送了部分第二阶段提交请求后下线了,导致部分节点提交,但是并没有达到过半,而部分节点就认为这条日志成功提交了。 4.3 可能提交成功了。

Leader上一定包括所有已经成功提交的日志,但也可能包含没有成功提交的日志,以Leader为主,Leader拥有的日志就算没有成功提交,也要按照提交成功来对待,强行复制给其他节点,其他节点上的日志可能被覆盖,但这种日志必然没有被成功提交,覆盖掉也没有关系。这两种原则就导致了没提交成功的数据也有可能被认为提交成功了,需要客户端能够保证幂等性。

实验中通过current_term区分每一轮选票,确保一轮最多一个Leader,通过lastLogTerm和lastLogIndex,避免丢失提交的数据。

四、源码

1 数据结构定义

代码语言:c复制
//上传给状态机
type ApplyMsg struct {
	CommandValid bool
	Command      interface{}
	CommandIndex int
	CommandTerm  int

	//For 2D:
	SnapshotValid bool
	Snapshot      []byte
	SnapshotTerm  int
	SnapshotIndex int
}

//
// A Go object implementing a single Raft peer.
//
type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()
	nPeers    int //所有节点数量

	timeoutInterval time.Duration //follower leader candidate
	lastActiveTime  time.Time     //超时开始计算时间,收到心跳时会更新

	//选举
	term     int
	role     MemberRole
	leaderId int
	votedFor int

	//提交情况
	//log         *RaftLog
	logs        []*LogEntry
	nextIndex   []int
	matchIndex  []int
	commitIndex int 
	lastApplied int

	//状态机
	applyCond *sync.Cond
	applyChan chan ApplyMsg //应用层与raft交互的channel,用于上传数据给应用层
}

type MemberRole int

const (
	Leader    MemberRole = 1
	Follower  MemberRole = 2
	Candidate MemberRole = 3

	RoleNone = -1
	None     = 0
)

func (m MemberRole) String() string {
	switch m {
	case Leader:
		return "Leader"
	case Follower:
		return "Follower"
	case Candidate:
		return "Candidate"
	}
	return "Unknown"
}

type LogType int

const (
	HeartBeatLogType   LogType = 1
	AppendEntryLogType LogType = 2
)

func (l LogType) String() string {
	switch l {
	case HeartBeatLogType:
		return "HeartBeatLogType"
	case AppendEntryLogType:
		return "AppendEntryLogType"
	}
	return "Unknown"
}

type LogEntry struct {
	LogTerm int
	Command interface{}
}

const (
	ElectionTimeout = 200 * time.Millisecond
	HeatBeatTimeout = 150 * time.Millisecond
)

//选举时需要传递自己拥有的最后一条log的term和index
type RequestVoteArgs struct {
	// Your data here (2A, 2B).
	Term         int
	CandidateId  int
	LastLogIndex int
	LastLogTerm  int
}

//
// example RequestVote RPC reply structure.
// field names must start with capital letters!
//
type RequestVoteReply struct {
	Term        int
	VoteGranted bool
}

//
// 心跳或者日志追加
//
type AppendEntriesArgs struct {
	LogType  LogType
	LeaderId int
	Term     int //leader currentTerm
	//用于日志复制,确保前面日志能够匹配
	PrevLogTerm         int
	PrevLogIndex        int
	LeaderCommitedIndex int
	LogEntries          []*LogEntry
}

//
// 心跳或者日志追加
//
type AppendEntriesReply struct {
	Success bool
	Term    int
	Msg     string
}

2 初始化raft节点

代码语言:c复制
//初始化raft, 所有raft的任务都要另起协程,测试文件采用的是协程模拟rpc
func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {

	rf := &Raft{
		mu:        sync.Mutex{},
		peers:     peers,
		persister: persister,
		me:        me,
		dead:      -1,
		nPeers:    len(peers),

		leaderId:       RoleNone,
		term:           None,
		votedFor:       RoleNone,
		role:           Follower,
		lastActiveTime: time.Now(),
		//lastHeartBeatTime: time.Now(),
		timeoutInterval: randElectionTimeout(),
		commitIndex:     None,
		lastApplied:     None,
		applyChan:       applyCh,
	}
	rf.applyCond = sync.NewCond(&rf.mu)
	DPrintf("starting new raft node, id[%d], lastActiveTime[%v], timeoutInterval[%d]", me, rf.lastActiveTime.UnixMilli(), rf.timeoutInterval.Milliseconds())

	rf.logs = make([]*LogEntry, 0)
	rf.nextIndex = make([]int, rf.nPeers)
	rf.matchIndex = make([]int, rf.nPeers)
	for i := 0; i < rf.nPeers; i   {
		rf.nextIndex[i] = 1
		rf.matchIndex[i] = 0
	}
    //有效logIndex、logTerm从1开始,刚开始写入一条dummy log是为了减少判空
	rf.logs = append(rf.logs, &LogEntry{
		LogTerm: 0,
	})

	// initialize from state persisted before a crash
	rf.readPersist(persister.ReadRaftState())
	// start ticker goroutine to start elections
	go rf.electionLoop()
	go rf.heartBeatLoop()
	go rf.applyLogLoop(applyCh)

	DPrintf("starting raft node[%d]", rf.me)

	return rf
}

3 选主循环

3.1 electionLoop

代码语言:c复制
func (rf *Raft) electionLoop() {
	for rf.killed() == false {
		time.Sleep(time.Millisecond * 1)
		func() {
			rf.mu.Lock()
			defer rf.mu.Unlock()
            //Follower、Candidate在这个循环,Leader在heartBeatLoop
			if rf.role == Leader {
				return
			}
			if time.Now().Sub(rf.lastActiveTime) < rf.timeoutInterval {
				//不超时不需要进入下一步,只需要接收RequestVote和AppendEntries请求即可
				return
			}
			//超时处理逻辑
			if rf.role == Follower {
				rf.role = Candidate
			}
			DPrintf("become candidate... node[%v] term[%v] role[%v] lastActiveTime[%v], timeoutInterval[%d], now[%v]", rf.me, rf.term, rf.role, rf.lastActiveTime.UnixMilli(), rf.timeoutInterval.Milliseconds(), time.Now().Sub(rf.lastActiveTime).Milliseconds())
			rf.lastActiveTime = time.Now()
			rf.timeoutInterval = randElectionTimeout()
			rf.votedFor = rf.me
			rf.term  
			rf.persist()
			lastLogTerm, lastLogIndex := rf.lastLogTermAndLastLogIndex()
			rf.mu.Unlock()
            
			maxTerm, voteGranted := rf.becomeCandidate(lastLogIndex, lastLogTerm)
			rf.mu.Lock()
			//DPrintf("node[%d] get vote num[%d]", rf.me, totalVote)

			//在这过程中接收到更大term的请求,导致退化为follower
			if rf.role != Candidate {
				//DPrintf("node[%d] role[%v] failed to leader, voteGranted[%d], totalVote[%d]", rf.me, rf.role, voteGranted, totalVote)
				return
			}
            //出现更大term后就退回到Follower
			if maxTerm > rf.term {
				rf.role = Follower
				rf.term = maxTerm
				rf.votedFor = RoleNone
				rf.leaderId = RoleNone
				rf.persist()
			} else if voteGranted > rf.nPeers/2 {
                //成为Leader
				rf.leaderId = rf.me
				rf.role = Leader
                //立刻发送心跳
				rf.lastActiveTime = time.Unix(0, 0)
			}
			DPrintf("node[%d] role[%v] maxTerm[%d] voteGranted[%d] nPeers[%d]", rf.me, rf.role, maxTerm, voteGranted, rf.nPeers)
		}()
	}
}

//并发去请求投票,并返回投票结果
func (rf *Raft) becomeCandidate(lastLogIndex, lastLogTerm int) (int, int) {

	type RequestVoteResult struct {
		peerId int
		resp   *RequestVoteReply
	}
	voteChan := make(chan *RequestVoteResult, rf.nPeers-1)
	args := &RequestVoteArgs{
		Term:         rf.term,
		CandidateId:  rf.me,
		LastLogIndex: lastLogIndex,
		LastLogTerm:  lastLogTerm,
	}
	for i := 0; i < rf.nPeers; i   {
		if rf.me == i {
			continue
		}
		go func(server int, args *RequestVoteArgs) {
			reply := &RequestVoteReply{}
			ok := rf.sendRequestVote(server, args, reply)
			if ok {
				voteChan <- &RequestVoteResult{
					peerId: server,
					resp:   reply,
				}
			} else {
				voteChan <- &RequestVoteResult{
					peerId: server,
					resp:   nil,
				}
			}
		}(i, args)
	}

	maxTerm := rf.term
	voteGranted := 1
	totalVote := 1
	for i := 0; i < rf.nPeers-1; i   {
		select {
		case vote := <-voteChan:
			totalVote  
			if vote.resp != nil {
				if vote.resp.VoteGranted {
					voteGranted  
				}
				//出现更大term就退回follower
				if vote.resp.Term > maxTerm {
					maxTerm = vote.resp.Term
				}
			}
		}
        //一旦过半投票就立刻返回,否则会被部分故障节点拖慢达成一致的时间导致超时
		if voteGranted > rf.nPeers/2 || totalVote == rf.nPeers {
			return maxTerm, voteGranted
		}
	}
	return maxTerm, voteGranted
}

3.2 RequestVote实现

代码语言:c复制
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	// Your code here (2A, 2B).
	rf.mu.Lock()
	defer func() {
		DPrintf("node[%d] role[%v] received vote from node[%d], now[%d], args: %v, reply: %v", rf.me, rf.role, args.CandidateId, time.Now().UnixMilli(), mr.Any2String(args), mr.Any2String(reply))
	}()
	defer rf.mu.Unlock()
	reply.Term = rf.term
	reply.VoteGranted = false
	//不接收小于自己term的请求
	if rf.term > args.Term {
		return
	}

	if args.Term > rf.term {
		rf.role = Follower //leader转换为follower
		rf.term = args.Term
		//需要比较最新一条日志的情况再决定要不要投票
		rf.votedFor = RoleNone
		rf.leaderId = RoleNone
		//rf.lastActiveTime = time.Now()
		//rf.lastHeartBeatTime = time.Now()
		//rf.timeoutInterval = randElectionTimeout()
		rf.persist()
	}
	//避免重复投票
	if rf.votedFor == RoleNone || rf.votedFor == args.CandidateId {
		lastLogTerm, lastLogIndex := rf.lastLogTermAndLastLogIndex()
		//最后一条日志任期更大或者任期一样但是更长
		if args.LastLogTerm > lastLogTerm || (lastLogTerm == args.LastLogTerm && lastLogIndex <= args.LastLogIndex) {
			rf.role = Follower
			rf.votedFor = args.CandidateId
			rf.leaderId = args.CandidateId
			rf.lastActiveTime = time.Now()
			rf.timeoutInterval = randElectionTimeout()
			reply.VoteGranted = true
			rf.persist()
		}
	}
}

投票原则:

  1. 拒绝给term比自己小的节点投票。
  2. 如果term比自己大,则提升自己的term且重置参数,在这个term自己还未投票。
  3. 在自己未投过票的前提下: 3.1 如果对方最后一条日志的logTerm更大,表示自己缺少某一轮的数据,对方比自己新,给其投票。 3.2 如果对方最后一条日志的logTerm和自己一样,并且日志长度不比自己短,为其投票。

4 心跳循环

4.1 heartBeatLoop

代码语言:c复制
func (rf *Raft) heartBeatLoop() {
	for rf.killed() == false {
		//不要太频繁了,否则会导致rpc太多
		time.Sleep(time.Millisecond * 20)
		func() {
			rf.mu.Lock()
			defer rf.mu.Unlock()
            //只有Leader才能进入心跳循环
			if rf.role != Leader {
				return
			}
			//如果没有超时,则直接返回
			if time.Now().Sub(rf.lastActiveTime) < HeatBeatTimeout-50 {
				return
			}
			rf.lastActiveTime = time.Now()
			for i := 0; i < rf.nPeers; i   {
				if rf.me == i {
					rf.matchIndex[i] = rf.lastLogIndex()
					rf.nextIndex[i] = rf.matchIndex[i]   1
					continue
				}
				//DPrintf("lastLogIndex: %v, logs: %v", lastLogIndex, mr.Any2String(rf.logs))
				//记录每个node本次发送日志的前一条日志
				prevLogIndex := rf.matchIndex[i]
				argsI := &AppendEntriesArgs{
					LogType:             HeartBeatLogType,
					Term:                rf.term,
					LeaderId:            rf.me,
					PrevLogIndex:        prevLogIndex,
					PrevLogTerm:         rf.logTerm(prevLogIndex),
					LeaderCommitedIndex: rf.commitIndex, //对上一次日志复制请求的二阶段
				}

				//本次复制的最后一条日志
				lastLogIndex := rf.lastLogIndex()
				if rf.matchIndex[i] < lastLogIndex {
					argsI.LogType = AppendEntryLogType
					argsI.LogEntries = make([]*LogEntry, 0)
					//因为此时没有加锁,担心有新日志写入,必须保证每个节点复制的最后一条日志一样才能起到过半提交的效果
					argsI.LogEntries = append(argsI.LogEntries, rf.logs[rf.nextIndex[i]:lastLogIndex 1]...)
				}

				go func(server int, args *AppendEntriesArgs) {
					reply := &AppendEntriesReply{}
					ok := rf.sendAppendEntries(server, args, reply)
					if !ok {
						return
					}
					rf.mu.Lock()
					defer rf.mu.Unlock()
					//如果term变了,表示该结点不再是leader,什么也不做
					if rf.term != args.Term {
						return
					}
					//发现更大的term,本结点是旧leader
					if reply.Term > rf.term {
						rf.term = reply.Term
						rf.votedFor = RoleNone
						rf.leaderId = RoleNone
						rf.role = Follower
						rf.persist()
						return
					}
					if reply.Success {
						rf.nextIndex[server]  = len(args.LogEntries)
						rf.matchIndex[server] = rf.nextIndex[server] - 1
						//提交到哪个位置需要根据中位数来判断,中位数表示过半提交的日志位置,
						//每次提交日志向各结点发送的日志并不完全一样,不能光靠是否发送成功来判断
						matchIndexSlice := make([]int, rf.nPeers)
						for index, matchIndex := range rf.matchIndex {
							matchIndexSlice[index] = matchIndex
						}
						sort.Slice(matchIndexSlice, func(i, j int) bool {
							return matchIndexSlice[i] < matchIndexSlice[j]
						})
						//fmt.Printf("matchIndexSlice: %v, newcommitIndex: %vn", mr.Any2String(matchIndexSlice), matchIndexSlice[rf.nPeers/2])
						newCommitIndex := matchIndexSlice[rf.nPeers/2]
						//不能提交不属于当前term的日志
						if newCommitIndex > rf.commitIndex && rf.logs[newCommitIndex].LogTerm == rf.term {
							DPrintf("id[%d] role[%v] commitIndex %v update to newcommitIndex %v, command: %v", rf.me, rf.role, rf.commitIndex, newCommitIndex, rf.logs[newCommitIndex])
							rf.commitIndex = newCommitIndex
						}
					} else {
						//follower缺少的之前的日志,探测缺少的位置
						//后退策略,可以按term探测,也可以二分,此处采用线性探测,简单一些
						rf.nextIndex[server] -= 1
						if rf.nextIndex[server] < 1 {
							rf.nextIndex[server] = 1
						}
					}
				}(i, argsI)
			}
		}()
	}
}

4.2 AppendEntries

代码语言:c复制
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {

	rf.mu.Lock()
	defer rf.mu.Unlock()
	defer func() {
		DPrintf("id[%d] role[%v] args: %v, reply: %v", rf.me, rf.role, mr.Any2String(args), mr.Any2String(reply))
	}()

	reply.Term = rf.term
	reply.Success = false
	//拒绝旧leader请求
	if args.Term < rf.term {
		return
	}
	//发现一个更大的任期,转变成这个term的follower,leader、follower--> follower
	if args.Term > rf.term {
		rf.term = args.Term
		rf.role = Follower
		//发现term大于等于自己的日志复制请求,则认其为主
		rf.votedFor = RoleNone
		rf.leaderId = RoleNone
		rf.persist()
	}
	rf.leaderId = args.LeaderId
    rf.votedFor = args.LeaderId
	rf.lastActiveTime = time.Now()
	//还缺少前面的日志或者前一条日志匹配不上
	if args.PrevLogIndex > rf.lastLogIndex() || args.PrevLogTerm != rf.logTerm(args.PrevLogIndex) {
		return
	}
	//args.PrevLogIndex<=lastLogIndex,有可能发生截断的情况
	if rf.lastLogIndex() > args.PrevLogIndex {
		rf.logs = rf.logs[:args.PrevLogIndex 1]
	}
	rf.logs = append(rf.logs, args.LogEntries...)
	rf.persist()
	if args.LeaderCommitedIndex > rf.commitIndex {
		rf.commitIndex = args.LeaderCommitedIndex
		if rf.lastLogIndex() < rf.commitIndex {
			rf.commitIndex = rf.lastLogIndex()
		}
	}
	reply.Success = true
}

处理AppendEntries请求的策略:

  1. args.term比自己小,直接拒绝掉。
  2. args.term比自己大,则本节点落后了,需要更新term并为此节点投票。
  3. 如果前一条日志的logIndex或者logTerm匹配不上,拒绝掉,需要进行探测。
  4. 如果前一条日志匹配上了,但是在本节点在匹配点后还有日志,那么就截断,以Leader为主。
  5. 更新本结点commitIndex,其实这是对上一批日志的commitIndex。

5 应用层向raft层写入数据

代码语言:go复制
//写入数据
func (rf *Raft) Start(command interface{}) (int, int, bool) {
	rf.mu.Lock()
	defer rf.mu.Unlock()

	if rf.role != Leader {
		return -1, -1, false
	}
	entry := &LogEntry{
		LogTerm: rf.term,
		Command: command,
	}
	rf.logs = append(rf.logs, entry)
	index := rf.lastLogIndex()
	term := rf.logTerm(index)
	//写入后立刻持久化
	rf.persist()
	DPrintf("node[%d] term[%d] role[%v] add entry: %v, logIndex[%d]", rf.me, rf.term, rf.role, mr.Any2String(entry), index)
	return index, term, true
}

6 上传日志给应用层

代码语言:c复制
//日志提交循环,不采用定期唤醒,而是条件变量
func (rf *Raft) applyLogLoop(applyCh chan ApplyMsg) {

	for !rf.killed() {
		time.Sleep(time.Millisecond * 10)
		applyMsgs := make([]ApplyMsg, 0)
		func() {
			rf.mu.Lock()
			defer rf.mu.Unlock()

			//没有数据需要上传给应用层
			if rf.lastApplied >= rf.commitIndex {
				return
			}
			for rf.lastApplied < rf.commitIndex {
				rf.lastApplied  
				applyMsgs = append(applyMsgs, ApplyMsg{
					CommandValid: true,
					Command:      rf.logs[rf.lastApplied].Command,
					CommandIndex: rf.lastApplied,
				})
			}
		}()
		go func() {
			//锁外提交给应用
			for i := 0; i < len(applyMsgs); i   {
				DPrintf("id[%v] role[%v] upload log to application, lastApplied[%d], commitIndex[%d]", rf.me, rf.role, applyMsgs[i].CommandIndex, rf.commitIndex)

				applyCh <- applyMsgs[i]
			}
		}()
	}
}

7 其他

代码语言:go复制
func randElectionTimeout() time.Duration {
	return ElectionTimeout   time.Duration(rand.Uint32())%ElectionTimeout
}

// return currentTerm and whether this server
// believes it is the leader.
//上层用来检测节点的状态
func (rf *Raft) GetState() (int, bool) {
	rf.mu.Lock()
	defer rf.mu.Unlock()

	return rf.term, rf.role == Leader
}

func (rf *Raft) lastLogTermAndLastLogIndex() (int, int) {
	logIndex := len(rf.logs) - 1
	logTerm := rf.logs[logIndex].LogTerm
	return logTerm, logIndex
}

func (rf *Raft) lastLogIndex() int {
	return len(rf.logs) - 1
}
func (rf *Raft) logTerm(logIndex int) int {
	return rf.logs[logIndex].LogTerm
}

五、调试

测试用例是创建一组raft节点,然后这些节点底层通过channel来通信,实际上是协程之间通信,并非真正的RPC。

调试时会出现各种问题,我们只能通过日志的形式来排查,最好是能够去阅读测试用例,看它想要检测什么问题,也好对症下药,锁定范围。

1 检测Leader

代码语言:go复制
func (cfg *config) checkOneLeader() int {
	for iters := 0; iters < 10; iters   {
		ms := 450   (rand.Int63() % 100)
		time.Sleep(time.Duration(ms) * time.Millisecond)

		leaders := make(map[int][]int)
		for i := 0; i < cfg.n; i   {
            //connected表示这个节点是否在网络中
			if cfg.connected[i] {
				if term, leader := cfg.rafts[i].GetState(); leader {
					leaders[term] = append(leaders[term], i)
				}
			}
		}

		lastTermWithLeader := -1
		for term, leaders := range leaders {
            //Leader节点超过1则Panic
			if len(leaders) > 1 {
				cfg.t.Fatalf("term %d has %d (>1) leaders", term, len(leaders))
			}
			if term > lastTermWithLeader {
				lastTermWithLeader = term
			}
		}

		if len(leaders) != 0 {
			return leaders[lastTermWithLeader][0]
		}
	}
    //重复十次,如果都没有leader,表示选主有问题
	cfg.t.Fatalf("expected one leader, got none")
	return -1
}

2 写入日志

代码语言:go复制
//写入日志cmd,10秒内成功
func (cfg *config) one(cmd interface{}, expectedServers int, retry bool) int {
	t0 := time.Now()
	starts := 0
	for time.Since(t0).Seconds() < 10 && cfg.checkFinished() == false {
		// try all the servers, maybe one is the leader.
		index := -1
		for si := 0; si < cfg.n; si   {
			starts = (starts   1) % cfg.n
			var rf *Raft
			cfg.mu.Lock()
			if cfg.connected[starts] {
				rf = cfg.rafts[starts]
			}
			cfg.mu.Unlock()
			if rf != nil {
				//fmt.Printf("connected: %v, raft: %vn", mr.Any2String(cfg.connected), mr.Any2String(rf))
                //写入日志
				index1, _, ok := rf.Start(cmd)
				if ok {
					index = index1
					break
				}
			}
		}
		//fmt.Printf("cfg index: %vn", index)
		if index != -1 {
			// somebody claimed to be the leader and to have
			// submitted our command; wait a while for agreement.
			t1 := time.Now()
            //判断2s内是否能够将日志复制到全部结点
			for time.Since(t1).Seconds() < 2 {
				nd, cmd1 := cfg.nCommitted(index)
				if nd > 0 && nd >= expectedServers {
					// committed
					if cmd1 == cmd {
						// and it was the command we submitted.
						return index
					}
				}
				time.Sleep(20 * time.Millisecond)
			}
			if retry == false {
				cfg.t.Fatalf("one(%v) failed to reach agreement", cmd)
			}
		} else {
			time.Sleep(50 * time.Millisecond)
		}
	}
	//fmt.Printf("cfg.logs: %vn", mr.Any2String(cfg.logs))
	if cfg.checkFinished() == false {
		cfg.t.Fatalf("one(%v) failed to reach agreement", cmd)
	}
	return -1
}

3 测试结果

lab 2Alab 2A
lab 2Blab 2B

六、改进点

1 日志探测

目前采用的是线性探测,而优化的方式是以下两种:

  1. 按term探测,如果匹配失败,则Follower返回上一term的第一条日志。
  2. 二分探测。

0 人点赞