6.824 raft Lab 3 kvRaft

2022-10-17 10:06:11 浏览数 (2)

一、背景

前面实现了raft协议,本文实现一个单机键-值数据库,并通过raft建立主从架构,使得能够容错,但是没有分片。

1 基本功能

这个键-值数据库需要实现以下几点功能:

server端:

  1. 能够支持Get、Put、Append三种操作。
  2. 实现一个状态机存放数据。
  3. 支持客户端重试,实现幂等性。
  4. 实现线性读。
  5. 实现快照压缩数据。

client端:

  1. 能够定位到Leader,并且支持重试。
  2. 单个client实例串行调用server端。

2 测试结果

Lab 3A 测试结果Lab 3A 测试结果
Lab 3B 测试结果Lab 3B 测试结果

二、交互流程和详细设计

交互图交互图

client端:

  1. 客户端需要负载均衡并记录上一次server id。
  2. 将读请求按照写请求的形式执行,实现线性读。

server端:

  1. 写请求处理过程是异步的,需要向raft写入日志,然后异步等待日志提交到状态机。 a. 先记录上下文,然后通过raft.Start写入日志,最后阻塞在waitCh上等待唤醒。
  2. 日志应用协程会不断的从applyCh中读取已提交的日志,此时需要考虑幂等性,相同的请求不要处理两次。 a. 添加去重表,如果日志的Request Id小于等于该client的lastRequestId,表示已经执行过,则过滤掉,达到幂等性效果,否则就写入到状态机,并更新lastRequestId。然后唤醒写协程,让其响应client。
  3. 根据日志数据量触发主动压缩日志。 a. 需要考虑持久化的字段,包括状态机、压缩点、去重表。去重表是:向状态机应用已经提交的日志时过滤已经执行过的请求。如果server宕掉了,就会先读快照,然后重新向状态机apply已经提交的日志进行,此时还是需要去重,所以去重表需要持久化。

三、代码实现

1 client实现

1.1 定义

代码语言:go复制
var clientGerarator int32
//UniqueRequestId=clientId<<32 nextRequestId
type Clerk struct {
	mu              sync.Mutex
	servers         []*labrpc.ClientEnd
	lastRpcServerId int //上次请求的server
	clientId        int //唯一
	nextRequestId   uint64 //递增
}

func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
	ck := new(Clerk)
	ck.servers = servers
	ck.mu = sync.Mutex{}
	ck.lastRpcServerId = 0
	ck.clientId = int(atomic.AddInt32(&clientGerarator, 1))
	ck.nextRequestId = 0
	return ck
}

func (ck *Clerk) currentRpcServerId() int {
	//ck.mu.Lock()
	//defer ck.mu.Unlock()
	return ck.lastRpcServerId
}
func (ck *Clerk) setRpcServerId(rpcServerId int) {
	//ck.mu.Lock()
	//defer ck.mu.Unlock()
	ck.lastRpcServerId = rpcServerId
	ck.lastRpcServerId %= len(ck.servers)
}

1.2 实现

代码语言:go复制
func (ck *Clerk) Get(key string) string {

	start := time.Now()
	defer func() {
		DPrintf("client Get cost: %v", time.Now().Sub(start).Milliseconds())
	}()
	ck.mu.Lock()
	defer ck.mu.Unlock()
	args := &GetArgs{
		ClientId:  ck.clientId,
		RequestId: atomic.AddUint64(&ck.nextRequestId, 1), //
		Key:       key,
	}
	rpcServerId := ck.currentRpcServerId()

	for {
		reply := &GetReply{}
		ok := ck.servers[rpcServerId].Call("KVServer.Get", args, reply)
		//DPrintf("client Get, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
		if !ok {
			rpcServerId  
			rpcServerId %= len(ck.servers)
		} else if reply.Err == OK {
			ck.setRpcServerId(rpcServerId) //记录调用成功的server,便于下次调用
			return reply.Value
		} else {
			rpcServerId  
			rpcServerId %= len(ck.servers)
		}
		time.Sleep(time.Millisecond * 1)
	}
}

