etcd-raft 模块如何实现分布式一致性?

2022-06-23 14:48:04 浏览数 (1)

你好,我是 aoho,大家周末快乐。今天我和你分享的主题是:etcd-raft 模块如何实现分布式一致性?

我们在上一篇介绍了 etcd 读写操作的底层实现,但至于 etcd 集群如何实现分布式数据一致性并没有详细介绍。在分布式环境中,常用数据复制来避免单点故障,实现多副本,提高服务的高可用性以及系统的吞吐量。etcd 集群中的多个节点不可避免地会出现相互之间数据不一致的情况。但不管是同步复制、异步复制还是半同步复制,会存在可用性或者一致性的问题。解决多个节点数据一致性的方案其实就是共识算法,常见的共识算法有 Paxos 和 Raft。Zookeeper 使用的 Zab 协议,etcd 使用的共识算法就是 Raft。

本课时将会首先介绍如何使用 raftexample,接着介绍 etcd-raft 模块的实现。etcd-raft 模块是 etcd 中解决分布式一致性的模块,我们结合源码分析下 raft 在 etcd 中的实现。

使用 raftexample

etcd 项目中包含了 Raft 库使用的示例。raftexample 基于 etcd-raft 库实现了键值对存储服务器。

raftexample 的入口方法实现代码如下所示:

代码语言:javascript复制
func main() {
 cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
 id := flag.Int("id", 1, "node ID")
 kvport := flag.Int("port", 9121, "key-value server port")
 join := flag.Bool("join", false, "join an existing cluster")
 flag.Parse()
  // 构建 propose
 proposeC := make(chan string)
 defer close(proposeC)
 confChangeC := make(chan raftpb.ConfChange)
 defer close(confChangeC)

 // raft 为来自http api的提案提供 commit 流
 var kvs *kvstore
 getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
 commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)

 kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)

  // 键值对的处理器将会向 raft 发起提案来更新
  serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
}

在入口函数中创建了两个 channel:proposeC 用于提交写入的数据;confChangeC 用于提交配置改动数据。

然后分别启动如下核心的 goroutine:

  • 启动 HTTP 服务器,用于接收用户的请求数据,最终会将用户请求的数据写入前面的 proposeC/confChangeC channel 中。
  • 启动 raftNode 结构体,该结构体中有上面提到的 raft/node.go 中的 node 结构体,也就是通过该结构体实现的 Node 接口与 raft 库进行交互。同时,raftNode 还会启动协程监听前面的两个 channel,收到数据之后通过 Node 接口函数调用 raft 库对应的接口。
  • HTTP 服务负责接收用户数据,再写入到两个核心 channel 中,而 raftNode 负责监听这两个 channel:如果收到 proposeC channel 的消息,说明有数据提交,则调用 Node.Propose 函数进行数据的提交;如果收到 confChangeC channel 的消息,说明有配置变更,则调用 Node.ProposeConfChange 函数进行配置变更。
  • 设置定时器 tick,到时间后调用 Node.Tick 函数。
  • 监听 Node.Ready 函数返回的 Ready 结构体 channel,有数据变更时,根据 Ready 结构体的不同数据类型进行相应的操作,之后需要调用 Node.Advance 函数进行收尾。

到了这里,已经对 raft 的使用有一个基本的概念了,即通过 node 结构体实现的 Node 接口与 raft 库进行交互,涉及数据变更的核心数据结构就是 Ready 结构体,接下来可以进一步来分析该库的实现了。

etcd raft 实现

raft 库对外提供一个 Node 的 interface,由 raft/node.go 中的 node 结构体实现,这也是应用层唯一需要与这个 raft 库直接打交道的结构体, Node 接口需要实现的函数包括:Tick、Propose、Ready、Step 等。

