一、背景
前面实现了raft协议,本文实现一个单机键-值数据库,并通过raft建立主从架构,使得能够容错,但是没有分片。
1 基本功能
这个键-值数据库需要实现以下几点功能:
server端:
- 能够支持Get、Put、Append三种操作。
- 实现一个状态机存放数据。
- 支持客户端重试,实现幂等性。
- 实现线性读。
- 实现快照压缩数据。
client端:
- 能够定位到Leader,并且支持重试。
- 单个client实例串行调用server端。
2 测试结果
二、交互流程和详细设计
client端:
- 客户端需要负载均衡并记录上一次server id。
- 将读请求按照写请求的形式执行,实现线性读。
server端:
- 写请求处理过程是异步的,需要向raft写入日志,然后异步等待日志提交到状态机。 a. 先记录上下文,然后通过raft.Start写入日志,最后阻塞在waitCh上等待唤醒。
- 日志应用协程会不断的从applyCh中读取已提交的日志,此时需要考虑幂等性,相同的请求不要处理两次。 a. 添加去重表,如果日志的Request Id小于等于该client的lastRequestId,表示已经执行过,则过滤掉,达到幂等性效果,否则就写入到状态机,并更新lastRequestId。然后唤醒写协程,让其响应client。
- 根据日志数据量触发主动压缩日志。 a. 需要考虑持久化的字段,包括状态机、压缩点、去重表。去重表是:向状态机应用已经提交的日志时过滤已经执行过的请求。如果server宕掉了,就会先读快照,然后重新向状态机apply已经提交的日志进行,此时还是需要去重,所以去重表需要持久化。
三、代码实现
1 client实现
1.1 定义
代码语言:go复制var clientGerarator int32
//UniqueRequestId=clientId<<32 nextRequestId
type Clerk struct {
mu sync.Mutex
servers []*labrpc.ClientEnd
lastRpcServerId int //上次请求的server
clientId int //唯一
nextRequestId uint64 //递增
}
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.servers = servers
ck.mu = sync.Mutex{}
ck.lastRpcServerId = 0
ck.clientId = int(atomic.AddInt32(&clientGerarator, 1))
ck.nextRequestId = 0
return ck
}
func (ck *Clerk) currentRpcServerId() int {
//ck.mu.Lock()
//defer ck.mu.Unlock()
return ck.lastRpcServerId
}
func (ck *Clerk) setRpcServerId(rpcServerId int) {
//ck.mu.Lock()
//defer ck.mu.Unlock()
ck.lastRpcServerId = rpcServerId
ck.lastRpcServerId %= len(ck.servers)
}
1.2 实现
代码语言:go复制func (ck *Clerk) Get(key string) string {
start := time.Now()
defer func() {
DPrintf("client Get cost: %v", time.Now().Sub(start).Milliseconds())
}()
ck.mu.Lock()
defer ck.mu.Unlock()
args := &GetArgs{
ClientId: ck.clientId,
RequestId: atomic.AddUint64(&ck.nextRequestId, 1), //
Key: key,
}
rpcServerId := ck.currentRpcServerId()
for {
reply := &GetReply{}
ok := ck.servers[rpcServerId].Call("KVServer.Get", args, reply)
//DPrintf("client Get, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
if !ok {
rpcServerId
rpcServerId %= len(ck.servers)
} else if reply.Err == OK {
ck.setRpcServerId(rpcServerId) //记录调用成功的server,便于下次调用
return reply.Value
} else {
rpcServerId
rpcServerId %= len(ck.servers)
}
time.Sleep(time.Millisecond * 1)
}
}
func (ck *Clerk) PutAppend(key string, value string, op string) {
start := time.Now()
ck.mu.Lock()
defer ck.mu.Unlock()
args := &PutAppendArgs{
ClientId: ck.clientId,
RequestId: atomic.AddUint64(&ck.nextRequestId, 1),
Key: key,
Value: value,
Op: op,
}
defer func() {
DPrintf("client Put cost: %v,op: %v, key: %v, value: %v, args: %v", time.Now().Sub(start).Milliseconds(), op, key, value, mr.Any2String(args))
}()
rpcServerId := ck.currentRpcServerId()
for {
reply := &PutAppendReply{}
ok := ck.servers[rpcServerId].Call("KVServer.PutAppend", args, reply)
//DPrintf("client Put, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
if !ok {
DPrintf("client Put, ok: %v, rpcServerId: %d, args: %v, reply: %v", ok, rpcServerId, mr.Any2String(args), mr.Any2String(reply))
rpcServerId
rpcServerId %= len(ck.servers)
} else if reply.Err == OK {
ck.setRpcServerId(rpcServerId)
DPrintf("client Put, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
return
} else {
DPrintf("client Put, rpcServerId: %d, args: %v, reply: %v", rpcServerId, mr.Any2String(args), mr.Any2String(reply))
rpcServerId
rpcServerId %= len(ck.servers)
}
time.Sleep(time.Millisecond * 1)
}
}
func (ck *Clerk) Put(key string, value string) {
ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
ck.PutAppend(key, value, "Append")
}
客户端实现比较简单,两者都不断的重试,直至请求成功。
2 server实现
2.1 定义
代码语言:go复制type OpType string
const (
OpTypeGet = "Get"
OpTypePut = "Put"
OpTypeAppend = "Append"
)
type Op struct {
ClientId int
RequestId uint64
OpType OpType
Key string
Value string
StartTimestamp int64
}
type OpContext struct {
ClientId int
RequestId uint64
UniqueRequestId uint64 //两者结合才是唯一ID
Op *Op
Term int
WaitCh chan string //用于实现写协程和日志应用循环协程的交互
}
func NewOpContext(op *Op, term int) *OpContext {
return &OpContext{
ClientId: op.ClientId,
RequestId: op.RequestId,
UniqueRequestId: UniqueRequestId(op.ClientId, op.RequestId),
Op: op,
Term: term,
WaitCh: make(chan string, 1), //缓冲区1是防止阻塞日志应用协程
}
}
type KVServer struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
dead int32 // set by Kill()
maxraftstate int // snapshot if log grows this big
// Your definitions here.
kvStore map[string]string //k-v对,状态机
opContextMap map[uint64]*OpContext //用于每个请求的上下文
lastRequestIdMap map[int]uint64 //clientId-->lastRequestId,维持幂等性,需要客户端能够保证串行
}
2.2 PutAppend实现
代码语言:go复制func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
op := &Op{
ClientId: args.ClientId,
RequestId: args.RequestId,
OpType: OpType(args.Op),
Key: args.Key,
Value: args.Value,
StartTimestamp: time.Now().UnixMilli(),
}
term := 0
isLeader := false
reply.Err = ErrWrongLeader
if term, isLeader = kv.rf.GetState(); !isLeader {
return
}
start := time.Now()
defer func() {
DPrintf("server PutAppend cost: %v, requestId: %d, node: %d, leaderId: %d", time.Now().Sub(start).Milliseconds(), op.RequestId, kv.me, kv.rf.LeaderId())
}()
kv.mu.Lock()
//可能存在前一次请求超时,但是这个请求实际上执行成功了,那么就直接return掉
if lastRequestId, ok := kv.lastRequestIdMap[op.ClientId]; ok && lastRequestId >= op.RequestId {
reply.Err = OK
kv.mu.Unlock()
return
}
opContext := NewOpContext(op, term)
kv.opContextMap[UniqueRequestId(op.ClientId, op.RequestId)] = opContext
kv.mu.Unlock()
_, _, ok := kv.rf.Start(*op)
defer func() {
//DPrintf("server PutAppend, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
kv.mu.Lock()
delete(kv.opContextMap, UniqueRequestId(op.ClientId, op.RequestId))
kv.mu.Unlock()
}()
if !ok {
return
}
//阻塞等待
select {
case <-opContext.WaitCh:
reply.Err = OK
case <-time.After(time.Millisecond * 1000):
reply.Err = ErrTimeout
}
}
请求执行前需要判断是否已经执行过了,如果已经执行,就直接返回OK。如果没执行过,则通过raft同步到其他节点上,然后阻塞在waitCh上等待该条日志提交,需要有超时机制,不要一直阻塞。最后执行结束时需要删除掉上下文,要不然会有内存泄漏。
2.3 Get
代码语言:go复制func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
op := &Op{
ClientId: args.ClientId,
RequestId: args.RequestId,
OpType: OpTypeGet,
Key: args.Key,
StartTimestamp: time.Now().UnixMilli(),
}
//Append不能先append然后将日志传给raft
term := 0
isLeader := false
if term, isLeader = kv.rf.GetState(); !isLeader {
reply.Err = ErrWrongLeader
return
}
start := time.Now()
defer func() {
DPrintf("server Get cost: %v, node: %v, leaderId: %d", time.Now().Sub(start).Milliseconds(), kv.me, kv.rf.LeaderId())
}()
kv.mu.Lock()
opContext := NewOpContext(op, term)
kv.opContextMap[opContext.UniqueRequestId] = opContext
kv.mu.Unlock()
_, _, ok := kv.rf.Start(*op)
defer func() {
//DPrintf("server Get, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
kv.mu.Lock()
delete(kv.opContextMap, opContext.UniqueRequestId)
kv.mu.Unlock()
}()
if !ok {
reply.Err = ErrWrongLeader
return
}
//阻塞等待
select {
case c := <-opContext.WaitCh:
reply.Err = OK
reply.Value = c
case <-time.After(time.Millisecond * 1000):
reply.Err = ErrTimeout
}
}
为了实现线性读,将读请求当成写请求执行,流程大体和写请求一样,但不需要幂等性逻辑。
2.4 日志应用循环
代码语言:go复制//串行写状态机
func (kv *KVServer) applyStateMachineLoop() {
for !kv.killed() {
select {
case applyMsg := <-kv.applyCh:
if applyMsg.CommandValid {
func() {
kv.mu.Lock()
defer kv.mu.Unlock()
op := applyMsg.Command.(Op)
//保证幂等性
if op.RequestId <= kv.lastRequestIdMap[op.ClientId] {
return
}
switch op.OpType {
case OpTypePut:
kv.kvStore[op.Key] = op.Value
kv.lastRequestIdMap[op.ClientId] = op.RequestId
case OpTypeAppend:
kv.kvStore[op.Key] = op.Value
kv.lastRequestIdMap[op.ClientId] = op.RequestId
case OpTypeGet:
//Get请求不需要更新lastRequestId
}
DPrintf("op: %v, value: %v, node: %v cost: %v,requestId: %v, stateMachine: %v", mr.Any2String(op), kv.kvStore[op.Key], kv.me, time.Now().UnixMilli()-op.StartTimestamp, op.RequestId, mr.Any2String(kv.kvStore))
val := kv.kvStore[op.Key]
//使得写入的client能够响应
if c, ok := kv.opContextMap[UniqueRequestId(op.ClientId, op.RequestId)]; ok {
c.WaitCh <- val
}
}()
}
}
}
}
另起协程不断的从applyCh中读取已提交的日志。首先,幂等性逻辑,如果op.RequestId <= kv.lastRequestIdMapop.ClientId,则表示该请求已经执行过,不能再次执行。然后更新状态机,最后唤醒写协程。
3 snapshot
在Lab 2D中已经讲解过raft如何进行压缩、同步snapshot等,而在本实验主要考虑:
- 应用层主动压缩snapshot。
- apply从leader拉取到的snapshot。
3.1 压缩字段
上文讲过需要保存压缩点、状态机、去重表。
代码语言:go复制func (kv *KVServer) maybeSnapshot(index int) {
if kv.maxraftstate == -1 {
return
}
if kv.persister.RaftStateSize() > kv.maxraftstate {
DPrintf("maybeSnapshot starting, index: %v", index)
kv.rf.Snapshot(index, kv.encodeSnapshot(index))
}
}
//上层加锁
func (kv *KVServer) encodeSnapshot(lastIncludedIndex int) []byte {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(kv.kvStore)
e.Encode(lastIncludedIndex)
e.Encode(kv.lastRequestIdMap) //持久化每个client的最大已执行过的写请求
return w.Bytes()
}
//上层加锁
func (kv *KVServer) decodeSnapshot(snapshot []byte) bool {
if len(snapshot) == 0 {
return true
}
r := bytes.NewBuffer(snapshot)
d := labgob.NewDecoder(r)
if err := d.Decode(&kv.kvStore); err != nil {
return false
}
if err := d.Decode(&kv.lastIncludedIndex); err != nil {
return false
}
//持久化每个client的最大已执行过的写请求
if err := d.Decode(&kv.lastRequestIdMap); err != nil {
return false
}
return true
}
3.2 apply快照
代码语言:go复制//串行写状态机
func (kv *KVServer) applyStateMachineLoop() {
for !kv.killed() {
select {
case applyMsg := <-kv.applyCh:
if applyMsg.CommandValid {
//...省略
} else if applyMsg.SnapshotValid {
func() {
kv.mu.Lock()
defer kv.mu.Unlock()
//将snapshot apply到状态机
if kv.decodeSnapshot(applyMsg.Snapshot) {
//截断日志
kv.rf.CondInstallSnapshot(applyMsg.SnapshotTerm, applyMsg.SnapshotIndex, applyMsg.Snapshot)
}
}()
}
}
DPrintf("snapshot size: %v, stateMachine: %v", kv.persister.SnapshotSize(), mr.Any2String(kv.kvStore))
}
}
3.3 主动触发压缩
在将日志apply到状态机时根据日志数据量决定是否日志压缩。
代码语言:go复制func (kv *KVServer) applyStateMachineLoop() {
for !kv.killed() {
select {
case applyMsg := <-kv.applyCh:
if applyMsg.CommandValid {
func() {
kv.mu.Lock()
defer kv.mu.Unlock()
op := applyMsg.Command.(Op)
//保证幂等性
if op.RequestId <= kv.lastRequestIdMap[op.ClientId] {
return
}
//过滤掉snapshot前的日志
if applyMsg.CommandIndex <= kv.lastIncludedIndex && op.OpType != OpTypeGet {
if c, ok := kv.opContextMap[UniqueRequestId(op.ClientId, op.RequestId)]; ok {
c.WaitCh <- "0"
}
return
}
switch op.OpType {
case OpTypePut:
kv.kvStore[op.Key] = op.Value
kv.lastRequestIdMap[op.ClientId] = op.RequestId
//尝试触发日志压缩
kv.maybeSnapshot(applyMsg.CommandIndex)
case OpTypeAppend:
kv.kvStore[op.Key] = op.Value
kv.lastRequestIdMap[op.ClientId] = op.RequestId
//尝试触发日志压缩
kv.maybeSnapshot(applyMsg.CommandIndex)
case OpTypeGet:
//Get请求不需要更新lastRequestId
}
DPrintf("op: %v, value: %v, node: %v cost: %v,requestId: %v, stateMachine: %v", mr.Any2String(op), kv.kvStore[op.Key], kv.me, time.Now().UnixMilli()-op.StartTimestamp, op.RequestId, mr.Any2String(kv.kvStore))
val := kv.kvStore[op.Key]
//使得写入的client能够响应
if c, ok := kv.opContextMap[UniqueRequestId(op.ClientId, op.RequestId)]; ok {
c.WaitCh <- val
}
}()
} else if applyMsg.SnapshotValid {
func() {
kv.mu.Lock()
defer kv.mu.Unlock()
if kv.decodeSnapshot(applyMsg.Snapshot) {
kv.rf.CondInstallSnapshot(applyMsg.SnapshotTerm, applyMsg.SnapshotIndex, applyMsg.Snapshot)
}
}()
}
}
DPrintf("snapshot size: %v, stateMachine: %v", kv.persister.SnapshotSize(), mr.Any2String(kv.kvStore))
}
}
四、优化和小结
1 优化
1.1 线性读
线性读方案有很多,本文以写请求形式处理读请求,简单但性能不是很好。本文推荐另外一种方案的实现,读Follower,并且从Leader复制此时已经提交过的日志,性能比前者要好一些,但多了一次与Leader的交互,该方案可以留到日后优化。
1.2 性能优化
每个请求的处理时间比预期高一些,后续打算采用成组提交机制来批量处理写操作。
2 小结
这个实验在6.824 Lab2D raft上实现一个single group的键值数据库,支持Get、Put、Append三种操作,能够保证客户端幂等性和线性读。本实验通过记录上下文和每个client的requestId来保证幂等性,以写请求的逻辑处理读请求来实现线性读。