func (ck *Clerk) PutAppend(key string, value string, op string) {

	start := time.Now()

	ck.mu.Lock()
	defer ck.mu.Unlock()
	args := &PutAppendArgs{
		ClientId:  ck.clientId,
		RequestId: atomic.AddUint64(&ck.nextRequestId, 1),
		Key:       key,
		Value:     value,
		Op:        op,
	}
	defer func() {
		DPrintf("client Put cost: %v,op: %v, key: %v, value: %v, args: %v", time.Now().Sub(start).Milliseconds(), op, key, value, mr.Any2String(args))
	}()
	rpcServerId := ck.currentRpcServerId()

	for {
		reply := &PutAppendReply{}
		ok := ck.servers[rpcServerId].Call("KVServer.PutAppend", args, reply)
		//DPrintf("client Put, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
		if !ok {
			DPrintf("client Put, ok: %v, rpcServerId: %d, args: %v, reply: %v", ok, rpcServerId, mr.Any2String(args), mr.Any2String(reply))
			rpcServerId  
			rpcServerId %= len(ck.servers)
		} else if reply.Err == OK {
			ck.setRpcServerId(rpcServerId)
			DPrintf("client Put, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
			return
		} else {
			DPrintf("client Put, rpcServerId: %d, args: %v, reply: %v", rpcServerId, mr.Any2String(args), mr.Any2String(reply))
			rpcServerId  
			rpcServerId %= len(ck.servers)
		}
		time.Sleep(time.Millisecond * 1)
	}
}

func (ck *Clerk) Put(key string, value string) {
	ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
	ck.PutAppend(key, value, "Append")
}

客户端实现比较简单,两者都不断的重试,直至请求成功。

2 server实现

2.1 定义

代码语言:go复制
type OpType string

const (
	OpTypeGet    = "Get"
	OpTypePut    = "Put"
	OpTypeAppend = "Append"
)

type Op struct {
	ClientId       int
	RequestId      uint64
	OpType         OpType
	Key            string
	Value          string
	StartTimestamp int64
}

type OpContext struct {
	ClientId        int
	RequestId       uint64
	UniqueRequestId uint64 //两者结合才是唯一ID
	Op              *Op
	Term            int
	WaitCh          chan string //用于实现写协程和日志应用循环协程的交互
}

func NewOpContext(op *Op, term int) *OpContext {
	return &OpContext{
		ClientId:        op.ClientId,
		RequestId:       op.RequestId,
		UniqueRequestId: UniqueRequestId(op.ClientId, op.RequestId),
		Op:              op,
		Term:            term,
		WaitCh:          make(chan string, 1), //缓冲区1是防止阻塞日志应用协程
	}
}

type KVServer struct {
	mu      sync.Mutex
	me      int
	rf      *raft.Raft
	applyCh chan raft.ApplyMsg
	dead    int32 // set by Kill()

	maxraftstate int // snapshot if log grows this big

	// Your definitions here.
	kvStore          map[string]string     //k-v对,状态机
	opContextMap     map[uint64]*OpContext //用于每个请求的上下文
	lastRequestIdMap map[int]uint64        //clientId-->lastRequestId,维持幂等性,需要客户端能够保证串行
}

2.2 PutAppend实现

代码语言:go复制
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {

	op := &Op{
		ClientId:       args.ClientId,
		RequestId:      args.RequestId,
		OpType:         OpType(args.Op),
		Key:            args.Key,
		Value:          args.Value,
		StartTimestamp: time.Now().UnixMilli(),
	}
	term := 0
	isLeader := false
	reply.Err = ErrWrongLeader
	if term, isLeader = kv.rf.GetState(); !isLeader {
		return
	}

	start := time.Now()
	defer func() {
		DPrintf("server PutAppend cost: %v, requestId: %d, node: %d, leaderId: %d", time.Now().Sub(start).Milliseconds(), op.RequestId, kv.me, kv.rf.LeaderId())
	}()
	kv.mu.Lock()
	//可能存在前一次请求超时,但是这个请求实际上执行成功了,那么就直接return掉
	if lastRequestId, ok := kv.lastRequestIdMap[op.ClientId]; ok && lastRequestId >= op.RequestId {
		reply.Err = OK
		kv.mu.Unlock()
		return
	}
	opContext := NewOpContext(op, term)
	kv.opContextMap[UniqueRequestId(op.ClientId, op.RequestId)] = opContext
	kv.mu.Unlock()
	_, _, ok := kv.rf.Start(*op)
	defer func() {
		//DPrintf("server PutAppend, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
		kv.mu.Lock()
		delete(kv.opContextMap, UniqueRequestId(op.ClientId, op.RequestId))
		kv.mu.Unlock()
	}()
	if !ok {
		return
	}
	//阻塞等待
	select {
	case <-opContext.WaitCh:
		reply.Err = OK
	case <-time.After(time.Millisecond * 1000):
		reply.Err = ErrTimeout
	}
}

请求执行前需要判断是否已经执行过了,如果已经执行,就直接返回OK。如果没执行过,则通过raft同步到其他节点上,然后阻塞在waitCh上等待该条日志提交,需要有超时机制,不要一直阻塞。最后执行结束时需要删除掉上下文,要不然会有内存泄漏。

2.3 Get

代码语言:go复制
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
	op := &Op{
		ClientId:       args.ClientId,
		RequestId:      args.RequestId,
		OpType:         OpTypeGet,
		Key:            args.Key,
		StartTimestamp: time.Now().UnixMilli(),
	}
	//Append不能先append然后将日志传给raft
	term := 0
	isLeader := false
	if term, isLeader = kv.rf.GetState(); !isLeader {
		reply.Err = ErrWrongLeader
		return
	}
	start := time.Now()
	defer func() {
		DPrintf("server Get cost: %v, node: %v, leaderId: %d", time.Now().Sub(start).Milliseconds(), kv.me, kv.rf.LeaderId())
	}()
	kv.mu.Lock()
	opContext := NewOpContext(op, term)
	kv.opContextMap[opContext.UniqueRequestId] = opContext
	kv.mu.Unlock()
	_, _, ok := kv.rf.Start(*op)

	defer func() {
		//DPrintf("server Get, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
		kv.mu.Lock()
		delete(kv.opContextMap, opContext.UniqueRequestId)
		kv.mu.Unlock()
	}()
	if !ok {
		reply.Err = ErrWrongLeader
		return
	}
	//阻塞等待
	select {
	case c := <-opContext.WaitCh:
		reply.Err = OK
		reply.Value = c
	case <-time.After(time.Millisecond * 1000):
		reply.Err = ErrTimeout
	}
}