我们重点需要了解 Ready,这是一个核心函数,将返回 Ready 对应的 channel,该通道表示当前时间点的 channel。应用层需要关注该 channel,当发生变更时,其中的数据也将会进行相应的操作。其他的函数对应的功能如下:

  • Tick:滴答时钟,最终会触发发起选举或者心跳;
  • Campaign:向 raft StateMachine 提交本地选举 MsgHup;
  • Propose:通过 channel 向 raft StateMachine 提交一个 Op,提交的是本地 MsgProp Msg;
  • ProposeConfChange:通过 propc channel 向 raft StateMachine 提交一个配置变更的请求,提交的也是本地 MsgProp Msg;
  • Step:节点收到 Peer 节点发送的 Msg 的时候会通过这个接口提交给 raft - StateMachine,Step 接口通过 recvc channel 向 raft StateMachine 传递这个 Msg;
  • TransferLeadership:提交 Transfer Leader 的 Msg;
  • ReadIndex:提交 read only Msg。

接着是 raft 算法的实现,node 结构体实现了 Node 接口,其定义如下:

代码语言:javascript复制
type node struct {
 propc      chan msgWithResult
 recvc      chan pb.Message
 confc      chan pb.ConfChangeV2
 confstatec chan pb.ConfState
 readyc     chan Ready
 advancec   chan struct{}
 tickc      chan struct{}
 done       chan struct{}
 stop       chan struct{}
 status     chan chan Status

 rn *RawNode
}

这个结构体会在后面经常用到。在 raft/raft.go 中还有两个核心数据结构:

  • Config:与 raft 算法相关的配置参数都包装在该结构体中。该结构体的命名是大写字母开头,用于提供给外部调用。
  • raft:具体实现 raft 算法的结构体。

节点状态

我们来看看 raft StateMachine 的状态机转换,实际上就是 raft 算法中各种角色的转换。每个 raft 节点,可能具有以下三种状态中的一种:

  • Candidate:候选人状态,节点切换到这个状态时,意味着将进行一次新的选举。
  • Follower:跟随者状态,节点切换到这个状态时,意味着选举结束。
  • Leader:领导者状态,所有数据提交都必须先提交到 Leader 上。

每一个状态都有其对应的状态机,每次收到一条提交的数据时,都会根据其不同的状态将消息输入到不同状态的状态机中。同时,在进行 tick 操作时,每种状态对应的处理函数也是不一样的。

因此 raft 结构体中将不同的状态及其不同的处理函数,独立出来几个成员变量:

  • state 保存当前节点状态;
  • tick 函数,每个状态对应的 tick 函数不同;
  • step,状态机函数,同样每个状态对应的状态机也不相同。

状态转换

etcd-raft StateMachine 封装在 raft 机构体中,其状态转换如下图:

raft-StateMachine.png

raft state 转换的调用接口都在 raft.go 中,定义如下:

代码语言:javascript复制
func (r *raft) becomeFollower(term uint64, lead uint64)
func (r *raft) becomePreCandidate()
func (r *raft) becomeCandidate()
func (r *raft) becomeLeader()

raft 在各种状态下,如何驱动 raft StateMachine 状态机运转?etcd 将 raft 相关的所有处理都抽象为了 Msg,通过 Step 接口处理:

代码语言:javascript复制
func (r *raft) Step(m pb.Message) error {
    r.step(r, m)
}

其中 step 是一个回调函数,在不同的 state 会设置不同的回调函数来驱动 raft,这个回调函数 stepFunc 就是在 becomeXX() 函数完成的设置

代码语言:javascript复制
type raft struct {
    ...
    step stepFunc
}

step 回调函数有如下几个值,其中 stepCandidate 会处理 PreCandidate 和 Candidate 两种状态:

代码语言:javascript复制
func stepFollower(r *raft, m pb.Message) error
func stepCandidate(r *raft, m pb.Message) error
func stepLeader(r *raft, m pb.Message) error

这里以 stepCandidate 为例说明:

