存储设计
Etcd中跟存储部分相关的模块主要有3块,Raft状态机中存储的日志条目、持久化到文件的日志条目以及后端的KV存储。
Raft状态机存储
回顾下第一篇中讲到的Etcd整体架构,raft模块只负责算法实现,所以所有收到的日志条目都是存在内存中。数据结构如下:
EtcdServer
上图中,所有日志条目都是存储在一个raftLog的结构中.
代码语言:txt复制type raftLog struct {
代码语言:txt复制 // 自从上次快照后已经持久化的日志
代码语言:txt复制 storage Storage
代码语言:txt复制 // 还未持久化的日志
代码语言:txt复制 unstable unstable
代码语言:txt复制 // 集群中已提交的日志index
代码语言:txt复制 committed uint64
代码语言:txt复制 // 本节点已经应用到状态机的日志index
代码语言:txt复制 applied uint64
代码语言:txt复制 ...
代码语言:txt复制 ...
代码语言:txt复制}
- raftLog中通过两个字段来存储日志,storage存储了已经持久化到磁盘的日志和最近一次快照的信息,也就是上图中已经写到了WAL中的数据。这部分日志即使在节点重启的情况下也不会丢失,重启时etcd会从wal中读取出这部分数据写到raft的内存中。undefined为啥是上次快照之后的呢?因为raft节点的内存毕竟是有限的,etcd中会定期对KV做快照,快照结束之后,storage就只需要存储快照的信息和在快照之后接收的日志就可以了,这在raft协议中也有定义。
- unstable结构中存储了尚未持久化的日志条目和快照,当日志持久化之后就会从unstable中移到storage中。
- raft协议的committed和applied属性也存在raftLog中,因为根据raft协议的规定,这两个属性也是需要持久化存储的。
Storage
raft状态机Storage接口定义如下:
代码语言:txt复制type Storage interface {
代码语言:txt复制 // 已经持久化的HardState和ConfState信息
代码语言:txt复制 InitialState() (pb.HardState, pb.ConfState, error)
代码语言:txt复制 // 返回日志条目
代码语言:txt复制 Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
代码语言:txt复制 // 当前的选举周期
代码语言:txt复制 Term(i uint64) (uint64, error)
代码语言:txt复制 // 最后一条日志的index
代码语言:txt复制 LastIndex() (uint64, error)
代码语言:txt复制 // 第一条日志的index
代码语言:txt复制 FirstIndex() (uint64, error)
代码语言:txt复制 // 返回最近一次Snapshot
代码语言:txt复制 Snapshot() (pb.Snapshot, error)
代码语言:txt复制}
Storage接口定义了所有Raft协议中要求的需要持久化的信息接口,比如HardState中的term、commitIndex,以及日志条目等。
etcd中对于该接口的默认实现是MemoryStorage
,从名字可以看出来数据是存在内存中的,看起来这么做跟raft的要求不符。这是因为MemoryStorage
中存的日志和状态信息都在WAL中,所以这里只需要内存就够了,重启的时候,etcd会从WAL中恢复数据写道Storage中。MemoryStorage
的定义如下:
type MemoryStorage struct {
代码语言:txt复制 // 读写锁
代码语言:txt复制 sync.Mutex
代码语言:txt复制 // term,commitIndex,vote封装在HardState中
代码语言:txt复制 hardState pb.HardState
代码语言:txt复制 //最近一次Snapshot
代码语言:txt复制 snapshot pb.Snapshot
代码语言:txt复制 //snapshot之后的日志条目,第一条日志条目的index为snapshot.Metadata.Index
代码语言:txt复制 ents []pb.Entry
代码语言:txt复制}
unstable
Raft模块已经收到还没有持久化到WAL的日志条目存在unstable中
代码语言:txt复制type unstable struct {
代码语言:txt复制 // 从leader收到的snapshot
代码语言:txt复制 snapshot *pb.Snapshot
代码语言:txt复制 // 新收到还未持久化的日志条目
代码语言:txt复制 entries []pb.Entry
代码语言:txt复制 //第一条日志的偏移量
代码语言:txt复制 offset uint64
代码语言:txt复制}
日志持久化存储
Raft模块日志数据持久化通过WAL实现,WAL通过追加写的方式来将数据写入磁盘以提高性能。Etcd在如下几种情况下会在WAL追加一条记录:
- 节点启动时记录节点和集群信息,对应的记录类型是metadataType;
- 收到新的日志条目,对应的记录类型是entryType;
- 状态变化时,比如新的选举周期,commitIndex变化等,对应的记录类型是stateType;
- 做数据快照时,对应的记录类型是snapshotType;
- 生成新的wal文件时,wal文件达到一定大小时,etcd就会生成一个新的文件,新的文件第一条记录会记录上一个文件的crc,以备数据校验。对应的记录类型是crcType
type WAL struct {
代码语言:txt复制 lg *zap.Logger
代码语言:txt复制 // wal文件的存储目录
代码语言:txt复制 dir string
代码语言:txt复制 dirFile *os.File
代码语言:txt复制 // wal文件构建后会写的第一个metadata记录
代码语言:txt复制 metadata []byte
代码语言:txt复制 // wal文件构建后会写的第一个state记录
代码语言:txt复制 state raftpb.HardState
代码语言:txt复制 // wal开始的snapshot,代表读取wal时从这个snapshot的记录之后开始
代码语言:txt复制 start walpb.Snapshot
代码语言:txt复制 //wal记录的反序列化器
代码语言:txt复制 decoder *decoder
代码语言:txt复制 ...
代码语言:txt复制 //底层数据文件列表
代码语言:txt复制 locks []*fileutil.LockedFile
代码语言:txt复制 //
代码语言:txt复制}
WAL底层对应着磁盘上一系列文件,当收到需要持久化的日志条目时就会追加到文件的末尾,文件达到一定大小时,WAL会主动创建一个新的磁盘文件,防止单个WAL文件过大。
etcd
会定期对数据做快照,快照时会在WAL中追加一条记录。在etcd节点重启恢复时,会查找wal中最后一次快照的记录,将快照后的日志条目重新给到raft模块恢复内存数据。
KV数据库存储
Etcd最终生效的数据存在KV数据库中,并对后端存储抽象了一个Backend接口,Backend的实现需要支持事务和多版本管理。Backend接口的定义如下:
代码语言:txt复制type Backend interface {
代码语言:txt复制 // 开启读事务.
代码语言:txt复制 ReadTx() ReadTx
代码语言:txt复制 //开启写事务
代码语言:txt复制 BatchTx() BatchTx
代码语言:txt复制 // 开启并发读事务,互相之间不阻塞
代码语言:txt复制 ConcurrentReadTx() ReadTx
代码语言:txt复制 // 对db做快照
代码语言:txt复制 Snapshot() Snapshot
代码语言:txt复制 Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
代码语言:txt复制 // DB占用的物理磁盘大小,空间可以预分配,所以不是实际数据大小
代码语言:txt复制 Size() int64
代码语言:txt复制 // 实际使用的磁盘空间
代码语言:txt复制 SizeInUse() int64
代码语言:txt复制 // 返回当前读事务个数
代码语言:txt复制 OpenReadTxN() int64
代码语言:txt复制 // 数据文件整理,会回收已删除key和已更新的key旧版本占用的磁盘
代码语言:txt复制 Defrag() error
代码语言:txt复制 ForceCommit()
代码语言:txt复制 Close() error
代码语言:txt复制}
该接口的默认实现如下:
代码语言:txt复制type backend struct {
代码语言:txt复制 // 已经占用的磁盘大小
代码语言:txt复制 size int64
代码语言:txt复制 // 实际使用的大小
代码语言:txt复制 sizeInUse int64
代码语言:txt复制 // 已提交事务数
代码语言:txt复制 commits int64
代码语言:txt复制 // 当前开启的读事务数
代码语言:txt复制 openReadTxN int64
代码语言:txt复制 // 读写锁
代码语言:txt复制 mu sync.RWMutex
代码语言:txt复制 //底层存储为boltDB
代码语言:txt复制 db *bolt.DB
代码语言:txt复制 // 批量写提交间隔
代码语言:txt复制 batchInterval time.Duration
代码语言:txt复制 // 批量写最大事务数
代码语言:txt复制 batchLimit int
代码语言:txt复制 // 写事务缓冲队列
代码语言:txt复制 batchTx *batchTxBuffered
代码语言:txt复制 // 写事务
代码语言:txt复制 readTx *readTx
代码语言:txt复制 stopc chan struct{}
代码语言:txt复制 donec chan struct{}
代码语言:txt复制 lg *zap.Logger
代码语言:txt复制}
从上面的实现中可以看出,etcd的默认底层存储使用的是boltDB。为了提高读写效率,etcd会维护一个写事务的缓存队列,当队列大小达到一定数或者离上次已经过了一定的时间后,才会真正将数据写到磁盘上。
存储总结
数据从客户端提交到Etcd后,会经过3个存储的地方。首先会进入Raft算法模块,raft将日志保存在内存中,然后通知etcd持久化。为了提高效率,etcd会将数据写到WAL中,因为wal底层文件只追加不更新和删除,所以完成这一步数据就不会丢了。之后etcd的leader节点将日志分发到集群中,当收到超过半数节点响应后,就会提交数据,将数据存入后端KV存储中。
日志同步
了解了etcd中的存储设计,可以更好的理解一条数据变更请求的整个流转过程,下面通过源码看一下。
请求处理
当客户端提交一条数据变更请求时,比如put hello 为
world的写请求,v3版本中会调用EtcdServer的Put()方法,最终都会调用到processInternalRaftRequestOnce()。
代码语言:txt复制func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
代码语言:txt复制 //判断已提交未apply的记录是否超过限制
代码语言:txt复制 ai := s.getAppliedIndex()
代码语言:txt复制 ci := s.getCommittedIndex()
代码语言:txt复制 if ci > ai maxGapBetweenApplyAndCommitIndex {
代码语言:txt复制 return nil, ErrTooManyRequests
代码语言:txt复制 }
代码语言:txt复制 //生成一个requestID
代码语言:txt复制 r.Header = &pb.RequestHeader{
代码语言:txt复制 ID: s.reqIDGen.Next(),
代码语言:txt复制 }
代码语言:txt复制 authInfo, err := s.AuthInfoFromCtx(ctx)
代码语言:txt复制 if err != nil {
代码语言:txt复制 return nil, err
代码语言:txt复制 }
代码语言:txt复制 if authInfo != nil {
代码语言:txt复制 r.Header.Username = authInfo.Username
代码语言:txt复制 r.Header.AuthRevision = authInfo.Revision
代码语言:txt复制 }
代码语言:txt复制 //反序列化请求数据
代码语言:txt复制 data, err := r.Marshal()
代码语言:txt复制 if err != nil {
代码语言:txt复制 return nil, err
代码语言:txt复制 }
代码语言:txt复制 if len(data) > int(s.Cfg.MaxRequestBytes) {
代码语言:txt复制 return nil, ErrRequestTooLarge
代码语言:txt复制 }
代码语言:txt复制 id := r.ID
代码语言:txt复制 if id == 0 {
代码语言:txt复制 id = r.Header.ID
代码语言:txt复制 }
代码语言:txt复制 //注册一个channel,等待处理完成
代码语言:txt复制 ch := s.w.Register(id)
代码语言:txt复制 //设置请求超时
代码语言:txt复制 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
代码语言:txt复制 defer cancel()
代码语言:txt复制 start := time.Now()
代码语言:txt复制 // 调用raft模块的Propose处理请求
代码语言:txt复制 err = s.r.Propose(cctx, data)
代码语言:txt复制 if err != nil {
代码语言:txt复制 proposalsFailed.Inc()
代码语言:txt复制 s.w.Trigger(id, nil) // GC wait
代码语言:txt复制 return nil, err
代码语言:txt复制 }
代码语言:txt复制 proposalsPending.Inc()
代码语言:txt复制 defer proposalsPending.Dec()
代码语言:txt复制 select {
代码语言:txt复制 // 等待收到apply结果返回给客户端
代码语言:txt复制 case x := <-ch:
代码语言:txt复制 return x.(*applyResult), nil
代码语言:txt复制 case <-cctx.Done():
代码语言:txt复制 proposalsFailed.Inc()
代码语言:txt复制 s.w.Trigger(id, nil) // GC wait
代码语言:txt复制 return nil, s.parseProposeCtxErr(cctx.Err(), start)
代码语言:txt复制 case <-s.done:
代码语言:txt复制 return nil, ErrStopped
代码语言:txt复制 }
代码语言:txt复制}
上面的方法中,etcd对请求做了基本的校验之后,会通过调用Propose()方法提交给Raft处理,然后等待反馈。在etcd实现中,会一直到数据apply到状态机之后,才会返回结果给客户端。在Propose()方法中,raft会将请求封装成一个MsgProp消息并调用Step函数。
代码语言:txt复制func (rn *RawNode) Propose(data []byte) error {
代码语言:txt复制 return rn.raft.Step(pb.Message{
代码语言:txt复制 Type: pb.MsgProp,
代码语言:txt复制 From: rn.raft.id,
代码语言:txt复制 Entries: []pb.Entry{
代码语言:txt复制 {Data: data},
代码语言:txt复制 }})
代码语言:txt复制}
etcd中只允许Leader处理数据变更请求,所以如果是Follower收到客户端的命令,会直接转给leader处理,然后等待Leader的反馈后将结果返回给客户端。所以,这里只需要看Leader的处理逻辑,上面的Step()函数最终调用的是raft模块的stepLeader(*raft,
pb.Message) 函数。
代码语言:txt复制对于为什么进到stepLeader方法,前一篇文章里面已经讲过了,印象不深的话可以回看一下
func stepLeader(r *raft, m pb.Message) error {
代码语言:txt复制 // These message types do not require any progress for m.From.
代码语言:txt复制 switch m.Type {
代码语言:txt复制 case pb.MsgBeat:
代码语言:txt复制 ...
代码语言:txt复制 case pb.MsgCheckQuorum:
代码语言:txt复制 ...
代码语言:txt复制 case pb.MsgProp:
代码语言:txt复制 if len(m.Entries) == 0 {
代码语言:txt复制 r.logger.Panicf("%x stepped empty MsgProp", r.id)
代码语言:txt复制 }
代码语言:txt复制 if r.prs.Progress[r.id] == nil {
代码语言:txt复制 // 判断当前节点是不是已经被从集群中移除了
代码语言:txt复制 return ErrProposalDropped
代码语言:txt复制 }
代码语言:txt复制 if r.leadTransferee != None {
代码语言:txt复制 // 如果正在进行leader切换,拒绝写入
代码语言:txt复制 return ErrProposalDropped
代码语言:txt复制 }
代码语言:txt复制 for i := range m.Entries {
代码语言:txt复制 //判断是否有配置变更的日志,有的话做一些特殊处理
代码语言:txt复制 }
代码语言:txt复制 //将日志追加到raft状态机中
代码语言:txt复制 if !r.appendEntry(m.Entries...) {
代码语言:txt复制 return ErrProposalDropped
代码语言:txt复制 }
代码语言:txt复制 // 发送日志给集群其它节点
代码语言:txt复制 r.bcastAppend()
代码语言:txt复制 return nil
代码语言:txt复制 case pb.MsgReadIndex:
代码语言:txt复制 ...
代码语言:txt复制 return nil
代码语言:txt复制 }
代码语言:txt复制 ...
代码语言:txt复制 ...
代码语言:txt复制 return nil
代码语言:txt复制}
Raft协议是一个基于日志复制的协议,所以客户端数据变更请求会封装成一条日志条目。上面的逻辑中首先做了一些基本的校验,通过后将Message中的日志条目追加到raft的日志列表中,追加成功后就会将日志广播给所有Follower。
Raft日志新增
上面讲存储的时候讲到,raft算法实现模块只是将日志存在内存中,所以appendEntry的逻辑也很简单。
代码语言:txt复制func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
代码语言:txt复制 //1. 获取raft节点当前最后一条日志条目的index
代码语言:txt复制 li := r.raftLog.lastIndex()
代码语言:txt复制 //2. 给新的日志条目设置term和index
代码语言:txt复制 for i := range es {
代码语言:txt复制 es[i].Term = r.Term
代码语言:txt复制 es[i].Index = li 1 uint64(i)
代码语言:txt复制 }
代码语言:txt复制 // 3. 判断未提交的日志条目是不是超过限制,是的话拒绝并返回失败
代码语言:txt复制 if !r.increaseUncommittedSize(es) {
代码语言:txt复制 return false
代码语言:txt复制 }
代码语言:txt复制 // 4. 将日志条目追加到raftLog中
代码语言:txt复制 li = r.raftLog.append(es...)
代码语言:txt复制 // 5. 检查并更新日志进度
代码语言:txt复制 r.prs.Progress[r.id].MaybeUpdate(li)
代码语言:txt复制 // 6. 判断是否做一次commit
代码语言:txt复制 r.maybeCommit()
代码语言:txt复制 return true
代码语言:txt复制}
- 获取raft当前日志中最后一条日志条目的index
- raft的日志条目index是单调递增的
- etcd限制了leader上最多有多少未提交的条目,防止因为leader和follower之间出现网络问题时,导致条目一直累积。
- 将日志条目追加到raftLog内存队列中,并且返回最大一条日志的index,对于leader追加日志的情况,这里返回的li肯定等于方法第1行中获取的li
- raft的leader节点保存了所有节点的日志同步进度,这里面也包括它自己
- 这里忽略maybeCommit()结果,直接返回true,开始广播日志。
同步给Follower
Leader节点将日志条目存到raftLog的内存中后,调用bcastAppend()方法触发一次广播操作,同步日志给Follower。
代码语言:txt复制func (r *raft) bcastAppend() {
代码语言:txt复制 //遍历所有节点,给除自己外的节点发送日志Append消息
代码语言:txt复制 r.prs.Visit(func(id uint64, _ *tracker.Progress) {
代码语言:txt复制 if id == r.id {
代码语言:txt复制 return
代码语言:txt复制 }
代码语言:txt复制 r.sendAppend(id)
代码语言:txt复制 })
代码语言:txt复制}
代码语言:txt复制func (r *raft) sendAppend(to uint64) {
代码语言:txt复制 r.maybeSendAppend(to, true)
代码语言:txt复制}
代码语言:txt复制func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
代码语言:txt复制 //1. 获取对端节点当前同步进度
代码语言:txt复制 pr := r.prs.Progress[to]
代码语言:txt复制 if pr.IsPaused() {
代码语言:txt复制 return false
代码语言:txt复制 }
代码语言:txt复制 m := pb.Message{}
代码语言:txt复制 m.To = to
代码语言:txt复制 //2. 注意这里带的term是本次发送给follower的第一条日志条目的term
代码语言:txt复制 term, errt := r.raftLog.term(pr.Next - 1)
代码语言:txt复制 ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
代码语言:txt复制 if len(ents) == 0 && !sendIfEmpty {
代码语言:txt复制 return false
代码语言:txt复制 }
代码语言:txt复制 if errt != nil || erre != nil {
代码语言:txt复制 //3. 如果获取term或日志失败,说明follower落后太多,raftLog内存中日志已经做过快照后被删除了
代码语言:txt复制 if !pr.RecentActive {
代码语言:txt复制 r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
代码语言:txt复制 return false
代码语言:txt复制 }
代码语言:txt复制 //4. 改为发送Snapshot消息
代码语言:txt复制 m.Type = pb.MsgSnap
代码语言:txt复制 snapshot, err := r.raftLog.snapshot()
代码语言:txt复制 if err != nil {
代码语言:txt复制 if err == ErrSnapshotTemporarilyUnavailable {
代码语言:txt复制 r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
代码语言:txt复制 return false
代码语言:txt复制 }
代码语言:txt复制 panic(err) // TODO(bdarnell)
代码语言:txt复制 }
代码语言:txt复制 if IsEmptySnap(snapshot) {
代码语言:txt复制 panic("need non-empty snapshot")
代码语言:txt复制 }
代码语言:txt复制 m.Snapshot = snapshot
代码语言:txt复制 sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
代码语言:txt复制 pr.BecomeSnapshot(sindex)
代码语言:txt复制 } else {
代码语言:txt复制 //5. 发送Append消息
代码语言:txt复制 m.Type = pb.MsgApp
代码语言:txt复制 m.Index = pr.Next - 1
代码语言:txt复制 m.LogTerm = term
代码语言:txt复制 m.Entries = ents
代码语言:txt复制 //6. 每次发送日志或心跳都会带上最新的commitIndex
代码语言:txt复制 m.Commit = r.raftLog.committed
代码语言:txt复制 if n := len(m.Entries); n != 0 {
代码语言:txt复制 ...
代码语言:txt复制 ...
代码语言:txt复制 }
代码语言:txt复制 }
代码语言:txt复制 //7. 发送消息
代码语言:txt复制 r.send(m)
代码语言:txt复制 return true
代码语言:txt复制}
上面的逻辑中,leader在收到新的更新日志后,会遍历集群中所有follower节点,触发一次日志同步。
- 按照raft协议规定,leader需要缓存当前所有Follower的日志同步进度
- 根据日志进度去取日志条目的时候发现,follower日志落后太多,这通常出现在新节点刚加入或者网络连接出现故障的情况下。那么在这种情况下,leader改为发送最近一次快照给Follower,从而提高同步效率
- 正常情况下会发送新的日志给Follower,消息类型为MsgApp,最终调用r.send(m)提交消息。
日志写WAL
在上一篇讲心跳消息发送的时候已经讲过,EtcdServer中会有一个goroutine监听raft的channel是不是有新的Ready数据过来,收到后就会将里面的msgs发送给接收端。这个MsgApp类型的消息也是一样提交的,这里就不在重复了。
日志发送给Follower的同时,Leader会将日志落盘,即写到WAL中,这是通过调用WAL.Save()方法实现的。
代码语言:txt复制func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
代码语言:txt复制 //获取wal的写锁
代码语言:txt复制 w.mu.Lock()
代码语言:txt复制 defer w.mu.Unlock()
代码语言:txt复制 // HardState变化或者新的日志条目则需要写wal
代码语言:txt复制 if raft.IsEmptyHardState(st) && len(ents) == 0 {
代码语言:txt复制 return nil
代码语言:txt复制 }
代码语言:txt复制 mustSync := raft.MustSync(st, w.state, len(ents))
代码语言:txt复制 // 写日志条目
代码语言:txt复制 for i := range ents {
代码语言:txt复制 if err := w.saveEntry(&ents[i]); err != nil {
代码语言:txt复制 return err
代码语言:txt复制 }
代码语言:txt复制 }
代码语言:txt复制 // 写state变化
代码语言:txt复制 if err := w.saveState(&st); err != nil {
代码语言:txt复制 return err
代码语言:txt复制 }
代码语言:txt复制 // 判断文件大小是否超过最大值
代码语言:txt复制 curOff, err := w.tail().Seek(0, io.SeekCurrent)
代码语言:txt复制 if err != nil {
代码语言:txt复制 return err
代码语言:txt复制 }
代码语言:txt复制 if curOff < SegmentSizeBytes {
代码语言:txt复制 if mustSync {
代码语言:txt复制 return w.sync()
代码语言:txt复制 }
代码语言:txt复制 return nil
代码语言:txt复制 }
代码语言:txt复制 // 文件切分
代码语言:txt复制 return w.cut()
代码语言:txt复制}
WAL文件结构上面已经讲过了,对于新增日志的情况,wal中新增entryType的记录。
Follower日志处理
Leader节点处理完命令后,发送日志和持久化操作都是异步进行的,但是这不代表客户端已经收到回复。Raft协议要求在返回客户端成功的时候,日志一定已经提交了,所以Leader需要等待超过半数的Follower节点处理完日志并反馈,下面先看一下Follower的日志处理。
日志消息到达Follower后,也是由EtcdServer.Process()方法来处理,最终会进到Raft模块的stepFollower()函数中。
代码语言:txt复制func stepFollower(r *raft, m pb.Message) error {
代码语言:txt复制 switch m.Type {
代码语言:txt复制 ...
代码语言:txt复制 case pb.MsgApp:
代码语言:txt复制 // 重置心跳计数
代码语言:txt复制 r.electionElapsed = 0
代码语言:txt复制 // 设置Leader
代码语言:txt复制 r.lead = m.From
代码语言:txt复制 // 处理日志条目
代码语言:txt复制 r.handleAppendEntries(m)
代码语言:txt复制 ...
代码语言:txt复制 }
代码语言:txt复制 ...
代码语言:txt复制}
Follower收到消息后首先跟心跳消息的处理逻辑一样,重置心跳计数和leader,然后再处理日志条目。
代码语言:txt复制func (r *raft) handleAppendEntries(m pb.Message) {
代码语言:txt复制 // 判断是否是过时的消息
代码语言:txt复制 if m.Index < r.raftLog.committed {
代码语言:txt复制 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
代码语言:txt复制 return
代码语言:txt复制 }
代码语言:txt复制 if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
代码语言:txt复制 // 处理成功,发送MsgAppResp给Leader
代码语言:txt复制 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
代码语言:txt复制 } else {
代码语言:txt复制 // 日志的index和Follower的lastIndex不匹配,返回reject消息
代码语言:txt复制 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
代码语言:txt复制 }
代码语言:txt复制}
调用raftLog存储日志,并返回结果给Leader。这里follower失败可能有2种情况造成的,一种是日志条目中带的term和follower的term不一致,还有一种是日志列表中最小的index大于follower的最大的日志index。
上面的maybeAppend()
方法只会将日志存储到RaftLog维护的内存队列中,日志的持久化是异步进行的,这个和Leader节点的存储WAL逻辑基本相同。有一点区别就是follower节点正式发送MsgAppResp消息会在wal保存成功后,而leader节点是先发送消息,后保存的wal。
提交(Commit)
Leader节点在向Follower广播日志后,就一直在等待follower的MsgAppResp消息,收到后还是会进到stepLeader函数。
代码语言:txt复制func stepLeader(r *raft, m pb.Message) error {
代码语言:txt复制 ...
代码语言:txt复制 ...
代码语言:txt复制 pr := r.prs.Progress[m.From]
代码语言:txt复制 switch m.Type {
代码语言:txt复制 case pb.MsgAppResp:
代码语言:txt复制 pr.RecentActive = true
代码语言:txt复制 if m.Reject {
代码语言:txt复制 //如果收到的是reject消息,则根据follower反馈的index重新发送日志
代码语言:txt复制 if pr.MaybeDecrTo(m.Index, m.RejectHint) {
代码语言:txt复制 if pr.State == tracker.StateReplicate {
代码语言:txt复制 pr.BecomeProbe()
代码语言:txt复制 }
代码语言:txt复制 r.sendAppend(m.From)
代码语言:txt复制 }
代码语言:txt复制 } else {
代码语言:txt复制 oldPaused := pr.IsPaused()
代码语言:txt复制 //更新缓存的日志同步进度
代码语言:txt复制 if pr.MaybeUpdate(m.Index) {
代码语言:txt复制 switch {
代码语言:txt复制 case pr.State == tracker.StateProbe:
代码语言:txt复制 pr.BecomeReplicate()
代码语言:txt复制 case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
代码语言:txt复制 pr.BecomeProbe()
代码语言:txt复制 pr.BecomeReplicate()
代码语言:txt复制 case pr.State == tracker.StateReplicate:
代码语言:txt复制 pr.Inflights.FreeLE(m.Index)
代码语言:txt复制 }
代码语言:txt复制 //如果进度有更新,判断并更新commitIndex
代码语言:txt复制 if r.maybeCommit() {
代码语言:txt复制 //commitIndex有变化则立即发送日志
代码语言:txt复制 r.bcastAppend()
代码语言:txt复制 } else if oldPaused {
代码语言:txt复制 r.sendAppend(m.From)
代码语言:txt复制 }
代码语言:txt复制 // 循环发送所有剩余的日志给follower
代码语言:txt复制 for r.maybeSendAppend(m.From, false) {
代码语言:txt复制 }
代码语言:txt复制 // 是否正在进行leader转移
代码语言:txt复制 if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
代码语言:txt复制 r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
代码语言:txt复制 r.sendTimeoutNow(m.From)
代码语言:txt复制 }
代码语言:txt复制 }
代码语言:txt复制 }
代码语言:txt复制 ...
代码语言:txt复制 ...
代码语言:txt复制 return nil
代码语言:txt复制}
代码语言:txt复制func (r *raft) maybeCommit() bool {
代码语言:txt复制 //获取最大的超过半数确认的index
代码语言:txt复制 mci := r.prs.Committed()
代码语言:txt复制 //更新commitIndex
代码语言:txt复制 return r.raftLog.maybeCommit(mci, r.Term)
代码语言:txt复制}
收到Follower的回复以后,如果是reject的,leader会根据返回的index重新发送日志。如果是成功的消息,则更新缓存中的日志同步进度,并判断超过半数确认的index是否有变化。有变化则通知raftLog更新commitIndex。到此为止,客户端的这条数据更新命令,就正式提交了。下面就看一下,数据是怎样写到DB中的。
数据更新(Apply)
前面已经讲过,EtcdServer在启动时会启动一个goroutine监听raft模块是否有Ready消息过来。当上一步的commitIndex发生变化后,Ready中的HardState就会有值了。Etcd会获取ready结构中的committedEntries,提交给Apply模块应用到后端存储中。
代码语言:txt复制func (r *raftNode) start(rh *raftReadyHandler) {
代码语言:txt复制 internalTimeout := time.Second
代码语言:txt复制 go func() {
代码语言:txt复制 defer r.onStop()
代码语言:txt复制 islead := false
代码语言:txt复制 for {
代码语言:txt复制 ...
代码语言:txt复制 case rd := <-r.Ready():
代码语言:txt复制 if rd.SoftState != nil {
代码语言:txt复制 ...
代码语言:txt复制 ...
代码语言:txt复制 }
代码语言:txt复制 if len(rd.ReadStates) != 0 {
代码语言:txt复制 ...
代码语言:txt复制 ...
代码语言:txt复制 }
代码语言:txt复制 // 生成apply请求
代码语言:txt复制 notifyc := make(chan struct{}, 1)
代码语言:txt复制 ap := apply{
代码语言:txt复制 entries: rd.CommittedEntries,
代码语言:txt复制 snapshot: rd.Snapshot,
代码语言:txt复制 notifyc: notifyc,
代码语言:txt复制 }
代码语言:txt复制 // 更新etcdServer缓存的commitIndex为最新值
代码语言:txt复制 updateCommittedIndex(&ap, rh)
代码语言:txt复制 // 将已提交日志应用到状态机
代码语言:txt复制 select {
代码语言:txt复制 case r.applyc <- ap:
代码语言:txt复制 case <-r.stopped:
代码语言:txt复制 return
代码语言:txt复制 }
代码语言:txt复制 if islead {
代码语言:txt复制 // 如果有新的日志条目
代码语言:txt复制 r.transport.Send(r.processMessages(rd.Messages))
代码语言:txt复制 }
代码语言:txt复制 // 如果有snapshot
代码语言:txt复制 if !raft.IsEmptySnap(rd.Snapshot) {
代码语言:txt复制 ...
代码语言:txt复制 ...
代码语言:txt复制 }
代码语言:txt复制 //将hardState和日志条目保存到WAL中
代码语言:txt复制 if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
代码语言:txt复制 ...
代码语言:txt复制 ...
代码语言:txt复制 }
代码语言:txt复制 if !raft.IsEmptyHardState(rd.HardState) {
代码语言:txt复制 proposalsCommitted.Set(float64(rd.HardState.Commit))
代码语言:txt复制 }
代码语言:txt复制 if !raft.IsEmptySnap(rd.Snapshot) {
代码语言:txt复制 ...
代码语言:txt复制 ...
代码语言:txt复制 }
代码语言:txt复制 r.raftStorage.Append(rd.Entries)
代码语言:txt复制 if !islead {
代码语言:txt复制 ...
代码语言:txt复制 ...
代码语言:txt复制 } else {
代码语言:txt复制 notifyc <- struct{}{}
代码语言:txt复制 }
代码语言:txt复制 //更新raft模块的applied index和将日志从unstable转到stable中
代码语言:txt复制 r.Advance()
代码语言:txt复制 case <-r.stopped:
代码语言:txt复制 return
代码语言:txt复制 }
代码语言:txt复制 }
代码语言:txt复制 }()
代码语言:txt复制}
这里需要注意的是,在将已提交日志条目应用到状态机的操作是异步完成的,在Apply完成后,会将结果写到客户端调用进来时注册的channel中。这样一次完整的写操作就完成了。