为了实现线性读,将读请求当成写请求执行,流程大体和写请求一样,但不需要幂等性逻辑。

2.4 日志应用循环

代码语言:go复制
//串行写状态机
func (kv *KVServer) applyStateMachineLoop() {

	for !kv.killed() {

		select {
		case applyMsg := <-kv.applyCh:
			if applyMsg.CommandValid {
				func() {
					kv.mu.Lock()
					defer kv.mu.Unlock()
					op := applyMsg.Command.(Op)
					//保证幂等性
					if op.RequestId <= kv.lastRequestIdMap[op.ClientId] {
						return
					}
					switch op.OpType {
					case OpTypePut:
						kv.kvStore[op.Key] = op.Value
						kv.lastRequestIdMap[op.ClientId] = op.RequestId
					case OpTypeAppend:
						kv.kvStore[op.Key]  = op.Value
						kv.lastRequestIdMap[op.ClientId] = op.RequestId
					case OpTypeGet:
						//Get请求不需要更新lastRequestId
					}
					DPrintf("op: %v, value: %v, node: %v cost: %v,requestId: %v, stateMachine: %v", mr.Any2String(op), kv.kvStore[op.Key], kv.me, time.Now().UnixMilli()-op.StartTimestamp, op.RequestId, mr.Any2String(kv.kvStore))
					val := kv.kvStore[op.Key]
					//使得写入的client能够响应
					if c, ok := kv.opContextMap[UniqueRequestId(op.ClientId, op.RequestId)]; ok {
						c.WaitCh <- val
					}
				}()
			}
		}
	}
}

另起协程不断的从applyCh中读取已提交的日志。首先,幂等性逻辑,如果op.RequestId <= kv.lastRequestIdMapop.ClientId,则表示该请求已经执行过,不能再次执行。然后更新状态机,最后唤醒写协程。

3 snapshot

在Lab 2D中已经讲解过raft如何进行压缩、同步snapshot等,而在本实验主要考虑:

  1. 应用层主动压缩snapshot。
  2. apply从leader拉取到的snapshot。

3.1 压缩字段

上文讲过需要保存压缩点、状态机、去重表。

代码语言:go复制
func (kv *KVServer) maybeSnapshot(index int) {
	if kv.maxraftstate == -1 {
		return
	}
	if kv.persister.RaftStateSize() > kv.maxraftstate {
		DPrintf("maybeSnapshot starting, index: %v", index)
		kv.rf.Snapshot(index, kv.encodeSnapshot(index))
	}
}

//上层加锁
func (kv *KVServer) encodeSnapshot(lastIncludedIndex int) []byte {
	
	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	e.Encode(kv.kvStore)
	e.Encode(lastIncludedIndex)
	e.Encode(kv.lastRequestIdMap) //持久化每个client的最大已执行过的写请求
	return w.Bytes()
}