代码语言:javascript复制
func stepCandidate(r *raft, m pb.Message) error {
    ...
    switch m.Type {
    case pb.MsgProp:
        r.logger.Infof("%x no Leader at term %d; dropping proposal", r.id, r.Term)
        return ErrProposalDropped
    case pb.MsgApp:
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleAppendEntries(m)
    case pb.MsgHeartbeat:
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleHeartbeat(m)
    case pb.MsgSnap:
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleSnapshot(m)
    case myVoteRespType:
        ...
    case pb.MsgTimeoutNow:
        r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
    }
    return nil
}

即对各种 Msg 进行处理,这里就不展开详细展开。我们来看下 raft 消息的类型及其定义。

raft 消息

raft 算法本质上是一个大的状态机,任何的操作例如选举、提交数据等,最后封装成一个消息结构体,输入到 raft 算法库的状态机中。

在 raft/raftpb/raft.proto 文件中,定义了 raft 算法中传输消息的结构体。熟悉 raft 论文的都知道,raft 算法其实由好几个协议组成,但是在这里,统一定义在了 Message 这个结构体之中,以下总结了该结构体的成员用途。

代码语言:javascript复制
// 位于 raft/raftpb/raft.pb.go:295
type Message struct {
 Type             MessageType `protobuf:"varint,1,opt,name=type,enum=raftpb.MessageType" json:"type"` // 消息类型
 To               uint64      `protobuf:"varint,2,opt,name=to" json:"to"` // 消息接收者的节点ID
 From             uint64      `protobuf:"varint,3,opt,name=from" json:"from"` // 消息发送者的节点ID
 Term             uint64      `protobuf:"varint,4,opt,name=term" json:"term"` // 任期ID
 LogTerm          uint64      `protobuf:"varint,5,opt,name=logTerm" json:"logTerm"` // 日志所处的任期ID
 Index            uint64      `protobuf:"varint,6,opt,name=index" json:"index"` // 日志索引ID,用于节点向Leader汇报自己已经commit的日志数据ID
 Entries          []Entry     `protobuf:"bytes,7,rep,name=entries" json:"entries"` // 日志条目数组
 Commit           uint64      `protobuf:"varint,8,opt,name=commit" json:"commit"` // 提交日志索引
 Snapshot         Snapshot    `protobuf:"bytes,9,opt,name=snapshot" json:"snapshot"` // 快照数据
 Reject           bool        `protobuf:"varint,10,opt,name=reject" json:"reject"` // 是否拒绝
 RejectHint       uint64      `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"` // 拒绝同步日志请求时返回的当前节点日志ID,用于被拒绝方快速定位到下一次合适的同步日志位置
 Context          []byte      `protobuf:"bytes,12,opt,name=context" json:"context,omitempty"` // 上下文数据
 XXX_unrecognized []byte      `json:"-"`
}

Message 结构体相关的数据类型为 MessageType,MessageType 有十九种。当然,并不是所有的消息类型都会用到上面定义的 Message 结构体中的所有字段,因此其中有些字段是 optinal 的,我其中常用的协议(即不同的消息类型)的用途总结成如下的表格:

type

功能

to

from

MsgHup

不用于节点间通信,仅用于发送给本节点让本节点进行选举

消息接收者的节点ID

本节点 ID

MsgBeat

不用于节点间通信,仅用于 Leader 节点在 heartbeat 定时器到期时向集群中其他节点发送心跳消息

消息接收者的节点 ID

本节点 ID

MsgProp

raft 库使用者提议(propose)数据

消息接收者的节点 ID

本节点 ID

MsgApp

用于 Leader 向集群中其他节点同步数据的消息

消息接收者的节点 ID

本节点 ID

MsgSnap

用于 Leader 向 Follower 同步数据用的快照消息

消息接收者的节点 ID

本节点 ID

MsgAppResp

集群中其他节点针对 Leader 的 MsgApp/MsgSnap 消息的应答消息

消息接收者的节点 ID

本节点 ID

MsgVote/MsgPreVote 消息

节点投票给自己以进行新一轮的选举