//上层加锁
func (kv *KVServer) decodeSnapshot(snapshot []byte) bool {

	if len(snapshot) == 0 {
		return true
	}
	r := bytes.NewBuffer(snapshot)
	d := labgob.NewDecoder(r)

	if err := d.Decode(&kv.kvStore); err != nil {
		return false
	}
	if err := d.Decode(&kv.lastIncludedIndex); err != nil {
		return false
	}
	//持久化每个client的最大已执行过的写请求
	if err := d.Decode(&kv.lastRequestIdMap); err != nil {
		return false
	}
	return true
}

3.2 apply快照

代码语言:go复制
//串行写状态机
func (kv *KVServer) applyStateMachineLoop() {

	for !kv.killed() {

		select {
		case applyMsg := <-kv.applyCh:
			if applyMsg.CommandValid {
				//...省略
			} else if applyMsg.SnapshotValid {
				func() {
					kv.mu.Lock()
					defer kv.mu.Unlock()
					//将snapshot apply到状态机
					if kv.decodeSnapshot(applyMsg.Snapshot) {
						//截断日志
						kv.rf.CondInstallSnapshot(applyMsg.SnapshotTerm, applyMsg.SnapshotIndex, applyMsg.Snapshot)
					}
				}()
			}
		}
		DPrintf("snapshot size: %v, stateMachine: %v", kv.persister.SnapshotSize(), mr.Any2String(kv.kvStore))
	}
}

3.3 主动触发压缩

在将日志apply到状态机时根据日志数据量决定是否日志压缩。

代码语言:go复制
func (kv *KVServer) applyStateMachineLoop() {

	for !kv.killed() {

		select {
		case applyMsg := <-kv.applyCh:
			if applyMsg.CommandValid {
				func() {
					kv.mu.Lock()
					defer kv.mu.Unlock()
					op := applyMsg.Command.(Op)
					//保证幂等性
					if op.RequestId <= kv.lastRequestIdMap[op.ClientId] {
						return
					}
					//过滤掉snapshot前的日志
					if applyMsg.CommandIndex <= kv.lastIncludedIndex && op.OpType != OpTypeGet {
						if c, ok := kv.opContextMap[UniqueRequestId(op.ClientId, op.RequestId)]; ok {
							c.WaitCh <- "0"
						}
						return
					}
					switch op.OpType {
					case OpTypePut:
						kv.kvStore[op.Key] = op.Value
						kv.lastRequestIdMap[op.ClientId] = op.RequestId
						//尝试触发日志压缩
						kv.maybeSnapshot(applyMsg.CommandIndex)
					case OpTypeAppend:
						kv.kvStore[op.Key]  = op.Value
						kv.lastRequestIdMap[op.ClientId] = op.RequestId
						//尝试触发日志压缩
						kv.maybeSnapshot(applyMsg.CommandIndex)
					case OpTypeGet:
						//Get请求不需要更新lastRequestId
					}
					DPrintf("op: %v, value: %v, node: %v cost: %v,requestId: %v, stateMachine: %v", mr.Any2String(op), kv.kvStore[op.Key], kv.me, time.Now().UnixMilli()-op.StartTimestamp, op.RequestId, mr.Any2String(kv.kvStore))
					val := kv.kvStore[op.Key]
					//使得写入的client能够响应
					if c, ok := kv.opContextMap[UniqueRequestId(op.ClientId, op.RequestId)]; ok {
						c.WaitCh <- val
					}
				}()
			} else if applyMsg.SnapshotValid {
				func() {
					kv.mu.Lock()
					defer kv.mu.Unlock()
					if kv.decodeSnapshot(applyMsg.Snapshot) {
						kv.rf.CondInstallSnapshot(applyMsg.SnapshotTerm, applyMsg.SnapshotIndex, applyMsg.Snapshot)
					}
				}()
			}
		}
		DPrintf("snapshot size: %v, stateMachine: %v", kv.persister.SnapshotSize(), mr.Any2String(kv.kvStore))
	}
}

四、优化和小结

1 优化

1.1 线性读

线性读方案有很多,本文以写请求形式处理读请求,简单但性能不是很好。本文推荐另外一种方案的实现,读Follower,并且从Leader复制此时已经提交过的日志,性能比前者要好一些,但多了一次与Leader的交互,该方案可以留到日后优化。

1.2 性能优化

每个请求的处理时间比预期高一些,后续打算采用成组提交机制来批量处理写操作。

2 小结

这个实验在6.824 Lab2D raft上实现一个single group的键值数据库,支持Get、Put、Append三种操作,能够保证客户端幂等性和线性读。本实验通过记录上下文和每个client的requestId来保证幂等性,以写请求的逻辑处理读请求来实现线性读。

0 人点赞