消息接收者的节点 ID

本节点 ID

MsgVoteResp/MsgPreVoteResp 消息

投票应答消息

消息接收者的节点ID

本节点 ID

MsgUnreachable

用于应用层向 raft 库汇报某个节点当前已不可达

消息接收者的节点 ID

节点 ID

MsgSnapStatus

用于应用层向 raft 库汇报某个节点当前接收快照状态

消息接收者的节点 ID

本节点 ID

MsgTransferLeader

用于迁移 Leader

消息接收者的节点 ID

注意这里不是发送者的 ID 了,而是准备迁移过去成为新 Leader 的节点 ID

MsgCheckQuorum

消息接收者的节点 ID

消息接收者的节点 ID

节点 ID

MsgTimeoutNow

Leader 迁移时,当新旧 Leader 的日志数据同步后,旧 Leader 向新 Leader 发送该消息通知可以进行迁移了

新的 Leader ID

旧的 Leader 的节点 ID

MsgReadIndex 和 MsgReadIndexResp 消息

用于读一致性的消息

接收者节点 ID

发送者节点 ID

上表列出了消息的类型对应的功能、消息接收者的节点 ID 和 消息发送者的节点 ID。在收到消息之后,根据消息类型检索本表,帮助我们理解 raft 算法的操作。

选举流程

raft 一致性算法实现的关键有 Leader 选举、日志复制和安全性限制。Leader 故障后集群能快速选出新 Leader;日志复制, 集群只有 Leader 能写入日志, Leader 负责复制日志到 Follower 节点,并强制 Follower 节点与自己保持相同;安全性

raft 算法的第一步是首先选举出 Leader 出来,在 Leader 出现故障后也需要快速选出新 Leader,所以我们来关注下选举的流程。

发起选举的节点

只有在 Candidate 或者 Follower 状态下的节点,才有可能发起一个选举流程,而这两种状态的节点,其对应的 tick 函数为 raft.tickElection 函数,用来发起选举和选举超时控制。选举流程如下所示:

  • 节点启动时都以 Follower 启动,同时随机生成自己的选举超时时间。之所以每个节点随机选择自己的超时时间,是为了避免同时有两个节点同时进行选举,这种情况下会出现没有任何一个节点赢得半数以上的投票从而这一轮选举失败,继续再进行下一轮选举。
  • 在 Follower 的 tick 函数 tickElection 函数中,当选举超时,节点向自己发送 MsgHup 消息。
  • 在状态机函数 raft.Step 函数中,在收到 MsgHup 消息之后,节点首先判断当前有没有没有 apply 的配置变更消息,如果有就忽略该消息。其原因在于,当有配置更新的情况下不能进行选举操作,即要保证每一次集群成员变化时只能同时变化一个,不能同时有多个集群成员的状态发生变化。
  • 否则进入 campaign 函数中进行选举:首先将任期号 1,然后广播给其他节点选举消息,带上的其它字段包括:节点当前的最后一条日志索引(Index 字段),最后一条日志对应的任期号(LogTerm 字段),选举任期号(Term 字段,即前面已经进行 1 之后的任期号),Context 字段(目的是为了告知这一次是否是 Leader 转让类需要强制进行选举的消息)。
  • 如果在一个选举超时之内,该发起新的选举流程的节点,得到了超过半数的节点投票,那么状态就切换到 Leader 状态,成为 Leader 的同时,Leader 将发送一条 dummy 的 append 消息,目的是为了提交该节点上在此任期之前的值。
收到选举消息的节点

当收到任期号大于当前节点任期号的消息,同时该消息类型如果是选举类的消息(类型为 prevote 或者 vote)时,会做以下判断:

  • 首先判断该消息是否为强制要求进行选举的类型(context 为 campaignTransfer,context 为这种类型时表示在进行 Leader 转让,流程见下面的 Leader 转让流程)
  • 判断当前是否在租约期以内,判断的条件包括:checkQuorum 为 true,当前节点保存的 Leader 不为空,没有到选举超时,前面这三个条件同时满足。

如果不是强制要求选举,同时又在租约期以内,那么就忽略该选举消息返回不进行处理,这么做是为了避免出现那些离开集群的节点,频繁发起新的选举请求。

  • 如果不是前面的忽略选举消息的情况,那么除非是 prevote 类的选举消息,在收到其他消息的情况下,该节点都切换为 Follower 状态。
  • 此时需要针对投票类型中带来的其他字段进行处理了,需要同时满足以下两个条件:
    • 只有在没有给其他节点进行过投票,或者消息的 term 任期号大于当前节点的任期号,或者之前的投票给的就是这个发出消息的节点
    • 进行选举的节点,它的日志是更新的,条件为:logterm 比本节点最新日志的任期号大,在两者相同的情况下,消息的 index 大于等于当前节点最新日志的 index,即总要保证该选举节点的日志比自己的大。

只有在同时满足以上两个条件的情况下,才能同意该节点的选举,否则都会被拒绝。这么做的原因是:保证最后能胜出来当新的 Leader 的节点,它上面的日志都是最新的。

日志复制

选举好 Leader 之后,Leader 在收到 put 提案时,如何将提案复制给其他 Follower 呢?

我们回顾下前面课时所讲的 etcd 读写请求的处理流程。以下面的图示来说明日志复制的流程。

follower-state-2.png

  • 收到客户端请求之后,etcd Server 的 KVServer 模块会向 Raft 模块提交一个类型为 MsgProp 的提案消息。
  • Leader 节点在本地添加一条日志,其对应的命令为 put foo bar。这里涉及到两个索引值,committedIndex 存储的最后一条提交(commit)日志的索引,appliedIndex 存储的是最后一条应用到状态机中的日志索引值,一条日志只有被提交了才能应用到状态机中,因此总有 committedIndex >= appliedIndex 不等式成立。在这里只是添加一条日志还并没有提交,两个索引值还指向上一条日志。
  • Leader 节点向集群中其他节点广播 AppendEntries 消息,带上 put 命令。

接着看看 Leader 怎么将日志数据复制到 Follower 节点。

follower-state-3.png

  • 收到 AppendEntries 请求的 Follower 节点,同样在本地添加了一条新的日志,也还没有提交。
  • Follower 节点向 Leader 节点应答 AppendEntries 消息。
  • 当 Leader 节点收到集群半数以上节点的 AppendEntries 请求的应答消息时,认为 put foo bar 命令成功复制,可以进行提交,于是修改了本地 committed 日志的索引指向最新的存储 put foo bar 的日志,而 appliedIndex 还是保持着上一次的值,因为还没有应用该命令到状态机中。

当这个命令提交完成了之后,命令就可以提交给应用层了。

  • 提交命令完成,给应用层说明这条命令已经提交。此时修改 appliedIndex 与 committedIndex 一样了。
  • Leader 节点在下一次给 Follower 的 AppendEntries 请求中,会带上当前最新的 committedIndex 索引值,Follower 收到之后同样会修改本地日志的 committedIndex 索引。

小结

本文主要介绍了 etcd-raft 模块实现分布式一致性的原理,通过 raftexample 了解 raft 模块的使用方式和过程。接着重点介绍了选举流程和日志复制的过程。除此之外,etcd 还有安全性限制保证日志选举和日志复制的正确性,比如 raft 算法中,并不是所有节点都能成为 Leader。一个节点要成为 Leader,需要得到集群中半数以上节点的投票,而一个节点会投票给一个节点,其中一个充分条件是:这个进行选举的节点的日志,比本节点的日志更新。其他还有判断日志的新旧以及提交前面任期的日志条目等措施。

最后,留一个问题,什么情况会选举超时到来时没有任何一个节点成为 Leader,后续会怎么处理呢?欢迎你在留言区提出。

0 人点赞